From 066cd992a77391227f9659f8fe151941675285e5 Mon Sep 17 00:00:00 2001 From: jhkim Date: Wed, 8 Oct 2014 02:57:30 +0900 Subject: [PATCH] TAJO-1103: Insert clause of partitioned table loses some FetchImpls --- .../org/apache/tajo/worker/FetchImpl.java | 16 ++-- .../apache/tajo/master/TestRepartitioner.java | 85 +++++++++++++++++++ .../org/apache/tajo/worker/TestHistory.java | 9 +- 3 files changed, 99 insertions(+), 11 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 964da5da51..f4117936c8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -36,8 +36,6 @@ * FetchImpl information to indicate the locations of intermediate data. */ public class FetchImpl implements ProtoObject, Cloneable { - private TajoWorkerProtocol.FetchProto.Builder builder = null; - private QueryUnit.PullHost host; // The pull server host information private TajoWorkerProtocol.ShuffleType type; // hash or range partition method. private ExecutionBlockId executionBlockId; // The executionBlock id @@ -53,7 +51,6 @@ public class FetchImpl implements ProtoObject, Cl private long length = -1; public FetchImpl() { - builder = TajoWorkerProtocol.FetchProto.newBuilder(); taskIds = new ArrayList(); attemptIds = new ArrayList(); } @@ -108,14 +105,14 @@ public FetchImpl(QueryUnit.PullHost host, TajoWorkerProtocol.ShuffleType type, E @Override public int hashCode() { - return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, hasNext, taskIds, attemptIds); + return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, + hasNext, taskIds, attemptIds, offset, length); } @Override public TajoWorkerProtocol.FetchProto getProto() { - if (builder == null) { - builder = TajoWorkerProtocol.FetchProto.newBuilder(); - } + TajoWorkerProtocol.FetchProto.Builder builder = TajoWorkerProtocol.FetchProto.newBuilder(); + builder.setHost(host.getHost()); builder.setPort(host.getPort()); builder.setType(type); @@ -235,7 +232,6 @@ public void setLength(long length) { public FetchImpl clone() throws CloneNotSupportedException { FetchImpl newFetchImpl = (FetchImpl) super.clone(); - newFetchImpl.builder = TajoWorkerProtocol.FetchProto.newBuilder(); newFetchImpl.host = host.clone(); newFetchImpl.type = type; newFetchImpl.executionBlockId = executionBlockId; @@ -273,6 +269,8 @@ public boolean equals(Object o) { TUtil.checkEquals(name, fetch.name) && TUtil.checkEquals(rangeParams, fetch.rangeParams) && TUtil.checkEquals(taskIds, fetch.taskIds) && - TUtil.checkEquals(type, fetch.type); + TUtil.checkEquals(type, fetch.type) && + TUtil.checkEquals(offset, fetch.offset) && + TUtil.checkEquals(length, fetch.length); } } diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index f969a081fb..afa330e9a7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -29,6 +29,7 @@ import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.master.querymaster.Repartitioner; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.junit.Test; @@ -39,6 +40,7 @@ import static junit.framework.Assert.assertEquals; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType; import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class TestRepartitioner { @@ -403,6 +405,89 @@ public void testSplitIntermediates2() { } } + @Test + public void testSplitIntermediatesWithUniqueHost() { + List intermediateEntries = new ArrayList(); + + int[] pageLengths = new int[20]; //195MB + for (int i = 0 ; i < pageLengths.length; i++) { + if (i < pageLengths.length - 1) { + pageLengths[i] = 10 * 1024 * 1024; + } else { + pageLengths[i] = 5 * 1024 * 1024; + } + } + + long expectedTotalLength = 0; + QueryUnit.PullHost pullHost = new QueryUnit.PullHost("host", 0); + + for (int i = 0; i < 20; i++) { + List> pages = new ArrayList>(); + long offset = 0; + for (int j = 0; j < pageLengths.length; j++) { + pages.add(new Pair(offset, pageLengths[j])); + offset += pageLengths[j]; + expectedTotalLength += pageLengths[j]; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, 0, pullHost); + interm.setPages(pages); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + + long splitVolume = 128 * 1024 * 1024; + List> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + splitVolume, 10 * 1024 * 1024); + assertEquals(32, fetches.size()); + + int expectedSize = 0; + Set fetchSet = TUtil.newHashSet(); + for(List list : fetches){ + expectedSize += list.size(); + fetchSet.addAll(list); + } + assertEquals(expectedSize, fetchSet.size()); + + + int index = 0; + int numZeroPosFetcher = 0; + long totalLength = 0; + Set uniqPullHost = new HashSet(); + + for (List eachFetchList: fetches) { + long length = 0; + for (FetchImpl eachFetch: eachFetchList) { + if (eachFetch.getOffset() == 0) { + numZeroPosFetcher++; + } + totalLength += eachFetch.getLength(); + length += eachFetch.getLength(); + uniqPullHost.add(eachFetch.getPullHost().toString()); + } + assertTrue(length + " should be smaller than splitVolume", length < splitVolume); + if (index < fetches.size() - 1) { + assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index, length >= 100 * 1024 * 1024); + } + index++; + } + assertEquals(20, numZeroPosFetcher); + assertEquals(1, uniqPullHost.size()); + assertEquals(expectedTotalLength, totalLength); + } + + @Test + public void testFetchImpl() { + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + QueryUnit.PullHost pullHost = new QueryUnit.PullHost("localhost", 0); + + FetchImpl expected = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId, 1); + FetchImpl fetch2 = new FetchImpl(pullHost, ShuffleType.SCATTERED_HASH_SHUFFLE, ebId, 1); + assertEquals(expected, fetch2); + fetch2.setOffset(5); + fetch2.setLength(10); + assertNotEquals(expected, fetch2); + } + private static void assertFetchImpl(FetchImpl [] expected, Map>[] result) { Set expectedURLs = Sets.newHashSet(); diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java index 15ead84718..3a85c14d12 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -76,8 +76,13 @@ public final void testTaskRunnerHistory() throws IOException, ServiceException, TaskRunnerHistory history = histories.iterator().next(); assertEquals(Service.STATE.STOPPED, history.getState()); - - assertEquals(history, new TaskRunnerHistory(history.getProto())); + TaskRunnerHistory fromProto = new TaskRunnerHistory(history.getProto()); + assertEquals(history.getExecutionBlockId(), fromProto.getExecutionBlockId()); + assertEquals(history.getFinishTime(), fromProto.getFinishTime()); + assertEquals(history.getStartTime(), fromProto.getStartTime()); + assertEquals(history.getState(), fromProto.getState()); + assertEquals(history.getContainerId(), fromProto.getContainerId()); + assertEquals(history.getProto().getTaskHistoriesCount(), fromProto.getProto().getTaskHistoriesCount()); } @Test