From 0c9b4bae33fe774472f4af8e9dc3e37c5f64b263 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Wed, 11 Mar 2015 08:49:31 +0900 Subject: [PATCH] TAJO-1381 Support multi-bytes delimiter for Text file --- .../tajo/engine/query/TestSelectQuery.java | 24 ++++++++++ .../multibytes_delimiter_table3_ddl.sql | 3 ++ .../multibytes_delimiter_table4_ddl.sql | 3 ++ .../testMultiBytesDelimiter3.sql | 1 + .../testMultiBytesDelimiter4.sql | 1 + .../testMultiBytesDelimiter3.result | 7 +++ .../testMultiBytesDelimiter4.result | 7 +++ .../storage/text/CSVLineDeserializer.java | 14 ++++-- .../tajo/storage/text/CSVLineSerDe.java | 4 +- .../tajo/storage/text/CSVLineSerializer.java | 8 ++-- .../tajo/storage/text/DelimitedTextFile.java | 2 +- .../storage/text/FieldSplitProcessor.java | 8 +--- .../text/MultiBytesFieldSplitProcessor.java | 45 +++++++++++++++++++ .../storage/text/TextLineDeserializer.java | 6 +-- .../tajo/storage/TestSplitProcessor.java | 38 ++++++++++++++-- 15 files changed, 149 insertions(+), 22 deletions(-) create mode 100644 tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql create mode 100644 tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql create mode 100644 tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql create mode 100644 tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql create mode 100644 tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result create mode 100644 tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result create mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index dd93dd14df..f7b1382db7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -659,4 +659,28 @@ public void testMultiBytesDelimiter2() throws Exception { executeString("DROP TABLE table2"); } } + + @Test + public void testMultiBytesDelimiter3() throws Exception { + executeDDL("multibytes_delimiter_table3_ddl.sql", "multibytes_delimiter1"); + try { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE table1"); + } + } + + @Test + public void testMultiBytesDelimiter4() throws Exception { + executeDDL("multibytes_delimiter_table4_ddl.sql", "multibytes_delimiter2"); + try { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } finally { + executeString("DROP TABLE table2"); + } + } } \ No newline at end of file diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql new file mode 100644 index 0000000000..8309d119ef --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table3_ddl.sql @@ -0,0 +1,3 @@ +create external table table1 (id int, name text, score float, type text) using text +with ('text.delimiter'='||', 'text.null'='NULL') location ${table.path}; + diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql new file mode 100644 index 0000000000..2fb821aba5 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/multibytes_delimiter_table4_ddl.sql @@ -0,0 +1,3 @@ +create external table table2 (id int, name text, score float, type text) using text +with ('text.delimiter'='ㅎ', 'text.null'='NULL') location ${table.path}; + diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql new file mode 100644 index 0000000000..bd6b02daaf --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter3.sql @@ -0,0 +1 @@ +select * from table1; \ No newline at end of file diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql new file mode 100644 index 0000000000..66a69ec071 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testMultiBytesDelimiter4.sql @@ -0,0 +1 @@ +select * from table2; \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result new file mode 100644 index 0000000000..d8d43b1cd7 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter3.result @@ -0,0 +1,7 @@ +id,name,score,type +------------------------------- +1,ooo,1.1,a +2,ppp,2.3, +3,qqq,null, +4,,4.5, +null,xxx,5.6,e \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result new file mode 100644 index 0000000000..d8d43b1cd7 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testMultiBytesDelimiter4.result @@ -0,0 +1,7 @@ +id,name,score,type +------------------------------- +1,ooo,1.1,a +2,ppp,2.3, +3,qqq,null, +4,,4.5, +null,xxx,5.6,e \ No newline at end of file diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index 1599f62ad4..6a8c7a9239 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage.text; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufProcessor; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.Datum; @@ -28,9 +29,10 @@ import java.io.IOException; public class CSVLineDeserializer extends TextLineDeserializer { - private FieldSplitProcessor processor; + private ByteBufProcessor processor; private FieldSerializerDeserializer fieldSerDer; private ByteBuf nullChars; + private int delimiterCompensation; public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { super(schema, meta, targetColumnIndexes); @@ -38,7 +40,13 @@ public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnInde @Override public void init() { - this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta)); + byte[] delimiter = CSVLineSerDe.getFieldDelimiter(meta); + if (delimiter.length == 1) { + this.processor = new FieldSplitProcessor(delimiter[0]); + } else { + this.processor = new MultiBytesFieldSplitProcessor(delimiter); + } + this.delimiterCompensation = delimiter.length - 1; if (nullChars != null) { nullChars.release(); @@ -67,7 +75,7 @@ public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, if (end < 0) { fieldLength = rowLength - start; } else { - fieldLength = end - start; + fieldLength = end - start - delimiterCompensation; } if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java index 2fe7f239fe..fa7d3372e7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java @@ -34,8 +34,8 @@ public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { return new CSVLineSerializer(schema, meta); } - public static char getFieldDelimiter(TableMeta meta) { + public static byte[] getFieldDelimiter(TableMeta meta) { return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER, - StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + StorageConstants.DEFAULT_FIELD_DELIMITER)).getBytes(); } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java index 53a0ef3b37..9a2fe3745b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java @@ -30,8 +30,8 @@ public class CSVLineSerializer extends TextLineSerializer { private FieldSerializerDeserializer serde; - private byte [] nullChars; - private char delimiter; + private byte[] nullChars; + private byte[] delimiter; private int columnNum; public CSVLineSerializer(Schema schema, TableMeta meta) { @@ -56,8 +56,8 @@ public int serialize(OutputStream out, Tuple input) throws IOException { writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); if (columnNum - 1 > i) { - out.write((byte) delimiter); - writtenBytes += 1; + out.write(delimiter); + writtenBytes += delimiter.length; } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index ebf9608e1d..4c9234e7af 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -391,7 +391,7 @@ public Tuple next() throws IOException { try { deserializer.deserialize(buf, tuple); - // if a line is read normaly, it exists this loop. + // if a line is read normally, it exists this loop. break; } catch (TextLineParsingError tae) { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java index a5ac142f67..862b5ae155 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java @@ -21,9 +21,9 @@ import io.netty.buffer.ByteBufProcessor; public class FieldSplitProcessor implements ByteBufProcessor { - private char delimiter; //the ascii separate character + private byte delimiter; //the ascii separate character - public FieldSplitProcessor(char recordDelimiterByte) { + public FieldSplitProcessor(byte recordDelimiterByte) { this.delimiter = recordDelimiterByte; } @@ -31,8 +31,4 @@ public FieldSplitProcessor(char recordDelimiterByte) { public boolean process(byte value) throws Exception { return delimiter != value; } - - public char getDelimiter() { - return delimiter; - } } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java new file mode 100644 index 0000000000..b97d7c6259 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/MultiBytesFieldSplitProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class MultiBytesFieldSplitProcessor implements ByteBufProcessor { + + private int index; + private final byte[] delimiter; + + public MultiBytesFieldSplitProcessor(byte[] recordDelimiterByte) { + this.delimiter = recordDelimiterByte; + } + + @Override + public boolean process(byte value) throws Exception { + if (delimiter[index] != value) { + index = 0; + return true; + } + if (index != delimiter.length - 1) { + index++; + return true; + } + index = 0; + return false; + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java index 7ebfa79ce1..89a7de9032 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java @@ -29,9 +29,9 @@ * Reads a text line and fills a Tuple with values */ public abstract class TextLineDeserializer { - protected Schema schema; - protected TableMeta meta; - protected int [] targetColumnIndexes; + protected final Schema schema; + protected final TableMeta meta; + protected final int[] targetColumnIndexes; public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) { this.schema = schema; diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java index 12ea5510f2..2174d62396 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java @@ -19,10 +19,12 @@ package org.apache.tajo.storage; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufProcessor; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import org.apache.tajo.storage.text.FieldSplitProcessor; import org.apache.tajo.storage.text.LineSplitProcessor; +import org.apache.tajo.storage.text.MultiBytesFieldSplitProcessor; import org.junit.Test; import java.io.IOException; @@ -35,17 +37,47 @@ public class TestSplitProcessor { @Test public void testFieldSplitProcessor() throws IOException { - String data = "abc||de"; + String data = "abc||de|"; final ByteBuf buf = releaseLater( Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); final int len = buf.readableBytes(); - FieldSplitProcessor processor = new FieldSplitProcessor('|'); + FieldSplitProcessor processor = new FieldSplitProcessor((byte)'|'); assertEquals(3, buf.forEachByte(0, len, processor)); assertEquals(4, buf.forEachByte(4, len - 4, processor)); - assertEquals(-1, buf.forEachByte(5, len - 5, processor)); + assertEquals(7, buf.forEachByte(5, len - 5, processor)); + assertEquals(-1, buf.forEachByte(8, len - 8, processor)); + } + + @Test + public void testMultiCharFieldSplitProcessor1() throws IOException { + String data = "abc||||de||"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); + + final int len = buf.readableBytes(); + ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("||".getBytes()); + + assertEquals(4, buf.forEachByte(0, len, processor)); + assertEquals(6, buf.forEachByte(5, len - 5, processor)); + assertEquals(10, buf.forEachByte(7, len - 7, processor)); + assertEquals(-1, buf.forEachByte(11, len - 11, processor)); + } + + @Test + public void testMultiCharFieldSplitProcessor2() throws IOException { + String data = "abcㅎㅎdeㅎ"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.UTF_8)); + + final int len = buf.readableBytes(); + ByteBufProcessor processor = new MultiBytesFieldSplitProcessor("ㅎ".getBytes()); + assertEquals(5, buf.forEachByte(0, len, processor)); + assertEquals(8, buf.forEachByte(6, len - 6, processor)); + assertEquals(13, buf.forEachByte(9, len - 9, processor)); + assertEquals(-1, buf.forEachByte(14, len - 14, processor)); } @Test