From b735135f24b5ceea707312f488044d9a9aad7c07 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Tue, 26 Sep 2017 12:32:02 -0700 Subject: [PATCH 1/4] Removed closing of ReadChannel from ArrowReader.close() and updated usage in tests to close things correctly --- .../src/main/java/org/apache/arrow/tools/EchoServer.java | 4 ++-- .../src/main/java/org/apache/arrow/tools/FileRoundtrip.java | 5 +++-- .../src/main/java/org/apache/arrow/tools/FileToStream.java | 3 +++ .../src/test/java/org/apache/arrow/tools/EchoServerTest.java | 4 ++-- .../main/java/org/apache/arrow/vector/file/ArrowReader.java | 1 - .../org/apache/arrow/vector/file/TestArrowReaderWriter.java | 4 ++-- .../org/apache/arrow/vector/file/TestArrowStreamPipe.java | 1 + 7 files changed, 13 insertions(+), 9 deletions(-) diff --git a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java index c53f0ea8693..3091bc4dab1 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/EchoServer.java @@ -95,9 +95,9 @@ public ClientConnection(Socket socket) { } public void run() throws IOException { - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); // Read the entire input stream and write it back - try (ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), allocator)) { VectorSchemaRoot root = reader.getVectorSchemaRoot(); // load the first batch before instantiating the writer so that we have any dictionaries reader.loadNextBatch(); diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java index 7d71b0b8f9d..ab8fa6e45ce 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileRoundtrip.java @@ -79,8 +79,9 @@ int run(String[] args) { File inFile = validateFile("input", inFileName); File outFile = validateFile("output", outFileName); - BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); // TODO: close - try (FileInputStream fileInputStream = new FileInputStream(inFile); + + try (BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(inFile); ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), allocator)) { diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java index 6722b30fa7f..8f7806ef01c 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -68,5 +68,8 @@ public static void main(String[] args) throws IOException { System.out : new FileOutputStream(new File(args[1])); convert(in, out); + + in.close(); + out.close(); } } diff --git a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java index 467965aff95..ecac6d6e530 100644 --- a/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java +++ b/java/tools/src/test/java/org/apache/arrow/tools/EchoServerTest.java @@ -95,9 +95,9 @@ private void testEchoServer(int serverPort, NullableTinyIntVector vector, int batches) throws UnknownHostException, IOException { - BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); VectorSchemaRoot root = new VectorSchemaRoot(asList(field), asList((FieldVector) vector), 0); - try (Socket socket = new Socket("localhost", serverPort); + try (BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE); + Socket socket = new Socket("localhost", serverPort); ArrowStreamWriter writer = new ArrowStreamWriter(root, null, socket.getOutputStream()); ArrowStreamReader reader = new ArrowStreamReader(socket.getInputStream(), alloc)) { writer.start(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index 646d6feeef0..e4ae3a506b6 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -141,7 +141,6 @@ public void close() throws IOException { dictionary.getVector().close(); } } - in.close(); } protected abstract Schema readSchema(T in) throws IOException; diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java index 65332aa2c7d..3ce01a26835 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowReaderWriter.java @@ -92,8 +92,8 @@ public void test() throws IOException { byte[] byteArray = out.toByteArray(); - SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray)); - try (ArrowFileReader reader = new ArrowFileReader(channel, allocator)) { + try (SeekableReadChannel channel = new SeekableReadChannel(new ByteArrayReadableSeekableByteChannel(byteArray)); + ArrowFileReader reader = new ArrowFileReader(channel, allocator)) { Schema readSchema = reader.getVectorSchemaRoot().getSchema(); assertEquals(schema, readSchema); assertTrue(readSchema.getFields().get(0).getTypeLayout().getVectorTypes().toString(), readSchema.getFields().get(0).getTypeLayout().getVectors().size() > 0); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java index a19c3795fd5..40716942f02 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowStreamPipe.java @@ -141,6 +141,7 @@ public void run() { while (!done) { assertTrue(reader.loadNextBatch()); } + reader.close(); } catch (IOException e) { e.printStackTrace(); Assert.fail(e.toString()); // have to explicitly fail since we're in a separate thread From 103a4192b97de0c0bb9b775d2230ca728e44b1c9 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 4 Oct 2017 10:57:36 -0700 Subject: [PATCH 2/4] changed to alternate close --- .../java/org/apache/arrow/vector/file/ArrowReader.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index e4ae3a506b6..cc485e5f484 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -135,12 +135,20 @@ public long bytesRead() { @Override public void close() throws IOException { + close(true); + } + + public void close(boolean closeReadChannel) throws IOException { if (initialized) { root.close(); for (Dictionary dictionary : dictionaries.values()) { dictionary.getVector().close(); } } + + if (closeReadChannel) { + in.close(); + } } protected abstract Schema readSchema(T in) throws IOException; From a9125dda09c8293a28aff217fad4784bdf43a396 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 4 Oct 2017 11:11:45 -0700 Subject: [PATCH 3/4] revert test that manually closed ReadChannel --- .../src/main/java/org/apache/arrow/tools/FileToStream.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java index 8f7806ef01c..6722b30fa7f 100644 --- a/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java +++ b/java/tools/src/main/java/org/apache/arrow/tools/FileToStream.java @@ -68,8 +68,5 @@ public static void main(String[] args) throws IOException { System.out : new FileOutputStream(new File(args[1])); convert(in, out); - - in.close(); - out.close(); } } From 028f2cd57e347e49480cc9bc5ec949bd285781ff Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Mon, 16 Oct 2017 09:46:06 -0700 Subject: [PATCH 4/4] Added docs to ArrowReader --- .../apache/arrow/vector/file/ArrowReader.java | 42 +++++++++++++++++-- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java index cc485e5f484..21fb2207eb0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/ArrowReader.java @@ -41,6 +41,11 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.DictionaryUtility; +/** + * Abstract class to read ArrowRecordBatches from a ReadChannel. + * + * @param Type of ReadChannel to use + */ public abstract class ArrowReader implements DictionaryProvider, AutoCloseable { private final T in; @@ -58,7 +63,7 @@ protected ArrowReader(T in, BufferAllocator allocator) { } /** - * Returns the vector schema root. This will be loaded with new values on every call to loadNextBatch + * Returns the vector schema root. This will be loaded with new values on every call to loadNextBatch. * * @return the vector schema root * @throws IOException if reading of schema fails @@ -69,9 +74,9 @@ public VectorSchemaRoot getVectorSchemaRoot() throws IOException { } /** - * Returns any dictionaries + * Returns any dictionaries that were loaded along with ArrowRecordBatches. * - * @return dictionaries, if any + * @return Map of dictionaries to dictionary id, empty if no dictionaries loaded * @throws IOException if reading of schema fails */ public Map getDictionaryVectors() throws IOException { @@ -79,6 +84,12 @@ public Map getDictionaryVectors() throws IOException { return dictionaries; } + /** + * Lookup a dictionary that has been loaded using the dictionary id. + * + * @param id Unique identifier for a dictionary + * @return the requested dictionary or null if not found + */ @Override public Dictionary lookup(long id) { if (!initialized) { @@ -88,7 +99,12 @@ public Dictionary lookup(long id) { return dictionaries.get(id); } - // Returns true if a batch was read, false on EOS + /** + * Load the next ArrowRecordBatch to the vector schema root if available. + * + * @return true if a batch was read, false on EOS + * @throws IOException + */ public boolean loadNextBatch() throws IOException { ensureInitialized(); // read in all dictionary batches, then stop after our first record batch @@ -129,15 +145,33 @@ public Boolean visit(ArrowRecordBatch message) { return readBatch; } + /** + * Return the number of bytes read from the ReadChannel. + * + * @return number of bytes read + */ public long bytesRead() { return in.bytesRead(); } + /** + * Close resources, including vector schema root and dictionary vectors, and the + * underlying ReadChannel. + * + * @throws IOException + */ @Override public void close() throws IOException { close(true); } + /** + * Close resources, including vector schema root and dictionary vectors. If the flag + * closeReadChannel is true then close the underlying ReadChannel, otherwise leave it open. + * + * @param closeReadChannel Flag to control if closing the underlying ReadChannel + * @throws IOException + */ public void close(boolean closeReadChannel) throws IOException { if (initialized) { root.close();