From 697ce8cae14510bee11aecbefb5d0a1e01e00668 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 27 May 2016 17:18:18 +0900 Subject: [PATCH 1/6] TAJO-2164: SequenceFile print wrong values with TextSerializerDeserializer --- .../tajo/storage/sequencefile/SequenceFileScanner.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index b7dc1eca9f..26a68c57ab 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -41,6 +41,7 @@ import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.storage.text.TextLineDeserializer; import org.apache.tajo.storage.text.TextLineParsingError; +import org.apache.tajo.util.KeyValueSet; import java.io.IOException; @@ -120,6 +121,11 @@ public void init() throws IOException { } outTuple = new VTuple(targets.length); + + KeyValueSet options = new KeyValueSet(); + options.set(StorageConstants.TEXT_DELIMITER, delim); + meta.setPropertySet(options); + deserializer = DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets); deserializer.init(); From cf6051d0d31cf2c82d008064ed9e55c5259bc879 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 27 May 2016 17:21:52 +0900 Subject: [PATCH 2/6] Add to get delimiter with text.delimiter --- .../apache/tajo/storage/sequencefile/SequenceFileScanner.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 26a68c57ab..b24ba541e9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -106,6 +106,10 @@ public void init() throws IOException { } String delim = meta.getProperty(StorageConstants.SEQUENCEFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + if (delim == null || delim.isEmpty()) { + delim = meta.getProperty(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + } + this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0); this.start = fragment.getStartKey(); From 4f799020959fa4ab1d7e3560103345fbda3cd064 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 27 May 2016 17:54:49 +0900 Subject: [PATCH 3/6] Fix a unit test bug and add unit tests --- .../sequencefile/SequenceFileScanner.java | 7 +- .../org/apache/tajo/storage/TestStorages.java | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+), 6 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index b24ba541e9..8c49d2a00e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -41,7 +41,6 @@ import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.storage.text.TextLineDeserializer; import org.apache.tajo.storage.text.TextLineParsingError; -import org.apache.tajo.util.KeyValueSet; import java.io.IOException; @@ -109,6 +108,7 @@ public void init() throws IOException { if (delim == null || delim.isEmpty()) { delim = meta.getProperty(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); } + meta.getPropertySet().set(StorageConstants.TEXT_DELIMITER, delim); this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0); @@ -125,11 +125,6 @@ public void init() throws IOException { } outTuple = new VTuple(targets.length); - - KeyValueSet options = new KeyValueSet(); - options.set(StorageConstants.TEXT_DELIMITER, delim); - meta.setPropertySet(options); - deserializer = DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets); deserializer.init(); diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index c1423d7899..ecd7ad518b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -866,6 +866,73 @@ public void testSequenceFileTextSerializeDeserialize() throws IOException { assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); } + + @Test + public void testSequenceFileTextSerializeDeserializeWithDelimiter() throws IOException { + if(!dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) return; + + Schema schema = SchemaBuilder.builder() + .add("col1", Type.BOOLEAN) + .add("col2", CatalogUtil.newDataTypeWithLen(Type.CHAR, 7)) + .add("col3", Type.INT2) + .add("col4", Type.INT4) + .add("col5", Type.INT8) + .add("col6", Type.FLOAT4) + .add("col7", Type.FLOAT8) + .add("col8", Type.TEXT) + .add("col9", Type.BLOB) + .add("col10", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())).build(); + + TableMeta meta = CatalogUtil.newTableMeta(dataFormat, conf); + meta.putProperty(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putProperty(StorageConstants.SEQUENCEFILE_DELIMITER, "\u0001"); + + Path tablePath = new Path(testDir, "testSequenceFileTextSerializeDeserializeWithDelimiter.data"); + FileTablespace sm = TablespaceManager.getLocalFs(); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + + VTuple tuple = new VTuple(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + ProtobufDatumFactory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.asDatum(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + @Test public void testSequenceFileBinarySerializeDeserialize() throws IOException { if(!dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) return; From e6ca35de0ec88c7cd9d049b90c4588fafb579060 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 30 May 2016 15:04:53 +0900 Subject: [PATCH 4/6] Get SEQUENCEFILE_DELIMITER if TEXT_DELIMITER doesn't exist and SEQUENCEFILE_DELIMITER exists. --- .../storage/sequencefile/SequenceFileScanner.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 8c49d2a00e..5cfacfae8b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -41,6 +41,7 @@ import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.storage.text.TextLineDeserializer; import org.apache.tajo.storage.text.TextLineParsingError; +import org.apache.tajo.util.KeyValueSet; import java.io.IOException; @@ -104,12 +105,15 @@ public void init() throws IOException { nullChars = nullCharacters.getBytes(); } - String delim = meta.getProperty(StorageConstants.SEQUENCEFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); - if (delim == null || delim.isEmpty()) { - delim = meta.getProperty(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + + // Set value of non-deprecated key for CSVLineSerDe::getFieldDelimiter. + KeyValueSet keyValueSet = meta.getPropertySet(); + if (!keyValueSet.containsKey(StorageConstants.TEXT_DELIMITER) + && keyValueSet.containsKey(StorageConstants.SEQUENCEFILE_DELIMITER)) { + keyValueSet.set(StorageConstants.TEXT_DELIMITER, meta.getProperty(StorageConstants.SEQUENCEFILE_DELIMITER)); } - meta.getPropertySet().set(StorageConstants.TEXT_DELIMITER, delim); + String delim = meta.getProperty(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0); this.start = fragment.getStartKey(); From 2891ac2d4c2807160762cc5b52b4b5815eed7982 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Mon, 30 May 2016 15:48:05 +0900 Subject: [PATCH 5/6] Modify descriptions --- .../apache/tajo/storage/sequencefile/SequenceFileScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 5cfacfae8b..8f8f4bc151 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -106,7 +106,7 @@ public void init() throws IOException { } - // Set value of non-deprecated key for CSVLineSerDe::getFieldDelimiter. + // Set value of non-deprecated key for compatibility. It will be used at CSVLineSerDe::getFieldDelimiter. KeyValueSet keyValueSet = meta.getPropertySet(); if (!keyValueSet.containsKey(StorageConstants.TEXT_DELIMITER) && keyValueSet.containsKey(StorageConstants.SEQUENCEFILE_DELIMITER)) { From 3367df42b8a521a8145f53b0ccfcab8cc5279418 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 31 May 2016 11:46:04 +0900 Subject: [PATCH 6/6] Add null character compatibility --- .../org/apache/tajo/catalog/CatalogUtil.java | 2 +- .../tajo/catalog/store/HiveCatalogStore.java | 14 ++-- .../catalog/store/TestHiveCatalogStore.java | 6 ++ .../planner/physical/PhysicalPlanUtil.java | 9 ++- .../sequencefile/SequenceFileAppender.java | 23 +++++- .../sequencefile/SequenceFileScanner.java | 45 +++++++----- .../tajo/storage/text/TextLineSerDe.java | 8 +- .../org/apache/tajo/storage/TestStorages.java | 73 ++++++++++++++++++- 8 files changed, 140 insertions(+), 40 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index f15ce030a0..9bda01bce4 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -768,7 +768,7 @@ public static KeyValueSet newDefaultProperty(String dataFormat, TajoConf conf) { options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); } else if (dataFormat.equalsIgnoreCase("SEQUENCEFILE")) { options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); - options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); } else if (dataFormat.equalsIgnoreCase("PARQUET")) { options.set(BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE); options.set(PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE); diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index b855c77733..1a154d6ebe 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -219,8 +219,8 @@ public final CatalogProtos.TableDescProto getTable(String databaseName, final St } } else if (BuiltinStorages.SEQUENCE_FILE.equals(dataFormat)) { - options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); - options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); + options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); + options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat)); String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); if (LazyBinarySerDe.class.getName().equals(serde)) { options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); @@ -527,7 +527,7 @@ public final void createTable(final CatalogProtos.TableDescProto tableDescProto) if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); - String fieldDelimiter = tableDesc.getMeta().getProperty(StorageConstants.SEQUENCEFILE_DELIMITER, + String fieldDelimiter = tableDesc.getMeta().getProperty(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); // User can use an unicode for filed delimiter such as \u0001, \001. @@ -539,15 +539,15 @@ public final void createTable(final CatalogProtos.TableDescProto tableDescProto) StringEscapeUtils.unescapeJava(fieldDelimiter)); sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM, StringEscapeUtils.unescapeJava(fieldDelimiter)); - table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER); + table.getParameters().remove(StorageConstants.TEXT_DELIMITER); } else { sd.getSerdeInfo().setSerializationLib(LazyBinarySerDe.class.getName()); } - if (tableDesc.getMeta().containsProperty(StorageConstants.SEQUENCEFILE_NULL)) { + if (tableDesc.getMeta().containsProperty(StorageConstants.TEXT_NULL)) { table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, - StringEscapeUtils.unescapeJava(tableDesc.getMeta().getProperty(StorageConstants.SEQUENCEFILE_NULL))); - table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL); + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getProperty(StorageConstants.TEXT_NULL))); + table.getParameters().remove(StorageConstants.TEXT_NULL); } } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.PARQUET)) { StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.PARQUET); diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index 1260371c77..d1be35b8b6 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -36,6 +36,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.NullDatum; import org.apache.tajo.schema.IdentifierUtil; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.CommonTestingUtil; @@ -595,6 +596,8 @@ public void testTableUsingSequenceFileWithBinarySerde() throws Exception { public void testTableUsingSequenceFileWithTextSerde() throws Exception { KeyValueSet options = new KeyValueSet(); options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); + options.set(StorageConstants.TEXT_DELIMITER, "\u0001"); + options.set(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT); TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options); org.apache.tajo.catalog.Schema schema = SchemaBuilder.builder() @@ -622,6 +625,9 @@ public void testTableUsingSequenceFileWithTextSerde() throws Exception { } assertEquals(StorageConstants.DEFAULT_TEXT_SERDE, table1.getMeta().getProperty(StorageConstants.SEQUENCEFILE_SERDE)); + assertEquals("\u0001", StringEscapeUtils.unescapeJava(table1.getMeta().getProperty(StorageConstants + .TEXT_DELIMITER))); + assertEquals(NullDatum.DEFAULT_TEXT, table1.getMeta().getProperty(StorageConstants.TEXT_NULL)); store.dropTable(DB_NAME, REGION); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index 074d0abed3..42ada8bb32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -216,7 +216,7 @@ private static void setNullCharForTextSerializer(TableMeta meta, String nullChar } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) { meta.putProperty(StorageConstants.RCFILE_NULL, nullChar); } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) { - meta.putProperty(StorageConstants.SEQUENCEFILE_NULL, nullChar); + meta.putProperty(StorageConstants.TEXT_NULL, nullChar); } } @@ -233,7 +233,12 @@ public static boolean containsNullChar(TableMeta meta) { } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) { return meta.containsProperty(StorageConstants.RCFILE_NULL); } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) { - return meta.containsProperty(StorageConstants.SEQUENCEFILE_NULL); + if (!meta.containsProperty(StorageConstants.TEXT_NULL) + && meta.containsProperty(StorageConstants.SEQUENCEFILE_NULL)) { + return meta.containsProperty(StorageConstants.SEQUENCEFILE_NULL); + } else { + return meta.containsProperty(StorageConstants.TEXT_NULL); + } } else { return false; } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index 2bb3e21961..5d8aa83e64 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -83,17 +83,34 @@ public void init() throws IOException { this.fs = path.getFileSystem(conf); - this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getProperty(StorageConstants.SEQUENCEFILE_DELIMITER, + // Set value of non-deprecated key for backward compatibility. + if (!meta.containsProperty(StorageConstants.TEXT_DELIMITER) + && meta.containsProperty(StorageConstants.SEQUENCEFILE_DELIMITER)) { + this.delimiter = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.SEQUENCEFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); - this.columnNum = schema.size(); - String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getProperty(StorageConstants.SEQUENCEFILE_NULL, + } else { + this.delimiter = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + } + + String nullCharacters; + if (!meta.containsProperty(StorageConstants.TEXT_NULL) + && meta.containsProperty(StorageConstants.SEQUENCEFILE_NULL)) { + nullCharacters = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.SEQUENCEFILE_NULL, NullDatum.DEFAULT_TEXT)); + } else { + nullCharacters = StringEscapeUtils.unescapeJava(meta.getProperty(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + } + if (StringUtils.isEmpty(nullCharacters)) { nullChars = NullDatum.get().asTextBytes(); } else { nullChars = nullCharacters.getBytes(); } + this.columnNum = schema.size(); + if(this.meta.containsProperty(StorageConstants.COMPRESSION_CODEC)) { String codecName = this.meta.getProperty(StorageConstants.COMPRESSION_CODEC); codecFactory = new CompressionCodecFactory(conf); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java index 8f8f4bc151..491a9a897f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java @@ -32,6 +32,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.exception.TajoRuntimeException; import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.plan.expr.EvalNode; @@ -41,7 +42,6 @@ import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.storage.text.TextLineDeserializer; import org.apache.tajo.storage.text.TextLineParsingError; -import org.apache.tajo.util.KeyValueSet; import java.io.IOException; @@ -97,7 +97,28 @@ public void init() throws IOException { reader = new SequenceFile.Reader(fs, fragment.getPath(), conf); - String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getProperty(StorageConstants.SEQUENCEFILE_NULL, + // Set value of non-deprecated key for backward compatibility. + TableMeta tableMeta; + try { + tableMeta = (TableMeta) meta.clone(); + + if (!tableMeta.containsProperty(StorageConstants.TEXT_DELIMITER)) { + tableMeta.putProperty(StorageConstants.TEXT_DELIMITER, tableMeta.getProperty(StorageConstants + .SEQUENCEFILE_DELIMITER)); + } + + if (!tableMeta.containsProperty(StorageConstants.TEXT_NULL) && tableMeta.containsProperty(StorageConstants + .SEQUENCEFILE_NULL)) { + tableMeta.putProperty(StorageConstants.TEXT_NULL, tableMeta.getProperty(StorageConstants.SEQUENCEFILE_NULL)); + } + } catch (CloneNotSupportedException e) { + throw new TajoInternalError(e); + } + + String delim = tableMeta.getProperty(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0); + + String nullCharacters = StringEscapeUtils.unescapeJava(tableMeta.getProperty(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT)); if (StringUtils.isEmpty(nullCharacters)) { nullChars = NullDatum.get().asTextBytes(); @@ -105,17 +126,6 @@ public void init() throws IOException { nullChars = nullCharacters.getBytes(); } - - // Set value of non-deprecated key for compatibility. It will be used at CSVLineSerDe::getFieldDelimiter. - KeyValueSet keyValueSet = meta.getPropertySet(); - if (!keyValueSet.containsKey(StorageConstants.TEXT_DELIMITER) - && keyValueSet.containsKey(StorageConstants.SEQUENCEFILE_DELIMITER)) { - keyValueSet.set(StorageConstants.TEXT_DELIMITER, meta.getProperty(StorageConstants.SEQUENCEFILE_DELIMITER)); - } - - String delim = meta.getProperty(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); - this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0); - this.start = fragment.getStartKey(); this.end = start + fragment.getLength(); @@ -129,9 +139,6 @@ public void init() throws IOException { } outTuple = new VTuple(targets.length); - deserializer = DelimitedTextFile.getLineSerde(meta).createDeserializer(schema, meta, targets); - deserializer.init(); - fieldIsNull = new boolean[schema.getRootColumns().size()]; fieldStart = new int[schema.getRootColumns().size()]; fieldLength = new int[schema.getRootColumns().size()]; @@ -139,12 +146,16 @@ public void init() throws IOException { prepareProjection(targets); try { - String serdeClass = this.meta.getProperty(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + String serdeClass = tableMeta.getProperty(StorageConstants.SEQUENCEFILE_SERDE, + TextSerializerDeserializer.class.getName()); serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); serde.init(schema); if (serde instanceof BinarySerializerDeserializer) { hasBinarySerDe = true; + } else { + deserializer = DelimitedTextFile.getLineSerde(tableMeta).createDeserializer(schema, tableMeta, targets); + deserializer.init(); } Class keyClass = (Class)Class.forName(reader.getKeyClassName()); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java index 94a0ba0d09..36183b7ad6 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java @@ -43,13 +43,7 @@ public TextLineSerDe() { public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); public static ByteBuf getNullChars(TableMeta meta) { - byte[] nullCharByteArray; - if (meta.getDataFormat().equals(BuiltinStorages.SEQUENCE_FILE)) { - nullCharByteArray = getNullCharsAsBytes(meta, StorageConstants.SEQUENCEFILE_NULL, "\\"); - } else { - nullCharByteArray = getNullCharsAsBytes(meta); - } - + byte[] nullCharByteArray = getNullCharsAsBytes(meta); ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length); nullChars.writeBytes(nullCharByteArray); diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index ecd7ad518b..deb758d94d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -866,9 +866,8 @@ public void testSequenceFileTextSerializeDeserialize() throws IOException { assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); } - @Test - public void testSequenceFileTextSerializeDeserializeWithDelimiter() throws IOException { + public void testSequenceFileTextSerializeDeserializeWithDeprecatedProperties() throws IOException { if(!dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) return; Schema schema = SchemaBuilder.builder() @@ -886,8 +885,76 @@ public void testSequenceFileTextSerializeDeserializeWithDelimiter() throws IOExc TableMeta meta = CatalogUtil.newTableMeta(dataFormat, conf); meta.putProperty(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); meta.putProperty(StorageConstants.SEQUENCEFILE_DELIMITER, "\u0001"); + meta.putProperty(StorageConstants.SEQUENCEFILE_NULL, "\\"); + + Path tablePath = new Path(testDir, "testSequenceFileTextSerializeDeserializeWithDeprecatedProperties.data"); + FileTablespace sm = TablespaceManager.getLocalFs(); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.enableStats(); + appender.init(); + + QueryId queryid = new QueryId("12345", 5); + + VTuple tuple = new VTuple(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("jinho"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9f), + DatumFactory.createText("jinho"), + DatumFactory.createBlob("hyunsik babo".getBytes()), + ProtobufDatumFactory.createDatum(queryid.getProto()) + }); + appender.addTuple(tuple); + appender.flush(); + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); + scanner.init(); + + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + + Tuple retrieved; + while ((retrieved=scanner.next()) != null) { + for (int i = 0; i < tuple.size(); i++) { + assertEquals(tuple.get(i), retrieved.asDatum(i)); + } + } + scanner.close(); + assertEquals(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } + + @Test + public void testSequenceFileTextSerializeDeserializeWithNonDeprecatedProperties() throws IOException { + if(!dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) return; + + Schema schema = SchemaBuilder.builder() + .add("col1", Type.BOOLEAN) + .add("col2", CatalogUtil.newDataTypeWithLen(Type.CHAR, 7)) + .add("col3", Type.INT2) + .add("col4", Type.INT4) + .add("col5", Type.INT8) + .add("col6", Type.FLOAT4) + .add("col7", Type.FLOAT8) + .add("col8", Type.TEXT) + .add("col9", Type.BLOB) + .add("col10", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())).build(); + + TableMeta meta = CatalogUtil.newTableMeta(dataFormat, conf); + meta.putProperty(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName()); + meta.putProperty(StorageConstants.TEXT_DELIMITER, "\u0001"); + meta.putProperty(StorageConstants.TEXT_NULL, "\\"); - Path tablePath = new Path(testDir, "testSequenceFileTextSerializeDeserializeWithDelimiter.data"); + Path tablePath = new Path(testDir, "testSequenceFileTextSerializeDeserializeWithNonDeprecatedProperties.data"); FileTablespace sm = TablespaceManager.getLocalFs(); Appender appender = sm.getAppender(meta, schema, tablePath); appender.enableStats();