Skip to content

Commit

Permalink
DRILL-3079: Move execution fragment json parsing from RPC message to …
Browse files Browse the repository at this point in the history
…fragment start.
  • Loading branch information
jacques-n committed May 14, 2015
1 parent 814f553 commit 4ad4261
Show file tree
Hide file tree
Showing 14 changed files with 1,870 additions and 305 deletions.
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner.fragment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -26,22 +27,22 @@
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange.ParallelizationDependency;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
Expand All @@ -54,8 +55,12 @@
import org.apache.drill.exec.work.foreman.ForemanSetupException;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;

/**
* The simple parallelizer determines the level of parallelization of a plan based on the cost of the underlying
Expand Down Expand Up @@ -367,6 +372,7 @@ private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint fore
.setMinorFragmentId(minorFragmentId) //
.setQueryId(queryId) //
.build();

PlanFragment fragment = PlanFragment.newBuilder() //
.setForeman(foremanNode) //
.setFragmentJson(plan) //
Expand All @@ -378,6 +384,7 @@ private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint fore
.setMemMax(wrapper.getMaxAllocation())
.setOptionsJson(optionsData)
.setCredentials(session.getCredentials())
.addAllCollector(CountRequiredFragments.getCollectors(root))
.build();

if (isRootNode) {
Expand All @@ -393,4 +400,44 @@ private QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint fore

return new QueryWorkUnit(rootOperator, rootFragment, fragments);
}

/**
* Designed to setup initial values for arriving fragment accounting.
*/
private static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
private static final CountRequiredFragments INSTANCE = new CountRequiredFragments();

public static List<Collector> getCollectors(PhysicalOperator root) {
List<Collector> collectors = Lists.newArrayList();
root.accept(INSTANCE, collectors);
return collectors;
}

@Override
public Void visitReceiver(Receiver receiver, List<Collector> collectors) throws RuntimeException {
List<MinorFragmentEndpoint> endpoints = receiver.getProvidingEndpoints();
List<Integer> list = new ArrayList<>(endpoints.size());
for (MinorFragmentEndpoint ep : endpoints) {
list.add(ep.getId());
}


collectors.add(Collector.newBuilder()
.setIsSpooling(receiver.isSpooling())
.setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId())
.setSupportsOutOfOrder(receiver.supportsOutOfOrderExchange())
.addAllIncomingMinorFragment(list)
.build());
return null;
}

@Override
public Void visitOp(PhysicalOperator op, List<Collector> collectors) throws RuntimeException {
for (PhysicalOperator o : op) {
o.accept(this, collectors);
}
return null;
}

}
}
Expand Up @@ -18,30 +18,25 @@
package org.apache.drill.exec.work.batch;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;

import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.util.ArrayWrappedIntIntMap;

import com.google.common.base.Preconditions;
import org.apache.drill.exec.util.ArrayWrappedIntIntMap;

public abstract class AbstractDataCollector implements DataCollector{

private final List<MinorFragmentEndpoint> incoming;
// private final List<MinorFragmentEndpoint> incoming;
private final int oppositeMajorFragmentId;
private final AtomicIntegerArray remainders;
private final AtomicInteger remainingRequired;
private final AtomicInteger parentAccounter;

private final int incomingStreams;
protected final RawBatchBuffer[] buffers;
protected final ArrayWrappedIntIntMap fragmentMap;

Expand All @@ -52,37 +47,36 @@ public abstract class AbstractDataCollector implements DataCollector{
* @param bufferCapacity Capacity of each RawBatchBuffer.
* @param context
*/
public AbstractDataCollector(AtomicInteger parentAccounter, Receiver receiver,
final int numBuffers, final int bufferCapacity, FragmentContext context) {
Preconditions.checkNotNull(receiver);
public AbstractDataCollector(AtomicInteger parentAccounter,
final int numBuffers, Collector collector, final int bufferCapacity, FragmentContext context) {
Preconditions.checkNotNull(collector);
Preconditions.checkNotNull(parentAccounter);

this.incomingStreams = collector.getIncomingMinorFragmentCount();
this.parentAccounter = parentAccounter;
this.incoming = receiver.getProvidingEndpoints();
this.remainders = new AtomicIntegerArray(incoming.size());
this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();

this.remainders = new AtomicIntegerArray(incomingStreams);
this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId();
// Create fragmentId to index that is within the range [0, incoming.size()-1]
// We use this mapping to find objects belonging to the fragment in buffers and remainders arrays.
fragmentMap = new ArrayWrappedIntIntMap();
int index = 0;
for(MinorFragmentEndpoint endpoint : incoming) {
fragmentMap.put(endpoint.getId(), index);
for (Integer endpoint : collector.getIncomingMinorFragmentList()) {
fragmentMap.put(endpoint, index);
index++;
}

buffers = new RawBatchBuffer[numBuffers];
remainingRequired = new AtomicInteger(numBuffers);

final boolean spooling = receiver.isSpooling();
final boolean spooling = collector.getIsSpooling();

try {

for (int i = 0; i < numBuffers; i++) {
if (spooling) {
buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId(), i);
buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i);
} else {
buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId());
buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId());
}
}
} catch (IOException | OutOfMemoryException e) {
Expand Down Expand Up @@ -129,7 +123,7 @@ public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws

@Override
public int getTotalIncomingFragments() {
return incoming.size();
return incomingStreams;
}

protected abstract RawBatchBuffer getBuffer(int minorFragmentId);
Expand Down
Expand Up @@ -21,7 +21,6 @@
import io.netty.buffer.ByteBuf;

import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.InitializeFragments;
Expand Down Expand Up @@ -132,9 +131,7 @@ private void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcE
drillbitContext.getFunctionImplementationRegistry());
final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
fragment.getFragmentJson());
final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
final FragmentExecutor fr = new FragmentExecutor(context, fragment, listener);
bee.addFragmentRunner(fr);
} else {
// isIntermediate, store for incoming data.
Expand Down
Expand Up @@ -24,9 +24,8 @@

import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.record.RawFragmentBatch;

import com.google.common.collect.ImmutableMap;
Expand All @@ -39,18 +38,24 @@ public class IncomingBuffers implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);

