From 5faaeccdf3e1ccffa445107ac65ed6a0cef11f54 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Thu, 30 Jul 2015 20:00:12 +0900 Subject: [PATCH 1/6] initial work * Improve SQL parser to recognize nested target fields in INSERT INTO statement. * Add workaround for hanging TaskContainer. * Improve logical planner to recognize nested target fields in INSERT INTO statement. --- .../java/org/apache/tajo/algebra/Insert.java | 6 +- .../apache/tajo/catalog/NestedPathUtil.java | 55 ++++++++++++------- .../apache/tajo/engine/parser/SQLParser.g4 | 2 +- .../tajo/engine/parser/SQLAnalyzer.java | 8 ++- .../org/apache/tajo/worker/TaskContainer.java | 2 +- .../java/org/apache/tajo/worker/TaskImpl.java | 4 ++ .../engine/query/TestSelectNestedRecord.java | 26 ++++++++- .../TestSelectNestedRecord/sample0_ddl.sql | 1 + .../TestSelectNestedRecord/testInsert.result | 0 .../org/apache/tajo/plan/LogicalPlanner.java | 16 ++++-- .../storage/json/JsonLineDeserializer.java | 27 +++------ .../tajo/storage/json/JsonLineSerializer.java | 2 +- 12 files changed, 92 insertions(+), 57 deletions(-) create mode 100644 tajo-core/src/test/resources/queries/TestSelectNestedRecord/sample0_ddl.sql create mode 100644 tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsert.result diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/Insert.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/Insert.java index f0cd5f9d38..299a480276 100644 --- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/Insert.java +++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/Insert.java @@ -32,7 +32,7 @@ public class Insert extends Expr { @Expose @SerializedName("TableName") private String tableName; @Expose @SerializedName("TargetColumns") - private String [] targetColumns; + private ColumnReferenceExpr [] targetColumns; @Expose @SerializedName("StorageType") private String storageType; @Expose @SerializedName("Location") @@ -70,11 +70,11 @@ public boolean hasTargetColumns() { return targetColumns != null; } - public String [] getTargetColumns() { + public ColumnReferenceExpr [] getTargetColumns() { return targetColumns; } - public void setTargetColumns(String [] targets) { + public void setTargetColumns(ColumnReferenceExpr [] targets) { this.targetColumns = targets; } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java index 58b4f26a26..e0e126dc7a 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java @@ -19,11 +19,12 @@ package org.apache.tajo.catalog; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.util.TUtil; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; /** * Utility methods for nested field @@ -65,24 +66,6 @@ public static String makePath(String[] parts, int startIndex, int depth) { return sb.toString(); } - /** - * Lookup the actual column corresponding to a given path. - * We assume that a path starts with the slash '/' and it - * does not include the root field. - * - * @param nestedField Nested column - * @param path Path which starts with '/'; - * @return Column corresponding to the path - */ - public static Column lookupPath(Column nestedField, String path) { - Preconditions.checkArgument(path.charAt(0) == PATH_DELIMITER.charAt(0), - "A nested field path must start with slash '/'."); - - // We assume that path starts with '/', causing an empty string "" at 0 in the path splits. - // So, we should start the index from 1 instead of 0. - return lookupPath(nestedField, path.split(PATH_DELIMITER)); - } - public static Column lookupPath(Column nestedField, String [] paths) { // We assume that path starts with '/', causing an empty string "" at 0 in the path splits. // So, we should start the index from 1 instead of 0. @@ -106,4 +89,34 @@ private static Column lookupColumnInternal(Column currentColumn, String [] paths throw new NoSuchFieldError(makePath(paths)); } } + + public static String [] convertColumnsToPaths(Column [] columns) { + String [] paths = new String[columns.length]; + + for (int i = 0; i < columns.length; i++) { + paths[i] = columns[i].getSimpleName(); + } + + return paths; + } + + public static ImmutableMap buildTypeMap(Iterable schema, String [] targetPaths) { + + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (Column column : schema) { + + // Keep types which only belong to projected paths + // For example, assume that a projected path is 'name/first_name', where name is RECORD and first_name is TEXT. + // In this case, we should keep two types: + // * name - RECORD + // * name/first_name TEXT + for (String p : targetPaths) { + if (p.startsWith(column.getSimpleName())) { + builder.put(column.getSimpleName(), column.getDataType().getType()); + } + } + } + + return builder.build(); + } } diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 index b07fb8f28d..4e7636b8c0 100644 --- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 +++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 @@ -1593,7 +1593,7 @@ null_ordering */ insert_statement - : INSERT (OVERWRITE)? INTO table_name (LEFT_PAREN column_name_list RIGHT_PAREN)? query_expression + : INSERT (OVERWRITE)? INTO table_name (LEFT_PAREN column_reference_list RIGHT_PAREN)? query_expression | INSERT (OVERWRITE)? INTO LOCATION path=Character_String_Literal (USING storage_type=identifier (param_clause)?)? query_expression ; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java index c50d5be9b8..fa414b938e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java @@ -1621,10 +1621,12 @@ public Expr visitInsert_statement(SQLParser.Insert_statementContext ctx) { if (ctx.table_name() != null) { insertExpr.setTableName(ctx.table_name().getText()); - if (ctx.column_name_list() != null) { - String[] targetColumns = new String[ctx.column_name_list().identifier().size()]; + if (ctx.column_reference_list() != null) { + ColumnReferenceExpr [] targetColumns = + new ColumnReferenceExpr[ctx.column_reference_list().column_reference().size()]; + for (int i = 0; i < targetColumns.length; i++) { - targetColumns[i] = ctx.column_name_list().identifier().get(i).getText(); + targetColumns[i] = visitColumn_reference(ctx.column_reference_list().column_reference(i)); } insertExpr.setTargetColumns(targetColumns); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java index bd28bb73f0..dfb057315d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -66,7 +66,7 @@ public void run() { } task.cleanup(); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); if (task != null) { try { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 92c682c117..56e0e2de09 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -43,6 +43,7 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto.ShuffleType; @@ -404,6 +405,9 @@ public void run() throws Exception { LOG.error(e.getMessage(), e); stopScriptExecutors(); context.stop(); + + throw new TajoInternalError(e); + } finally { if (executor != null) { try { diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java index 9f8a5fdbd7..880a105803 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java @@ -19,6 +19,7 @@ package org.apache.tajo.engine.query; import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.client.ResultSetUtil; import org.apache.tajo.util.TUtil; import org.junit.Test; @@ -30,7 +31,7 @@ public class TestSelectNestedRecord extends QueryTestCaseBase { @Test - public final void testSelect1() throws Exception { + public final void testSelect0() throws Exception { List tables = executeDDL("sample1_ddl.sql", "sample1", "sample1"); assertEquals(TUtil.newList("sample1"), tables); @@ -39,6 +40,16 @@ public final void testSelect1() throws Exception { cleanupQuery(res); } + @Test + public final void testSelect1() throws Exception { + List tables = executeDDL("sample1_ddl.sql", "sample1", "sample2"); + assertEquals(TUtil.newList("sample2"), tables); + + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } + @Test public final void testSelect2() throws Exception { List tables = executeDDL("tweets_ddl.sql", "tweets", "tweets"); @@ -68,4 +79,17 @@ public final void testNestedFieldAsJoinKey1() throws Exception { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testInsert() throws Exception { + List tables = executeDDL("sample1_ddl.sql", "sample1", "sample3"); + assertEquals(TUtil.newList("sample3"), tables); + + executeString("CREATE TABLE clone (title TEXT, name RECORD (first_name TEXT, last_name TEXT)) USING JSON;").close(); + + executeString("INSERT INTO clone (title, name.first_name, name.last_name) SELECT title, name.first_name, name.last_name from sample3").close(); + ResultSet res = executeString("select title, name.first_name, name.last_name from clone"); + assertResultSet(res); + res.close(); + } } diff --git a/tajo-core/src/test/resources/queries/TestSelectNestedRecord/sample0_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/sample0_ddl.sql new file mode 100644 index 0000000000..ed6aee18a0 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/sample0_ddl.sql @@ -0,0 +1 @@ +CREATE TABLE clone (title TEXT, name RECORD (first_name TEXT, last_name TEXT)) USING JSON; \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsert.result b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsert.result new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java index c9e101b399..1311dca562 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java @@ -56,7 +56,6 @@ import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule; import org.apache.tajo.plan.util.ExprFinder; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.storage.StorageService; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.Pair; @@ -1656,10 +1655,10 @@ private InsertNode buildInsertIntoTablePlan(PlanContext context, InsertNode inse // See PreLogicalPlanVerifier.visitInsert. // It guarantees that the equivalence between the numbers of target and projected columns. - String [] targets = expr.getTargetColumns(); + ColumnReferenceExpr [] targets = expr.getTargetColumns(); Schema targetColumns = new Schema(); for (int i = 0; i < targets.length; i++) { - Column targetColumn = desc.getLogicalSchema().getColumn(targets[i]); + Column targetColumn = desc.getLogicalSchema().getColumn(targets[i].getCanonicalName().replace(".", "/")); if (targetColumn == null) { throw makeSyntaxError("column '" + targets[i] + "' of relation '" + desc.getName() + "' does not exist"); @@ -1702,15 +1701,20 @@ private void buildProjectedInsert(PlanContext context, InsertNode insertNode) { } if (child instanceof Projectable) { - Projectable projectionNode = (Projectable)insertNode.getChild(); + Projectable projectionNode = insertNode.getChild(); // Modifying projected columns by adding NULL constants // It is because that table appender does not support target columns to be written. List targets = TUtil.newList(); - for (int i = 0; i < tableSchema.size(); i++) { - Column column = tableSchema.getColumn(i); + for (Column column : tableSchema.getAllColumns()) { int idxInProjectionNode = targetColumns.getIndex(column); + + // record type itself cannot be projected yet. + if (column.getDataType().getType() == TajoDataTypes.Type.RECORD) { + continue; + } + if (idxInProjectionNode >= 0 && idxInProjectionNode < projectionNode.getTargets().length) { targets.add(projectionNode.getTargets()[idxInProjectionNode]); } else { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index 73b7592182..13b8d3056b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -19,6 +19,8 @@ package org.apache.tajo.storage.json; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import net.minidev.json.JSONObject; import net.minidev.json.parser.JSONParser; @@ -42,35 +44,20 @@ public class JsonLineDeserializer extends TextLineDeserializer { private JSONParser parser; + // Full Path -> Type - private Map types; - private String [] projectedPaths; + private final Map types; + private final String [] projectedPaths; public JsonLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { super(schema, meta); - projectedPaths = new String[projected.length]; - for (int i = 0; i < projected.length; i++) { - this.projectedPaths[i] = projected[i].getSimpleName(); - } + projectedPaths = NestedPathUtil.convertColumnsToPaths(projected); + types = NestedPathUtil.buildTypeMap(schema.getAllColumns(), projectedPaths); } @Override public void init() { - types = TUtil.newHashMap(); - for (Column column : schema.getAllColumns()) { - - // Keep types which only belong to projected paths - // For example, assume that a projected path is 'name/first_name', where name is RECORD and first_name is TEXT. - // In this case, we should keep two types: - // * name - RECORD - // * name/first_name TEXT - for (String p :projectedPaths) { - if (p.startsWith(column.getSimpleName())) { - types.put(column.getSimpleName(), column.getDataType().getType()); - } - } - } parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR); } diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index 4d5d2e6bf0..06d5f072cf 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -111,7 +111,7 @@ public int serialize(OutputStream out, Tuple input) throws IOException { break; default: - throw new UnimplementedException(types[i].name() + " is not supported."); + throw new UnimplementedException(fieldName + "(" + types[i].name() + ") is not supported."); } } From ef499b3dffa846d31462c5a4be9042d83eec9ed1 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Thu, 30 Jul 2015 21:21:33 +0900 Subject: [PATCH 2/6] TAJO-1723: INSERT INTO statement should allow nested fields as target columns. --- .../apache/tajo/catalog/NestedPathUtil.java | 18 ++- .../engine/query/TestSelectNestedRecord.java | 17 +- .../TestSelectNestedRecord/testSelect0.sql | 1 + .../TestSelectNestedRecord/testInsert.result | 0 .../testInsertType1.result | 5 + .../testInsertType2.result | 5 + .../TestSelectNestedRecord/testSelect0.result | 5 + .../storage/json/JsonLineDeserializer.java | 17 +- .../tajo/storage/json/JsonLineSerializer.java | 145 ++++++++++-------- 9 files changed, 126 insertions(+), 87 deletions(-) create mode 100644 tajo-core/src/test/resources/queries/TestSelectNestedRecord/testSelect0.sql delete mode 100644 tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsert.result create mode 100644 tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType1.result create mode 100644 tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result create mode 100644 tajo-core/src/test/resources/results/TestSelectNestedRecord/testSelect0.result diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java index e0e126dc7a..d01349268c 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.util.TUtil; @@ -90,19 +91,22 @@ private static Column lookupColumnInternal(Column currentColumn, String [] paths } } - public static String [] convertColumnsToPaths(Column [] columns) { - String [] paths = new String[columns.length]; + public static String [] convertColumnsToPaths(Iterable columns, boolean onlyLeaves) { + List paths = Lists.newArrayList(); - for (int i = 0; i < columns.length; i++) { - paths[i] = columns[i].getSimpleName(); + for (Column c : columns) { + if (onlyLeaves && c.getDataType().getType() == Type.RECORD) { + continue; + } + paths.add(c.getSimpleName()); } - return paths; + return paths.toArray(new String [paths.size()]); } public static ImmutableMap buildTypeMap(Iterable schema, String [] targetPaths) { - ImmutableMap.Builder builder = ImmutableMap.builder(); + HashMap builder = new HashMap(); for (Column column : schema) { // Keep types which only belong to projected paths @@ -117,6 +121,6 @@ public static ImmutableMap buildTypeMap(Iterable schema, S } } - return builder.build(); + return ImmutableMap.copyOf(builder); } } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java index 880a105803..458fed6165 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java @@ -81,7 +81,8 @@ public final void testNestedFieldAsJoinKey1() throws Exception { } @Test - public final void testInsert() throws Exception { + public final void testInsertType1() throws Exception { + // all columns List tables = executeDDL("sample1_ddl.sql", "sample1", "sample3"); assertEquals(TUtil.newList("sample3"), tables); @@ -92,4 +93,18 @@ public final void testInsert() throws Exception { assertResultSet(res); res.close(); } + + @Test + public final void testInsertType2() throws Exception { + // some columns + List tables = executeDDL("sample1_ddl.sql", "sample1", "sample4"); + assertEquals(TUtil.newList("sample4"), tables); + + executeString("CREATE TABLE clone2 (title TEXT, name RECORD (first_name TEXT, last_name TEXT)) USING JSON;").close(); + + executeString("INSERT INTO clone2 (title, name.last_name) SELECT title, name.last_name from sample4").close(); + ResultSet res = executeString("select title, name.first_name, name.last_name from clone2"); + assertResultSet(res); + res.close(); + } } diff --git a/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testSelect0.sql b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testSelect0.sql new file mode 100644 index 0000000000..a594bcfbce --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectNestedRecord/testSelect0.sql @@ -0,0 +1 @@ +SELECT title, name.first_name, name.last_name FROM sample1; \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsert.result b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsert.result deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType1.result b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType1.result new file mode 100644 index 0000000000..ec49cd5e7a --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType1.result @@ -0,0 +1,5 @@ +title,name/first_name,name/last_name +------------------------------- +Hand of the King,Eddard,Stark +Assassin,Arya,Stark +Dancing Master,Syrio,Forel \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result new file mode 100644 index 0000000000..34fd9a0f79 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result @@ -0,0 +1,5 @@ +title,name/first_name,name/last_name +------------------------------- +Hand of the King,,Stark +Assassin,,Stark +Dancing Master,,Forel \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testSelect0.result b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testSelect0.result new file mode 100644 index 0000000000..ec49cd5e7a --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testSelect0.result @@ -0,0 +1,5 @@ +title,name/first_name,name/last_name +------------------------------- +Hand of the King,Eddard,Stark +Assassin,Arya,Stark +Dancing Master,Syrio,Forel \ No newline at end of file diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index 13b8d3056b..1c4b4535f8 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage.json; +import com.facebook.presto.hive.shaded.com.google.common.collect.Lists; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; @@ -40,6 +41,8 @@ import org.apache.tajo.util.TUtil; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.Map; public class JsonLineDeserializer extends TextLineDeserializer { @@ -52,7 +55,7 @@ public class JsonLineDeserializer extends TextLineDeserializer { public JsonLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { super(schema, meta); - projectedPaths = NestedPathUtil.convertColumnsToPaths(projected); + projectedPaths = NestedPathUtil.convertColumnsToPaths(Lists.newArrayList(projected), true); types = NestedPathUtil.buildTypeMap(schema.getAllColumns(), projectedPaths); } @@ -61,18 +64,6 @@ public void init() { parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR); } - private static String makePath(String [] path, int depth) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i <= depth; i++) { - sb.append(path[i]); - if (i < depth) { - sb.append(NestedPathUtil.PATH_DELIMITER); - } - } - - return sb.toString(); - } - /** * * diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index 06d5f072cf..df37fa7c90 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -21,6 +21,7 @@ import net.minidev.json.JSONObject; import org.apache.commons.net.util.Base64; +import org.apache.tajo.catalog.NestedPathUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; @@ -29,90 +30,103 @@ import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.text.TextLineSerializer; +import org.codehaus.jettison.json.JSONWriter; import java.io.IOException; import java.io.OutputStream; +import java.util.Map; public class JsonLineSerializer extends TextLineSerializer { - private Type [] types; - private String [] simpleNames; - private int columnNum; + // Full Path -> Type + private final Map types; + private final String [] projectedPaths; public JsonLineSerializer(Schema schema, TableMeta meta) { super(schema, meta); + + projectedPaths = NestedPathUtil.convertColumnsToPaths(schema.getAllColumns(), true); + types = NestedPathUtil.buildTypeMap(schema.getAllColumns(), projectedPaths); } @Override public void init() { - types = SchemaUtil.toTypes(schema); - simpleNames = SchemaUtil.toSimpleNames(schema); - columnNum = schema.size(); + } + + private void putValue(JSONObject json, + String fullPath, + String [] pathElements, + int depth, + int fieldIndex, + Tuple input) throws IOException { + String fieldName = pathElements[depth]; + + switch (types.get(fullPath)) { + + case BOOLEAN: + json.put(fieldName, input.getBool(fieldIndex)); + break; + + case INT1: + case INT2: + json.put(fieldName, input.getInt2(fieldIndex)); + break; + + case INT4: + json.put(fieldName, input.getInt4(fieldIndex)); + break; + + case INT8: + json.put(fieldName, input.getInt8(fieldIndex)); + break; + + case FLOAT4: + json.put(fieldName, input.getFloat4(fieldIndex)); + break; + + case FLOAT8: + json.put(fieldName, input.getFloat8(fieldIndex)); + break; + + case CHAR: + case TEXT: + case VARCHAR: + case INET4: + case TIMESTAMP: + case DATE: + case TIME: + case INTERVAL: + json.put(fieldName, input.getText(fieldIndex)); + break; + + case BIT: + case BINARY: + case BLOB: + case VARBINARY: + json.put(fieldName, Base64.encodeBase64String(input.getBytes(fieldIndex))); + break; + + case NULL_TYPE: + break; + + case RECORD: + JSONObject record = json.containsKey(fieldName) ? (JSONObject) json.get(fieldName) : new JSONObject(); + json.put(fieldName, record); + putValue(record, fullPath + "/" + pathElements[depth + 1], pathElements, depth + 1, fieldIndex, input); + break; + + default: + throw new UnimplementedException(fieldName + "(" + types.get(fullPath).name() + ") is not supported."); + } } @Override public int serialize(OutputStream out, Tuple input) throws IOException { JSONObject jsonObject = new JSONObject(); - for (int i = 0; i < columnNum; i++) { - if (input.isBlankOrNull(i)) { - continue; - } - - String fieldName = simpleNames[i]; - Type type = types[i]; - - switch (type) { - - case BOOLEAN: - jsonObject.put(fieldName, input.getBool(i)); - break; - - case INT1: - case INT2: - jsonObject.put(fieldName, input.getInt2(i)); - break; - - case INT4: - jsonObject.put(fieldName, input.getInt4(i)); - break; - - case INT8: - jsonObject.put(fieldName, input.getInt8(i)); - break; - - case FLOAT4: - jsonObject.put(fieldName, input.getFloat4(i)); - break; - - case FLOAT8: - jsonObject.put(fieldName, input.getFloat8(i)); - break; - - case CHAR: - case TEXT: - case VARCHAR: - case INET4: - case TIMESTAMP: - case DATE: - case TIME: - case INTERVAL: - jsonObject.put(fieldName, input.getText(i)); - break; - - case BIT: - case BINARY: - case BLOB: - case VARBINARY: - jsonObject.put(fieldName, Base64.encodeBase64String(input.getBytes(i))); - break; - - case NULL_TYPE: - break; - - default: - throw new UnimplementedException(fieldName + "(" + types[i].name() + ") is not supported."); - } + for (int i = 0; i < projectedPaths.length; i++) { + String [] paths = projectedPaths[i].split(NestedPathUtil.PATH_DELIMITER); + putValue(jsonObject, paths[0], paths, 0, i, input); } String jsonStr = jsonObject.toJSONString(); @@ -123,6 +137,5 @@ public int serialize(OutputStream out, Tuple input) throws IOException { @Override public void release() { - } } From 334db6d0cf471fd1f00a41ee44133ac70d1153e6 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Thu, 30 Jul 2015 21:23:43 +0900 Subject: [PATCH 3/6] Refine changes. --- .../src/main/java/org/apache/tajo/catalog/NestedPathUtil.java | 2 -- .../src/main/java/org/apache/tajo/worker/TaskContainer.java | 2 +- tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java | 4 ---- .../org/apache/tajo/engine/query/TestSelectNestedRecord.java | 1 - .../org/apache/tajo/storage/json/JsonLineDeserializer.java | 4 ---- .../java/org/apache/tajo/storage/json/JsonLineSerializer.java | 2 -- 6 files changed, 1 insertion(+), 14 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java index d01349268c..2bfd4b67dd 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java @@ -21,9 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.util.TUtil; import java.util.*; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java index dfb057315d..bd28bb73f0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -66,7 +66,7 @@ public void run() { } task.cleanup(); - } catch (Throwable e) { + } catch (Exception e) { LOG.error(e.getMessage(), e); if (task != null) { try { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 56e0e2de09..92c682c117 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -43,7 +43,6 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; -import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto.ShuffleType; @@ -405,9 +404,6 @@ public void run() throws Exception { LOG.error(e.getMessage(), e); stopScriptExecutors(); context.stop(); - - throw new TajoInternalError(e); - } finally { if (executor != null) { try { diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java index 458fed6165..ffd6f08ebb 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectNestedRecord.java @@ -19,7 +19,6 @@ package org.apache.tajo.engine.query; import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.client.ResultSetUtil; import org.apache.tajo.util.TUtil; import org.junit.Test; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index 1c4b4535f8..343eaaec19 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -20,8 +20,6 @@ import com.facebook.presto.hive.shaded.com.google.common.collect.Lists; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import net.minidev.json.JSONObject; import net.minidev.json.parser.JSONParser; @@ -41,8 +39,6 @@ import org.apache.tajo.util.TUtil; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; import java.util.Map; public class JsonLineDeserializer extends TextLineDeserializer { diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index df37fa7c90..782195f8ed 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -23,14 +23,12 @@ import org.apache.commons.net.util.Base64; import org.apache.tajo.catalog.NestedPathUtil; import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.text.TextLineSerializer; -import org.codehaus.jettison.json.JSONWriter; import java.io.IOException; import java.io.OutputStream; From e9acd683a1e075c20bc78ca26058a84b1305b078 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Thu, 30 Jul 2015 21:24:57 +0900 Subject: [PATCH 4/6] Moved some methods to SchemaUtil. --- .../apache/tajo/catalog/NestedPathUtil.java | 32 --------------- .../org/apache/tajo/catalog/SchemaUtil.java | 40 ++++++++++++++++++- .../storage/json/JsonLineDeserializer.java | 5 +-- .../tajo/storage/json/JsonLineSerializer.java | 5 ++- 4 files changed, 43 insertions(+), 39 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java index 2bfd4b67dd..dec9a55dcb 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/NestedPathUtil.java @@ -89,36 +89,4 @@ private static Column lookupColumnInternal(Column currentColumn, String [] paths } } - public static String [] convertColumnsToPaths(Iterable columns, boolean onlyLeaves) { - List paths = Lists.newArrayList(); - - for (Column c : columns) { - if (onlyLeaves && c.getDataType().getType() == Type.RECORD) { - continue; - } - paths.add(c.getSimpleName()); - } - - return paths.toArray(new String [paths.size()]); - } - - public static ImmutableMap buildTypeMap(Iterable schema, String [] targetPaths) { - - HashMap builder = new HashMap(); - for (Column column : schema) { - - // Keep types which only belong to projected paths - // For example, assume that a projected path is 'name/first_name', where name is RECORD and first_name is TEXT. - // In this case, we should keep two types: - // * name - RECORD - // * name/first_name TEXT - for (String p : targetPaths) { - if (p.startsWith(column.getSimpleName())) { - builder.put(column.getSimpleName(), column.getDataType().getType()); - } - } - } - - return ImmutableMap.copyOf(builder); - } } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java index 44973dbbc1..09a2e456a2 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SchemaUtil.java @@ -18,8 +18,11 @@ package org.apache.tajo.catalog; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import org.apache.tajo.util.TUtil; +import java.util.HashMap; import java.util.List; import static org.apache.tajo.common.TajoDataTypes.DataType; @@ -113,11 +116,44 @@ public static T clone(Schema schema) { return names; } + public static String [] convertColumnsToPaths(Iterable columns, boolean onlyLeaves) { + List paths = Lists.newArrayList(); + + for (Column c : columns) { + if (onlyLeaves && c.getDataType().getType() == Type.RECORD) { + continue; + } + paths.add(c.getSimpleName()); + } + + return paths.toArray(new String [paths.size()]); + } + + public static ImmutableMap buildTypeMap(Iterable schema, String [] targetPaths) { + + HashMap builder = new HashMap(); + for (Column column : schema) { + + // Keep types which only belong to projected paths + // For example, assume that a projected path is 'name/first_name', where name is RECORD and first_name is TEXT. + // In this case, we should keep two types: + // * name - RECORD + // * name/first_name TEXT + for (String p : targetPaths) { + if (p.startsWith(column.getSimpleName())) { + builder.put(column.getSimpleName(), column.getDataType().getType()); + } + } + } + + return ImmutableMap.copyOf(builder); + } + /** * Column visitor interface */ - public static interface ColumnVisitor { - public void visit(int depth, List path, Column column); + public interface ColumnVisitor { + void visit(int depth, List path, Column column); } /** diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java index 343eaaec19..7ef483c6ae 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java @@ -36,7 +36,6 @@ import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.text.TextLineDeserializer; import org.apache.tajo.storage.text.TextLineParsingError; -import org.apache.tajo.util.TUtil; import java.io.IOException; import java.util.Map; @@ -51,8 +50,8 @@ public class JsonLineDeserializer extends TextLineDeserializer { public JsonLineDeserializer(Schema schema, TableMeta meta, Column [] projected) { super(schema, meta); - projectedPaths = NestedPathUtil.convertColumnsToPaths(Lists.newArrayList(projected), true); - types = NestedPathUtil.buildTypeMap(schema.getAllColumns(), projectedPaths); + projectedPaths = SchemaUtil.convertColumnsToPaths(Lists.newArrayList(projected), true); + types = SchemaUtil.buildTypeMap(schema.getAllColumns(), projectedPaths); } @Override diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index 782195f8ed..7c2346ec37 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -23,6 +23,7 @@ import org.apache.commons.net.util.Base64; import org.apache.tajo.catalog.NestedPathUtil; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.TextDatum; @@ -43,8 +44,8 @@ public class JsonLineSerializer extends TextLineSerializer { public JsonLineSerializer(Schema schema, TableMeta meta) { super(schema, meta); - projectedPaths = NestedPathUtil.convertColumnsToPaths(schema.getAllColumns(), true); - types = NestedPathUtil.buildTypeMap(schema.getAllColumns(), projectedPaths); + projectedPaths = SchemaUtil.convertColumnsToPaths(schema.getAllColumns(), true); + types = SchemaUtil.buildTypeMap(schema.getAllColumns(), projectedPaths); } @Override From a6c05d267e9fb17398cf54b7f09a5646ba82db65 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Thu, 30 Jul 2015 22:23:47 +0900 Subject: [PATCH 5/6] Fixed null handling in json. --- .../java/org/apache/tajo/storage/json/JsonLineSerializer.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java index 7c2346ec37..99f81a26ee 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java @@ -60,6 +60,10 @@ private void putValue(JSONObject json, Tuple input) throws IOException { String fieldName = pathElements[depth]; + if (input.isBlankOrNull(fieldIndex)) { + return; + } + switch (types.get(fullPath)) { case BOOLEAN: From 04d5284164f6a9f82b152ae7dcef94f361650081 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 31 Jul 2015 00:30:27 +0900 Subject: [PATCH 6/6] Fixed test results. --- .../results/TestSelectNestedRecord/testInsertType2.result | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result index 34fd9a0f79..a80edfe19a 100644 --- a/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result +++ b/tajo-core/src/test/resources/results/TestSelectNestedRecord/testInsertType2.result @@ -1,5 +1,5 @@ title,name/first_name,name/last_name ------------------------------- -Hand of the King,,Stark -Assassin,,Stark -Dancing Master,,Forel \ No newline at end of file +Hand of the King,null,Stark +Assassin,null,Stark +Dancing Master,null,Forel \ No newline at end of file