From fe48f9b5dff8a9e3e5f720fa6dc19f11337359a3 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 28 Nov 2014 02:56:59 +0900 Subject: [PATCH 1/5] TAJO-1095: Implement Json file scanner. --- .../org/apache/tajo/catalog/CatalogUtil.java | 9 +- .../org/apache/tajo/catalog/SchemaUtil.java | 8 + .../src/main/proto/CatalogProtos.proto | 1 + .../apache/tajo/storage/StorageConstants.java | 2 +- .../java/org/apache/tajo/storage/VTuple.java | 4 +- tajo-storage/pom.xml | 6 + .../storage/json/JsonLineDeserializer.java | 217 ++++++++++++++++++ .../tajo/storage/json/JsonLineSerDe.java | 37 +++ .../tajo/storage/json/JsonLineSerializer.java | 131 +++++++++++ .../storage/text/CSVLineDeserializer.java | 4 +- .../tajo/storage/text/CSVLineSerDe.java | 4 - .../tajo/storage/text/CSVLineSerializer.java | 15 +- .../tajo/storage/text/DelimitedTextFile.java | 8 +- .../text/TextFieldSerializerDeserializer.java | 2 +- .../storage/text/TextLineDeserializer.java | 4 +- .../src/main/resources/storage-default.xml | 16 +- .../org/apache/tajo/storage/TestStorages.java | 105 +++++---- .../tajo/storage/json/TestJsonSerDe.java | 104 +++++++++ .../TestJsonSerDe/testVariousType.json | 1 + .../src/test/resources/storage-default.xml | 16 +- .../src/test/resources/testVariousTypes.avsc | 19 +- 21 files changed, 632 insertions(+), 81 deletions(-) create mode 100644 tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java create mode 100644 tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java create mode 100644 tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java create mode 100644 tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java create mode 100644 tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 737c9ae94e..f2d9b9cad6 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -31,9 +31,11 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.exception.InvalidOperationException; import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.TUtil; +import org.mortbay.util.ajax.JSON; import java.sql.Connection; import java.sql.ResultSet; @@ -278,13 +280,16 @@ public static StoreType getStoreType(final String typeStr) { return StoreType.AVRO; } else if (typeStr.equalsIgnoreCase(StoreType.TEXTFILE.name())) { return StoreType.TEXTFILE; + } else if (typeStr.equalsIgnoreCase(StoreType.JSON.name())) { + return StoreType.JSON; } else { return null; } } public static TableMeta newTableMeta(StoreType type) { - return new TableMeta(type, new KeyValueSet()); + KeyValueSet defaultProperties = CatalogUtil.newPhysicalProperties(type); + return new TableMeta(type, defaultProperties); } public static TableMeta newTableMeta(StoreType type, KeyValueSet options) { @@ -821,6 +826,8 @@ public static KeyValueSet newPhysicalProperties(StoreType type) { KeyValueSet options = new KeyValueSet(); if (StoreType.CSV == type || StoreType.TEXTFILE == type) { options.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + } else if (StoreType.JSON == type) { + options.set(StorageConstants.TEXT_SERDE_CLASS, "org.apache.tajo.storage.json.JsonLineSerDe"); } else if (StoreType.RCFILE == type) { options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); } else if (StoreType.SEQUENCEFILE == type) { diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java index ee670efa7b..23ebe1be88 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java @@ -100,4 +100,12 @@ public static T clone(Schema schema) { } return types; } + + public static String [] toSimpleNames(Schema schema) { + String [] names = new String[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + names[i] = schema.getColumn(i).getSimpleName(); + } + return names; + } } diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 99f594a52e..f29bc6c379 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -36,6 +36,7 @@ enum StoreType { SEQUENCEFILE = 8; AVRO = 9; TEXTFILE = 10; + JSON = 11; } enum OrderType { diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index 3065d31eee..a3d8de0b98 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -33,7 +33,7 @@ public class StorageConstants { public static final String TEXT_DELIMITER = "text.delimiter"; public static final String TEXT_NULL = "text.null"; public static final String TEXT_SERDE_CLASS = "text.serde.class"; - public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerde"; + public static final String DEFAULT_TEXT_SERDE_CLASS = "org.apache.tajo.storage.text.CSVLineSerDe"; @Deprecated public static final String SEQUENCEFILE_DELIMITER = "sequencefile.delimiter"; diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java index 6304734f56..5e839b7e58 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/VTuple.java @@ -56,7 +56,7 @@ public boolean contains(int fieldId) { @Override public boolean isNull(int fieldid) { - return values[fieldid].isNull(); + return values[fieldid] == null || values[fieldid].isNull(); } @Override @@ -93,7 +93,7 @@ public void put(int fieldId, Tuple tuple) { } public void put(Datum [] values) { - System.arraycopy(values, 0, this.values, 0, size()); + System.arraycopy(values, 0, this.values, 0, values.length); } ////////////////////////////////////////////////////// diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index c6877c4652..ef26a32971 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -72,6 +72,7 @@ src/test/resources/testVariousTypes.avsc + src/test/resources/dataset/TestJsonSerDe/*.json @@ -313,6 +314,11 @@ io.netty netty-buffer + + net.minidev + json-smart + 2.0 + diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java new file mode 100644 index 0000000000..720c812d40 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + + +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import net.minidev.json.JSONArray; +import net.minidev.json.JSONObject; +import net.minidev.json.parser.JSONParser; +import net.minidev.json.parser.ParseException; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.text.TextLineDeserializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Iterator; + +public class JsonLineDeserializer extends TextLineDeserializer { + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + + private JSONParser parser; + private Type [] types; + private String [] columnNames; + + public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + super(schema, meta, targetColumnIndexes); + } + + @Override + public void init() { + types = SchemaUtil.toTypes(schema); + columnNames = SchemaUtil.toSimpleNames(schema); + + parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE); + } + + @Override + public void deserialize(ByteBuf buf, Tuple output) throws IOException { + byte [] line = new byte[buf.capacity()]; + buf.getBytes(0, line); + + try { + JSONObject object = (JSONObject) parser.parse(line); + + for (int i = 0; i < targetColumnIndexes.length; i++) { + int actualIdx = targetColumnIndexes[i]; + String fieldName = columnNames[actualIdx]; + + switch (types[actualIdx]) { + case BOOLEAN: + String boolStr = object.getAsString(fieldName); + if (boolStr != null) { + output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true"))); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case CHAR: + String charStr = object.getAsString(fieldName); + if (charStr != null) { + output.put(actualIdx, DatumFactory.createChar(charStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT1: + case INT2: + Number int2Num = object.getAsNumber(fieldName); + if (int2Num != null) { + output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT4: + Number int4Num = object.getAsNumber(fieldName); + if (int4Num != null) { + output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case INT8: + Number int8Num = object.getAsNumber(fieldName); + if (int8Num != null) { + output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case FLOAT4: + Number float4Num = object.getAsNumber(fieldName); + if (float4Num != null) { + output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case FLOAT8: + Number float8Num = object.getAsNumber(fieldName); + if (float8Num != null) { + output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue())); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TEXT: + String textStr = object.getAsString(fieldName); + if (textStr != null) { + output.put(actualIdx, DatumFactory.createText(textStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TIMESTAMP: + String timestampStr = object.getAsString(fieldName); + if (timestampStr != null) { + output.put(actualIdx, DatumFactory.createTimestamp(timestampStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case TIME: + String timeStr = object.getAsString(fieldName); + if (timeStr != null) { + output.put(actualIdx, DatumFactory.createTime(timeStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case DATE: + String dateStr = object.getAsString(fieldName); + if (dateStr != null) { + output.put(actualIdx, DatumFactory.createDate(dateStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + case BIT: + case BINARY: + case VARBINARY: + case BLOB: { + JSONArray jsonArray = (JSONArray) object.get(fieldName); + if (jsonArray == null) { + output.put(actualIdx, NullDatum.get()); + break; + } + + byte[] bytes = new byte[jsonArray.size()]; + Iterator it = jsonArray.iterator(); + int arrayIdx = 0; + while (it.hasNext()) { + bytes[arrayIdx++] = ((Long) it.next()).byteValue(); + } + if (bytes.length > 0) { + output.put(actualIdx, DatumFactory.createBlob(bytes)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + } + case INET4: + String inetStr = object.getAsString(fieldName); + if (inetStr != null) { + output.put(actualIdx, DatumFactory.createInet4(inetStr)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; + + case NULL_TYPE: + output.put(actualIdx, NullDatum.get()); + break; + + default: + throw new NotImplementedException(types[actualIdx].name() + " is not supported."); + } + } + + } catch (Throwable e) { + throw new IOException(e); + } + } + + @Override + public void release() { + } +} diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java new file mode 100644 index 0000000000..6db2c2994a --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.text.TextLineDeserializer; +import org.apache.tajo.storage.text.TextLineSerDe; +import org.apache.tajo.storage.text.TextLineSerializer; + +public class JsonLineSerDe extends TextLineSerDe { + @Override + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + return new JsonLineDeserializer(schema, meta, targetColumnIndexes); + } + + @Override + public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { + return new JsonLineSerializer(schema, meta); + } +} diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java new file mode 100644 index 0000000000..4276f4a967 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + + +import net.minidev.json.JSONObject; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.text.TextLineSerDe; +import org.apache.tajo.storage.text.TextLineSerializer; + +import java.io.IOException; +import java.io.OutputStream; + +public class JsonLineSerializer extends TextLineSerializer { + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + + private Type [] types; + private String [] simpleNames; + private int columnNum; + + + public JsonLineSerializer(Schema schema, TableMeta meta) { + super(schema, meta); + } + + @Override + public void init() { + types = SchemaUtil.toTypes(schema); + simpleNames = SchemaUtil.toSimpleNames(schema); + columnNum = schema.size(); + } + + @Override + public int serialize(OutputStream out, Tuple input) throws IOException { + JSONObject jsonObject = new JSONObject(); + + for (int i = 0; i < columnNum; i++) { + if (input.isNull(i)) { + continue; + } + + String fieldName = simpleNames[i]; + Type type = types[i]; + + switch (type) { + + case BOOLEAN: + jsonObject.put(fieldName, input.getBool(i)); + break; + + case INT1: + case INT2: + jsonObject.put(fieldName, input.getInt2(i)); + break; + + case INT4: + jsonObject.put(fieldName, input.getInt4(i)); + break; + + case INT8: + jsonObject.put(fieldName, input.getInt8(i)); + break; + + case FLOAT4: + jsonObject.put(fieldName, input.getFloat4(i)); + break; + + case FLOAT8: + jsonObject.put(fieldName, input.getFloat8(i)); + break; + + case CHAR: + case TEXT: + case VARCHAR: + case INET4: + case TIMESTAMP: + case DATE: + case TIME: + case INTERVAL: + jsonObject.put(fieldName, input.getText(i)); + break; + + case BIT: + case BINARY: + case BLOB: + case VARBINARY: + jsonObject.put(fieldName, input.getBytes(i)); + break; + + case NULL_TYPE: + break; + + default: + throw new NotImplementedException(types[i].name() + " is not supported."); + } + } + + String jsonStr = jsonObject.toJSONString(); + byte [] jsonBytes = jsonStr.getBytes(); + out.write(jsonBytes); + return jsonBytes.length; + } + + @Override + public void release() { + + } +} diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java index f580da16cd..0e2dfb043a 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -48,7 +48,7 @@ public void init() { fieldSerDer = new TextFieldSerializerDeserializer(); } - public void deserialize(final ByteBuf lineBuf, Tuple tuple) throws IOException { + public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException { int[] projection = targetColumnIndexes; if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { return; @@ -73,7 +73,7 @@ public void deserialize(final ByteBuf lineBuf, Tuple tuple) throws IOException { if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { lineBuf.setIndex(start, start + fieldLength); Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); - tuple.put(currentIndex, datum); + output.put(currentIndex, datum); currentTarget++; } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java index e2686a6c06..2fe7f239fe 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java @@ -24,10 +24,6 @@ import org.apache.tajo.storage.StorageConstants; public class CSVLineSerDe extends TextLineSerDe { - - public CSVLineSerDe() { - } - @Override public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { return new CSVLineDeserializer(schema, meta, targetColumnIndexes); diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java index 684519c5ba..73970009fa 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage.text; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.datum.Datum; @@ -32,6 +33,7 @@ public class CSVLineSerializer extends TextLineSerializer { private byte [] nullChars; private char delimiter; + private int columnNum; public CSVLineSerializer(Schema schema, TableMeta meta) { super(schema, meta); @@ -41,25 +43,26 @@ public CSVLineSerializer(Schema schema, TableMeta meta) { public void init() { nullChars = TextLineSerDe.getNullCharsAsBytes(meta); delimiter = CSVLineSerDe.getFieldDelimiter(meta); + columnNum = schema.size(); serde = new TextFieldSerializerDeserializer(); } @Override public int serialize(OutputStream out, Tuple input) throws IOException { - int rowBytes = 0; + int writtenBytes = 0; - for (int i = 0; i < schema.size(); i++) { + for (int i = 0; i < columnNum; i++) { Datum datum = input.get(i); - rowBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); + writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); - if (schema.size() - 1 > i) { + if (columnNum - 1 > i) { out.write((byte) delimiter); - rowBytes += 1; + writtenBytes += 1; } } - return rowBytes; + return writtenBytes; } @Override diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index d15f394304..2218fae580 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -79,12 +79,12 @@ public static TextLineSerDe getLineSerde(TableMeta meta) { if (serdeClassCache.containsKey(serDeClassName)) { serdeClass = serdeClassCache.get(serDeClassName); } else { - serdeClass = (Class) Class.forName(CSVLineSerDe.class.getName()); + serdeClass = (Class) Class.forName(serDeClassName); serdeClassCache.put(serDeClassName, serdeClass); } lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass); } catch (Throwable e) { - throw new RuntimeException("TextLineSerde class cannot be initialized"); + throw new RuntimeException("TextLineSerde class cannot be initialized.", e); } return lineSerder; @@ -382,7 +382,9 @@ public void reset() throws IOException { @Override public void close() throws IOException { try { - deserializer.release(); + if (deserializer != null) { + deserializer.release(); + } if (tableStats != null && reader != null) { tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java index 9722959651..95d0407673 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java @@ -38,7 +38,7 @@ public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { public static final byte[] trueBytes = "true".getBytes(); public static final byte[] falseBytes = "false".getBytes(); - private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java index 645d118343..b0d3c3af9a 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java @@ -48,10 +48,10 @@ public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIn * It fills a tuple with a read fields in a given line. * * @param buf Read line - * @param tuple Tuple to be filled with read fields + * @param output Tuple to be filled with read fields * @throws java.io.IOException */ - public abstract void deserialize(final ByteBuf buf, Tuple tuple) throws IOException; + public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException; /** * Release external resources diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml index 064f250f56..e861b7d077 100644 --- a/tajo-storage/src/main/resources/storage-default.xml +++ b/tajo-storage/src/main/resources/storage-default.xml @@ -35,7 +35,7 @@ tajo.storage.scanner-handler - textfile,csv,raw,rcfile,row,parquet,sequencefile,avro + textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro @@ -47,6 +47,10 @@ tajo.storage.fragment.csv.class org.apache.tajo.storage.fragment.FileFragment + + tajo.storage.fragment.json.class + org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.raw.class org.apache.tajo.storage.fragment.FileFragment @@ -83,6 +87,11 @@ org.apache.tajo.storage.CSVFile$CSVScanner + + tajo.storage.scanner-handler.json.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner + + tajo.storage.scanner-handler.raw.class org.apache.tajo.storage.RawFile$RawFileScanner @@ -129,6 +138,11 @@ org.apache.tajo.storage.CSVFile$CSVAppender + + tajo.storage.appender-handler.json.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender + + tajo.storage.appender-handler.raw.class org.apache.tajo.storage.RawFile$RawFileAppender diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index 6e2bc3557d..c581926ec6 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -80,18 +80,17 @@ public class TestStorages { " \"name\": \"testNullHandlingTypes\",\n" + " \"fields\": [\n" + " { \"name\": \"col1\", \"type\": [\"null\", \"boolean\"] },\n" + - " { \"name\": \"col2\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col3\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col2\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col3\", \"type\": [\"null\", \"int\"] },\n" + " { \"name\": \"col4\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col5\", \"type\": [\"null\", \"int\"] },\n" + - " { \"name\": \"col6\", \"type\": [\"null\", \"long\"] },\n" + - " { \"name\": \"col7\", \"type\": [\"null\", \"float\"] },\n" + - " { \"name\": \"col8\", \"type\": [\"null\", \"double\"] },\n" + - " { \"name\": \"col9\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col5\", \"type\": [\"null\", \"long\"] },\n" + + " { \"name\": \"col6\", \"type\": [\"null\", \"float\"] },\n" + + " { \"name\": \"col7\", \"type\": [\"null\", \"double\"] },\n" + + " { \"name\": \"col8\", \"type\": [\"null\", \"string\"] },\n" + + " { \"name\": \"col9\", \"type\": [\"null\", \"bytes\"] },\n" + " { \"name\": \"col10\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col11\", \"type\": [\"null\", \"bytes\"] },\n" + - " { \"name\": \"col12\", \"type\": \"null\" },\n" + - " { \"name\": \"col13\", \"type\": [\"null\", \"bytes\"] }\n" + + " { \"name\": \"col11\", \"type\": \"null\" },\n" + + " { \"name\": \"col12\", \"type\": [\"null\", \"bytes\"] }\n" + " ]\n" + "}\n"; @@ -129,6 +128,7 @@ public static Collection generateParameters() { {StoreType.SEQUENCEFILE, true, true, false}, {StoreType.AVRO, false, false, false}, {StoreType.TEXTFILE, true, true, false}, + {StoreType.JSON, true, true, false}, }); } @@ -298,20 +298,23 @@ public void testProjection() throws IOException { @Test public void testVariousTypes() throws IOException { + boolean handleProtobuf = storeType != StoreType.JSON; + Schema schema = new Schema(); schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + if (handleProtobuf) { + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + } KeyValueSet options = new KeyValueSet(); TableMeta meta = CatalogUtil.newTableMeta(storeType, options); @@ -328,10 +331,9 @@ public void testVariousTypes() throws IOException { QueryId queryid = new QueryId("12345", 5); ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - Tuple tuple = new VTuple(13); + Tuple tuple = new VTuple(11 + (handleProtobuf ? 1 : 0)); tuple.put(new Datum[] { DatumFactory.createBool(true), - DatumFactory.createBit((byte) 0x99), DatumFactory.createChar("hyunsik"), DatumFactory.createInt2((short) 17), DatumFactory.createInt4(59), @@ -341,9 +343,12 @@ public void testVariousTypes() throws IOException { DatumFactory.createText("hyunsik"), DatumFactory.createBlob("hyunsik".getBytes()), DatumFactory.createInet4("192.168.0.1"), - NullDatum.get(), - factory.createDatum(queryid.getProto()) + NullDatum.get() }); + if (handleProtobuf) { + tuple.put(11, factory.createDatum(queryid.getProto())); + } + appender.addTuple(tuple); appender.flush(); appender.close(); @@ -364,20 +369,24 @@ public void testVariousTypes() throws IOException { @Test public void testNullHandlingTypes() throws IOException { + boolean handleProtobuf = storeType != StoreType.JSON; + Schema schema = new Schema(); schema.addColumn("col1", Type.BOOLEAN); - schema.addColumn("col2", Type.BIT); - schema.addColumn("col3", Type.CHAR, 7); - schema.addColumn("col4", Type.INT2); - schema.addColumn("col5", Type.INT4); - schema.addColumn("col6", Type.INT8); - schema.addColumn("col7", Type.FLOAT4); - schema.addColumn("col8", Type.FLOAT8); - schema.addColumn("col9", Type.TEXT); - schema.addColumn("col10", Type.BLOB); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", Type.NULL_TYPE); - schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + schema.addColumn("col11", Type.NULL_TYPE); + + if (handleProtobuf) { + schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName())); + } KeyValueSet options = new KeyValueSet(); TableMeta meta = CatalogUtil.newTableMeta(storeType, options); @@ -397,11 +406,10 @@ public void testNullHandlingTypes() throws IOException { QueryId queryid = new QueryId("12345", 5); ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName()); - - Tuple seedTuple = new VTuple(13); + int columnNum = 11 + (handleProtobuf ? 1 : 0); + Tuple seedTuple = new VTuple(columnNum); seedTuple.put(new Datum[]{ DatumFactory.createBool(true), // 0 - DatumFactory.createBit((byte) 0x99), // 1 DatumFactory.createChar("hyunsik"), // 2 DatumFactory.createInt2((short) 17), // 3 DatumFactory.createInt4(59), // 4 @@ -412,14 +420,17 @@ public void testNullHandlingTypes() throws IOException { DatumFactory.createBlob("hyunsik".getBytes()),// 9 DatumFactory.createInet4("192.168.0.1"), // 10 NullDatum.get(), // 11 - factory.createDatum(queryid.getProto()) // 12 }); + if (handleProtobuf) { + seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12 + } + // Making tuples with different null column positions Tuple tuple; - for (int i = 0; i < 13; i++) { - tuple = new VTuple(13); - for (int j = 0; j < 13; j++) { + for (int i = 0; i < columnNum; i++) { + tuple = new VTuple(columnNum); + for (int j = 0; j < columnNum; j++) { if (i == j) { // i'th column will have NULL value tuple.put(j, NullDatum.get()); } else { @@ -439,8 +450,8 @@ public void testNullHandlingTypes() throws IOException { Tuple retrieved; int i = 0; while ((retrieved = scanner.next()) != null) { - assertEquals(13, retrieved.size()); - for (int j = 0; j < 13; j++) { + assertEquals(columnNum, retrieved.size()); + for (int j = 0; j < columnNum; j++) { if (i == j) { assertEquals(NullDatum.get(), retrieved.get(j)); } else { diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java new file mode 100644 index 0000000000..19ee003071 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.json; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.junit.Test; + +import java.io.IOException; +import java.net.URL; + +import static org.junit.Assert.*; + +public class TestJsonSerDe { + private static Schema schema = new Schema(); + + static { + schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN); + schema.addColumn("col2", TajoDataTypes.Type.CHAR, 7); + schema.addColumn("col3", TajoDataTypes.Type.INT2); + schema.addColumn("col4", TajoDataTypes.Type.INT4); + schema.addColumn("col5", TajoDataTypes.Type.INT8); + schema.addColumn("col6", TajoDataTypes.Type.FLOAT4); + schema.addColumn("col7", TajoDataTypes.Type.FLOAT8); + schema.addColumn("col8", TajoDataTypes.Type.TEXT); + schema.addColumn("col9", TajoDataTypes.Type.BLOB); + schema.addColumn("col10", TajoDataTypes.Type.INET4); + } + + public static Path getResourcePath(String path, String suffix) { + URL resultBaseURL = ClassLoader.getSystemResource(path); + return new Path(resultBaseURL.toString(), suffix); + } + + public static Path getResultPath(Class clazz, String fileName) { + return new Path (getResourcePath("results", clazz.getSimpleName()), fileName); + } + + @Test + public void testVarioutType() throws IOException { + TajoConf conf = new TajoConf(); + + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); + Path tablePath = new Path(getResourcePath("dataset", "TestJsonSerDe"), "testVariousType.json"); + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(tablePath); + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple tuple = scanner.next(); + assertNotNull(tuple); + assertNull(scanner.next()); + scanner.close(); + + Tuple baseTuple = new VTuple(10); + baseTuple.put(new Datum[] { + DatumFactory.createBool(true), + DatumFactory.createChar("hyunsik"), + DatumFactory.createInt2((short) 17), + DatumFactory.createInt4(59), + DatumFactory.createInt8(23l), + DatumFactory.createFloat4(77.9f), + DatumFactory.createFloat8(271.9d), + DatumFactory.createText("hyunsik"), + DatumFactory.createBlob("hyunsik".getBytes()), + DatumFactory.createInet4("192.168.0.1"), + NullDatum.get(), + }); + + assertEquals(baseTuple, tuple); + } +} diff --git a/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json b/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json new file mode 100644 index 0000000000..8ee3408f04 --- /dev/null +++ b/tajo-storage/src/test/resources/dataset/TestJsonSerDe/testVariousType.json @@ -0,0 +1 @@ +{"col1": "true", "col2": "hyunsik", "col3": 17, "col4": 59, "col5": 23, "col6": 77.9, "col7": 271.9, "col8": "hyunsik", "col9": "hyunsik", "col10": "192.168.0.1"} diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml index 790d5a8ceb..f4c81c7bdb 100644 --- a/tajo-storage/src/test/resources/storage-default.xml +++ b/tajo-storage/src/test/resources/storage-default.xml @@ -28,7 +28,7 @@ tajo.storage.scanner-handler - textfile,csv,raw,rcfile,row,parquet,sequencefile,avro + textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro @@ -40,6 +40,10 @@ tajo.storage.fragment.csv.class org.apache.tajo.storage.fragment.FileFragment + + tajo.storage.fragment.json.class + org.apache.tajo.storage.fragment.FileFragment + tajo.storage.fragment.raw.class org.apache.tajo.storage.fragment.FileFragment @@ -76,6 +80,11 @@ org.apache.tajo.storage.CSVFile$CSVScanner + + tajo.storage.scanner-handler.json.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner + + tajo.storage.scanner-handler.raw.class org.apache.tajo.storage.RawFile$RawFileScanner @@ -122,6 +131,11 @@ org.apache.tajo.storage.CSVFile$CSVAppender + + tajo.storage.appender-handler.json.class + org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender + + tajo.storage.appender-handler.raw.class org.apache.tajo.storage.RawFile$RawFileAppender diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc index 611b97f9e7..d4250a94fc 100644 --- a/tajo-storage/src/test/resources/testVariousTypes.avsc +++ b/tajo-storage/src/test/resources/testVariousTypes.avsc @@ -4,18 +4,17 @@ "name": "testVariousTypes", "fields": [ { "name": "col1", "type": "boolean" }, - { "name": "col2", "type": "int" }, - { "name": "col3", "type": "string" }, + { "name": "col2", "type": "string" }, + { "name": "col3", "type": "int" }, { "name": "col4", "type": "int" }, - { "name": "col5", "type": "int" }, - { "name": "col6", "type": "long" }, - { "name": "col7", "type": "float" }, - { "name": "col8", "type": "double" }, - { "name": "col9", "type": "string" }, + { "name": "col5", "type": "long" }, + { "name": "col6", "type": "float" }, + { "name": "col7", "type": "double" }, + { "name": "col8", "type": "string" }, + { "name": "col9", "type": "bytes" }, { "name": "col10", "type": "bytes" }, - { "name": "col11", "type": "bytes" }, - { "name": "col12", "type": "null" }, - { "name": "col13", "type": "bytes" } + { "name": "col11", "type": "null" }, + { "name": "col12", "type": "bytes" } ] } From 789a2506e87e3fdcc1a3620a4c7796a3f74de0cb Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 28 Nov 2014 09:24:31 +0900 Subject: [PATCH 2/5] Fix unit test failures in TestJsonSerDe. --- .../storage/json/JsonLineDeserializer.java | 45 ++++++++++--------- .../tajo/storage/json/TestJsonSerDe.java | 29 ++++++------ 2 files changed, 38 insertions(+), 36 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index 720c812d40..e1316ec35e 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -19,29 +19,22 @@ package org.apache.tajo.storage.json; -import com.google.protobuf.Message; import io.netty.buffer.ByteBuf; import net.minidev.json.JSONArray; import net.minidev.json.JSONObject; import net.minidev.json.parser.JSONParser; -import net.minidev.json.parser.ParseException; -import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.common.exception.NotImplementedException; -import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatumFactory; import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.text.TextLineDeserializer; import java.io.IOException; -import java.io.OutputStream; import java.util.Iterator; public class JsonLineDeserializer extends TextLineDeserializer { @@ -75,6 +68,11 @@ public void deserialize(ByteBuf buf, Tuple output) throws IOException { int actualIdx = targetColumnIndexes[i]; String fieldName = columnNames[actualIdx]; + if (!object.containsKey(fieldName)) { + output.put(actualIdx, NullDatum.get()); + continue; + } + switch (types[actualIdx]) { case BOOLEAN: String boolStr = object.getAsString(fieldName); @@ -169,22 +167,29 @@ public void deserialize(ByteBuf buf, Tuple output) throws IOException { case BINARY: case VARBINARY: case BLOB: { - JSONArray jsonArray = (JSONArray) object.get(fieldName); - if (jsonArray == null) { + Object jsonObject = object.get(fieldName); + + if (jsonObject == null) { output.put(actualIdx, NullDatum.get()); break; - } - - byte[] bytes = new byte[jsonArray.size()]; - Iterator it = jsonArray.iterator(); - int arrayIdx = 0; - while (it.hasNext()) { - bytes[arrayIdx++] = ((Long) it.next()).byteValue(); - } - if (bytes.length > 0) { - output.put(actualIdx, DatumFactory.createBlob(bytes)); + } if (jsonObject instanceof String) { + output.put(actualIdx, DatumFactory.createBlob((String)jsonObject)); + } else if (jsonObject instanceof JSONArray) { + JSONArray jsonArray = (JSONArray) jsonObject; + byte[] bytes = new byte[jsonArray.size()]; + Iterator it = jsonArray.iterator(); + int arrayIdx = 0; + while (it.hasNext()) { + bytes[arrayIdx++] = ((Long) it.next()).byteValue(); + } + if (bytes.length > 0) { + output.put(actualIdx, DatumFactory.createBlob(bytes)); + } else { + output.put(actualIdx, NullDatum.get()); + } + break; } else { - output.put(actualIdx, NullDatum.get()); + throw new IOException("Unknown json object: " + object.getClass().getSimpleName()); } break; } diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java index 19ee003071..038bc17499 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java @@ -56,6 +56,7 @@ public class TestJsonSerDe { schema.addColumn("col8", TajoDataTypes.Type.TEXT); schema.addColumn("col9", TajoDataTypes.Type.BLOB); schema.addColumn("col10", TajoDataTypes.Type.INET4); + schema.addColumn("col11", TajoDataTypes.Type.NULL_TYPE); } public static Path getResourcePath(String path, String suffix) { @@ -63,10 +64,6 @@ public static Path getResourcePath(String path, String suffix) { return new Path(resultBaseURL.toString(), suffix); } - public static Path getResultPath(Class clazz, String fileName) { - return new Path (getResourcePath("results", clazz.getSimpleName()), fileName); - } - @Test public void testVarioutType() throws IOException { TajoConf conf = new TajoConf(); @@ -84,19 +81,19 @@ public void testVarioutType() throws IOException { assertNull(scanner.next()); scanner.close(); - Tuple baseTuple = new VTuple(10); + Tuple baseTuple = new VTuple(11); baseTuple.put(new Datum[] { - DatumFactory.createBool(true), - DatumFactory.createChar("hyunsik"), - DatumFactory.createInt2((short) 17), - DatumFactory.createInt4(59), - DatumFactory.createInt8(23l), - DatumFactory.createFloat4(77.9f), - DatumFactory.createFloat8(271.9d), - DatumFactory.createText("hyunsik"), - DatumFactory.createBlob("hyunsik".getBytes()), - DatumFactory.createInet4("192.168.0.1"), - NullDatum.get(), + DatumFactory.createBool(true), // 0 + DatumFactory.createChar("hyunsik"), // 1 + DatumFactory.createInt2((short) 17), // 2 + DatumFactory.createInt4(59), // 3 + DatumFactory.createInt8(23l), // 4 + DatumFactory.createFloat4(77.9f), // 5 + DatumFactory.createFloat8(271.9d), // 6 + DatumFactory.createText("hyunsik"), // 7 + DatumFactory.createBlob("hyunsik".getBytes()), // 8 + DatumFactory.createInet4("192.168.0.1"), // 9 + NullDatum.get(), // 10 }); assertEquals(baseTuple, tuple); From 8da63d6bc9f926c5eb66d4280992a853f48053c4 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 28 Nov 2014 10:48:13 +0900 Subject: [PATCH 3/5] Change the use of ByteBuf.capacity() to ByteBuf.readBytes(); --- .../java/org/apache/tajo/storage/json/JsonLineDeserializer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index e1316ec35e..e86cdc146d 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -58,7 +58,7 @@ public void init() { @Override public void deserialize(ByteBuf buf, Tuple output) throws IOException { - byte [] line = new byte[buf.capacity()]; + byte [] line = new byte[buf.readableBytes()]; buf.getBytes(0, line); try { From 6c65a02ef03dad25eec0d369417d32dbf1d9c260 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 28 Nov 2014 11:05:16 +0900 Subject: [PATCH 4/5] Change ByteBuf.getBytes to ByteBuf.readBytes. --- .../org/apache/tajo/storage/json/JsonLineDeserializer.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index e86cdc146d..37cd9f3804 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -38,8 +38,6 @@ import java.util.Iterator; public class JsonLineDeserializer extends TextLineDeserializer { - private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - private JSONParser parser; private Type [] types; private String [] columnNames; @@ -59,7 +57,7 @@ public void init() { @Override public void deserialize(ByteBuf buf, Tuple output) throws IOException { byte [] line = new byte[buf.readableBytes()]; - buf.getBytes(0, line); + buf.readBytes(line); try { JSONObject object = (JSONObject) parser.parse(line); From 5122d22c2f8b4e0ff7a77bfec1f2e922ce9b9129 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 28 Nov 2014 15:01:04 +0900 Subject: [PATCH 5/5] Add default charset to getBytes(). --- .../org/apache/tajo/storage/json/JsonLineSerializer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index 4276f4a967..c7007d8158 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -20,12 +20,14 @@ import net.minidev.json.JSONObject; +import org.apache.commons.lang.CharSet; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.datum.TextDatum; import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.text.TextLineSerDe; @@ -33,6 +35,7 @@ import java.io.IOException; import java.io.OutputStream; +import java.nio.charset.Charset; public class JsonLineSerializer extends TextLineSerializer { private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); @@ -119,7 +122,7 @@ public int serialize(OutputStream out, Tuple input) throws IOException { } String jsonStr = jsonObject.toJSONString(); - byte [] jsonBytes = jsonStr.getBytes(); + byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET); out.write(jsonBytes); return jsonBytes.length; }