Skip to content

Commit

Permalink
ARROW-7899: [Integration][Java] Fix Flight integration test client to…
Browse files Browse the repository at this point in the history
… verify each batch

Previously the Java Flight integration test client would read a number of batches from the server, but only verify the last one is equal the last batch in the JSON file. This fixes the client to verify each batch is correct and that the number of batches sent is the same as in the JSON input file.

Closes #6476 from BryanCutler/integration-null-test-ARROW-7899 and squashes the following commits:

6d1cfc3 <Bryan Cutler> Change Java Flight test client to not error on trailing empty batches
fcc7390 <Bryan Cutler> Fix Flight integration test client to verify each batch

Authored-by: Bryan Cutler <cutlerb@gmail.com>
Signed-off-by: Bryan Cutler <cutlerb@gmail.com>
  • Loading branch information
BryanCutler committed Feb 24, 2020
1 parent 5cf9b21 commit 23d74c0
Showing 1 changed file with 26 additions and 15 deletions.
Expand Up @@ -40,6 +40,7 @@
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.JsonFileReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Validator;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
Expand Down Expand Up @@ -102,12 +103,8 @@ private static void testStream(BufferAllocator allocator, Location server, Fligh
throws IOException {
// 1. Read data from JSON and upload to server.
FlightDescriptor descriptor = FlightDescriptor.path(inputPath);
VectorSchemaRoot jsonRoot;
try (JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator);
VectorSchemaRoot root = VectorSchemaRoot.create(reader.start(), allocator)) {
jsonRoot = VectorSchemaRoot.create(root.getSchema(), allocator);
VectorUnloader unloader = new VectorUnloader(root);
VectorLoader jsonLoader = new VectorLoader(jsonRoot);
FlightClient.ClientStreamListener stream = client.startPut(descriptor, root, reader,
new AsyncPutListener() {
int counter = 0;
Expand All @@ -131,9 +128,6 @@ public void onNext(PutResult val) {
metadata.writeBytes(rawMetadata);
// Transfers ownership of the buffer, so do not release it ourselves
stream.putNext(metadata);
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
jsonLoader.load(arb);
}
root.clear();
counter++;
}
Expand All @@ -160,22 +154,39 @@ public void onNext(PutResult val) {
try (FlightClient readClient = FlightClient.builder(allocator, location).build();
FlightStream stream = readClient.getStream(endpoint.getTicket());
VectorSchemaRoot root = stream.getRoot();
VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator)) {
VectorSchemaRoot downloadedRoot = VectorSchemaRoot.create(root.getSchema(), allocator);
JsonFileReader reader = new JsonFileReader(new File(inputPath), allocator)) {
VectorLoader loader = new VectorLoader(downloadedRoot);
VectorUnloader unloader = new VectorUnloader(root);
while (stream.next()) {
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
loader.load(arb);

Schema jsonSchema = reader.start();
Validator.compareSchemas(root.getSchema(), jsonSchema);
try (VectorSchemaRoot jsonRoot = VectorSchemaRoot.create(jsonSchema, allocator)) {

while (stream.next()) {
try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
loader.load(arb);
if (reader.read(jsonRoot)) {

// 4. Validate the data.
Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot);
jsonRoot.clear();
} else {
throw new RuntimeException("Flight stream has more batches than JSON");
}
}
}
}

// 4. Validate the data.
Validator.compareVectorSchemaRoot(jsonRoot, downloadedRoot);
// Verify no more batches with data in JSON
// NOTE: Currently the C++ Flight server skips empty batches at end of the stream
if (reader.read(jsonRoot) && jsonRoot.getRowCount() > 0) {
throw new RuntimeException("JSON has more batches with than Flight stream");
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
jsonRoot.close();
}
}

0 comments on commit 23d74c0

Please sign in to comment.