Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.tajo.master.querymaster;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unused imports.
Please remove it.

import com.google.common.primitives.Ints;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -48,6 +50,7 @@
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.FetchImpl;
Expand Down Expand Up @@ -675,6 +678,31 @@ public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map<?, Collect
}
}

@VisibleForTesting
public static class FetchGroupMeta {
long totalVolume;
List<FetchImpl> fetchUrls;

public FetchGroupMeta(long volume, FetchImpl fetchUrls) {
this.totalVolume = volume;
this.fetchUrls = Lists.newArrayList(fetchUrls);
}

public FetchGroupMeta addFetche(FetchImpl fetches) {
this.fetchUrls.add(fetches);
return this;
}

public void increaseVolume(long volume) {
this.totalVolume += volume;
}

public long getVolume() {
return totalVolume;
}

}

public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
SubQuery subQuery, DataChannel channel,
int maxNum) {
Expand All @@ -689,7 +717,7 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon
SubQuery.scheduleFragments(subQuery, fragments);

Map<QueryUnit.PullHost, List<IntermediateEntry>> hashedByHost;
Map<Integer, Collection<FetchImpl>> finalFetches = new HashMap<Integer, Collection<FetchImpl>>();
Map<Integer, FetchGroupMeta> finalFetches = new HashMap<Integer, FetchGroupMeta>();
Map<ExecutionBlockId, List<IntermediateEntry>> intermediates = new HashMap<ExecutionBlockId,
List<IntermediateEntry>>();

Expand Down Expand Up @@ -717,10 +745,15 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon
FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(),
block.getId(), interm.getKey(), e.getValue());

long volumeSum = 0;
for (IntermediateEntry ie : e.getValue()) {
volumeSum += ie.getVolume();
}

if (finalFetches.containsKey(interm.getKey())) {
finalFetches.get(interm.getKey()).add(fetch);
finalFetches.get(interm.getKey()).addFetche(fetch).increaseVolume(volumeSum);
} else {
finalFetches.put(interm.getKey(), TUtil.newList(fetch));
finalFetches.put(interm.getKey(), new FetchGroupMeta(volumeSum, fetch));
}
}
}
Expand Down Expand Up @@ -756,12 +789,77 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon
scan.getTableName());
} else {
schedulerContext.setEstimatedTaskNum(determinedTaskNum);
// divide fetch uris into the the proper number of tasks in a round robin manner.
scheduleFetchesByRoundRobin(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
// divide fetch uris into the the proper number of tasks according to volumes
scheduleFetchesByEvenDistributedVolumes(subQuery, finalFetches, scan.getTableName(), determinedTaskNum);
LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum);
}
}

public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl(
Map<Integer, FetchGroupMeta> partitions, String tableName, int num) {

// Sort fetchGroupMeta in a descending order of data volumes.
List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values());
Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() {
@Override
public int compare(FetchGroupMeta o1, FetchGroupMeta o2) {
return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0);
}
});

// Initialize containers
Map<String, List<FetchImpl>>[] fetchesArray = new Map[num];
Long [] assignedVolumes = new Long[num];
// initialization
for (int i = 0; i < num; i++) {
fetchesArray[i] = new HashMap<String, List<FetchImpl>>();
assignedVolumes[i] = 0l;
}

// This algorithm assignes bigger first manner by using a sorted iterator. It is a kind of greedy manner.
// Its complexity is O(n). Since FetchGroup can be more than tens of thousands, we should consider its complexity.
// In terms of this point, it will show reasonable performance and results. even though it is not an optimal
// algorithm.
Iterator<FetchGroupMeta> iterator = fetchGroupMetaList.iterator();

int p = 0;
while(iterator.hasNext()) {
while (p < num && iterator.hasNext()) {
FetchGroupMeta fetchGroupMeta = iterator.next();
assignedVolumes[p] += fetchGroupMeta.getVolume();

TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
p++;
}

p = num - 1;
while (p > 0 && iterator.hasNext()) {
FetchGroupMeta fetchGroupMeta = iterator.next();
assignedVolumes[p] += fetchGroupMeta.getVolume();

// While the current one is smaller than next one, it adds additional fetches to current one.
while(iterator.hasNext() && assignedVolumes[p - 1] > assignedVolumes[p]) {
FetchGroupMeta additionalFetchGroup = iterator.next();
assignedVolumes[p] += additionalFetchGroup.getVolume();
TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls);
}

p--;
}
}

return new Pair<Long[], Map<String, List<FetchImpl>>[]>(assignedVolumes, fetchesArray);
}

public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Map<Integer, FetchGroupMeta> partitions,
String tableName, int num) {
Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond();
// Schedule FetchImpls
for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) {
SubQuery.scheduleFetches(subQuery, eachFetches);
}
}

// Scattered hash shuffle hashes the key columns and groups the hash keys associated with
// the same hash key. Then, if the volume of a group is larger
// than DIST_QUERY_TABLE_PARTITION_VOLUME, it divides the group into more than two sub groups
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.tajo.master;

import com.google.common.collect.Maps;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.TestTajoIds;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryMaster;
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.Repartitioner;
import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.junit.Test;
import org.mockito.Mockito;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some unused imports.
Please remove them.


import java.net.URI;
import java.util.ArrayList;
Expand All @@ -35,6 +41,8 @@
import java.util.Map;

import static junit.framework.Assert.assertEquals;
import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta;
import static org.junit.Assert.assertTrue;

public class TestRepartitioner {
@Test
Expand Down Expand Up @@ -83,4 +91,52 @@ private List<String> splitMaps(List<String> mapq) {
}
return ret;
}

@Test
public void testScheduleFetchesByEvenDistributedVolumes() {
Map<Integer, FetchGroupMeta> fetchGroups = Maps.newHashMap();
String tableName = "test1";


fetchGroups.put(0, new FetchGroupMeta(100, new FetchImpl()));
fetchGroups.put(1, new FetchGroupMeta(80, new FetchImpl()));
fetchGroups.put(2, new FetchGroupMeta(70, new FetchImpl()));
fetchGroups.put(3, new FetchGroupMeta(30, new FetchImpl()));
fetchGroups.put(4, new FetchGroupMeta(10, new FetchImpl()));
fetchGroups.put(5, new FetchGroupMeta(5, new FetchImpl()));

Pair<Long [], Map<String, List<FetchImpl>>[]> results;

results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
assertFetchVolumes(expected, results.getFirst());

results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
long expected0 [] = {130, 165};
assertFetchVolumes(expected0, results.getFirst());

results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
long expected1 [] = {100, 95, 100};
assertFetchVolumes(expected1, results.getFirst());

results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
long expected2 [] = {100, 80, 70, 45};
assertFetchVolumes(expected2, results.getFirst());

results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
long expected3 [] = {100, 80, 70, 30, 15};
assertFetchVolumes(expected3, results.getFirst());

results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
long expected4 [] = {100, 80, 70, 30, 10, 5};
assertFetchVolumes(expected4, results.getFirst());
}

private static void assertFetchVolumes(long [] expected, Long [] results) {
assertEquals("the lengths of volumes are mismatch", expected.length, results.length);

for (int i = 0; i < expected.length; i++) {
assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]);
}
}
}