From f2413122ef724db0367e8dcf51a98def29d0f8b6 Mon Sep 17 00:00:00 2001 From: jhkim Date: Wed, 21 Jan 2015 11:49:23 +0900 Subject: [PATCH 1/3] TAJO-1309: Add missing break point in physical operator. --- .../engine/planner/physical/BNLJoinExec.java | 3 ++- .../planner/physical/ExternalSortExec.java | 2 +- .../planner/physical/HashAggregateExec.java | 2 +- .../HashBasedColPartitionStoreExec.java | 2 +- .../physical/HashFullOuterJoinExec.java | 4 ++-- .../engine/planner/physical/HashJoinExec.java | 4 ++-- .../physical/HashLeftAntiJoinExec.java | 4 ++-- .../physical/HashLeftOuterJoinExec.java | 4 ++-- .../physical/HashLeftSemiJoinExec.java | 2 +- .../physical/HashShuffleFileWriteExec.java | 2 +- .../engine/planner/physical/HavingExec.java | 2 +- .../engine/planner/physical/MemSortExec.java | 2 +- .../physical/MergeFullOuterJoinExec.java | 3 ++- .../planner/physical/MergeJoinExec.java | 3 ++- .../engine/planner/physical/NLJoinExec.java | 3 ++- .../planner/physical/NLLeftOuterJoinExec.java | 3 ++- .../physical/PartitionMergeScanExec.java | 5 ++--- .../physical/RangeShuffleFileWriteExec.java | 2 +- .../physical/RightOuterMergeJoinExec.java | 3 ++- .../planner/physical/SelectionExec.java | 2 +- .../engine/planner/physical/SeqScanExec.java | 2 +- .../SortBasedColPartitionStoreExec.java | 2 +- .../planner/physical/StoreTableExec.java | 2 +- .../apache/tajo/master/QueryInProgress.java | 8 +++++-- .../tajo/master/TajoContainerProxy.java | 6 ++--- .../tajo/worker/TajoResourceAllocator.java | 6 ++--- .../java/org/apache/tajo/worker/Task.java | 22 ++++++++----------- .../tajo/worker/TaskAttemptContext.java | 2 +- 28 files changed, 56 insertions(+), 51 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java index 117b04c4b8..14cf567e0b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java @@ -128,7 +128,7 @@ public Tuple next() throws IOException { rightEnd = true; } - while (true) { + while (!context.isStopped()) { if (!rightIterator.hasNext()) { // if leftIterator ended if (leftIterator.hasNext()) { // if rightTupleslot remains leftTuple = leftIterator.next(); @@ -201,6 +201,7 @@ public Tuple next() throws IOException { return outputTuple; } } + return null; } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 4e19114e5a..c3f9d3df8f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -203,7 +203,7 @@ private List sortAndStoreAllChunks() throws IOException { int chunkId = 0; long runStartTime = System.currentTimeMillis(); - while ((tuple = child.next()) != null) { // partition sort start + while (!context.isStopped() && (tuple = child.next()) != null) { // partition sort start Tuple vtuple = new VTuple(tuple); inMemoryTable.add(vtuple); memoryConsumption += MemoryUtil.calculateMemorySize(vtuple); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java index 80bba2b6b8..0d1bf3dceb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java @@ -48,7 +48,7 @@ public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec private void compute() throws IOException { Tuple tuple; Tuple keyTuple; - while((tuple = child.next()) != null && !context.isStopped()) { + while(!context.isStopped() && (tuple = child.next()) != null) { keyTuple = new VTuple(groupingKeyIds.length); // build one key tuple for(int i = 0; i < groupingKeyIds.length; i++) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index c28a5cd676..e94bc262f5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -67,7 +67,7 @@ private Appender getAppender(String partition) throws IOException { public Tuple next() throws IOException { Tuple tuple; StringBuilder sb = new StringBuilder(); - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { // set subpartition directory name sb.delete(0, sb.length()); if (keyIds != null) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java index 28d9a3e348..9cd13fb5f5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java @@ -144,7 +144,7 @@ public Tuple next() throws IOException { Tuple rightTuple; boolean found = false; - while(!finished) { + while(!context.isStopped() && !finished) { if (shouldGetLeftTuple) { // initially, it is true. // getting new outer leftTuple = leftChild.next(); // it comes from a disk @@ -208,7 +208,7 @@ protected void loadRightToHashTable() throws IOException { Tuple tuple; Tuple keyTuple; - while ((tuple = rightChild.next()) != null) { + while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); for (int i = 0; i < rightKeyList.length; i++) { keyTuple.put(i, tuple.get(rightKeyList[i])); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index 701297f4e0..38728b5338 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -111,7 +111,7 @@ public Tuple next() throws IOException { Tuple rightTuple; boolean found = false; - while(!finished) { + while(!context.isStopped() && !finished) { if (shouldGetLeftTuple) { // initially, it is true. // getting new outer leftTuple = leftChild.next(); // it comes from a disk @@ -156,7 +156,7 @@ protected void loadRightToHashTable() throws IOException { Tuple tuple; Tuple keyTuple; - while ((tuple = rightChild.next()) != null) { + while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); for (int i = 0; i < rightKeyList.length; i++) { keyTuple.put(i, tuple.get(rightKeyList[i])); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java index 236f5e3644..cceed3e8d0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java @@ -64,7 +64,7 @@ public Tuple next() throws IOException { Tuple rightTuple; boolean notFound; - while(!finished) { + while(!context.isStopped() && !finished) { // getting new outer leftTuple = leftChild.next(); // it comes from a disk @@ -89,7 +89,7 @@ public Tuple next() throws IOException { // Reach here only when a hash bucket is found. Then, it checks all tuples in the found bucket. // If it finds a matched tuple, it escapes the loop for all tuples in the hash bucket. notFound = true; - while (notFound && iterator.hasNext()) { + while (!context.isStopped() && notFound && iterator.hasNext()) { rightTuple = iterator.next(); frameTuple.set(leftTuple, rightTuple); if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index c1b6522da0..233ef9270b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -138,7 +138,7 @@ public Tuple next() throws IOException { Tuple rightTuple; boolean found = false; - while(!finished) { + while(!context.isStopped() && !finished) { if (shouldGetLeftTuple) { // initially, it is true. // getting new outer @@ -204,7 +204,7 @@ protected void loadRightToHashTable() throws IOException { Tuple tuple; Tuple keyTuple; - while ((tuple = rightChild.next()) != null) { + while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); for (int i = 0; i < rightKeyList.length; i++) { keyTuple.put(i, tuple.get(rightKeyList[i])); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java index 5196a63c0d..37c6d0e196 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java @@ -70,7 +70,7 @@ public Tuple next() throws IOException { Tuple rightTuple; boolean notFound; - while(!finished) { + while(!context.isStopped() && !finished) { // getting new outer leftTuple = leftChild.next(); // it comes from a disk diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 3c4949f2a3..28974f911e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -103,7 +103,7 @@ public Tuple next() throws IOException { int partId; int tupleCount = 0; long numRows = 0; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { tupleCount++; numRows++; diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java index f9f4351004..e9a7c03da7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java @@ -39,7 +39,7 @@ public HavingExec(TaskAttemptContext context, @Override public Tuple next() throws IOException { Tuple tuple; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { if (qual.eval(inSchema, tuple).isTrue()) { return tuple; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java index 13fec7b0a2..c77313ea68 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java @@ -51,7 +51,7 @@ public Tuple next() throws IOException { if (!sorted) { Tuple tuple; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { tupleSlots.add(new VTuple(tuple)); } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java index cb2552b6b2..3f2e431d74 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java @@ -109,7 +109,7 @@ public JoinNode getPlan(){ public Tuple next() throws IOException { Tuple previous; - for (;;) { + while (!context.isStopped()) { boolean newRound = false; if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) { newRound = true; @@ -313,6 +313,7 @@ public Tuple next() throws IOException { } } // the second if end false } // for + return null; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java index 13104ee1e3..63f48ac0a2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java @@ -102,7 +102,7 @@ public JoinNode getPlan(){ public Tuple next() throws IOException { Tuple previous; - for (;;) { + while (!context.isStopped()) { if (!outerIterator.hasNext() && !innerIterator.hasNext()) { if(end){ return null; @@ -170,6 +170,7 @@ public Tuple next() throws IOException { return outTuple; } } + return null; } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java index b5c6244f23..5e7ab983f4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java @@ -67,7 +67,7 @@ public JoinNode getPlan() { } public Tuple next() throws IOException { - for (;;) { + while (!context.isStopped()) { if (needNewOuter) { outerTuple = leftChild.next(); if (outerTuple == null) { @@ -94,6 +94,7 @@ public Tuple next() throws IOException { return outTuple; } } + return null; } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java index 8ff757064c..7959d4784c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java @@ -73,7 +73,7 @@ public JoinNode getPlan() { } public Tuple next() throws IOException { - for (;;) { + while (!context.isStopped()) { if (needNextRightTuple) { leftTuple = leftChild.next(); if (leftTuple == null) { @@ -112,6 +112,7 @@ public Tuple next() throws IOException { return outTuple; } } + return null; } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index 5297e2c9f8..5692308364 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -21,9 +21,8 @@ import com.google.common.collect.Lists; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Tuple; import org.apache.tajo.worker.TaskAttemptContext; @@ -70,7 +69,7 @@ public void init() throws IOException { @Override public Tuple next() throws IOException { Tuple tuple; - while (currentScanner != null) { + while (!context.isStopped() && currentScanner != null) { tuple = currentScanner.next(); if (tuple != null) { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index 119f0532d5..8da1a03638 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -96,7 +96,7 @@ public Tuple next() throws IOException { long offset; - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { offset = appender.getOffset(); appender.addTuple(tuple); keyTuple = new VTuple(keySchema.size()); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java index a02d00b6ee..5e80b8f230 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java @@ -129,7 +129,7 @@ private Tuple createNullPaddedTuple(int columnNum){ public Tuple next() throws IOException { Tuple previous; - for (;;) { + while (!context.isStopped()) { boolean newRound = false; if((posRightTupleSlots == -1) && (posLeftTupleSlots == -1)) { newRound = true; @@ -339,6 +339,7 @@ public Tuple next() throws IOException { } } // the second if end false } // for + return null; } @Override diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java index 9e84462bf5..b9273faa9d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java @@ -44,7 +44,7 @@ public void compile() throws CompilationError { @Override public Tuple next() throws IOException { Tuple tuple; - while ((tuple = child.next()) != null) { + while (!context.isStopped() && (tuple = child.next()) != null) { if (qual.eval(inSchema, tuple).isTrue()) { return tuple; } diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 94cd4edd0c..15f17fdb68 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -251,7 +251,7 @@ private void scanAndAddCache(Schema projected) throws IOException { initScanner(projected); List broadcastTupleCacheList = new ArrayList(); - while (true) { + while (!context.isStopped()) { Tuple tuple = next(); if (tuple != null) { broadcastTupleCacheList.add(tuple); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index f7c20fc870..ca90b0e78e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -73,7 +73,7 @@ private String getSubdirectory(Tuple keyTuple) { @Override public Tuple next() throws IOException { Tuple tuple; - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { fillKeyTuple(tuple, currentKey); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 3d3da5c7cf..562269918e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -115,7 +115,7 @@ public void openNewFile(int suffixId) throws IOException { */ @Override public Tuple next() throws IOException { - while((tuple = child.next()) != null) { + while(!context.isStopped() && (tuple = child.next()) != null) { appender.addTuple(tuple); if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 75875431f5..23ca559091 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -80,8 +80,12 @@ public QueryInProgress( public synchronized void kill() { getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - if(queryMasterRpcClient != null){ - queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + if (queryMasterRpcClient != null) { + try { + queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 588b7eeeb3..5659437d4e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -82,7 +82,7 @@ public void killTaskAttempt(TaskAttemptId taskAttemptId) { tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get()); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); @@ -111,7 +111,7 @@ private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContain .build(); tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); @@ -198,7 +198,7 @@ public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext .addAllContainerIds(containerIdProtos) .build(), NullCallback.get()); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { connPool.releaseConnection(tmClient); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 7278317cfe..dd408c9ce2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -128,7 +128,7 @@ public synchronized void stop() { for(ContainerProxy eachProxy: list) { try { eachProxy.stopContainer(); - } catch (Exception e) { + } catch (Throwable e) { LOG.warn(e.getMessage()); } } @@ -301,7 +301,7 @@ public void run() { QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } finally { connPool.releaseConnection(tmClient); @@ -363,7 +363,7 @@ public void run() { containerIds.add(eachContainer.getId()); } TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds); - } catch (Exception e) { + } catch (Throwable e) { LOG.error(e.getMessage(), e); } return; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 5f9c6ace1c..e9ad8386f0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -88,8 +88,6 @@ public class Task { private final Map descs = Maps.newHashMap(); private PhysicalExec executor; private boolean interQuery; - private boolean killed = false; - private boolean aborted = false; private Path inputTableBaseDir; private long startTime; @@ -254,13 +252,11 @@ public void fetch() { } public void kill() { - killed = true; - context.stop(); context.setState(TaskAttemptState.TA_KILLED); + context.stop(); } public void abort() { - aborted = true; context.stop(); } @@ -299,7 +295,7 @@ public boolean isProgressChanged() { } public void updateProgress() { - if(killed || aborted){ + if(context != null && context.isStopped()){ return; } @@ -403,12 +399,12 @@ public void run() throws Exception { createPlan(context, plan); this.executor.init(); - while(!killed && !aborted && executor.next() != null) { + while(!context.isStopped() && executor.next() != null) { } } catch (Throwable e) { error = e ; LOG.error(e.getMessage(), e); - aborted = true; + context.stop(); } finally { if (executor != null) { try { @@ -423,10 +419,10 @@ public void run() throws Exception { executionBlockContext.completedTasksNum.incrementAndGet(); context.getHashShuffleAppenderManager().finalizeTask(taskId); QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub(); - if (killed || aborted) { + if (context.isStopped()) { context.setExecutorProgress(0.0f); - if(killed) { - context.setState(TaskAttemptState.TA_KILLED); + + if(context.getState() == TaskAttemptState.TA_KILLED) { queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); executionBlockContext.killedTasksNum.incrementAndGet(); } else { @@ -593,7 +589,7 @@ public void run() { int retryWaitTime = 1000; //sec try { // for releasing fetch latch - while(!killed && retryNum < maxRetryNum) { + while(!context.isStopped() && retryNum < maxRetryNum) { if (retryNum > 0) { try { Thread.sleep(retryWaitTime); @@ -625,7 +621,7 @@ public void run() { if (retryNum == maxRetryNum) { LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); } - aborted = true; // retry task + context.stop(); // retry task ctx.getFetchLatch().countDown(); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 3092c47050..1f2c325368 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -70,7 +70,7 @@ public class TaskAttemptContext { /** a map of shuffled file outputs */ private Map shuffleFileOutputs; private File fetchIn; - private boolean stopped = false; + private volatile boolean stopped = false; private boolean interQuery = false; private Path outputPath; private DataChannel dataChannel; From 5be85f3affa8267b63a155b6c777a2b231eef3b5 Mon Sep 17 00:00:00 2001 From: jhkim Date: Wed, 21 Jan 2015 11:55:52 +0900 Subject: [PATCH 2/3] handle rpc failure case --- .../src/main/java/org/apache/tajo/master/QueryInProgress.java | 4 ++-- .../main/java/org/apache/tajo/master/TajoContainerProxy.java | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 23ca559091..e5c8182025 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -84,7 +84,7 @@ public synchronized void kill() { try { queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); } catch (Throwable e) { - LOG.fatal(e.getMessage(), e); + catchException(e); } } } @@ -169,7 +169,7 @@ public synchronized void submmitQueryToMaster() { } } - public void catchException(Exception e) { + public void catchException(Throwable e) { LOG.error(e.getMessage(), e); queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED); queryInfo.setLastMessage(StringUtils.stringifyException(e)); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 5659437d4e..42ffd87213 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -31,6 +31,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; +import org.apache.tajo.master.event.TaskFatalErrorEvent; import org.apache.tajo.master.rm.TajoWorkerContainer; import org.apache.tajo.master.rm.TajoWorkerContainerId; import org.apache.tajo.querymaster.QueryMasterTask; @@ -83,7 +84,8 @@ public void killTaskAttempt(TaskAttemptId taskAttemptId) { TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get()); } catch (Throwable e) { - LOG.error(e.getMessage(), e); + /* Worker RPC failure */ + context.getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); } finally { RpcConnectionPool.getPool().releaseConnection(tajoWorkerRpc); } From 100e1a3cfabc812fe0a8b6627c880bb8450969e2 Mon Sep 17 00:00:00 2001 From: jhkim Date: Thu, 22 Jan 2015 18:00:59 +0900 Subject: [PATCH 3/3] fix typo --- .../java/org/apache/tajo/cli/tsql/commands/HelpCommand.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java index 5d41e41a4c..ce56d12392 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/commands/HelpCommand.java @@ -18,11 +18,11 @@ package org.apache.tajo.cli.tsql.commands; -import java.io.PrintWriter; - import org.apache.tajo.cli.tsql.TajoCli; import org.apache.tajo.util.VersionInfo; +import java.io.PrintWriter; + public class HelpCommand extends TajoShellCommand { private String targetDocVersion = ""; @@ -79,7 +79,7 @@ public void invoke(String[] cmd) throws Exception { sout.println(); sout.println("Variables"); - sout.println(" \\set [[NAME] [VALUE] set session variable or list session variables"); + sout.println(" \\set [NAME] [VALUE] set session variable or list session variables"); sout.println(" \\unset NAME unset session variable"); sout.println(); sout.println();