From 017c5c842747f1bb5ebc21635e44c343de06bd56 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 8 Sep 2015 23:23:29 +0900 Subject: [PATCH 1/3] TAJO-1829: Fix DelimitedTextFileAppender NPE in negative tests. --- .../apache/tajo/storage/text/DelimitedTextFile.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 650aa60b44..865eb96b0f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -219,15 +219,19 @@ public long getOffset() throws IOException { @Override public void flush() throws IOException { - flushBuffer(); - outputStream.flush(); + if(inited) { + flushBuffer(); + outputStream.flush(); + } } @Override public void close() throws IOException { try { - serializer.release(); + if(serializer != null) { + serializer.release(); + } if(outputStream != null){ flush(); From d8c4bed3cf5c0e1c03c56fca536e3d37f5ac7d6e Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 9 Sep 2015 09:54:00 +0900 Subject: [PATCH 2/3] fix stat NPE --- .../org/apache/tajo/storage/text/DelimitedTextFile.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index 865eb96b0f..df0ee3a75d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -233,12 +233,10 @@ public void close() throws IOException { serializer.release(); } - if(outputStream != null){ - flush(); - } + flush(); // Statistical section - if (enabledStats) { + if (enabledStats && stats != null) { stats.setNumBytes(getOffset()); } From 8f1801e8731b52a4c0c194849ddecd30b3269af3 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 9 Sep 2015 10:54:44 +0900 Subject: [PATCH 3/3] TAJO-1829 --- .../tajo/storage/text/DelimitedTextFile.java | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index df0ee3a75d..bc1d7ba9ba 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -137,6 +137,22 @@ public void init() throws IOException { throw new FileNotFoundException(path.toString()); } + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + if(serializer != null) { + serializer.release(); + } + serializer = getLineSerde().createSerializer(schema, meta); + serializer.init(); + + bufferSize = conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); + if (os == null) { + os = new NonSyncByteArrayOutputStream(bufferSize); + } + os.reset(); + if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); codecFactory = new CompressionCodecFactory(conf); @@ -163,19 +179,6 @@ public void init() throws IOException { outputStream = new DataOutputStream(new BufferedOutputStream(fos)); } - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - - serializer = getLineSerde().createSerializer(schema, meta); - serializer.init(); - - bufferSize = conf.getInt(WRITE_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); - if (os == null) { - os = new NonSyncByteArrayOutputStream(bufferSize); - } - - os.reset(); pos = fos.getPos(); bufferedBytes = 0; super.init(); @@ -236,7 +239,7 @@ public void close() throws IOException { flush(); // Statistical section - if (enabledStats && stats != null) { + if (enabledStats) { stats.setNumBytes(getOffset()); }