Skip to content

[JAVA] Use prepared statement leads Memory leak #50

@Ma1oneZhang

Description

@Ma1oneZhang

Describe the bug, including details regarding any error messages, version, and platform.

Hi arrow maintainer, i find some memory leak in my flight-sql usage. I use arrow-16.0.0, and works on AMD Ryzen 3970X machine.

package org.example;

import org.apache.arrow.flight.*;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.flight.sql.FlightSqlClient;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.*;
import org.apache.arrow.vector.types.pojo.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class SqlRunner {

    private static final Logger log = LoggerFactory.getLogger(SqlRunner.class);

    static void run_flight_sql() throws Exception {
        try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) {
            final Location clientLocation = Location.forGrpcInsecure("127.0.0.1", 8360);
            try (FlightClient client = FlightClient.builder(allocator, clientLocation).build();
                 FlightSqlClient sqlClient = new FlightSqlClient(client)) {

                Optional<CredentialCallOption> credentialCallOption = client.authenticateBasicToken("admin", "public");
                CallHeaders headers = new FlightCallHeaders();
                headers.insert("database", "test");

                Set<CallOption> options = new HashSet<>();
                credentialCallOption.ifPresent(options::add);
                options.add(new HeaderCallOption(headers));
               // use the sql to query is ok
               try {
                   String query = "SELECT count(*) from test.sx1;";
                   executeQuery(sqlClient, query, options);
               } catch (Exception e){
                   e.printStackTrace();
                   throw e;
               }

               // where memory leak happens
                try {
                    try (FlightSqlClient.PreparedStatement preparedStatement = sqlClient.prepare("insert into table sx1 (sid, value, flag) values(?, ?, ?);", options.toArray(new CallOption[0]))) {
                        insertBatch(sqlClient, preparedStatement, allocator, options);
                    }
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

    private static void executeQuery(FlightSqlClient sqlClient, String query, Set<CallOption> options) throws Exception {
        final FlightInfo info = sqlClient.execute(query, options.toArray(new CallOption[0]));
        final Ticket ticket = info.getEndpoints().get(0).getTicket();
        try (FlightStream stream = sqlClient.getStream(ticket, options.toArray(new CallOption[0]))) {
            while (stream.next()) {
                try (VectorSchemaRoot schemaRoot = stream.getRoot()) {
                    log.info(schemaRoot.contentToTSVString());
                }
            }
        }
    }

    private static void insertBatch(FlightSqlClient sqlClient, FlightSqlClient.PreparedStatement preparedStatement, BufferAllocator allocator, Set<CallOption> options) throws Exception {
        try (IntVector sids = new IntVector("sid", allocator);
             Float4Vector values = new Float4Vector("value", allocator);
             TinyIntVector flags = new TinyIntVector("flag", allocator)) {

            sids.allocateNew(100);
            values.allocateNew(100);
            flags.allocateNew(100);

            for (int i = 0; i < 100; i++) {
                sids.setSafe(i, i);
                values.setSafe(i, (float) i);
                flags.setSafe(i, (byte) i);
            }

            List<Field> fields = Arrays.asList(sids.getField(), values.getField(), flags.getField());
            List<FieldVector> fieldVectors = Arrays.asList(sids, values, flags);
            try (VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(fields, fieldVectors)){
                vectorSchemaRoot.setRowCount(100);

                preparedStatement.setParameters(vectorSchemaRoot);
                FlightInfo info = preparedStatement.execute();

                final Ticket ticket = info.getEndpoints().get(0).getTicket();
                try (FlightStream stream = sqlClient.getStream(ticket, options.toArray(new CallOption[0]))) {
                    while (stream.next()) {
                        try (VectorSchemaRoot schemaRoot = stream.getRoot()) {
                            List<FieldVector> vectors = schemaRoot.getFieldVectors();
                            for (int i = 0; i < vectors.size(); i++) {
                                System.out.printf("%d %s\n", i, vectors.get(i));
                            }
                        }
                    }
                }
                preparedStatement.clearParameters();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        run_flight_sql();
    }
}

That's the memory allocator verboseString, is it a bug? or server side bad implement?

=================================================================
Allocator(ROOT) 0/16/1424/2147483647 (res/actual/peak/limit)
  child allocators: 1
    Allocator(flight-client) 0/16/272/9223372036854775807 (res/actual/peak/limit)
      child allocators: 0
      ledgers: 1
        ledger[6] allocator: flight-client), isOwning: , size: , references: 1, life: 1242736490302663..0, allocatorManager: [, life: ] holds 1 buffers. 
            ArrowBuf[23], address:139720841494544, capacity:16
       event log for: ArrowBuf[23]
         1242736490544534 create()
                  at org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:180)
                  at org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:85)
                  at org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:98)
                  at org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:259)
                  at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:352)
                  at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:328)
                  at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:291)
                  at org.apache.arrow.flight.PutResult.fromProtocol(PutResult.java:82)
                  at org.apache.arrow.flight.FlightClient$SetStreamObserver.onNext(FlightClient.java:466)
                  at org.apache.arrow.flight.FlightClient$SetStreamObserver.onNext(FlightClient.java:454)
                  at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:468)
                  at io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
                  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInternal(ClientCallImpl.java:657)
                  at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:644)
                  at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
                  at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                  at java.lang.Thread.run(Thread.java:750)

      reservations: 0
  ledgers: 0
  reservations: 0

=================================================================

Component(s)

Java

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type: bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions