From dafda3964dfd8f492b61506adf0e06f8a50dd28e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 3 Feb 2015 18:44:00 +0900 Subject: [PATCH 01/13] TAJO-1298 --- .../tajo/engine/query/QueryContext.java | 8 ++++ .../tajo/master/TajoMasterClientService.java | 3 +- .../tajo/master/exec/QueryExecutor.java | 2 + .../master/exec/prehook/CreateIndexHook.java | 37 +++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index 7b3c00db30..c53e613072 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -166,4 +166,12 @@ public void setInsert() { public boolean isInsert() { return isCommandType(NodeType.INSERT.name()); } + + public void setCreateIndex() { + setCommandType(NodeType.CREATE_INDEX); + } + + public boolean isCreateIndex() { + return isCommandType(NodeType.CREATE_INDEX.name()); + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 85d501dd5f..2f353cf1fa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -460,7 +460,8 @@ public GetQueryStatusResponse getQueryStatus(RpcController controller, boolean isCreateTable = queryInfo.getQueryContext().isCreateTable(); boolean isInsert = queryInfo.getQueryContext().isInsert(); - builder.setHasResult(!(isCreateTable || isInsert)); + boolean isCreateIndex = queryInfo.getQueryContext().isCreateIndex(); + builder.setHasResult(!(isCreateTable || isInsert || isCreateIndex)); builder.setProgress(queryInfo.getProgress()); builder.setSubmitTime(queryInfo.getStartTime()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index aeb346ff0c..045e492a8e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -45,6 +45,7 @@ import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; import org.apache.tajo.master.*; +import org.apache.tajo.master.exec.prehook.CreateIndexHook; import org.apache.tajo.master.exec.prehook.CreateTableHook; import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager; import org.apache.tajo.master.exec.prehook.InsertIntoHook; @@ -82,6 +83,7 @@ public QueryExecutor(TajoMaster.MasterContext context, DDLExecutor ddlExecutor) this.hookManager = new DistributedQueryHookManager(); this.hookManager.addHook(new CreateTableHook()); this.hookManager.addHook(new InsertIntoHook()); + this.hookManager.addHook(new CreateIndexHook()); } public SubmitQueryResponse execute(QueryContext queryContext, Session session, String sql, String jsonExpr, diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java new file mode 100644 index 0000000000..80d8516cda --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.exec.prehook; + +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.logical.NodeType; + +public class CreateIndexHook implements DistributedQueryHook { + @Override + public boolean isEligible(QueryContext queryContext, LogicalPlan plan) { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + return rootNode.getChild().getType() == NodeType.CREATE_INDEX; + } + + @Override + public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception { + queryContext.setCreateIndex(); + } +} From e981ff419172f9a368304e16738a7e545cdb0b25 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 9 Feb 2015 14:38:27 +0900 Subject: [PATCH 02/13] TAJO-1298 --- .../apache/tajo/master/exec/prehook/CreateIndexHook.java | 7 +++++++ .../java/org/apache/tajo/querymaster/QueryMasterTask.java | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java index 80d8516cda..9136b64f45 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java @@ -18,8 +18,11 @@ package org.apache.tajo.master.exec.prehook; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.CreateIndexNode; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.plan.logical.NodeType; @@ -32,6 +35,10 @@ public boolean isEligible(QueryContext queryContext, LogicalPlan plan) { @Override public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception { + CreateIndexNode createIndexNode = (CreateIndexNode) plan.getRootBlock().getRoot().getChild(0); + String indexName = CatalogUtil.splitFQTableName(createIndexNode.getIndexName())[1]; + queryContext.setOutputTable(indexName); + queryContext.setOutputPath(new Path(createIndexNode.getIndexPath())); queryContext.setCreateIndex(); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 7a5e7b41f4..f7de7f7eb6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -434,7 +434,7 @@ public static Path initStagingDir(TajoConf conf, String queryId, QueryContext co //////////////////////////////////////////// String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, ""); - if (context.isCreateTable() || context.isInsert()) { + if (context.isCreateTable() || context.isInsert() || context.isCreateIndex()) { if (outputPath == null || outputPath.isEmpty()) { // hbase stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId); From edf464783401264e854e9356de658ec4e4b3ecec Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 9 Feb 2015 20:24:57 +0900 Subject: [PATCH 03/13] TAJO-1298 --- .../tajo/engine/planner/PhysicalPlannerImpl.java | 3 ++- .../engine/planner/physical/StoreIndexExec.java | 3 ++- .../tajo/master/exec/prehook/CreateIndexHook.java | 4 +++- .../main/java/org/apache/tajo/worker/Task.java | 15 +++++++++++++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 17a9bb18b9..ec3da11e41 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -91,7 +91,8 @@ public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNo if (execPlan instanceof StoreTableExec || execPlan instanceof RangeShuffleFileWriteExec || execPlan instanceof HashShuffleFileWriteExec - || execPlan instanceof ColPartitionStoreExec) { + || execPlan instanceof ColPartitionStoreExec + || execPlan instanceof StoreIndexExec) { return execPlan; } else if (context.getDataChannel() != null) { return buildOutputOperator(context, logicalPlan, execPlan); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java index f9db842534..ca47a45199 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java @@ -66,7 +66,8 @@ public void init() throws IOException { } TajoConf conf = context.getConf(); - Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments()); +// Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments()); + Path indexPath = context.getOutputPath(); // TODO: Create factory using reflection BSTIndex bst = new BSTIndex(conf); this.comparator = new BaseTupleComparator(keySchema, sortSpecs); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java index 9136b64f45..113a35ec06 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateIndexHook.java @@ -36,7 +36,9 @@ public boolean isEligible(QueryContext queryContext, LogicalPlan plan) { @Override public void hook(QueryContext queryContext, LogicalPlan plan) throws Exception { CreateIndexNode createIndexNode = (CreateIndexNode) plan.getRootBlock().getRoot().getChild(0); - String indexName = CatalogUtil.splitFQTableName(createIndexNode.getIndexName())[1]; + String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName()); + String databaseName = splits[0]; + String indexName = splits[1]; queryContext.setOutputTable(indexName); queryContext.setOutputPath(new Path(createIndexNode.getIndexPath())); queryContext.setCreateIndex(); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 859b3aea71..46e919b99c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.tajo.TajoConstants; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; @@ -150,8 +151,13 @@ public void initPlan() throws IOException { this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { - Path outFilePath = ((FileStorageManager)StorageManager.getFileStorageManager(systemConf)) - .getAppenderFilePath(taskId, queryContext.getStagingDir()); + Path outFilePath; + if (queryContext.isCreateIndex()) { + outFilePath = getIndexStagingPath(queryContext, context); + } else { + outFilePath = ((FileStorageManager) StorageManager.getFileStorageManager(systemConf)) + .getAppenderFilePath(taskId, queryContext.getStagingDir()); + } LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); } @@ -844,4 +850,9 @@ public static Path getTaskAttemptDir(TaskAttemptId quid) { String.valueOf(quid.getId())); return workDir; } + + private static Path getIndexStagingPath(QueryContext queryContext, TaskAttemptContext context) { + return StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME, + context.getUniqueKeyFromFragments()); + } } From aa45f842146eb81c271a60d6fba4c97d61dc4a85 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 9 Feb 2015 20:26:46 +0900 Subject: [PATCH 04/13] TAJO-1298 --- .../org/apache/tajo/engine/planner/physical/StoreIndexExec.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java index ca47a45199..7f28973b2f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java @@ -66,7 +66,6 @@ public void init() throws IOException { } TajoConf conf = context.getConf(); -// Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments()); Path indexPath = context.getOutputPath(); // TODO: Create factory using reflection BSTIndex bst = new BSTIndex(conf); From 88507f575c18bd15f35be6eafc69bc30a80887e4 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 10 Feb 2015 10:35:40 +0900 Subject: [PATCH 05/13] TAJO-1298: fix bugs --- .../tajo/engine/planner/physical/TestPhysicalPlanner.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 5aa33506ae..e2cc84d098 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -980,7 +980,7 @@ public final void testCreateIndex() throws IOException, PlanningException { FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateIndex"); - Path indexPath = StorageUtil.concatPath(TajoConf.getWarehouseDir(conf), "default/idx_employee"); + Path indexPath = StorageUtil.concatPath(workDir, "idx_employee"); if (sm.getFileSystem().exists(indexPath)) { sm.getFileSystem().delete(indexPath, true); } @@ -989,6 +989,7 @@ public final void testCreateIndex() throws IOException, PlanningException { LocalTajoTestingUtility.newTaskAttemptId(masterPlan), new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(new Enforcer()); + ctx.setOutputPath(indexPath); Expr context = analyzer.parse(createIndexStmt[0]); LogicalPlan plan = planner.createPlan(defaultContext, context); LogicalNode rootNode = optimizer.optimize(plan); @@ -1000,7 +1001,7 @@ public final void testCreateIndex() throws IOException, PlanningException { } exec.close(); - FileStatus[] list = sm.getFileSystem().listStatus(indexPath); + FileStatus[] list = sm.getFileSystem().listStatus(indexPath.getParent()); assertEquals(2, list.length); } From fcef0b72cba862394f13c83e624e22ac54c7eea2 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 22 Feb 2015 16:35:26 +0900 Subject: [PATCH 06/13] TAJO-1298 --- .../org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java index 5cbe77b0b4..3de17f7d53 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java @@ -87,7 +87,7 @@ public void printResult(PrintWriter sout, InputStream sin, TableDesc tableDesc, } if (res == null) { - sout.println(getQuerySuccessMessage(tableDesc, responseTime, 0, "inserted", true)); + sout.println(getQuerySuccessMessage(tableDesc, responseTime, 0, "written", true)); return; } ResultSetMetaData rsmd = res.getMetaData(); From e90699928a60c6b595118c70072cfe697ce17784 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 22 Feb 2015 17:32:51 +0900 Subject: [PATCH 07/13] TAJO-1298 --- .../org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java index df709c5d10..740d06909a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java @@ -125,7 +125,7 @@ public void testPrintResultInsertStatement() throws Exception { PrintWriter writer = new PrintWriter(stringWriter); outputFormatter.printResult(writer, null, tableDesc, responseTime, null); - String expectedOutput = "(" + numRows + " rows, " + responseTime + " sec, " + numBytes + " B inserted)\n"; + String expectedOutput = "(" + numRows + " rows, " + responseTime + " sec, " + numBytes + " B written)\n"; assertEquals(expectedOutput, stringWriter.toString()); } From 065fc8f90e96a14a46fc4e8ae5bb1f2da835c423 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 1 Mar 2015 20:36:00 +0900 Subject: [PATCH 08/13] TAJO-1298 --- .../tsql/DefaultTajoCliOutputFormatter.java | 36 ++++++++++++ .../org/apache/tajo/cli/tsql/TajoCli.java | 18 +++--- .../tajo/cli/tsql/TajoCliOutputFormatter.java | 10 ++++ tajo-client/src/main/proto/ClientProtos.proto | 3 + tajo-common/pom.xml | 1 + tajo-common/src/main/proto/PlanTypes.proto | 57 +++++++++++++++++++ .../tajo/master/exec/QueryExecutor.java | 3 +- .../tsql/TestDefaultCliOutputFormatter.java | 7 ++- .../plan/serder/LogicalNodeSerializer.java | 9 +-- .../apache/tajo/plan/util/PlannerUtil.java | 16 +++++- tajo-plan/src/main/proto/Plan.proto | 38 +------------ 11 files changed, 144 insertions(+), 54 deletions(-) create mode 100644 tajo-common/src/main/proto/PlanTypes.proto diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java index 3de17f7d53..2683e008bb 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java @@ -25,6 +25,8 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.common.PlanTypesProto; +import org.apache.tajo.exception.UnsupportedException; import org.apache.tajo.util.FileUtil; import java.io.InputStream; @@ -192,6 +194,13 @@ public void printErrorMessage(PrintWriter sout, QueryStatus status) { sout.flush(); } + @Override + public void printQueryTypeMessage(PrintWriter sout, boolean isDDL, PlanTypesProto.PlanNodeType planNodeType) { + if (isDDL) { + sout.println(getPlanTypeString(planNodeType) + " OK"); + } + } + public static String parseErrorMessage(String message) { if (message == null) { return TajoCli.ERROR_PREFIX + "No error message"; @@ -208,4 +217,31 @@ public static String parseErrorMessage(String message) { return message; } + + private static String getPlanTypeString(PlanTypesProto.PlanNodeType type) { + switch (type) { + case INSERT: + return "INSERT"; + case CREATE_DATABASE: + return "CREATE DATABASE"; + case DROP_DATABASE: + return "DROP DATABASE"; + case CREATE_TABLE: + return "CREATE TABLE"; + case DROP_TABLE: + return "DROP TABLE"; + case ALTER_TABLESPACE: + return "ALTER TABLESPACE"; + case ALTER_TABLE: + return "ALTER TABLE"; + case TRUNCATE_TABLE: + return "TRUNCATE TABLE"; + case CREATE_INDEX: + return "CREATE INDEX"; + case DROP_INDEX: + return "DROP INDEX"; + default: + throw new UnsupportedException("Only DDLs are supported, but the plan type is " + type); + } + } } diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 1d3682ca0e..b714c05a54 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -31,6 +31,7 @@ import org.apache.tajo.cli.tsql.SimpleParser.ParsingState; import org.apache.tajo.cli.tsql.commands.*; import org.apache.tajo.client.*; +import org.apache.tajo.common.PlanTypesProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.ClientProtos; @@ -499,10 +500,7 @@ private void executeJsonQuery(String json) throws ServiceException, IOException QueryId queryId = new QueryId(response.getQueryId()); waitForQueryCompleted(queryId); } else { - if (!response.hasTableDesc() && !response.hasResultSet()) { - displayFormatter.printMessage(sout, "OK"); - wasError = true; - } else { + if (response.hasTableDesc() || response.hasResultSet()) { localQueryCompleted(response, startTime); } } @@ -512,6 +510,10 @@ private void executeJsonQuery(String json) throws ServiceException, IOException wasError = true; } } + + if (!wasError) { + displayFormatter.printQueryTypeMessage(sout, response.hasPlanType(), response.getPlanType()); + } } private int executeQuery(String statement) throws ServiceException, IOException { @@ -536,9 +538,7 @@ private int executeQuery(String statement) throws ServiceException, IOException QueryId queryId = new QueryId(response.getQueryId()); waitForQueryCompleted(queryId); } else { - if (!response.hasTableDesc() && !response.hasResultSet()) { - displayFormatter.printMessage(sout, "OK"); - } else { + if (response.hasTableDesc() || response.hasResultSet()) { localQueryCompleted(response, startTime); } } @@ -549,6 +549,10 @@ private int executeQuery(String statement) throws ServiceException, IOException } } + if (!wasError) { + displayFormatter.printQueryTypeMessage(sout, response.hasPlanType(), response.getPlanType()); + } + return wasError ? -1 : 0; } diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCliOutputFormatter.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCliOutputFormatter.java index 13d9cb1f33..5a89d0d314 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCliOutputFormatter.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCliOutputFormatter.java @@ -21,6 +21,7 @@ import org.apache.tajo.QueryId; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.common.PlanTypesProto; import java.io.InputStream; import java.io.PrintWriter; @@ -93,5 +94,14 @@ public void printResult(PrintWriter sout, InputStream sin, TableDesc tableDesc, */ void printErrorMessage(PrintWriter sout, QueryStatus status); + /** + * Print the query type if the query is one of the DDL statements. + * + * @param sout + * @param isDDL + * @param planNodeType + */ + public void printQueryTypeMessage(PrintWriter sout, boolean isDDL, PlanTypesProto.PlanNodeType planNodeType); + void setScriptMode(); } diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 7ccb2a41d3..e6ea861dd9 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -25,6 +25,7 @@ import "tajo_protos.proto"; import "TajoIdProtos.proto"; import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; +import "PlanTypes.proto"; enum ResultCode { OK = 0; @@ -141,6 +142,8 @@ message SubmitQueryResponse { optional int32 maxRowNum = 9; optional KeyValueSetProto sessionVars = 10; + + optional PlanNodeType planType = 11; } message GetQueryStatusResponse { diff --git a/tajo-common/pom.xml b/tajo-common/pom.xml index cd6f0bebfe..7748d3c909 100644 --- a/tajo-common/pom.xml +++ b/tajo-common/pom.xml @@ -151,6 +151,7 @@ src/main/proto/DataTypes.proto src/main/proto/PrimitiveProtos.proto src/main/proto/tajo_protos.proto + src/main/proto/PlanTypes.proto diff --git a/tajo-common/src/main/proto/PlanTypes.proto b/tajo-common/src/main/proto/PlanTypes.proto new file mode 100644 index 0000000000..0dc3f148cc --- /dev/null +++ b/tajo-common/src/main/proto/PlanTypes.proto @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.common"; +option java_outer_classname = "PlanTypesProto"; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +enum PlanNodeType { + SET_SESSION = 0; + + ROOT = 1; + EXPRS = 2; + PROJECTION = 3; + LIMIT = 4; + WINDOW_AGG = 5; + SORT = 6; + HAVING = 7; + GROUP_BY = 8; + DISTINCT_GROUP_BY = 9; + SELECTION = 10; + JOIN = 11; + UNION = 12; + INTERSECT = 13; + EXCEPT = 14; + TABLE_SUBQUERY = 15; + SCAN = 16; + PARTITIONS_SCAN = 17; + INDEX_SCAN = 18; + STORE = 19; + INSERT = 20; + + CREATE_DATABASE = 21; + DROP_DATABASE = 22; + CREATE_TABLE = 23; + DROP_TABLE = 24; + ALTER_TABLESPACE = 25; + ALTER_TABLE = 26; + TRUNCATE_TABLE = 27; + CREATE_INDEX = 28; + DROP_INDEX = 29; +} \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 5448906292..c77d5b3f1c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -103,7 +103,7 @@ public SubmitQueryResponse execute(QueryContext queryContext, Session session, S context.getSystemMetrics().counter("Query", "numDDLQuery").inc(); if (PlannerUtil.isDistExecDDL(rootNode)) { - if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) { + if (PlannerUtil.extractPlanType(rootNode) == NodeType.CREATE_INDEX) { checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild()); } executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response); @@ -111,6 +111,7 @@ public SubmitQueryResponse execute(QueryContext queryContext, Session session, S response.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); response.setResult(IPCUtil.buildOkRequestResult()); } + response.setPlanType(PlannerUtil.convertType(PlannerUtil.extractPlanType(rootNode))); ddlExecutor.execute(queryContext, plan); diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java index 740d06909a..48b33e6e33 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java @@ -23,8 +23,7 @@ import org.apache.tajo.TpchTestBase; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.cli.tsql.DefaultTajoCliOutputFormatter; -import org.apache.tajo.cli.tsql.TajoCli; +import org.apache.tajo.common.PlanTypesProto; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Float8Datum; @@ -124,8 +123,9 @@ public void testPrintResultInsertStatement() throws Exception { StringWriter stringWriter = new StringWriter(); PrintWriter writer = new PrintWriter(stringWriter); outputFormatter.printResult(writer, null, tableDesc, responseTime, null); + outputFormatter.printQueryTypeMessage(writer, true, PlanTypesProto.PlanNodeType.INSERT); - String expectedOutput = "(" + numRows + " rows, " + responseTime + " sec, " + numBytes + " B written)\n"; + String expectedOutput = "(" + numRows + " rows, " + responseTime + " sec, " + numBytes + " B written)\nINSERT OK\n"; assertEquals(expectedOutput, stringWriter.toString()); } @@ -173,6 +173,7 @@ public void testPrintResultSelectStatement() throws Exception { StringWriter stringWriter = new StringWriter(); PrintWriter writer = new PrintWriter(stringWriter); outputFormatter.printResult(writer, null, tableDesc, responseTime, resultSet); + outputFormatter.printQueryTypeMessage(writer, false, PlanTypesProto.PlanNodeType.GROUP_BY); assertEquals(expectedOutput, stringWriter.toString()); } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index 5ad79c0ef7..b7a89a82a2 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -23,7 +23,7 @@ import org.apache.tajo.algebra.JoinType; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto; +import org.apache.tajo.common.PlanTypesProto; import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; @@ -35,6 +35,7 @@ import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameTable; import org.apache.tajo.plan.serder.PlanProto.AlterTablespaceNode.SetLocation; import org.apache.tajo.plan.serder.PlanProto.LogicalNodeTree; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.util.ProtoUtil; import org.apache.tajo.util.TUtil; @@ -89,7 +90,7 @@ private static PlanProto.LogicalNode.Builder createNodeBuilder(SerializeContext PlanProto.LogicalNode.Builder nodeBuilder = PlanProto.LogicalNode.newBuilder(); nodeBuilder.setVisitSeq(selfId); nodeBuilder.setNodeId(node.getPID()); - nodeBuilder.setType(convertType(node.getType())); + nodeBuilder.setType(PlannerUtil.convertType(node.getType())); // some DDL statements like DropTable or DropDatabase do not have in/out schemas if (node.getInSchema() != null) { @@ -744,10 +745,6 @@ public LogicalNode visitDropIndex(SerializeContext context, LogicalPlan plan, Lo return node; } - public static PlanProto.NodeType convertType(NodeType type) { - return PlanProto.NodeType.valueOf(type.name()); - } - public static PlanProto.JoinType convertJoinType(JoinType type) { switch (type) { case CROSS: diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java index ebd47defd9..335e025171 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java @@ -27,6 +27,7 @@ import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.PlanTypesProto; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.plan.*; import org.apache.tajo.plan.expr.*; @@ -59,6 +60,19 @@ public static boolean checkIfSetSession(LogicalNode node) { } + public static PlanTypesProto.PlanNodeType convertType(NodeType type) { + return PlanTypesProto.PlanNodeType.valueOf(type.name()); + } + + public static NodeType extractPlanType(LogicalNode node) { + LogicalNode baseNode = node; + if (node instanceof LogicalRootNode) { + baseNode = ((LogicalRootNode) node).getChild(); + } + + return baseNode.getType(); + } + public static boolean checkIfDDLPlan(LogicalNode node) { LogicalNode baseNode = node; if (node instanceof LogicalRootNode) { @@ -70,7 +84,7 @@ public static boolean checkIfDDLPlan(LogicalNode node) { return type == NodeType.CREATE_DATABASE || type == NodeType.DROP_DATABASE || - (type == NodeType.CREATE_TABLE && !((CreateTableNode) baseNode).hasSubQuery()) || + type == NodeType.CREATE_TABLE || type == NodeType.DROP_TABLE || type == NodeType.ALTER_TABLESPACE || type == NodeType.ALTER_TABLE || diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index dfee969e9e..d890ab3c0c 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -24,41 +24,7 @@ option java_generate_equals_and_hash = true; import "PrimitiveProtos.proto"; import "CatalogProtos.proto"; import "DataTypes.proto"; - -enum NodeType { - SET_SESSION = 0; - - ROOT = 1; - EXPRS = 2; - PROJECTION = 3; - LIMIT = 4; - WINDOW_AGG = 5; - SORT = 6; - HAVING = 7; - GROUP_BY = 8; - DISTINCT_GROUP_BY = 9; - SELECTION = 10; - JOIN = 11; - UNION = 12; - INTERSECT = 13; - EXCEPT = 14; - TABLE_SUBQUERY = 15; - SCAN = 16; - PARTITIONS_SCAN = 17; - INDEX_SCAN = 18; - STORE = 19; - INSERT = 20; - - CREATE_DATABASE = 21; - DROP_DATABASE = 22; - CREATE_TABLE = 23; - DROP_TABLE = 24; - ALTER_TABLESPACE = 25; - ALTER_TABLE = 26; - TRUNCATE_TABLE = 27; - CREATE_INDEX = 28; - DROP_INDEX = 29; -} +import "PlanTypes.proto"; message LogicalNodeTree { repeated LogicalNode nodes = 1; @@ -67,7 +33,7 @@ message LogicalNodeTree { message LogicalNode { required int32 visitSeq = 1; required int32 nodeId = 2; - required NodeType type = 3; + required PlanNodeType type = 3; optional SchemaProto in_schema = 4; optional SchemaProto out_schema = 5; From e0115b510f0ebb0bd5d01b0dd5cfc81b211e2b5f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 24 Jun 2015 14:33:02 +0900 Subject: [PATCH 09/13] Cleanup commented out codes --- .../tajo/catalog/AbstractCatalogClient.java | 34 - .../apache/tajo/catalog/store/MemStore.java | 25 - .../apache/tajo/client/QueryClientImpl.java | 133 -- .../apache/tajo/client/SessionConnection.java | 68 - .../tajo/master/exec/QueryExecutor.java | 19 - .../java/org/apache/tajo/worker/Task.java | 1446 ----------------- 6 files changed, 1725 deletions(-) diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index c872f8b7d3..5c58169520 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -478,8 +478,6 @@ public boolean existIndexByColumns(final String databaseName, final String table @Override public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) { try { - -//<<<<<<< HEAD GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); for (String colunName : columnNames) { @@ -499,14 +497,6 @@ public boolean existIndexesByTable(final String databaseName, final String table try { CatalogProtocolService.BlockingInterface stub = getStub(); return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue(); -//======= -// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); -// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); -// builder.setColumnName(columnName); -// -// CatalogProtocolService.BlockingInterface stub = getStub(); -// return stub.existIndexByColumn(null, builder.build()).getValue(); -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } catch (ServiceException e) { LOG.error(e.getMessage(), e); return false; @@ -548,11 +538,6 @@ public final IndexDesc getIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) { try { -// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder(); -// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); -// builder.setColumnName(columnName); -// -//<<<<<<< HEAD GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder(); builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName)); for (String columnName : columnNames) { @@ -579,10 +564,6 @@ public final Collection getAllIndexesByTable(final String databaseNam indexDescs.add(new IndexDesc(descProto)); } return indexDescs; -//======= -// CatalogProtocolService.BlockingInterface stub = getStub(); -// return new IndexDesc(stub.getIndexByColumn(null, builder.build())); -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } catch (ServiceException e) { LOG.error(e.getMessage(), e); return null; @@ -604,21 +585,6 @@ public boolean dropIndex(final String databaseName, return false; } } -//<<<<<<< HEAD -//======= -// -// @Override -// public List getAllIndexes() { -// try { -// CatalogProtocolService.BlockingInterface stub = getStub(); -// GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO); -// return response.getIndexList(); -// } catch (ServiceException e) { -// LOG.error(e.getMessage(), e); -// return new ArrayList(); -// } -// } -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 @Override public final boolean createFunction(final FunctionDesc funcDesc) { diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index f2fc2cb208..672bf4d48c 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -724,31 +724,6 @@ public boolean existIndexesByTable(String databaseName, String tableName) throws return false; } -// public List getAllIndexes() throws CatalogException { -// List indexList = new ArrayList(); -// Set databases = indexes.keySet(); -// -// for (String databaseName: databases) { -// Map indexMap = indexes.get(databaseName); -// -// for (Map.Entry entry: indexMap.entrySet()) { -// IndexDescProto indexDesc = entry.getValue(); -// IndexProto.Builder builder = IndexProto.newBuilder(); -// -// builder.setColumnName(indexDesc.getColumn().getName()); -// builder.setDataType(indexDesc.getColumn().getDataType().getType().toString()); -// builder.setIndexName(entry.getKey()); -// builder.setIndexType(indexDesc.getIndexMethod().toString()); -// builder.setIsAscending(indexDesc.hasIsAscending() && indexDesc.getIsAscending()); -// builder.setIsClustered(indexDesc.hasIsClustered() && indexDesc.getIsClustered()); -// builder.setIsUnique(indexDesc.hasIsUnique() && indexDesc.getIsUnique()); -// -// indexList.add(builder.build()); -// } -// } -// return false; -// } - @Override public List getAllIndexes() throws CatalogException { List indexDescProtos = TUtil.newList(); diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 71e317941f..fa5d194922 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -166,27 +166,6 @@ public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws Se connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); } return response; - -//<<<<<<< HEAD -// connection.checkSessionAndGet(client); -// -// final QueryRequest.Builder builder = QueryRequest.newBuilder(); -// builder.setSessionId(connection.sessionId); -// builder.setQuery(sql); -// builder.setIsJson(false); -// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); -// -// -// -// } -// }.withRetries(); -//======= -// SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build()); -// if (response.getResultCode() == ResultCode.OK) { -// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); -// } -// return response; -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } @Override @@ -365,39 +344,6 @@ public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int throws ServiceException { try { -//<<<<<<< HEAD -// final ServerCallable callable = -// new ServerCallable(connection.manager, connection.getTajoMasterAddr(), -// TajoMasterClientProtocol.class, false) { -// -// public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException { -// -// connection.checkSessionAndGet(client); -// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); -// -// GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder(); -// builder.setSessionId(connection.sessionId); -// builder.setQueryId(queryId.getProto()); -// builder.setFetchRowNum(fetchRowNum); -// try { -// GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build()); -// if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) { -// abort(); -// throw new ServiceException(response.getResult().getErrorMessage()); -// } -// -// return response.getResultSet(); -// } catch (ServiceException e) { -// abort(); -// throw e; -// } catch (Throwable t) { -// throw new ServiceException(t.getMessage(), t); -// } -// } -// }; -// -// ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries(); -//======= NettyClientBase client = connection.getTajoMasterConnection(); connection.checkSessionAndGet(client); TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); @@ -413,7 +359,6 @@ public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int } ClientProtos.SerializedResultSet resultSet = response.getResultSet(); -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 return new TajoMemoryResultSet(queryId, new Schema(resultSet.getSchema()), @@ -440,33 +385,12 @@ public boolean updateQuery(final String sql) throws ServiceException { builder.setIsJson(false); ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); -//<<<<<<< HEAD -// connection.checkSessionAndGet(client); -// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); -// -// QueryRequest.Builder builder = QueryRequest.newBuilder(); -// builder.setSessionId(connection.sessionId); -// builder.setQuery(sql); -// builder.setIsJson(false); -// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); -// -// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { -// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); -// return true; -// } else { -// if (response.getResult().hasErrorMessage()) { -// System.err.println("ERROR: " + response.getResult().getErrorMessage()); -// } -// return false; -// } -//======= if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); return true; } else { if (response.getResult().hasErrorMessage()) { LOG.error("ERROR: " + response.getResult().getErrorMessage()); -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } return false; } @@ -475,29 +399,6 @@ public boolean updateQuery(final String sql) throws ServiceException { @Override public boolean updateQueryWithJson(final String json) throws ServiceException { -//<<<<<<< HEAD -// return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), -// TajoMasterClientProtocol.class, false) { -// -// public Boolean call(NettyClientBase client) throws ServiceException { -// -// connection.checkSessionAndGet(client); -// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); -// -// QueryRequest.Builder builder = QueryRequest.newBuilder(); -// builder.setSessionId(connection.sessionId); -// builder.setQuery(json); -// builder.setIsJson(true); -// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build()); -// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) { -// return true; -// } else { -// if (response.getResult().hasErrorMessage()) { -// System.err.println("ERROR: " + response.getResult().getErrorMessage()); -// } -// return false; -// } -//======= NettyClientBase client = connection.getTajoMasterConnection(); connection.checkSessionAndGet(client); TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); @@ -512,7 +413,6 @@ public boolean updateQueryWithJson(final String json) throws ServiceException { } else { if (response.getResult().hasErrorMessage()) { LOG.error("ERROR: " + response.getResult().getErrorMessage()); -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } return false; } @@ -607,27 +507,6 @@ public int getMaxRows() { } public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException { -//<<<<<<< HEAD -// return new ServerCallable(connection.manager, connection.getTajoMasterAddr(), -// TajoMasterClientProtocol.class, false) { -// public QueryInfoProto call(NettyClientBase client) throws ServiceException { -// connection.checkSessionAndGet(client); -// -// QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); -// builder.setSessionId(connection.sessionId); -// builder.setQueryId(queryId.getProto()); -// -// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); -// GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build()); -// if (res.getResult().getResultCode() == ResultCode.OK) { -// return res.getQueryInfo(); -// } else { -// abort(); -// throw new ServiceException(res.getResult().getErrorMessage()); -// } -// } -// }.withRetries(); -//======= NettyClientBase client = connection.getTajoMasterConnection(); connection.checkSessionAndGet(client); @@ -642,7 +521,6 @@ public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceExceptio } else { throw new ServiceException(res.getResult().getErrorMessage()); } -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException { @@ -666,16 +544,6 @@ public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceEx try { connection.checkSessionAndGet(connection.getTajoMasterConnection()); -//<<<<<<< HEAD -// QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); -// GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build()); -// if (res.getResult().getResultCode() == ResultCode.OK) { -// return res.getQueryHistory(); -// } else { -// abort(); -// throw new ServiceException(res.getResult().getErrorMessage()); -// } -//======= QueryIdRequest.Builder builder = QueryIdRequest.newBuilder(); builder.setSessionId(connection.sessionId); builder.setQueryId(queryId.getProto()); @@ -686,7 +554,6 @@ public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceEx return res.getQueryHistory(); } else { throw new ServiceException(res.getResult().getErrorMessage()); -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } } finally { queryMasterClient.close(); diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index fece670acd..61838b41b9 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -166,35 +166,8 @@ public String getCurrentDatabase() throws ServiceException { } public Map updateSessionVariables(final Map variables) throws ServiceException { -//<<<<<<< HEAD -// return new ServerCallable>(manager, getTajoMasterAddr(), -// TajoMasterClientProtocol.class, false) { -// -// public Map call(NettyClientBase client) throws ServiceException { -// checkSessionAndGet(client); -// -// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); -// KeyValueSet keyValueSet = new KeyValueSet(); -// keyValueSet.putAll(variables); -// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() -// .setSessionId(sessionId) -// .setSessionVars(keyValueSet.getProto()).build(); -// -// SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request); -// -// if (response.getResult().getResultCode() == ResultCode.OK) { -// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); -// return Collections.unmodifiableMap(sessionVarsCache); -// } else { -// throw new ServiceException(response.getResult().getErrorMessage()); -// } -// } -// }.withRetries(); -// } -//======= NettyClientBase client = getTajoMasterConnection(); checkSessionAndGet(client); -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); KeyValueSet keyValueSet = new KeyValueSet(); @@ -217,16 +190,6 @@ public Map unsetSessionVariables(final List variables) t NettyClientBase client = getTajoMasterConnection(); checkSessionAndGet(client); -//<<<<<<< HEAD -// if (response.getResult().getResultCode() == ResultCode.OK) { -// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars())); -// return Collections.unmodifiableMap(sessionVarsCache); -// } else { -// throw new ServiceException(response.getResult().getErrorMessage()); -// } -// } -// }.withRetries(); -//======= TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() .setSessionId(sessionId) @@ -240,7 +203,6 @@ public Map unsetSessionVariables(final List variables) t } else { throw new ServiceException(response.getResult().getErrorMessage()); } -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } void updateSessionVarsCache(Map variables) { @@ -368,21 +330,12 @@ public boolean reconnect() throws Exception { NettyClientBase client = getTajoMasterConnection(); -//<<<<<<< HEAD -// // create new session -// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); -// CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); -// if (response.getResult().getResultCode() != ResultCode.OK) { -// return false; -// } -//======= // create new session TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub(); CreateSessionResponse response = tajoMasterService.createSession(null, builder.build()); if (response.getResult().getResultCode() != ResultCode.OK) { return false; } -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 // Invalidate some session variables in client cache sessionId = response.getSessionId(); @@ -396,26 +349,6 @@ public boolean reconnect() throws Exception { } } -//<<<<<<< HEAD -// // Update the session variables in server side -// try { -// KeyValueSet keyValueSet = new KeyValueSet(); -// keyValueSet.putAll(sessionVarsCache); -// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder() -// .setSessionId(sessionId) -// .setSessionVars(keyValueSet.getProto()).build(); -// -// if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) { -// tajoMasterService.removeSession(null, sessionId); -// return false; -// } -// LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); -// return true; -// } catch (ServiceException e) { -// tajoMasterService.removeSession(null, sessionId); -// return false; -// } -//======= // Update the session variables in server side try { KeyValueSet keyValueSet = new KeyValueSet(); @@ -427,7 +360,6 @@ public boolean reconnect() throws Exception { if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) { tajoMasterService.removeSession(null, sessionId); return false; -//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746 } LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName())); return true; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 975b913057..f740d8a2dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -335,25 +335,6 @@ public static void startScriptExecutors(QueryContext queryContext, EvalContext e } } } -//<<<<<<< HEAD -// boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; -// if (isInsert) { -// InsertNode insertNode = rootNode.getChild(); -// insertNonFromQuery(queryContext, insertNode, responseBuilder); -// } else { -// Schema schema = PlannerUtil.targetToSchema(targets); -// RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); -// byte[] serializedBytes = encoder.toBytes(outTuple); -// ClientProtos.SerializedResultSet.Builder serializedResBuilder = ClientProtos.SerializedResultSet.newBuilder(); -// serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); -// serializedResBuilder.setSchema(schema.getProto()); -// serializedResBuilder.setBytesNum(serializedBytes.length); -// -// responseBuilder.setResultSet(serializedResBuilder); -// responseBuilder.setMaxRowNum(1); -// responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); -// responseBuilder.setResult(IPCUtil.buildOkRequestResult()); -//======= } public static void stopScriptExecutors(EvalContext evalContext) { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 70bbc45485..c84994003e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -22,824 +22,6 @@ import java.io.IOException; -//<<<<<<< HEAD -//import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -//import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; -// -//public class Task { -// private static final Log LOG = LogFactory.getLog(Task.class); -// private static final float FETCHER_PROGRESS = 0.5f; -// -// private final TajoConf systemConf; -// private final QueryContext queryContext; -// private final ExecutionBlockContext executionBlockContext; -// private final TaskAttemptId taskId; -// private final String taskRunnerId; -// -// private final Path taskDir; -// private final TaskRequest request; -// private TaskAttemptContext context; -// private List fetcherRunners; -// private LogicalNode plan; -// private final Map descs = Maps.newHashMap(); -// private PhysicalExec executor; -// private boolean interQuery; -// private Path inputTableBaseDir; -// -// private long startTime; -// private long finishTime; -// -// private final TableStats inputStats; -// private List localChunks; -// -// // TODO - to be refactored -// private ShuffleType shuffleType = null; -// private Schema finalSchema = null; -// private TupleComparator sortComp = null; -// -// public Task(String taskRunnerId, -// Path baseDir, -// TaskAttemptId taskId, -// final ExecutionBlockContext executionBlockContext, -// final TaskRequest request) throws IOException { -// this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request); -// } -// -// public Task(String taskRunnerId, -// Path baseDir, -// TaskAttemptId taskId, -// TajoConf conf, -// final ExecutionBlockContext executionBlockContext, -// final TaskRequest request) throws IOException { -// this.taskRunnerId = taskRunnerId; -// this.request = request; -// this.taskId = taskId; -// -// this.systemConf = conf; -// this.queryContext = request.getQueryContext(systemConf); -// this.executionBlockContext = executionBlockContext; -// this.taskDir = StorageUtil.concatPath(baseDir, -// taskId.getTaskId().getId() + "_" + taskId.getId()); -// -// this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId, -// request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); -// this.context.setDataChannel(request.getDataChannel()); -// this.context.setEnforcer(request.getEnforcer()); -// this.context.setState(TaskAttemptState.TA_PENDING); -// this.inputStats = new TableStats(); -// this.fetcherRunners = Lists.newArrayList(); -// } -// -// public void initPlan() throws IOException { -// plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); -// updateDescsForScanNodes(NodeType.SCAN); -// updateDescsForScanNodes(NodeType.PARTITIONS_SCAN); -// updateDescsForScanNodes(NodeType.INDEX_SCAN); -// LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); -// if (scanNode != null) { -// for (LogicalNode node : scanNode) { -// ScanNode scan = (ScanNode) node; -// descs.put(scan.getCanonicalName(), scan.getTableDesc()); -// } -// } -// -// LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); -// if (partitionScanNode != null) { -// for (LogicalNode node : partitionScanNode) { -// PartitionedTableScanNode scan = (PartitionedTableScanNode) node; -// descs.put(scan.getCanonicalName(), scan.getTableDesc()); -// } -// } -// -// interQuery = request.getProto().getInterQuery(); -// if (interQuery) { -// context.setInterQuery(); -// this.shuffleType = context.getDataChannel().getShuffleType(); -// -// if (shuffleType == ShuffleType.RANGE_SHUFFLE) { -// SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); -// this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); -// this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); -// } -// } else { -// Path outFilePath; -// if (queryContext.isCreateIndex()) { -// outFilePath = getIndexStagingPath(queryContext, context); -// } else { -// outFilePath = ((FileStorageManager) TableSpaceManager.getFileStorageManager(systemConf)) -// .getAppenderFilePath(taskId, queryContext.getStagingDir()); -// } -////======= -//// Path outFilePath = ((FileStorageManager) TableSpaceManager.getFileStorageManager(systemConf)) -//// .getAppenderFilePath(taskId, queryContext.getStagingDir()); -////>>>>>>> 2cbc1b9c7dc1ec04c4f6c8d7c8f746225d02421d -// LOG.info("Output File Path: " + outFilePath); -// context.setOutputPath(outFilePath); -// } -// -// this.localChunks = Collections.synchronizedList(new ArrayList()); -// LOG.info("=================================="); -// LOG.info("* Stage " + request.getId() + " is initialized"); -// LOG.info("* InterQuery: " + interQuery -// + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + -// ", Fragments (num: " + request.getFragments().size() + ")" + -// ", Fetches (total:" + request.getFetches().size() + ") :"); -// -// if(LOG.isDebugEnabled()) { -// for (FetchImpl f : request.getFetches()) { -// LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); -// } -// } -// LOG.info("* Local task dir: " + taskDir); -// if(LOG.isDebugEnabled()) { -// LOG.debug("* plan:\n"); -// LOG.debug(plan.toString()); -// } -// LOG.info("=================================="); -// } -// -// private void updateDescsForScanNodes(NodeType nodeType) { -// assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN; -// LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType); -// if (scanNodes != null) { -// for (LogicalNode node : scanNodes) { -// ScanNode scanNode = (ScanNode) node; -// descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); -// } -// } -// } -// -// private void startScriptExecutors() throws IOException { -// for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { -// executor.start(systemConf); -// } -// } -// -// private void stopScriptExecutors() { -// for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { -// executor.shutdown(); -// } -// } -// -// public void init() throws IOException { -// initPlan(); -// startScriptExecutors(); -// -// if (context.getState() == TaskAttemptState.TA_PENDING) { -// // initialize a task temporal dir -// FileSystem localFS = executionBlockContext.getLocalFS(); -// localFS.mkdirs(taskDir); -// -// if (request.getFetches().size() > 0) { -// inputTableBaseDir = localFS.makeQualified( -// executionBlockContext.getLocalDirAllocator().getLocalPathForWrite( -// getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); -// localFS.mkdirs(inputTableBaseDir); -// Path tableDir; -// for (String inputTable : context.getInputTables()) { -// tableDir = new Path(inputTableBaseDir, inputTable); -// if (!localFS.exists(tableDir)) { -// LOG.info("the directory is created " + tableDir.toUri()); -// localFS.mkdirs(tableDir); -// } -// } -// } -// // for localizing the intermediate data -// fetcherRunners.addAll(getFetchRunners(context, request.getFetches())); -// } -// } -// -// public TaskAttemptId getTaskId() { -// return taskId; -// } -// -// public TaskAttemptId getId() { -// return context.getTaskId(); -// } -// -// public TaskAttemptState getStatus() { -// return context.getState(); -// } -// -// public String toString() { -// return "queryId: " + this.getId() + " status: " + this.getStatus(); -// } -// -// public void setState(TaskAttemptState status) { -// context.setState(status); -// } -// -// public TaskAttemptContext getContext() { -// return context; -// } -// -// public boolean hasFetchPhase() { -// return fetcherRunners.size() > 0; -// } -// -// public List getFetchers() { -// return new ArrayList(fetcherRunners); -// } -// -// public void fetch() { -// ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); -// for (Fetcher f : fetcherRunners) { -// executorService.submit(new FetchRunner(context, f)); -// } -// } -// -// public void kill() { -// stopScriptExecutors(); -// context.setState(TaskAttemptState.TA_KILLED); -// context.stop(); -// } -// -// public void abort() { -// stopScriptExecutors(); -// context.stop(); -// } -// -// public void cleanUp() { -// // remove itself from worker -// if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { -// synchronized (executionBlockContext.getTasks()) { -// executionBlockContext.getTasks().remove(this.getId()); -// } -// } else { -// LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); -// } -// } -// -// public TaskStatusProto getReport() { -// TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); -// builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); -// builder.setId(context.getTaskId().getProto()) -// .setProgress(context.getProgress()) -// .setState(context.getState()); -// -// builder.setInputStats(reloadInputStats()); -// -// if (context.getResultStats() != null) { -// builder.setResultStats(context.getResultStats().getProto()); -// } -// return builder.build(); -// } -// -// public boolean isRunning(){ -// return context.getState() == TaskAttemptState.TA_RUNNING; -// } -// -// public boolean isProgressChanged() { -// return context.isProgressChanged(); -// } -// -// public void updateProgress() { -// if(context != null && context.isStopped()){ -// return; -// } -// -// if (executor != null && context.getProgress() < 1.0f) { -// context.setExecutorProgress(executor.getProgress()); -// } -// } -// -// private CatalogProtos.TableStatsProto reloadInputStats() { -// synchronized(inputStats) { -// if (this.executor == null) { -// return inputStats.getProto(); -// } -// -// TableStats executorInputStats = this.executor.getInputStats(); -// -// if (executorInputStats != null) { -// inputStats.setValues(executorInputStats); -// } -// return inputStats.getProto(); -// } -// } -// -// private TaskCompletionReport getTaskCompletionReport() { -// TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); -// builder.setId(context.getTaskId().getProto()); -// -// builder.setInputStats(reloadInputStats()); -// -// if (context.hasResultStats()) { -// builder.setResultStats(context.getResultStats().getProto()); -// } else { -// builder.setResultStats(new TableStats().getProto()); -// } -// -// Iterator> it = context.getShuffleFileOutputs(); -// if (it.hasNext()) { -// do { -// Entry entry = it.next(); -// ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); -// part.setPartId(entry.getKey()); -// -// // Set output volume -// if (context.getPartitionOutputVolume() != null) { -// for (Entry e : context.getPartitionOutputVolume().entrySet()) { -// if (entry.getKey().equals(e.getKey())) { -// part.setVolume(e.getValue().longValue()); -// break; -// } -// } -// } -// -// builder.addShuffleFileOutputs(part.build()); -// } while (it.hasNext()); -// } -// -// return builder.build(); -// } -// -// private void waitForFetch() throws InterruptedException, IOException { -// context.getFetchLatch().await(); -// LOG.info(context.getTaskId() + " All fetches are done!"); -// Collection inputs = Lists.newArrayList(context.getInputTables()); -// -// // Get all broadcasted tables -// Set broadcastTableNames = new HashSet(); -// List broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); -// if (broadcasts != null) { -// for (EnforceProperty eachBroadcast : broadcasts) { -// broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); -// } -// } -// -// // localize the fetched data and skip the broadcast table -// for (String inputTable: inputs) { -// if (broadcastTableNames.contains(inputTable)) { -// continue; -// } -// File tableDir = new File(context.getFetchIn(), inputTable); -// FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); -// context.updateAssignedFragments(inputTable, frags); -// } -// } -// -// public void run() throws Exception { -// startTime = System.currentTimeMillis(); -// Throwable error = null; -// try { -// if(!context.isStopped()) { -// context.setState(TaskAttemptState.TA_RUNNING); -// if (context.hasFetchPhase()) { -// // If the fetch is still in progress, the query unit must wait for -// // complete. -// waitForFetch(); -// context.setFetcherProgress(FETCHER_PROGRESS); -// context.setProgressChanged(true); -// updateProgress(); -// } -// -// this.executor = executionBlockContext.getTQueryEngine(). -// createPlan(context, plan); -// this.executor.init(); -// -// while(!context.isStopped() && executor.next() != null) { -// } -// } -// } catch (Throwable e) { -// error = e ; -// LOG.error(e.getMessage(), e); -// stopScriptExecutors(); -// context.stop(); -// } finally { -// if (executor != null) { -// try { -// executor.close(); -// reloadInputStats(); -// } catch (IOException e) { -// LOG.error(e, e); -// } -// this.executor = null; -// } -// -// executionBlockContext.completedTasksNum.incrementAndGet(); -// context.getHashShuffleAppenderManager().finalizeTask(taskId); -// -// QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); -// if (context.isStopped()) { -// context.setExecutorProgress(0.0f); -// -// if (context.getState() == TaskAttemptState.TA_KILLED) { -// queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); -// executionBlockContext.killedTasksNum.incrementAndGet(); -// } else { -// context.setState(TaskAttemptState.TA_FAILED); -// TaskFatalErrorReport.Builder errorBuilder = -// TaskFatalErrorReport.newBuilder() -// .setId(getId().getProto()); -// if (error != null) { -// if (error.getMessage() == null) { -// errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); -// } else { -// errorBuilder.setErrorMessage(error.getMessage()); -// } -// errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); -// } -// -// queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); -// executionBlockContext.failedTasksNum.incrementAndGet(); -// } -// } else { -// // if successful -// context.setProgress(1.0f); -// context.setState(TaskAttemptState.TA_SUCCEEDED); -// executionBlockContext.succeededTasksNum.incrementAndGet(); -// -// TaskCompletionReport report = getTaskCompletionReport(); -// queryMasterStub.done(null, report, NullCallback.get()); -// } -// finishTime = System.currentTimeMillis(); -// LOG.info(context.getTaskId() + " completed. " + -// "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + -// ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() -// + ", killed: " + executionBlockContext.killedTasksNum.intValue() -// + ", failed: " + executionBlockContext.failedTasksNum.intValue()); -// cleanupTask(); -// } -// } -// -// public void cleanupTask() { -// TaskHistory taskHistory = createTaskHistory(); -// executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); -// executionBlockContext.getTasks().remove(getId()); -// -// fetcherRunners.clear(); -// fetcherRunners = null; -// try { -// if(executor != null) { -// executor.close(); -// executor = null; -// } -// } catch (IOException e) { -// LOG.fatal(e.getMessage(), e); -// } -// -// executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); -// stopScriptExecutors(); -// } -// -// public TaskHistory createTaskHistory() { -// TaskHistory taskHistory = null; -// try { -// taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(), -// startTime, finishTime, reloadInputStats()); -// -// if (context.getOutputPath() != null) { -// taskHistory.setOutputPath(context.getOutputPath().toString()); -// } -// -// if (context.getWorkDir() != null) { -// taskHistory.setWorkingPath(context.getWorkDir().toString()); -// } -// -// if (context.getResultStats() != null) { -// taskHistory.setOutputStats(context.getResultStats().getProto()); -// } -// -// if (hasFetchPhase()) { -// taskHistory.setTotalFetchCount(fetcherRunners.size()); -// int i = 0; -// FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); -// for (Fetcher fetcher : fetcherRunners) { -// // TODO store the fetcher histories -// if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { -// builder.setStartTime(fetcher.getStartTime()); -// builder.setFinishTime(fetcher.getFinishTime()); -// builder.setFileLength(fetcher.getFileLen()); -// builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); -// builder.setState(fetcher.getState()); -// -// taskHistory.addFetcherHistory(builder.build()); -// } -// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; -// } -// taskHistory.setFinishedFetchCount(i); -// } -// } catch (Exception e) { -// LOG.warn(e.getMessage(), e); -// } -// -// return taskHistory; -// } -// -// public int hashCode() { -// return context.hashCode(); -// } -// -// public boolean equals(Object obj) { -// if (obj instanceof Task) { -// Task other = (Task) obj; -// return this.context.equals(other.context); -// } -// return false; -// } -// -// private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) -// throws IOException { -// Configuration c = new Configuration(systemConf); -// c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); -// FileSystem fs = FileSystem.get(c); -// Path tablePath = new Path(file.getAbsolutePath()); -// -// List listTablets = new ArrayList(); -// FileFragment tablet; -// -// FileStatus[] fileLists = fs.listStatus(tablePath); -// for (FileStatus f : fileLists) { -// if (f.getLen() == 0) { -// continue; -// } -// tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); -// listTablets.add(tablet); -// } -// -// // Special treatment for locally pseudo fetched chunks -// synchronized (localChunks) { -// for (FileChunk chunk : localChunks) { -// if (name.equals(chunk.getEbId())) { -// tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); -// listTablets.add(tablet); -// LOG.info("One local chunk is added to listTablets"); -// } -// } -// } -// -// FileFragment[] tablets = new FileFragment[listTablets.size()]; -// listTablets.toArray(tablets); -// -// return tablets; -// } -// -// private class FetchRunner implements Runnable { -// private final TaskAttemptContext ctx; -// private final Fetcher fetcher; -// private int maxRetryNum; -// -// public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { -// this.ctx = ctx; -// this.fetcher = fetcher; -// this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); -// } -// -// @Override -// public void run() { -// int retryNum = 0; -// int retryWaitTime = 1000; //sec -// -// try { // for releasing fetch latch -// while(!context.isStopped() && retryNum < maxRetryNum) { -// if (retryNum > 0) { -// try { -// Thread.sleep(retryWaitTime); -// retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds -// } catch (InterruptedException e) { -// LOG.error(e); -// } -// LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); -// } -// try { -// FileChunk fetched = fetcher.get(); -// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null -// && fetched.getFile() != null) { -// if (fetched.fromRemote() == false) { -// localChunks.add(fetched); -// LOG.info("Add a new FileChunk to local chunk list"); -// } -// break; -// } -// } catch (Throwable e) { -// LOG.error("Fetch failed: " + fetcher.getURI(), e); -// } -// retryNum++; -// } -// } finally { -// if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ -// fetcherFinished(ctx); -// } else { -// if (retryNum == maxRetryNum) { -// LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); -// } -// stopScriptExecutors(); -// context.stop(); // retry task -// ctx.getFetchLatch().countDown(); -// } -// } -// } -// } -// -// @VisibleForTesting -// public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { -// if (totalFetcher > 0) { -// return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; -// } else { -// return 0.0f; -// } -// } -// -// private synchronized void fetcherFinished(TaskAttemptContext ctx) { -// int fetcherSize = fetcherRunners.size(); -// if(fetcherSize == 0) { -// return; -// } -// -// ctx.getFetchLatch().countDown(); -// -// int remainFetcher = (int) ctx.getFetchLatch().getCount(); -// if (remainFetcher == 0) { -// context.setFetcherProgress(FETCHER_PROGRESS); -// } else { -// context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); -// context.setProgressChanged(true); -// } -// } -// -// private List getFetchRunners(TaskAttemptContext ctx, -// List fetches) throws IOException { -// -// if (fetches.size() > 0) { -// Path inputDir = executionBlockContext.getLocalDirAllocator(). -// getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); -// -// int i = 0; -// File storeDir; -// File defaultStoreFile; -// FileChunk storeChunk = null; -// List runnerList = Lists.newArrayList(); -// -// for (FetchImpl f : fetches) { -// storeDir = new File(inputDir.toString(), f.getName()); -// if (!storeDir.exists()) { -// storeDir.mkdirs(); -// } -// -// for (URI uri : f.getURIs()) { -// defaultStoreFile = new File(storeDir, "in_" + i); -// InetAddress address = InetAddress.getByName(uri.getHost()); -// -// WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); -// if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { -// boolean hasError = false; -// try { -// LOG.info("Try to get local file chunk at local host"); -// storeChunk = getLocalStoredFileChunk(uri, systemConf); -// } catch (Throwable t) { -// hasError = true; -// } -// -// // When a range request is out of range, storeChunk will be NULL. This case is normal state. -// // So, we should skip and don't need to create storeChunk. -// if (storeChunk == null && !hasError) { -// continue; -// } -// -// if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 -// && hasError == false) { -// storeChunk.setFromRemote(false); -// } else { -// storeChunk = new FileChunk(defaultStoreFile, 0, -1); -// storeChunk.setFromRemote(true); -// } -// } else { -// storeChunk = new FileChunk(defaultStoreFile, 0, -1); -// storeChunk.setFromRemote(true); -// } -// -// // If we decide that intermediate data should be really fetched from a remote host, storeChunk -// // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it -// storeChunk.setEbId(f.getName()); -// Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); -// LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); -// runnerList.add(fetcher); -// i++; -// } -// } -// ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); -// return runnerList; -// } else { -// return Lists.newArrayList(); -// } -// } -// -// private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { -// // Parse the URI -// LOG.info("getLocalStoredFileChunk starts"); -// final Map> params = new QueryStringDecoder(fetchURI.toString()).parameters(); -// final List types = params.get("type"); -// final List qids = params.get("qid"); -// final List taskIdList = params.get("ta"); -// final List stageIds = params.get("sid"); -// final List partIds = params.get("p"); -// final List offsetList = params.get("offset"); -// final List lengthList = params.get("length"); -// -// if (types == null || stageIds == null || qids == null || partIds == null) { -// LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); -// return null; -// } -// -// if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { -// LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); -// return null; -// } -// -// String queryId = qids.get(0); -// String shuffleType = types.get(0); -// String sid = stageIds.get(0); -// String partId = partIds.get(0); -// -// if (shuffleType.equals("r") && taskIdList == null) { -// LOG.error("Invalid URI - For range shuffle, taskId is required"); -// return null; -// } -// List taskIds = splitMaps(taskIdList); -// -// FileChunk chunk = null; -// long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; -// long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; -// -// LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId -// + ", taskIds=" + taskIdList); -// -// // The working directory of Tajo worker for each query, including stage -// String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; -// -// // If the stage requires a range shuffle -// if (shuffleType.equals("r")) { -// String ta = taskIds.get(0); -// if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { -// LOG.warn("Range shuffle - file not exist"); -// return null; -// } -// Path path = executionBlockContext.getLocalFS().makeQualified( -// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); -// String startKey = params.get("start").get(0); -// String endKey = params.get("end").get(0); -// boolean last = params.get("final") != null; -// -// try { -// chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); -// } catch (Throwable t) { -// LOG.error("getFileChunks() throws exception"); -// return null; -// } -// -// // If the stage requires a hash shuffle or a scattered hash shuffle -// } else if (shuffleType.equals("h") || shuffleType.equals("s")) { -// int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); -// String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; -// if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { -// LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); -// return null; -// } -// Path path = executionBlockContext.getLocalFS().makeQualified( -// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); -// File file = new File(path.toUri()); -// long startPos = (offset >= 0 && length >= 0) ? offset : 0; -// long readLen = (offset >= 0 && length >= 0) ? length : file.length(); -// -// if (startPos >= file.length()) { -// LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); -// return null; -// } -// chunk = new FileChunk(file, startPos, readLen); -// -// } else { -// LOG.error("Unknown shuffle type"); -// return null; -// } -// -// return chunk; -// } -// -// private List splitMaps(List mapq) { -// if (null == mapq) { -// return null; -// } -// final List ret = new ArrayList(); -// for (String s : mapq) { -// Collections.addAll(ret, s.split(",")); -// } -// return ret; -// } -// -// public static Path getTaskAttemptDir(TaskAttemptId quid) { -// Path workDir = -// StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), -// String.valueOf(quid.getTaskId().getId()), -// String.valueOf(quid.getId())); -// return workDir; -// } -// -// private static Path getIndexStagingPath(QueryContext queryContext, TaskAttemptContext context) { -// return StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME, -// context.getUniqueKeyFromFragments()); -// } -//======= public interface Task { void init() throws IOException; @@ -866,633 +48,5 @@ public interface Task { ExecutionBlockContext getExecutionBlockContext(); -//<<<<<<< HEAD -// -// public TaskAttemptId getTaskId() { -// return taskId; -// } -// -// public TaskAttemptId getId() { -// return context.getTaskId(); -// } -// -// public TaskAttemptState getStatus() { -// return context.getState(); -// } -// -// public String toString() { -// return "queryId: " + this.getId() + " status: " + this.getStatus(); -// } -// -// public void setState(TaskAttemptState status) { -// context.setState(status); -// } -// -// public TaskAttemptContext getContext() { -// return context; -// } -// -// public boolean hasFetchPhase() { -// return fetcherRunners.size() > 0; -// } -// -// public List getFetchers() { -// return new ArrayList(fetcherRunners); -// } -// -// public void fetch() { -// ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); -// for (Fetcher f : fetcherRunners) { -// executorService.submit(new FetchRunner(context, f)); -// } -// } -// -// public void kill() { -// stopScriptExecutors(); -// context.setState(TaskAttemptState.TA_KILLED); -// context.stop(); -// } -// -// public void abort() { -// stopScriptExecutors(); -// context.stop(); -// } -// -// public void cleanUp() { -// // remove itself from worker -// if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { -// synchronized (executionBlockContext.getTasks()) { -// executionBlockContext.getTasks().remove(this.getId()); -// } -// } else { -// LOG.error("TaskAttemptId: " + context.getTaskId() + " status: " + context.getState()); -// } -// } -// -// public TaskStatusProto getReport() { -// TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); -// builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); -// builder.setId(context.getTaskId().getProto()) -// .setProgress(context.getProgress()) -// .setState(context.getState()); -// -// builder.setInputStats(reloadInputStats()); -// -// if (context.getResultStats() != null) { -// builder.setResultStats(context.getResultStats().getProto()); -// } -// return builder.build(); -// } -// -// public boolean isRunning(){ -// return context.getState() == TaskAttemptState.TA_RUNNING; -// } -// -// public boolean isProgressChanged() { -// return context.isProgressChanged(); -// } -// -// public void updateProgress() { -// if(context != null && context.isStopped()){ -// return; -// } -// -// if (executor != null && context.getProgress() < 1.0f) { -// context.setExecutorProgress(executor.getProgress()); -// } -// } -// -// private CatalogProtos.TableStatsProto reloadInputStats() { -// synchronized(inputStats) { -// if (this.executor == null) { -// return inputStats.getProto(); -// } -// -// TableStats executorInputStats = this.executor.getInputStats(); -// -// if (executorInputStats != null) { -// inputStats.setValues(executorInputStats); -// } -// return inputStats.getProto(); -// } -// } -// -// private TaskCompletionReport getTaskCompletionReport() { -// TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); -// builder.setId(context.getTaskId().getProto()); -// -// builder.setInputStats(reloadInputStats()); -// -// if (context.hasResultStats()) { -// builder.setResultStats(context.getResultStats().getProto()); -// } else { -// builder.setResultStats(new TableStats().getProto()); -// } -// -// Iterator> it = context.getShuffleFileOutputs(); -// if (it.hasNext()) { -// do { -// Entry entry = it.next(); -// ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); -// part.setPartId(entry.getKey()); -// -// // Set output volume -// if (context.getPartitionOutputVolume() != null) { -// for (Entry e : context.getPartitionOutputVolume().entrySet()) { -// if (entry.getKey().equals(e.getKey())) { -// part.setVolume(e.getValue().longValue()); -// break; -// } -// } -// } -// -// builder.addShuffleFileOutputs(part.build()); -// } while (it.hasNext()); -// } -// -// return builder.build(); -// } -// -// private void waitForFetch() throws InterruptedException, IOException { -// context.getFetchLatch().await(); -// LOG.info(context.getTaskId() + " All fetches are done!"); -// Collection inputs = Lists.newArrayList(context.getInputTables()); -// -// // Get all broadcasted tables -// Set broadcastTableNames = new HashSet(); -// List broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); -// if (broadcasts != null) { -// for (EnforceProperty eachBroadcast : broadcasts) { -// broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); -// } -// } -// -// // localize the fetched data and skip the broadcast table -// for (String inputTable: inputs) { -// if (broadcastTableNames.contains(inputTable)) { -// continue; -// } -// File tableDir = new File(context.getFetchIn(), inputTable); -// FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); -// context.updateAssignedFragments(inputTable, frags); -// } -// } -// -// public void run() throws Exception { -// startTime = System.currentTimeMillis(); -// Throwable error = null; -// try { -// if(!context.isStopped()) { -// context.setState(TaskAttemptState.TA_RUNNING); -// if (context.hasFetchPhase()) { -// // If the fetch is still in progress, the query unit must wait for -// // complete. -// waitForFetch(); -// context.setFetcherProgress(FETCHER_PROGRESS); -// context.setProgressChanged(true); -// updateProgress(); -// } -// -// this.executor = executionBlockContext.getTQueryEngine(). -// createPlan(context, plan); -// this.executor.init(); -// -// while(!context.isStopped() && executor.next() != null) { -// } -// } -// } catch (Throwable e) { -// error = e ; -// LOG.error(e.getMessage(), e); -// stopScriptExecutors(); -// context.stop(); -// } finally { -// if (executor != null) { -// try { -// executor.close(); -// reloadInputStats(); -// } catch (IOException e) { -// LOG.error(e, e); -// } -// this.executor = null; -// } -// -// executionBlockContext.completedTasksNum.incrementAndGet(); -// context.getHashShuffleAppenderManager().finalizeTask(taskId); -// -// QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); -// if (context.isStopped()) { -// context.setExecutorProgress(0.0f); -// -// if (context.getState() == TaskAttemptState.TA_KILLED) { -// queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); -// executionBlockContext.killedTasksNum.incrementAndGet(); -// } else { -// context.setState(TaskAttemptState.TA_FAILED); -// TaskFatalErrorReport.Builder errorBuilder = -// TaskFatalErrorReport.newBuilder() -// .setId(getId().getProto()); -// if (error != null) { -// if (error.getMessage() == null) { -// errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); -// } else { -// errorBuilder.setErrorMessage(error.getMessage()); -// } -// errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); -// } -// -// queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); -// executionBlockContext.failedTasksNum.incrementAndGet(); -// } -// } else { -// // if successful -// context.setProgress(1.0f); -// context.setState(TaskAttemptState.TA_SUCCEEDED); -// executionBlockContext.succeededTasksNum.incrementAndGet(); -// -// TaskCompletionReport report = getTaskCompletionReport(); -// queryMasterStub.done(null, report, NullCallback.get()); -// } -// finishTime = System.currentTimeMillis(); -// LOG.info(context.getTaskId() + " completed. " + -// "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + -// ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() -// + ", killed: " + executionBlockContext.killedTasksNum.intValue() -// + ", failed: " + executionBlockContext.failedTasksNum.intValue()); -// cleanupTask(); -// } -// } -// -// public void cleanupTask() { -// TaskHistory taskHistory = createTaskHistory(); -// executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); -// executionBlockContext.getTasks().remove(getId()); -// -// fetcherRunners.clear(); -// fetcherRunners = null; -// try { -// if(executor != null) { -// executor.close(); -// executor = null; -// } -// } catch (IOException e) { -// LOG.fatal(e.getMessage(), e); -// } -// -// executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); -// stopScriptExecutors(); -// } -// -// public TaskHistory createTaskHistory() { -// TaskHistory taskHistory = null; -// try { -// taskHistory = new TaskHistory(getTaskId(), getStatus(), context.getProgress(), -// startTime, finishTime, reloadInputStats()); -// -// if (context.getOutputPath() != null) { -// taskHistory.setOutputPath(context.getOutputPath().toString()); -// } -// -// if (context.getWorkDir() != null) { -// taskHistory.setWorkingPath(context.getWorkDir().toString()); -// } -// -// if (context.getResultStats() != null) { -// taskHistory.setOutputStats(context.getResultStats().getProto()); -// } -// -// if (hasFetchPhase()) { -// taskHistory.setTotalFetchCount(fetcherRunners.size()); -// int i = 0; -// FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); -// for (Fetcher fetcher : fetcherRunners) { -// // TODO store the fetcher histories -// if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { -// builder.setStartTime(fetcher.getStartTime()); -// builder.setFinishTime(fetcher.getFinishTime()); -// builder.setFileLength(fetcher.getFileLen()); -// builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); -// builder.setState(fetcher.getState()); -// -// taskHistory.addFetcherHistory(builder.build()); -// } -// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; -// } -// taskHistory.setFinishedFetchCount(i); -// } -// } catch (Exception e) { -// LOG.warn(e.getMessage(), e); -// } -// -// return taskHistory; -// } -// -// public int hashCode() { -// return context.hashCode(); -// } -// -// public boolean equals(Object obj) { -// if (obj instanceof Task) { -// Task other = (Task) obj; -// return this.context.equals(other.context); -// } -// return false; -// } -// -// private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) -// throws IOException { -// Configuration c = new Configuration(systemConf); -// c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); -// FileSystem fs = FileSystem.get(c); -// Path tablePath = new Path(file.getAbsolutePath()); -// -// List listTablets = new ArrayList(); -// FileFragment tablet; -// -// FileStatus[] fileLists = fs.listStatus(tablePath); -// for (FileStatus f : fileLists) { -// if (f.getLen() == 0) { -// continue; -// } -// tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); -// listTablets.add(tablet); -// } -// -// // Special treatment for locally pseudo fetched chunks -// synchronized (localChunks) { -// for (FileChunk chunk : localChunks) { -// if (name.equals(chunk.getEbId())) { -// tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); -// listTablets.add(tablet); -// LOG.info("One local chunk is added to listTablets"); -// } -// } -// } -// -// FileFragment[] tablets = new FileFragment[listTablets.size()]; -// listTablets.toArray(tablets); -// -// return tablets; -// } -// -// private class FetchRunner implements Runnable { -// private final TaskAttemptContext ctx; -// private final Fetcher fetcher; -// private int maxRetryNum; -// -// public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { -// this.ctx = ctx; -// this.fetcher = fetcher; -// this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); -// } -// -// @Override -// public void run() { -// int retryNum = 0; -// int retryWaitTime = 1000; //sec -// -// try { // for releasing fetch latch -// while(!context.isStopped() && retryNum < maxRetryNum) { -// if (retryNum > 0) { -// try { -// Thread.sleep(retryWaitTime); -// retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds -// } catch (InterruptedException e) { -// LOG.error(e); -// } -// LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); -// } -// try { -// FileChunk fetched = fetcher.get(); -// if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null -// && fetched.getFile() != null) { -// if (fetched.fromRemote() == false) { -// localChunks.add(fetched); -// LOG.info("Add a new FileChunk to local chunk list"); -// } -// break; -// } -// } catch (Throwable e) { -// LOG.error("Fetch failed: " + fetcher.getURI(), e); -// } -// retryNum++; -// } -// } finally { -// if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ -// fetcherFinished(ctx); -// } else { -// if (retryNum == maxRetryNum) { -// LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); -// } -// stopScriptExecutors(); -// context.stop(); // retry task -// ctx.getFetchLatch().countDown(); -// } -// } -// } -// } -// -// @VisibleForTesting -// public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { -// if (totalFetcher > 0) { -// return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; -// } else { -// return 0.0f; -// } -// } -// -// private synchronized void fetcherFinished(TaskAttemptContext ctx) { -// int fetcherSize = fetcherRunners.size(); -// if(fetcherSize == 0) { -// return; -// } -// -// ctx.getFetchLatch().countDown(); -// -// int remainFetcher = (int) ctx.getFetchLatch().getCount(); -// if (remainFetcher == 0) { -// context.setFetcherProgress(FETCHER_PROGRESS); -// } else { -// context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); -// context.setProgressChanged(true); -// } -// } -// -// private List getFetchRunners(TaskAttemptContext ctx, -// List fetches) throws IOException { -// -// if (fetches.size() > 0) { -// Path inputDir = executionBlockContext.getLocalDirAllocator(). -// getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); -// -// int i = 0; -// File storeDir; -// File defaultStoreFile; -// FileChunk storeChunk = null; -// List runnerList = Lists.newArrayList(); -// -// for (FetchImpl f : fetches) { -// storeDir = new File(inputDir.toString(), f.getName()); -// if (!storeDir.exists()) { -// storeDir.mkdirs(); -// } -// -// for (URI uri : f.getURIs()) { -// defaultStoreFile = new File(storeDir, "in_" + i); -// InetAddress address = InetAddress.getByName(uri.getHost()); -// -// WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); -// if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { -// boolean hasError = false; -// try { -// LOG.info("Try to get local file chunk at local host"); -// storeChunk = getLocalStoredFileChunk(uri, systemConf); -// } catch (Throwable t) { -// hasError = true; -// } -// -// // When a range request is out of range, storeChunk will be NULL. This case is normal state. -// // So, we should skip and don't need to create storeChunk. -// if (storeChunk == null && !hasError) { -// continue; -// } -// -// if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 -// && hasError == false) { -// storeChunk.setFromRemote(false); -// } else { -// storeChunk = new FileChunk(defaultStoreFile, 0, -1); -// storeChunk.setFromRemote(true); -// } -// } else { -// storeChunk = new FileChunk(defaultStoreFile, 0, -1); -// storeChunk.setFromRemote(true); -// } -// -// // If we decide that intermediate data should be really fetched from a remote host, storeChunk -// // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it -// storeChunk.setEbId(f.getName()); -// Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); -// LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); -// runnerList.add(fetcher); -// i++; -// } -// } -// ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); -// return runnerList; -// } else { -// return Lists.newArrayList(); -// } -// } -// -// private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { -// // Parse the URI -// LOG.info("getLocalStoredFileChunk starts"); -// final Map> params = new QueryStringDecoder(fetchURI.toString()).parameters(); -// final List types = params.get("type"); -// final List qids = params.get("qid"); -// final List taskIdList = params.get("ta"); -// final List stageIds = params.get("sid"); -// final List partIds = params.get("p"); -// final List offsetList = params.get("offset"); -// final List lengthList = params.get("length"); -// -// if (types == null || stageIds == null || qids == null || partIds == null) { -// LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); -// return null; -// } -// -// if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { -// LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); -// return null; -// } -// -// String queryId = qids.get(0); -// String shuffleType = types.get(0); -// String sid = stageIds.get(0); -// String partId = partIds.get(0); -// -// if (shuffleType.equals("r") && taskIdList == null) { -// LOG.error("Invalid URI - For range shuffle, taskId is required"); -// return null; -// } -// List taskIds = splitMaps(taskIdList); -// -// FileChunk chunk = null; -// long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; -// long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; -// -// LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId -// + ", taskIds=" + taskIdList); -// -// // The working directory of Tajo worker for each query, including stage -// String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; -// -// // If the stage requires a range shuffle -// if (shuffleType.equals("r")) { -// String ta = taskIds.get(0); -// if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { -// LOG.warn("Range shuffle - file not exist"); -// return null; -// } -// Path path = executionBlockContext.getLocalFS().makeQualified( -// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); -// String startKey = params.get("start").get(0); -// String endKey = params.get("end").get(0); -// boolean last = params.get("final") != null; -// -// try { -// chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); -// } catch (Throwable t) { -// LOG.error("getFileChunks() throws exception"); -// return null; -// } -// -// // If the stage requires a hash shuffle or a scattered hash shuffle -// } else if (shuffleType.equals("h") || shuffleType.equals("s")) { -// int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); -// String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; -// if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { -// LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); -// return null; -// } -// Path path = executionBlockContext.getLocalFS().makeQualified( -// executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); -// File file = new File(path.toUri()); -// long startPos = (offset >= 0 && length >= 0) ? offset : 0; -// long readLen = (offset >= 0 && length >= 0) ? length : file.length(); -// -// if (startPos >= file.length()) { -// LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); -// return null; -// } -// chunk = new FileChunk(file, startPos, readLen); -// -// } else { -// LOG.error("Unknown shuffle type"); -// return null; -// } -// -// return chunk; -// } -// -// private List splitMaps(List mapq) { -// if (null == mapq) { -// return null; -// } -// final List ret = new ArrayList(); -// for (String s : mapq) { -// Collections.addAll(ret, s.split(",")); -// } -// return ret; -// } -// -// public static Path getTaskAttemptDir(TaskAttemptId quid) { -// Path workDir = -// StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), -// String.valueOf(quid.getTaskId().getId()), -// String.valueOf(quid.getId())); -// return workDir; -// } -//======= TajoWorkerProtocol.TaskStatusProto getReport(); } From ac837af71881282bf5ff26bf57265b8231c18d98 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 18 Jul 2015 12:08:33 +0900 Subject: [PATCH 10/13] Fix test failures --- .../cli/tsql/DefaultTajoCliOutputFormatter.java | 1 + .../java/org/apache/tajo/cli/tsql/TajoCli.java | 7 +++++-- .../testAlterTableAddDropPartition.result | 14 +++++++------- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java index eb2aeb9aed..90ae1782b6 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/DefaultTajoCliOutputFormatter.java @@ -198,6 +198,7 @@ public void printErrorMessage(PrintWriter sout, QueryStatus status) { public void printQueryTypeMessage(PrintWriter sout, boolean isDDL, PlanTypesProto.PlanNodeType planNodeType) { if (isDDL) { sout.println(getPlanTypeString(planNodeType) + " OK"); + sout.flush(); } } diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index 8429a7614b..f590d52a61 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -31,7 +31,6 @@ import org.apache.tajo.cli.tsql.SimpleParser.ParsingState; import org.apache.tajo.cli.tsql.commands.*; import org.apache.tajo.client.*; -import org.apache.tajo.common.PlanTypesProto; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ipc.ClientProtos; @@ -42,9 +41,13 @@ import java.lang.reflect.Constructor; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.*; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; public class TajoCli { + public static final String ERROR_PREFIX = "ERROR: "; public static final String KILL_PREFIX = "KILL: "; diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result b/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result index c16c311dd6..1283b1fbf8 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testAlterTableAddDropPartition.result @@ -1,9 +1,9 @@ -OK +CREATE TABLE OK ERROR: "key2" column is not the partition key of "default.testaltertableaddpartition". -OK -OK +ALTER TABLE OK +ALTER TABLE OK ERROR: "key=0.1" is not the partition of "testaltertableaddpartition". -OK -OK -OK -OK \ No newline at end of file +DROP TABLE OK +CREATE TABLE OK +ALTER TABLE OK +ALTER TABLE OK \ No newline at end of file From cc72f490a124dace0f3e09fb119fc80f4a01805e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 18 Jul 2015 13:18:20 +0900 Subject: [PATCH 11/13] trigger travis --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 7ad6d51ff9..a9332942eb 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.11.0 - unreleased NEW FEATURES From 7f163b3e3ee780f6ad2eefe25ab282ed12cebcd0 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sun, 19 Jul 2015 10:27:05 +0900 Subject: [PATCH 12/13] triger travis --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index a9332942eb..7ad6d51ff9 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.11.0 - unreleased NEW FEATURES From 733488f170313b3827bccea80a317b800b9a0ae3 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 22 Jul 2015 22:58:54 +0900 Subject: [PATCH 13/13] trigger travis --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 8afa627337..62f972a488 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.11.0 - unreleased NEW FEATURES