diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java index 53d010a25f56b..1b28b2d86a300 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/example/integration/IntegrationTestClient.java @@ -40,6 +40,7 @@ import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.JsonFileReader; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Validator; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -102,12 +103,8 @@ private static void testStream(BufferAllocator allocator, Location server, Fligh throws IOException { // 1. Read data from JSON and upload to server. FlightDescriptor descriptor = FlightDescriptor.path(inputPath); - VectorSchemaRoot jsonRoot; try (JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator); VectorSchemaRoot root = VectorSchemaRoot.create(reader.start(), allocator)) { - jsonRoot = VectorSchemaRoot.create(root.getSchema(), allocator); - VectorUnloader unloader = new VectorUnloader(root); - VectorLoader jsonLoader = new VectorLoader(jsonRoot); FlightClient.ClientStreamListener stream = client.startPut(descriptor, root, reader, new AsyncPutListener() { int counter = 0; @@ -131,9 +128,6 @@ public void onNext(PutResult val) { metadata.writeBytes(rawMetadata); // Transfers ownership of the buffer, so do not release it ourselves stream.putNext(metadata); - try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { - jsonLoader.load(arb); - } root.clear(); counter++; } @@ -160,22 +154,39 @@ public void onNext(PutResult val) { try (FlightClient readClient = FlightClient.builder(allocator, location).build(); FlightStream stream = readClient.getStream(endpoint.getTicket()); VectorSchemaRoot root = stream.getRoot(); - VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator)) { + VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator); + JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) { VectorLoader loader = new VectorLoader(downloadedRoot); VectorUnloader unloader = new VectorUnloader(root); - while (stream.next()) { - try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { - loader.load(arb); + + Schema jsonSchema = reader.start(); + Validator.compareSchemas(root.getSchema(), jsonSchema); + try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) { + + while (stream.next()) { + try (final ArrowRecordBatch arb = unloader.getRecordBatch()) { + loader.load(arb); + if (reader.read(jsonRoot)) { + + // 4. Validate the data. + Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot); + jsonRoot.clear(); + } else { + throw new RuntimeException("Flight stream has more batches than JSON"); + } + } } - } - // 4. Validate the data. - Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot); + // Verify no more batches with data in JSON + // NOTE: Currently the C++ Flight server skips empty batches at end of the stream + if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) { + throw new RuntimeException("JSON has more batches with than Flight stream"); + } + } } catch (Exception e) { throw new RuntimeException(e); } } } - jsonRoot.close(); } }