From d1b23100f9a6e4e1f8b23afe9722c57cae1e9ecd Mon Sep 17 00:00:00 2001 From: Padma Penumarthy Date: Mon, 27 Feb 2017 18:32:24 -0800 Subject: [PATCH] DRILL-5304: Queries fail intermittently when there is skew in data distribution --- .../SoftAffinityFragmentParallelizer.java | 2 +- .../store/schedule/AssignmentCreator.java | 28 ++++++++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java index 1ebed86ee8a..644263eeaa3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java @@ -117,7 +117,7 @@ private List findEndpoints(final Collection // Find the maximum number of slots which should go to endpoints with affinity (See DRILL-825 for details) int affinedSlots = - Math.max(1, (int) (parameters.getAffinityFactor() * width / activeEndpoints.size())) * sortedAffinityList.size(); + Math.max(1, (int) (Math.ceil((double)parameters.getAffinityFactor() * width / activeEndpoints.size()) * sortedAffinityList.size())); // Make sure affined slots is at least the number of mandatory nodes affinedSlots = Math.max(affinedSlots, numRequiredNodes); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java index aeaf4bf5c4b..198d1ac262a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java @@ -106,13 +106,16 @@ private ListMultimap getMappings() { LinkedList> unassignedWorkList; Map endpointIterators = getEndpointIterators(); - // Assign upto minCount per node based on locality. - unassignedWorkList = assign(workList, endpointIterators, true); // Assign upto maxCount per node based on locality. - unassignedWorkList = assign(unassignedWorkList, endpointIterators, false); + unassignedWorkList = assign(workList, endpointIterators, false); + // Assign upto minCount per node in a round robin fashion. assignLeftovers(unassignedWorkList, endpointIterators, true); - // Assign upto maxCount per node in a round robin fashion. + + // Assign upto maxCount + leftovers per node based on locality. + unassignedWorkList = assign(unassignedWorkList, endpointIterators, true); + + // Assign upto maxCount + leftovers per node in a round robin fashion. assignLeftovers(unassignedWorkList, endpointIterators, false); if (unassignedWorkList.size() != 0) { @@ -127,10 +130,12 @@ private ListMultimap getMappings() { * * @param workList the list of work units to assign * @param endpointIterators the endpointIterators to assign to - * @param assignMinimum whether to assign only up to the minimum required + * @param assignMaxLeftOvers whether to assign upto maximum including leftovers * @return a list of unassigned work units */ - private LinkedList> assign(List> workList, Map endpointIterators, boolean assignMinimum) { + private LinkedList> assign(List> workList, + Map endpointIterators, + boolean assignMaxLeftOvers) { LinkedList> currentUnassignedList = Lists.newLinkedList(); outer: for (WorkEndpointListPair workPair : workList) { List endpoints = workPair.sortedEndpoints; @@ -139,7 +144,7 @@ private LinkedList> assign(List> if (iteratorWrapper == null) { continue; } - if (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) { + if (iteratorWrapper.count < (assignMaxLeftOvers ? (iteratorWrapper.maxCount + iteratorWrapper.maxCountLeftOver) : iteratorWrapper.maxCount)) { Integer assignment = iteratorWrapper.iter.next(); iteratorWrapper.count++; mappings.put(assignment, workPair.work); @@ -157,9 +162,11 @@ private LinkedList> assign(List> * @param endpointIterators the endpointIterators to assign to * @param assignMinimum wheterh to assign the minimum amount */ - private void assignLeftovers(LinkedList> unassignedWorkList, Map endpointIterators, boolean assignMinimum) { + private void assignLeftovers(LinkedList> unassignedWorkList, + Map endpointIterators, + boolean assignMinimum) { outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) { - while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) { + while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : (iteratorWrapper.maxCount + iteratorWrapper.maxCountLeftOver))) { WorkEndpointListPair workPair = unassignedWorkList.poll(); if (workPair == null) { break outer; @@ -261,7 +268,7 @@ private Map getEndpointIterators() { while (totalMaxCount < units.size()) { for (Entry entry : map.entrySet()) { FragIteratorWrapper iteratorWrapper = entry.getValue(); - iteratorWrapper.maxCount++; + iteratorWrapper.maxCountLeftOver++; totalMaxCount++; if (totalMaxCount == units.size()) { break; @@ -279,6 +286,7 @@ private Map getEndpointIterators() { private static class FragIteratorWrapper { int count = 0; int maxCount; + int maxCountLeftOver; int minCount; Iterator iter; }