private final AtomicInteger streamsRemaining = new AtomicInteger(0);
private final AtomicInteger remainingRequired = new AtomicInteger(0);
private final AtomicInteger remainingRequired;
private final Map<Integer, DataCollector> fragCounts;
private final FragmentContext context;

public IncomingBuffers(PhysicalOperator root, FragmentContext context) {
public IncomingBuffers(PlanFragment fragment, FragmentContext context) {
this.context = context;
Map<Integer, DataCollector> counts = Maps.newHashMap();
CountRequiredFragments reqFrags = new CountRequiredFragments();
root.accept(reqFrags, counts);
Map<Integer, DataCollector> collectors = Maps.newHashMap();
remainingRequired = new AtomicInteger(fragment.getCollectorCount());
for(int i =0; i < fragment.getCollectorCount(); i++){
Collector collector = fragment.getCollector(i);
DataCollector newCollector = collector.getSupportsOutOfOrder() ?
new MergingCollector(remainingRequired, collector, context) :
new PartitionedCollector(remainingRequired, collector, context);
collectors.put(collector.getOppositeMajorFragmentId(), newCollector);
}

logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), counts);
fragCounts = ImmutableMap.copyOf(counts);
logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), collectors);
fragCounts = ImmutableMap.copyOf(collectors);

// Determine the total number of incoming streams that will need to be completed before we are finished.
int totalStreams = 0;
Expand Down Expand Up @@ -98,34 +103,7 @@ public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) {
}


/**
* Designed to setup initial values for arriving fragment accounting.
*/
public class CountRequiredFragments extends AbstractPhysicalVisitor<Void, Map<Integer, DataCollector>, RuntimeException> {

@Override
public Void visitReceiver(Receiver receiver, Map<Integer, DataCollector> counts) throws RuntimeException {
DataCollector set;
if (receiver.supportsOutOfOrderExchange()) {
set = new MergingCollector(remainingRequired, receiver, context);
} else {
set = new PartitionedCollector(remainingRequired, receiver, context);
}

counts.put(set.getOppositeMajorFragmentId(), set);
remainingRequired.incrementAndGet();
return null;
}

@Override
public Void visitOp(PhysicalOperator op, Map<Integer, DataCollector> value) throws RuntimeException {
for (PhysicalOperator o : op) {
o.accept(this, value);
}
return null;
}

}

public boolean isDone() {
return streamsRemaining.get() < 1;
Expand Down
Expand Up @@ -20,12 +20,12 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.proto.BitControl.Collector;

public class MergingCollector extends AbstractDataCollector{

public MergingCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
super(parentAccounter, receiver, 1, receiver.getProvidingEndpoints().size(), context);
public MergingCollector(AtomicInteger parentAccounter, Collector collector, FragmentContext context) {
super(parentAccounter, 1, collector, collector.getIncomingMinorFragmentCount(), context);
}

@Override
Expand Down
Expand Up @@ -20,12 +20,12 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.proto.BitControl.Collector;

public class PartitionedCollector extends AbstractDataCollector{

public PartitionedCollector(AtomicInteger parentAccounter, Receiver receiver, FragmentContext context) {
super(parentAccounter, receiver, receiver.getProvidingEndpoints().size(), 1, context);
public PartitionedCollector(AtomicInteger parentAccounter, Collector collector, FragmentContext context) {
super(parentAccounter, collector.getIncomingMinorFragmentCount(), collector, 1, context);
}

@Override
Expand Down
Expand Up @@ -929,15 +929,14 @@ private void setupRootFragment(final PlanFragment rootFragment, final FragmentRo
final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, queryContext,
initiatingClient, drillbitContext.getFunctionImplementationRegistry());
@SuppressWarnings("resource")
final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
final IncomingBuffers buffers = new IncomingBuffers(rootFragment, rootContext);
rootContext.setBuffers(buffers);

queryManager.addFragmentStatusTracker(rootFragment, true);

rootRunner = new FragmentExecutor(rootContext, rootOperator,
queryManager.newRootStatusHandler(rootContext));
final RootFragmentManager fragmentManager =
new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
rootRunner = new FragmentExecutor(rootContext, rootFragment, queryManager.newRootStatusHandler(rootContext),
rootOperator);
final RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);

if (buffers.isDone()) {
// if we don't have to wait for any incoming data, start the fragment runner.
Expand Down

0 comments on commit 4ad4261

Please sign in to comment.