From f687029710f45adc5e46bf248a0b9e86aac3a5e5 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 17 Aug 2015 17:33:55 +0900 Subject: [PATCH] TAJO-1779: Remove "DFSInputStream has been closed already" messages in DelimitedLineReader --- .../java/org/apache/tajo/storage/ByteBufInputChannel.java | 2 +- .../java/org/apache/tajo/storage/FSDataInputChannel.java | 6 +++++- .../org/apache/tajo/storage/text/DelimitedLineReader.java | 2 +- .../test/java/org/apache/tajo/storage/TestLineReader.java | 8 ++++++-- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java index bdfec91962..0c289c360e 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java @@ -45,6 +45,6 @@ public int read(ByteBuffer dst) throws IOException { @Override protected void implCloseChannel() throws IOException { - IOUtils.cleanup(null, channel, inputStream); + IOUtils.cleanup(null, channel); } } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java index 3f638c06eb..605516d545 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FSDataInputChannel.java @@ -74,6 +74,10 @@ public long size() throws IOException { @Override protected void implCloseChannel() throws IOException { - IOUtils.cleanup(null, channel, inputStream); + if (isDirectRead) { + IOUtils.cleanup(null, inputStream); + } else { + IOUtils.cleanup(null, channel); + } } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java index 5b93c5cd94..6d6cf72460 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -179,7 +179,7 @@ public boolean isCompressed() { @Override public void close() throws IOException { try { - IOUtils.cleanup(LOG, lineReader, is, fis); + IOUtils.cleanup(LOG, lineReader); fs = null; is = null; fis = null; diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index 7410778085..21fff58b1a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -96,7 +96,8 @@ public void testByteBufLineReader() throws IOException { if(buf == null) break; i++; } - IOUtils.cleanup(null, reader, channel, fs); + IOUtils.cleanup(null, reader, fs); + assertFalse(channel.isOpen()); assertEquals(tupleNum, i); assertEquals(status.getLen(), totalRead); assertEquals(status.getLen(), reader.readBytes()); @@ -229,6 +230,7 @@ public void testByteBufLineReaderWithoutTerminating() throws IOException { i++; } IOUtils.cleanup(null, reader); + assertFalse(channel.isOpen()); assertEquals(file.length(), totalRead); assertEquals(file.length(), reader.readBytes()); assertEquals(data.split("\n").length, i); @@ -261,6 +263,7 @@ public void testCRLFLine() throws IOException { i++; } IOUtils.cleanup(null, reader); + assertFalse(channel.isOpen()); assertEquals(status.getLen(), totalRead); assertEquals(status.getLen(), reader.readBytes()); } @@ -319,7 +322,8 @@ public void testSeekableByteBufLineReader() throws IOException { i++; } - IOUtils.cleanup(null, reader, channel, fs); + IOUtils.cleanup(null, reader, fs); + assertFalse(channel.isOpen()); assertEquals(tupleNum, i); assertEquals(status.getLen(), totalRead);