Skip to content

Commit

Permalink
DRILL-4446: Support mandatory work assignment to endpoint requirement…
Browse files Browse the repository at this point in the history
…s of operators
  • Loading branch information
vkorukanti authored and vkorukanti committed Apr 13, 2016
1 parent 9f4fff8 commit 10afc70
Show file tree
Hide file tree
Showing 24 changed files with 937 additions and 150 deletions.
Expand Up @@ -31,6 +31,14 @@ public class EndpointAffinity {
private final DrillbitEndpoint endpoint;
private double affinity = 0.0d;

// Requires including this endpoint at least once? Default is not required.
private boolean mandatory;

/**
* Maximum allowed assignments for this endpoint. Default is {@link Integer#MAX_VALUE}
*/
private int maxWidth = Integer.MAX_VALUE;

/**
* Create EndpointAffinity instance for given Drillbit endpoint. Affinity is initialized to 0. Affinity can be added
* after EndpointAffinity object creation using {@link #addAffinity(double)}.
Expand All @@ -53,6 +61,22 @@ public EndpointAffinity(DrillbitEndpoint endpoint, double affinity) {
this.affinity = affinity;
}

/**
* Creates EndpointAffinity instance for given DrillbitEndpoint, affinity and mandatory assignment requirement flag.
* @param endpoint Drillbit endpoint
* @param affinity Initial affinity value
* @param mandatory Is this endpoint requires at least one mandatory assignment?
* @param maxWidth Maximum allowed assignments for this endpoint.
*/
public EndpointAffinity(final DrillbitEndpoint endpoint, final double affinity, final boolean mandatory,
final int maxWidth) {
Preconditions.checkArgument(maxWidth >= 1, "MaxWidth for given endpoint should be at least one.");
this.endpoint = endpoint;
this.affinity = affinity;
this.mandatory = mandatory;
this.maxWidth = maxWidth;
}

/**
* Return the Drillbit endpoint in this instance.
*
Expand Down Expand Up @@ -86,13 +110,36 @@ public void addAffinity(double f){
}
}

/**
* Set the endpoint requires at least one assignment.
*/
public void setAssignmentRequired() {
mandatory = true;
}

/**
* Is this endpoint required to be in fragment endpoint assignment list?
*
* @return Returns true for mandatory assignment, false otherwise.
*/
public boolean isAssignmentRequired() {
return Double.POSITIVE_INFINITY == affinity;
return mandatory || Double.POSITIVE_INFINITY == affinity;
}

/**
* @return Maximum allowed assignments for this endpoint.
*/
public int getMaxWidth() {
return maxWidth;
}

/**
* Set the new max width as the minimum of the the given value and current max width.
* @param maxWidth
*/
public void setMaxWidth(final int maxWidth) {
Preconditions.checkArgument(maxWidth >= 1, "MaxWidth for given endpoint should be at least one.");
this.maxWidth = Math.min(this.maxWidth, maxWidth);
}

@Override
Expand Down Expand Up @@ -128,11 +175,12 @@ public boolean equals(Object obj) {
} else if (!endpoint.equals(other.endpoint)) {
return false;
}
return true;
return mandatory == other.mandatory;
}

@Override
public String toString() {
return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]";
return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity +
", mandatory=" + mandatory + ", maxWidth=" + maxWidth + "]";
}
}
Expand Up @@ -26,6 +26,7 @@

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.planner.physical.PlannerSettings;

import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -162,4 +163,8 @@ public Collection<String> getFiles() {
return null;
}

@Override
public DistributionAffinity getDistributionAffinity() {
return DistributionAffinity.SOFT;
}
}
Expand Up @@ -18,7 +18,7 @@
package org.apache.drill.exec.physical.base;



import org.apache.drill.exec.planner.fragment.DistributionAffinity;

public abstract class AbstractStore extends AbstractSingle implements Store, Root{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStore.class);
Expand All @@ -33,4 +33,8 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis
}


@Override
public DistributionAffinity getDistributionAffinity() {
return DistributionAffinity.SOFT;
}
}
Expand Up @@ -22,6 +22,7 @@
import org.apache.drill.exec.physical.EndpointAffinity;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.exec.planner.fragment.DistributionAffinity;

/**
* Describes a physical operator that has affinity to particular nodes. Used for assignment decisions.
Expand All @@ -33,5 +34,11 @@ public interface HasAffinity extends PhysicalOperator {
* @return List of EndpointAffinity objects.
*/
@JsonIgnore
public List<EndpointAffinity> getOperatorAffinity();
List<EndpointAffinity> getOperatorAffinity();

/**
* Get distribution affinity which describes the parallelization strategy of the operator.
*/
@JsonIgnore
DistributionAffinity getDistributionAffinity();
}
Expand Up @@ -26,6 +26,7 @@
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.planner.fragment.DistributionAffinity;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;

Expand All @@ -48,7 +49,7 @@ public Screen(@JsonProperty("child") PhysicalOperator child, @JacksonInject Dril

@Override
public List<EndpointAffinity> getOperatorAffinity() {
return Collections.singletonList(new EndpointAffinity(endpoint, Double.POSITIVE_INFINITY));
return Collections.singletonList(new EndpointAffinity(endpoint, 1, true, /* maxWidth = */ 1));
}

@Override
Expand Down Expand Up @@ -102,4 +103,8 @@ public int getOperatorType() {
return CoreOperatorType.SCREEN_VALUE;
}

@Override
public DistributionAffinity getDistributionAffinity() {
return DistributionAffinity.HARD;
}
}
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;

/**
* Describes an operator's endpoint assignment requirements. Ordering is from no assignment requirement to mandatory
* assignment requirements. Changes/new addition should keep the order of increasing restrictive assignment requirement.
*/
public enum DistributionAffinity {
/**
* No affinity to any endpoints. Operator can run on any endpoint.
*/
NONE(SoftAffinityFragmentParallelizer.INSTANCE),

/**
* Operator has soft distribution affinity to one or more endpoints. Operator performs better when fragments are
* assigned to the endpoints with affinity, but not a mandatory requirement.
*/
SOFT(SoftAffinityFragmentParallelizer.INSTANCE),

/**
* Hard distribution affinity to one or more endpoints. Fragments having the operator must be scheduled on the nodes
* with affinity.
*/
HARD(HardAffinityFragmentParallelizer.INSTANCE);

private final FragmentParallelizer fragmentParallelizer;

DistributionAffinity(final FragmentParallelizer fragmentParallelizer) {
this.fragmentParallelizer = fragmentParallelizer;
}

/**
* @return {@link FragmentParallelizer} implementation.
*/
public FragmentParallelizer getFragmentParallelizer() {
return fragmentParallelizer;
}

/**
* Is the current DistributionAffinity less restrictive than the given DistributionAffinity?
* @param distributionAffinity
* @return
*/
public boolean isLessRestrictiveThan(final DistributionAffinity distributionAffinity) {
return ordinal() < distributionAffinity.ordinal();
}
}
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.fragment;

import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;

import java.util.Collection;

/**
* Generic interface to provide different parallelization strategies for MajorFragments.
*/
public interface FragmentParallelizer {
/**
* Parallelize the given fragment.
*
* @param fragment
* @param parameters
* @param activeEndpoints
* @throws PhysicalOperatorSetupException
*/
void parallelizeFragment(final Wrapper fragment, final ParallelizationParameters parameters,
final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException;
}

0 comments on commit 10afc70

Please sign in to comment.