Skip to content

Commit

Permalink
apacheGH-36912: [Java] JDBC driver stops consuming roots if it sees a…
Browse files Browse the repository at this point in the history
…n empty root
  • Loading branch information
jduo committed Nov 5, 2023
1 parent fc8c6b7 commit e5b99d7
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,8 @@ private CloseableEndpointStreamPair next(final EndpointStreamSupplier endpointSt
futures.remove(future);
try {
final CloseableEndpointStreamPair endpoint = future.get();
// Get the next FlightStream with content.
// The stream is non-empty.
if (endpoint.getStream().getRoot().getRowCount() > 0) {
// Get the next FlightStream that has a root with content.
if (endpoint != null) {
return endpoint;
}
} catch (final ExecutionException | InterruptedException | CancellationException e) {
Expand Down Expand Up @@ -178,8 +177,12 @@ public synchronized void enqueue(final CloseableEndpointStreamPair endpointReque
endpointsToClose.add(endpointRequest);
futures.add(completionService.submit(() -> {
// `FlightStream#next` will block until new data can be read or stream is over.
endpointRequest.getStream().next();
return endpointRequest;
while (endpointRequest.getStream().next()) {
if (endpointRequest.getStream().getRoot().getRowCount() > 0) {
return endpointRequest;
}
}
return null;
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,4 +455,17 @@ allocator, forGrpcInsecure("localhost", 0), rootProducer)
}
}
}

@Test
public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(
CoreMockedSqlProducers.LEGACY_REGULAR_WITH_EMPTY_SQL_CMD)) {
long rowCount = 0;
while (resultSet.next()) {
++rowCount;
}
assertEquals(2, rowCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public final class CoreMockedSqlProducers {
public static final String LEGACY_REGULAR_SQL_CMD = "SELECT * FROM TEST";
public static final String LEGACY_METADATA_SQL_CMD = "SELECT * FROM METADATA";
public static final String LEGACY_CANCELLATION_SQL_CMD = "SELECT * FROM TAKES_FOREVER";
public static final String LEGACY_REGULAR_WITH_EMPTY_SQL_CMD = "SELECT * FROM TEST_EMPTIES";

private CoreMockedSqlProducers() {
// Prevent instantiation.
Expand All @@ -80,9 +81,44 @@ public static MockFlightSqlProducer getLegacyProducer() {
addLegacyRegularSqlCmdSupport(producer);
addLegacyMetadataSqlCmdSupport(producer);
addLegacyCancellationSqlCmdSupport(producer);
addQueryWithEmbeddedEmptyRoot(producer);
return producer;
}

private static void addQueryWithEmbeddedEmptyRoot(final MockFlightSqlProducer producer) {
final Schema querySchema = new Schema(ImmutableList.of(
new Field(
"ID",
new FieldType(true, new ArrowType.Int(64, true),
null),
null)
));

final List<Consumer<ServerStreamListener>> resultProducers = new ArrayList<>();
Consumer<ServerStreamListener> dataRoot = listener -> {
try (final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
final VectorSchemaRoot root = VectorSchemaRoot.create(querySchema, allocator)) {
root.allocateNew();
root.setRowCount(0);
listener.start(root);
listener.putNext(); // empty root
((BigIntVector) root.getVector("ID")).setSafe(0, 100L);
root.setRowCount(1);
listener.putNext(); // data root
root.clear();
root.setRowCount(0);
listener.putNext(); // empty root
((BigIntVector) root.getVector("ID")).setSafe(0, 100L);
root.setRowCount(1);
listener.putNext(); // data root
} finally {
listener.completed();
}
};
resultProducers.add(dataRoot);
producer.addSelectQuery(LEGACY_REGULAR_WITH_EMPTY_SQL_CMD, querySchema, resultProducers);
}

private static void addLegacyRegularSqlCmdSupport(final MockFlightSqlProducer producer) {
final Schema querySchema = new Schema(ImmutableList.of(
new Field(
Expand Down

0 comments on commit e5b99d7

Please sign in to comment.