From 01db4334775b3095385df6efc42d30591eafa443 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 25 Aug 2015 22:43:18 +0900 Subject: [PATCH 1/2] TAJO-1801 --- .../org/apache/tajo/querymaster/Query.java | 12 ++++------- .../tajo/querymaster/QueryMasterTask.java | 20 +++++++++++++------ .../tajo/querymaster/Repartitioner.java | 9 ++++++--- .../org/apache/tajo/querymaster/Stage.java | 17 ++++++++-------- 4 files changed, 33 insertions(+), 25 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index b09d5fd7f8..e79f4793c4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -34,26 +34,22 @@ import org.apache.tajo.SessionVars; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; -import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; import org.apache.tajo.engine.planner.global.ExecutionQueue; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; -import org.apache.tajo.plan.logical.*; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.master.event.*; +import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.QueryHistory; import org.apache.tajo.util.history.StageHistory; diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 52e0a96bae..6602a48459 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -91,7 +91,8 @@ public class QueryMasterTask extends CompositeService { private final long querySubmitTime; - private Map tableDescMap = new HashMap(); +// private Map tableDescMap = new HashMap(); + private final Map tableDescMap = new HashMap<>(); private TajoConf systemConf; @@ -332,7 +333,8 @@ public synchronized void startQuery() { if (scanNodes != null) { for (LogicalNode eachScanNode : scanNodes) { ScanNode scanNode = (ScanNode) eachScanNode; - tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); +// tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc()); } } @@ -340,7 +342,8 @@ public synchronized void startQuery() { if (scanNodes != null) { for (LogicalNode eachScanNode : scanNodes) { ScanNode scanNode = (ScanNode) eachScanNode; - tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); +// tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc()); } } @@ -348,7 +351,8 @@ public synchronized void startQuery() { if (scanNodes != null) { for (LogicalNode eachScanNode : scanNodes) { ScanNode scanNode = (ScanNode) eachScanNode; - tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); +// tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); + tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc()); } } } @@ -533,8 +537,12 @@ public Stage getStage(ExecutionBlockId id) { return query.getStage(id); } - public Map getTableDescMap() { - return tableDescMap; +// public Map getTableDescMap() { +// return tableDescMap; +// } + + public TableDesc getTableDesc(ScanNode scanNode) { + return tableDescMap.get(scanNode.getPID()); } public float getProgress() { diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 0d5880ef14..2537043543 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -88,7 +88,8 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC // initialize variables from the child operators for (int i = 0; i < scans.length; i++) { - TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); +// TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); + TableDesc tableDesc = masterContext.getTableDesc(scans[i]); if (tableDesc == null) { // if it is a real table stored on storage if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) { @@ -376,7 +377,8 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster for (ScanNode eachScan: broadcastScans) { Path[] partitionScanPaths = null; - TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); +// TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); + TableDesc tableDesc = masterContext.getTableDesc(eachScan); Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { @@ -498,7 +500,8 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch List broadcastFragments = new ArrayList(); for (int i = 0; i < scans.length; i++) { ScanNode scan = scans[i]; - TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName()); +// TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName()); + TableDesc desc = stage.getContext().getTableDesc(scan); TableMeta meta = desc.getMeta(); Collection scanFragments; diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index cf5cdbded0..2fb0d2fda4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -31,7 +31,6 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.catalog.statistics.StatisticsUtil; @@ -44,12 +43,12 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.exception.TajoException; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; -import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.querymaster.Task.IntermediateEntry; import org.apache.tajo.rpc.AsyncRpcClient; @@ -57,8 +56,8 @@ import org.apache.tajo.rpc.RpcClientManager; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.FileTablespace; -import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.Tablespace; +import org.apache.tajo.storage.TablespaceManager; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; @@ -76,9 +75,9 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.tajo.ResourceProtos.*; import static org.apache.tajo.conf.TajoConf.ConfVars; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; -import static org.apache.tajo.ResourceProtos.*; /** @@ -1051,12 +1050,13 @@ public static int getNonLeafTaskNum(Stage stage) { public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock execBlock) { - Map tableMap = context.getTableDescMap(); +// Map tableMap = context.getTableDescMap(); if (masterPlan.isLeaf(execBlock)) { ScanNode[] outerScans = execBlock.getScanNodes(); long maxVolume = 0; for (ScanNode eachScanNode: outerScans) { - TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats(); +// TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats(); + TableStats stat = context.getTableDesc(eachScanNode).getStats(); if (stat.getNumBytes() > maxVolume) { maxVolume = stat.getNumBytes(); } @@ -1082,7 +1082,8 @@ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOExceptio ScanNode[] scans = execBlock.getScanNodes(); Preconditions.checkArgument(scans.length == 1, "Must be Scan Query"); ScanNode scan = scans[0]; - TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName()); +// TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName()); + TableDesc table = stage.context.getTableDesc(scan); Collection fragments; Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri()).get(); From 972e93c788b0928073d76ce55cc48e4d51c00d32 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 26 Aug 2015 23:54:05 +0900 Subject: [PATCH 2/2] TAJO-1801 --- .../java/org/apache/tajo/querymaster/QueryMasterTask.java | 8 -------- .../java/org/apache/tajo/querymaster/Repartitioner.java | 3 --- .../src/main/java/org/apache/tajo/querymaster/Stage.java | 3 --- 3 files changed, 14 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 6602a48459..1313dade48 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -91,7 +91,6 @@ public class QueryMasterTask extends CompositeService { private final long querySubmitTime; -// private Map tableDescMap = new HashMap(); private final Map tableDescMap = new HashMap<>(); private TajoConf systemConf; @@ -333,7 +332,6 @@ public synchronized void startQuery() { if (scanNodes != null) { for (LogicalNode eachScanNode : scanNodes) { ScanNode scanNode = (ScanNode) eachScanNode; -// tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc()); } } @@ -342,7 +340,6 @@ public synchronized void startQuery() { if (scanNodes != null) { for (LogicalNode eachScanNode : scanNodes) { ScanNode scanNode = (ScanNode) eachScanNode; -// tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc()); } } @@ -351,7 +348,6 @@ public synchronized void startQuery() { if (scanNodes != null) { for (LogicalNode eachScanNode : scanNodes) { ScanNode scanNode = (ScanNode) eachScanNode; -// tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc()); tableDescMap.put(scanNode.getPID(), scanNode.getTableDesc()); } } @@ -537,10 +533,6 @@ public Stage getStage(ExecutionBlockId id) { return query.getStage(id); } -// public Map getTableDescMap() { -// return tableDescMap; -// } - public TableDesc getTableDesc(ScanNode scanNode) { return tableDescMap.get(scanNode.getPID()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index 2537043543..c4fc64514c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -88,7 +88,6 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC // initialize variables from the child operators for (int i = 0; i < scans.length; i++) { -// TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); TableDesc tableDesc = masterContext.getTableDesc(scans[i]); if (tableDesc == null) { // if it is a real table stored on storage @@ -377,7 +376,6 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster for (ScanNode eachScan: broadcastScans) { Path[] partitionScanPaths = null; -// TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); TableDesc tableDesc = masterContext.getTableDesc(eachScan); Tablespace space = TablespaceManager.get(tableDesc.getUri()).get(); @@ -500,7 +498,6 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch List broadcastFragments = new ArrayList(); for (int i = 0; i < scans.length; i++) { ScanNode scan = scans[i]; -// TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName()); TableDesc desc = stage.getContext().getTableDesc(scan); TableMeta meta = desc.getMeta(); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index e6f9e4e21d..12e43664a5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -1057,12 +1057,10 @@ public static int getNonLeafTaskNum(Stage stage) { public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock execBlock) { -// Map tableMap = context.getTableDescMap(); if (masterPlan.isLeaf(execBlock)) { ScanNode[] outerScans = execBlock.getScanNodes(); long maxVolume = 0; for (ScanNode eachScanNode: outerScans) { -// TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats(); TableStats stat = context.getTableDesc(eachScanNode).getStats(); if (stat.getNumBytes() > maxVolume) { maxVolume = stat.getNumBytes(); @@ -1089,7 +1087,6 @@ private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOExceptio ScanNode[] scans = execBlock.getScanNodes(); Preconditions.checkArgument(scans.length == 1, "Must be Scan Query"); ScanNode scan = scans[0]; -// TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName()); TableDesc table = stage.context.getTableDesc(scan); Collection fragments;