From 2d1b880094f05bd0ab574817265f53125aa9f304 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Tue, 22 Jul 2014 16:59:57 +0900 Subject: [PATCH] TAJO-969: Distributed sort on a large data set may result in incorrect results. --- .../tajo/master/querymaster/QueryUnit.java | 10 +++++++- .../master/querymaster/Repartitioner.java | 10 ++++++-- .../org/apache/tajo/worker/FetchImpl.java | 24 ++++++++++++++++++- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 806c0f18d0..8c953bdef8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -626,7 +626,7 @@ public List getIntermediateData() { return this.intermediateData; } - public static class PullHost { + public static class PullHost implements Cloneable { String host; int port; public PullHost(String pullServerAddr, int pullServerPort){ @@ -659,6 +659,14 @@ public boolean equals(Object obj) { return false; } + + @Override + public PullHost clone() throws CloneNotSupportedException { + PullHost newPullHost = (PullHost) super.clone(); + newPullHost.host = host; + newPullHost.port = port; + return newPullHost; + } } public static class IntermediateEntry { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index 055e9a2056..31c520f0cc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -636,8 +636,14 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo for (FetchImpl fetch: fetches) { String rangeParam = TupleUtil.rangeToQuery(ranges[i], ascendingFirstKey ? i == (ranges.length - 1) : i == 0, encoder); - fetch.setRangeParams(rangeParam); - fetchSet.add(fetch); + FetchImpl copy = null; + try { + copy = fetch.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + copy.setRangeParams(rangeParam); + fetchSet.add(copy); } map.put(ranges[i], fetchSet); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 9d1f4286e7..869c1066d0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -20,6 +20,7 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -33,7 +34,7 @@ /** * FetchImpl information to indicate the locations of intermediate data. */ -public class FetchImpl implements ProtoObject { +public class FetchImpl implements ProtoObject, Cloneable { private TajoWorkerProtocol.FetchProto.Builder builder = null; private QueryUnit.PullHost host; // The pull server host information @@ -110,6 +111,7 @@ public TajoWorkerProtocol.FetchProto getProto() { builder.setPartitionId(partitionId); builder.setHasNext(hasNext); builder.setName(name); + if (rangeParams != null && !rangeParams.isEmpty()) { builder.setRangeParams(rangeParams); } @@ -198,4 +200,24 @@ public List getTaskIds() { public List getAttemptIds() { return attemptIds; } + + public FetchImpl clone() throws CloneNotSupportedException { + FetchImpl newFetchImpl = (FetchImpl) super.clone(); + + newFetchImpl.builder = TajoWorkerProtocol.FetchProto.newBuilder(); + newFetchImpl.host = host.clone(); + newFetchImpl.type = type; + newFetchImpl.executionBlockId = executionBlockId; + newFetchImpl.partitionId = partitionId; + newFetchImpl.name = name; + newFetchImpl.rangeParams = rangeParams; + newFetchImpl.hasNext = hasNext; + if (taskIds != null) { + newFetchImpl.taskIds = Lists.newArrayList(taskIds); + } + if (attemptIds != null) { + newFetchImpl.attemptIds = Lists.newArrayList(attemptIds); + } + return newFetchImpl; + } }