From 32d639a0d0c66cb06d439557e57b82b80eb6f952 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 15 Aug 2016 17:53:41 +0900 Subject: [PATCH 1/3] TAJO-2179: Add a regular expression scanner and appender. --- .../org/apache/tajo/catalog/CatalogUtil.java | 12 +- .../java/org/apache/tajo/BuiltinStorages.java | 1 + .../apache/tajo/storage/StorageConstants.java | 7 + .../src/main/resources/storage-default.xml | 14 +- .../storage/regex/RegexLineDeserializer.java | 167 ++++++++++++++ .../tajo/storage/regex/RegexLineSerDe.java | 60 ++++++ .../storage/regex/RegexLineSerializer.java | 142 ++++++++++++ .../tajo/storage/regex/TestRegexSerDe.java | 203 ++++++++++++++++++ .../dataset/TestRegexSerDe/access.log | 2 + .../src/test/resources/storage-default.xml | 12 +- 10 files changed, 612 insertions(+), 8 deletions(-) create mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineDeserializer.java create mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerDe.java create mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerializer.java create mode 100644 tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java create mode 100644 tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestRegexSerDe/access.log diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 7d844fac92..055581b04e 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -742,14 +742,16 @@ public static KeyValueSet newDefaultProperty(String dataFormat, TajoConf conf) { if (dataFormat.equalsIgnoreCase(BuiltinStorages.TEXT)) { options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); options.set(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT); - } else if (dataFormat.equalsIgnoreCase("JSON")) { - options.set(StorageConstants.TEXT_SERDE_CLASS, "org.apache.tajo.storage.json.JsonLineSerDe"); - } else if (dataFormat.equalsIgnoreCase("RCFILE")) { + } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.JSON)) { + options.set(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_JSON_SERDE_CLASS); + } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.REGEX)) { + options.set(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_REGEX_SERDE_CLASS); + } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) { options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); - } else if (dataFormat.equalsIgnoreCase("SEQUENCEFILE")) { + } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) { options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); - } else if (dataFormat.equalsIgnoreCase("PARQUET")) { + } else if (dataFormat.equalsIgnoreCase(BuiltinStorages.PARQUET)) { options.set(BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE); options.set(PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE); options.set(COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME); diff --git a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java index aa7a9e73f5..b99f0b1649 100644 --- a/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java +++ b/tajo-common/src/main/java/org/apache/tajo/BuiltinStorages.java @@ -21,6 +21,7 @@ public class BuiltinStorages { public static final String TEXT = "TEXT"; public static final String JSON = "JSON"; + public static final String REGEX = "REGEX"; public static final String RAW = "RAW"; public static final String DRAW = "DRAW"; public static final String RCFILE = "RCFILE"; diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index fc48baa741..a1df29ba41 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -50,6 +50,13 @@ public class StorageConstants { public static final String TEXT_SKIP_HEADER_LINE = "text.skip.headerlines"; + public static final String DEFAULT_JSON_SERDE_CLASS = "org.apache.tajo.storage.json.JsonLineSerDe"; + public static final String DEFAULT_REGEX_SERDE_CLASS = "org.apache.tajo.storage.regex.RegexLineSerDe"; + + public static final String TEXT_REGEX = "text.regex"; + public static final String TEXT_REGEX_CASE_INSENSITIVE = "text.regex.case.insensitive"; + public static final String TEXT_REGEX_OUTPUT_FORMAT_STRING = "text.regex.output.format.string"; + /** * It's the maximum number of parsing error torrence. * diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index ce0ce85393..c7c1d0c7da 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -39,7 +39,7 @@ tajo.storage.scanner-handler - text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json + text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase,ex_http_json @@ -87,6 +87,11 @@ org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner + + tajo.storage.scanner-handler.regex.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner + + tajo.storage.scanner-handler.raw.class org.apache.tajo.storage.RawFile$RawFileScanner @@ -140,7 +145,7 @@ tajo.storage.appender-handler - text,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase + text,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase,regex @@ -153,6 +158,11 @@ org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender + + tajo.storage.appender-handler.regex.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender + + tajo.storage.appender-handler.raw.class org.apache.tajo.storage.RawFile$RawFileAppender diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineDeserializer.java new file mode 100644 index 0000000000..b425a2d9d8 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineDeserializer.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.regex; + + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.exception.InvalidTablePropertyException; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.text.TextFieldSerializerDeserializer; +import org.apache.tajo.storage.text.TextLineDeserializer; +import org.apache.tajo.storage.text.TextLineParsingError; +import org.apache.tajo.storage.text.TextLineSerDe; + +import java.io.IOException; +import java.nio.charset.CharsetDecoder; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class RegexLineDeserializer extends TextLineDeserializer { + private static final Log LOG = LogFactory.getLog(RegexLineDeserializer.class); + + private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); + private FieldSerializerDeserializer fieldSerDer; + private ByteBuf nullChars; + + private int[] targetColumnIndexes; + private String inputRegex; + private Pattern inputPattern; + // Number of rows not matching the regex + private long unmatchedRows = 0; + private long nextUnmatchedRows = 1; + // Number of rows that match the regex but have missing groups. + private long partialMatchedRows = 0; + private long nextPartialMatchedRows = 1; + + public RegexLineDeserializer(Schema schema, TableMeta meta, Column[] projected) { + super(schema, meta); + targetColumnIndexes = PlannerUtil.getTargetIds(schema, projected); + } + + @Override + public void init() { + fieldSerDer = new TextFieldSerializerDeserializer(meta); + fieldSerDer.init(schema); + + // Read the configuration parameters + inputRegex = meta.getProperty(StorageConstants.TEXT_REGEX); + boolean inputRegexIgnoreCase = "true".equalsIgnoreCase( + meta.getProperty(StorageConstants.TEXT_REGEX_CASE_INSENSITIVE, "false")); + + // Parse the configuration parameters + if (inputRegex != null) { + inputPattern = Pattern.compile(inputRegex, Pattern.DOTALL + + (inputRegexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0)); + } else { + throw new TajoRuntimeException(new InvalidTablePropertyException(StorageConstants.TEXT_REGEX, + "This table does not have serde property \"" + StorageConstants.TEXT_REGEX + "\"!")); + } + + if (nullChars != null) { + nullChars.release(); + } + nullChars = TextLineSerDe.getNullChars(meta); + } + + + @Override + public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError { + + if (lineBuf == null || targetColumnIndexes.length == 0) { + return; + } + + String line = decoder.decode(lineBuf.nioBuffer(lineBuf.readerIndex(), lineBuf.readableBytes())).toString(); + int[] projection = targetColumnIndexes; + + // Projection + int currentTarget = 0; + int currentIndex = 0; + Matcher m = inputPattern.matcher(line); + + if (!m.matches()) { + unmatchedRows++; + if (unmatchedRows >= nextUnmatchedRows) { + nextUnmatchedRows *= 100; + // Report the row + LOG.warn("" + unmatchedRows + " unmatched rows are found: " + line); + } + } else { + + int groupCount = m.groupCount(); + int currentGroup = 1; + while (currentGroup <= groupCount) { + + if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { + + try { + Datum datum = fieldSerDer.deserialize( + currentIndex, lineBuf.setIndex(m.start(currentGroup), m.end(currentGroup)), nullChars); + + output.put(currentTarget, datum); + } catch (Exception e) { + partialMatchedRows++; + if (partialMatchedRows >= nextPartialMatchedRows) { + nextPartialMatchedRows *= 100; + // Report the row + LOG.warn("" + partialMatchedRows + " partially unmatched rows are found, " + + " cannot find group " + currentIndex + ": " + line); + } + output.put(currentTarget, NullDatum.get()); + } + currentTarget++; + } + + if (projection.length == currentTarget) { + break; + } + + currentIndex++; + currentGroup++; + } + } + + /* If a text row is less than table schema size, tuple should set to NullDatum */ + if (projection.length > currentTarget) { + for (; currentTarget < projection.length; currentTarget++) { + output.put(currentTarget, NullDatum.get()); + } + } + } + + @Override + public void release() { + if (nullChars != null) { + nullChars.release(); + nullChars = null; + } + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerDe.java new file mode 100644 index 0000000000..cda97e0e76 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerDe.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.regex; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.text.TextLineDeserializer; +import org.apache.tajo.storage.text.TextLineSerDe; +import org.apache.tajo.storage.text.TextLineSerializer; + + +/** + * This is an implementation copied from hive RegexSerDe + * + * RegexSerDe uses regular expression (regex) to serialize/deserialize. + * + * It can deserialize the data using regex and extracts groups as columns. It + * can also serialize the tuple using a format string. + * + * In deserialization stage, if a row does not match the regex, then all columns + * in the row will be NULL. If a row matches the regex but has less than + * expected groups, the missing groups will be NULL. If a row matches the regex + * but has more than expected groups, the additional groups are just ignored. + * + * In serialization stage, it uses java string formatter to format the columns + * into a row. If the output type of the column in a query is not a string, it + * will be automatically converted to String by tajo. + * + * For the format of the format String, please refer to + * {@link http://java.sun.com/j2se/1.5.0/docs/api/java/util/Formatter.html#syntax} + */ +public class RegexLineSerDe extends TextLineSerDe { + + @Override + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, Column [] projected) { + return new RegexLineDeserializer(schema, meta, projected); + } + + @Override + public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { + return new RegexLineSerializer(schema, meta); + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerializer.java new file mode 100644 index 0000000000..af6a8b23c7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/regex/RegexLineSerializer.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.regex; + + +import io.netty.util.CharsetUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TimestampDatum; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.exception.InvalidTablePropertyException; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.text.TextLineSerDe; +import org.apache.tajo.storage.text.TextLineSerializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.TimeZone; + +public class RegexLineSerializer extends TextLineSerializer { + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private String outputFormatString; + private TimeZone tableTimezone; + private int columnNum; + private String nullChars; + + public RegexLineSerializer(Schema schema, TableMeta meta) { + super(schema, meta); + } + + @Override + public void init() { + // Read the configuration parameters + outputFormatString = meta.getProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING); + + if (outputFormatString == null) { + throw new TajoRuntimeException(new InvalidTablePropertyException(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING, + "Cannot write data into table because \"" + StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING + "\"" + + " is not specified in serde properties of the table.")); + } + + tableTimezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE, + StorageUtil.TAJO_CONF.getSystemTimezone().getID())); + nullChars = new String(TextLineSerDe.getNullCharsAsBytes(meta), CharsetUtil.UTF_8); + columnNum = schema.size(); + } + + @Override + public int serialize(OutputStream out, Tuple input) throws IOException { + + String[] values = new String[columnNum]; + + for (int i = 0; i < columnNum; i++) { + values[i] = convertToString(i, input, nullChars); + } + + byte[] bytes = String.format(outputFormatString, values).getBytes(CharsetUtil.UTF_8); + out.write(bytes); + return bytes.length; + } + + + private String convertToString(int columnIndex, Tuple tuple, String nullChars) + throws IOException { + + Column col = schema.getColumn(columnIndex); + TajoDataTypes.DataType dataType = col.getDataType(); + + if (tuple.isBlankOrNull(columnIndex)) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + return nullChars; + default: + return StringUtils.EMPTY; + } + } + + switch (dataType.getType()) { + case BOOLEAN: + return tuple.getBool(columnIndex) ? "true" : "false"; + case CHAR: + int size = dataType.getLength() - tuple.size(columnIndex); + if (size < 0) { + throw new ValueTooLongForTypeCharactersException(dataType.getLength()); + } + + return StringUtils.rightPad(tuple.getText(columnIndex), size, ""); + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case DATE: + case INTERVAL: + case TIME: + return tuple.getText(columnIndex); + case TIMESTAMP: + // UTC to table timezone + return TimestampDatum.asChars(tuple.getTimeDate(columnIndex), tableTimezone, false); + case BLOB: + return Base64.encodeBase64String(tuple.getBytes(columnIndex)); + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) tuple.getProtobufDatum(columnIndex); + return protobufJsonFormat.printToString(protobuf.get()); + case NULL_TYPE: + default: + return StringUtils.EMPTY; + } + } + + @Override + public void release() { + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java new file mode 100644 index 0000000000..9cb7b03589 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.regex; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaBuilder; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.schema.Field; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; + +import static org.apache.tajo.schema.QualifiedIdentifier.$; +import static org.apache.tajo.type.Type.Text; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +public class TestRegexSerDe { + private Schema schema; + private Tuple[] rows; + private Path testDir; + private String apacheWeblogPattern; + + @Before + public void setup() throws IOException { + apacheWeblogPattern = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\[[^\\]]*\\]) ([^ \"]*|\"[^\"]*\") " + + "(-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?"; + + Field f1 = Field.Field($("host"), Text); + Field f2 = Field.Field($("identity"), Text); + Field f3 = Field.Field($("user"), Text); + Field f4 = Field.Field($("time"), Text); + Field f5 = Field.Field($("request"), Text); + Field f6 = Field.Field($("status"), Text); + Field f7 = Field.Field($("size"), Text); + Field f8 = Field.Field($("referer"), Text); + Field f9 = Field.Field($("agent"), Text); + + schema = SchemaBuilder.builder().addAll2( + org.apache.tajo.schema.Schema.Schema(f1, f2, f3, f4, f5, f6, f7, f8, f9)).build(); + + rows = new VTuple[]{new VTuple(new Datum[]{ + DatumFactory.createText("127.0.0.1"), + DatumFactory.createText("-"), + DatumFactory.createText("frank"), + DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"), + DatumFactory.createText("\"GET /apache_pb.gif HTTP/1.0\""), + DatumFactory.createText("200"), + DatumFactory.createText("2326"), + NullDatum.get(), + NullDatum.get(), + }), new VTuple(new Datum[]{ + DatumFactory.createText("127.0.0.1"), + DatumFactory.createText("-"), + DatumFactory.createText("frank"), + DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"), + DatumFactory.createText("\"GET /apache_pb.gif HTTP/1.0\""), + DatumFactory.createText("200"), + DatumFactory.createText("2326"), + DatumFactory.createText("-"), + DatumFactory.createText("\"Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/525.19 " + + "(KHTML, like Gecko) Chrome/1.0.154.65 Safari/525.19\""), + })}; + + final String TEST_PATH = "target/test-data/TestStorages"; + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + } + + @After + public void tearDown() throws IOException { + FileSystem.getLocal(new Configuration()).delete(testDir, true); + } + + public static Path getResourcePath(String path, String suffix) { + URL resultBaseURL = ClassLoader.getSystemResource(path); + return new Path(resultBaseURL.toString(), suffix); + } + + @Test + public void testApacheAccessLogScanner() throws IOException { + TajoConf conf = new TajoConf(); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf); + Path tablePath = new Path(getResourcePath("dataset", "TestRegexSerDe"), "access.log"); + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, null); + meta.putProperty(StorageConstants.TEXT_REGEX, apacheWeblogPattern); + scanner.init(); + + Tuple tuple = scanner.next(); + assertEquals(rows[0], tuple); + + assertNotNull(tuple = scanner.next()); + assertEquals(rows[1], tuple); + + scanner.close(); + } + + @Test + public void testProjection() throws IOException { + Schema target = SchemaBuilder.builder() + .add("time", TajoDataTypes.Type.TEXT) + .add("status", TajoDataTypes.Type.TEXT) + .build(); + + Tuple[] rows = new VTuple[]{new VTuple(new Datum[]{ + DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"), + DatumFactory.createText("200") + }), new VTuple(new Datum[]{ + DatumFactory.createText("[10/Oct/2000:13:55:36 -0700]"), + DatumFactory.createText("200") + })}; + + TajoConf conf = new TajoConf(); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf); + Path tablePath = new Path(getResourcePath("dataset", "TestRegexSerDe"), "access.log"); + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target); + meta.putProperty(StorageConstants.TEXT_REGEX, apacheWeblogPattern); + scanner.init(); + + Tuple tuple = scanner.next(); + assertEquals(2, tuple.size()); + assertEquals(rows[0], tuple); + + assertNotNull(tuple = scanner.next()); + assertEquals(2, tuple.size()); + assertEquals(rows[1], tuple); + + scanner.close(); + } + + @Test + public void testSerializer() throws IOException { + TajoConf conf = new TajoConf(); + + TableMeta meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf); + meta.putProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING, "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s"); + + FileTablespace sm = TablespaceManager.getLocalFs(); + Path tablePath = new Path(testDir, "testSerializer.data"); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + appender.addTuple(rows[0]); + appender.addTuple(rows[1]); + appender.close(); + + FileStatus status = tablePath.getFileSystem(conf).getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + + meta = CatalogUtil.newTableMeta(BuiltinStorages.REGEX, conf); + meta.putProperty(StorageConstants.TEXT_REGEX, apacheWeblogPattern); + Scanner scanner = sm.getScanner(meta, schema, fragment, null); + scanner.init(); + + Tuple tuple = scanner.next(); + assertEquals(rows[0], tuple); + assertNotNull(tuple = scanner.next()); + assertEquals(rows[1], tuple); + assertNull(scanner.next()); + scanner.close(); + } +} diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestRegexSerDe/access.log b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestRegexSerDe/access.log new file mode 100644 index 0000000000..1e9106f624 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/dataset/TestRegexSerDe/access.log @@ -0,0 +1,2 @@ +127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 +127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 - "Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/525.19 (KHTML, like Gecko) Chrome/1.0.154.65 Safari/525.19" diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index 7ae58aaf84..d2aefd9823 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -34,7 +34,7 @@ tajo.storage.scanner-handler - text,json,raw,draw,rcfile,row,parquet,orc,sequencefile,avro + text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro @@ -74,6 +74,11 @@ org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner + + tajo.storage.scanner-handler.regex.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner + + tajo.storage.scanner-handler.raw.class org.apache.tajo.storage.RawFile$RawFileScanner @@ -130,6 +135,11 @@ org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender + + tajo.storage.appender-handler.regex.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender + + tajo.storage.appender-handler.raw.class org.apache.tajo.storage.RawFile$RawFileAppender From 6f756e0cc2df8eb2770c2cede9add67240144ebe Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 15 Aug 2016 18:05:15 +0900 Subject: [PATCH 2/3] add missing properties --- .../tajo-storage-common/src/main/resources/storage-default.xml | 2 +- .../test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java | 2 +- .../tajo-storage-hdfs/src/test/resources/storage-default.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index c7c1d0c7da..c6b1d6ad54 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -145,7 +145,7 @@ tajo.storage.appender-handler - text,raw,rcfile,row,parquet,orc,sequencefile,avro,hbase,regex + text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro,hbase diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java index 9cb7b03589..a6b814ff11 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/regex/TestRegexSerDe.java @@ -96,7 +96,7 @@ public void setup() throws IOException { "(KHTML, like Gecko) Chrome/1.0.154.65 Safari/525.19\""), })}; - final String TEST_PATH = "target/test-data/TestStorages"; + final String TEST_PATH = "target/test-data/TestRegexSerDe"; testDir = CommonTestingUtil.getTestDir(TEST_PATH); } diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index d2aefd9823..fdf9ebce99 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -122,7 +122,7 @@ tajo.storage.appender-handler - text,raw,draw,rcfile,row,parquet,orc,sequencefile,avro + text,json,regex,raw,draw,rcfile,row,parquet,orc,sequencefile,avro From e87c852c54831bd5f30040700125a8bafb73a582 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 15 Aug 2016 18:55:54 +0900 Subject: [PATCH 3/3] add support hivemeta --- .../tajo/catalog/store/HiveCatalogStore.java | 26 +++++++++++++++ .../tajo/catalog/store/HiveCatalogUtil.java | 3 ++ .../catalog/store/TestHiveCatalogStore.java | 33 +++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index 9d6b369c82..8c4dc9e33f 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; import org.apache.hadoop.hive.ql.io.StorageFormatFactory; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.RegexSerDe; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; @@ -228,6 +229,12 @@ public final CatalogProtos.TableDescProto getTable(String databaseName, final St options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); } + } else if (BuiltinStorages.REGEX.equals(dataFormat)) { + options.set(StorageConstants.TEXT_REGEX, properties.getProperty(RegexSerDe.INPUT_REGEX)); + options.set(StorageConstants.TEXT_REGEX_CASE_INSENSITIVE, + properties.getProperty(RegexSerDe.INPUT_REGEX_CASE_SENSITIVE)); + options.set(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING, + properties.getProperty("output.format.string")); } // set data size @@ -569,6 +576,25 @@ public final void createTable(final CatalogProtos.TableDescProto tableDescProto) table.putToParameters(OrcConf.COMPRESS.getAttribute(), tableDesc.getMeta().getProperty(OrcConf.COMPRESS.getAttribute())); } + } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.REGEX)) { + + sd.setInputFormat(TextInputFormat.class.getName()); + sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getName()); + sd.getSerdeInfo().setSerializationLib(RegexSerDe.class.getName()); + + if (tableDesc.getMeta().containsProperty(StorageConstants.TEXT_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getProperty(StorageConstants.TEXT_NULL))); + table.getParameters().remove(StorageConstants.TEXT_NULL); + } + + sd.getSerdeInfo().putToParameters(RegexSerDe.INPUT_REGEX, + tableDesc.getMeta().getProperty(StorageConstants.TEXT_REGEX)); + sd.getSerdeInfo().putToParameters(RegexSerDe.INPUT_REGEX_CASE_SENSITIVE, + tableDesc.getMeta().getProperty(StorageConstants.TEXT_REGEX_CASE_INSENSITIVE, "false")); + sd.getSerdeInfo().putToParameters("output.format.string", + tableDesc.getMeta().getProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING)); + } else { throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore"); } diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java index faefd2808e..9cb665eeb5 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.RegexSerDe; import org.apache.hadoop.hive.serde2.avro.AvroSerDe; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; @@ -142,6 +143,8 @@ public static String getDataFormat(StorageDescriptor descriptor) { return BuiltinStorages.AVRO; } else if (OrcSerde.class.getName().equals(serde)) { return BuiltinStorages.ORC; + } else if (RegexSerDe.class.getName().equals(serde)) { + return BuiltinStorages.REGEX; } else { throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat)); } diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index d1be35b8b6..5c83423ccf 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; import org.apache.hadoop.hive.ql.io.StorageFormatFactory; +import org.apache.hadoop.hive.serde2.RegexSerDe; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.tajo.BuiltinStorages; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionDesc; @@ -705,4 +707,35 @@ public void testDataTypeCompatibility() throws Exception { table1.getMeta().getProperty(StorageConstants.TEXT_DELIMITER)); store.dropTable(DB_NAME, tableName); } + + @Test + public void testTableUsingRegex() throws Exception { + TableMeta meta = new TableMeta(BuiltinStorages.REGEX, new KeyValueSet()); + meta.putProperty(StorageConstants.TEXT_REGEX, "([^ ]*)"); + meta.putProperty(StorageConstants.TEXT_REGEX_OUTPUT_FORMAT_STRING, "%1$s"); + + org.apache.tajo.catalog.Schema schema = SchemaBuilder.builder() + .add("c_custkey", TajoDataTypes.Type.TEXT) + .build(); + + TableDesc table = new TableDesc(IdentifierUtil.buildFQName(DB_NAME, CUSTOMER), schema, meta, + new Path(warehousePath, new Path(DB_NAME, CUSTOMER)).toUri()); + store.createTable(table.getProto()); + assertTrue(store.existTable(DB_NAME, CUSTOMER)); + + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER); + assertEquals(TextInputFormat.class.getName(), hiveTable.getSd().getInputFormat()); + assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat()); + assertEquals(RegexSerDe.class.getName(), hiveTable.getSerializationLib()); + + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER)); + assertEquals(table.getName(), table1.getName()); + assertEquals(table.getUri(), table1.getUri()); + assertEquals(table.getSchema().size(), table1.getSchema().size()); + for (int i = 0; i < table.getSchema().size(); i++) { + assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); + } + + store.dropTable(DB_NAME, CUSTOMER); + } }