Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Serializable;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.io.IOReadableWritable;

@PublicEvolving
Expand Down Expand Up @@ -57,4 +58,10 @@ public interface DataDistribution extends IOReadableWritable, Serializable {
* @return The number of fields in the (composite) key.
*/
int getNumberOfFields();

/**
* Gets the type of the key by which the dataSet is partitioned.
* @return The type of the key by which the dataSet is partitioned.
*/
TypeInformation[] getKeyTypes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class Keys<T> {

public abstract int[] computeLogicalKeyPositions();

protected abstract TypeInformation<?>[] getKeyFieldTypes();
public abstract TypeInformation<?>[] getKeyFieldTypes();

public abstract <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInformation<E> typeInfo);

Expand Down Expand Up @@ -134,7 +134,7 @@ public int[] computeLogicalKeyPositions() {
}

@Override
protected TypeInformation<?>[] getKeyFieldTypes() {
public TypeInformation<?>[] getKeyFieldTypes() {
TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
for (int i = 0; i < keyFields.size(); i++) {
fieldTypes[i] = keyFields.get(i).getType();
Expand Down Expand Up @@ -337,7 +337,7 @@ public int[] computeLogicalKeyPositions() {
}

@Override
protected TypeInformation<?>[] getKeyFieldTypes() {
public TypeInformation<?>[] getKeyFieldTypes() {
TypeInformation<?>[] fieldTypes = new TypeInformation[keyFields.size()];
for (int i = 0; i < keyFields.size(); i++) {
fieldTypes[i] = keyFields.get(i).getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.NoOpFunction;
Expand Down Expand Up @@ -49,6 +50,8 @@ public static enum PartitionMethod {

private Partitioner<?> customPartitioner;

private DataDistribution distribution;


public PartitionOperatorBase(UnaryOperatorInformation<IN, IN> operatorInfo, PartitionMethod pMethod, int[] keys, String name) {
super(new UserCodeObjectWrapper<NoOpFunction>(new NoOpFunction()), operatorInfo, keys, name);
Expand All @@ -70,6 +73,14 @@ public Partitioner<?> getCustomPartitioner() {
return customPartitioner;
}

public DataDistribution getDistribution() {
return this.distribution;
}

public void setDistribution(DataDistribution distribution) {
this.distribution = distribution;
}

public void setCustomPartitioner(Partitioner<?> customPartitioner) {
if (customPartitioner != null) {
int[] keys = getKeyColumns(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.Operator;
Expand All @@ -33,6 +34,8 @@
import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.Arrays;

/**
* This operator represents a partitioning.
*
Expand All @@ -45,35 +48,46 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
private final PartitionMethod pMethod;
private final String partitionLocationName;
private final Partitioner<?> customPartitioner;


private final DataDistribution distribution;


public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, String partitionLocationName) {
this(input, pMethod, pKeys, null, null, partitionLocationName);
this(input, pMethod, pKeys, null, null, null, partitionLocationName);
}


public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, DataDistribution distribution, String partitionLocationName) {
this(input, pMethod, pKeys, null, null, distribution, partitionLocationName);
}

public PartitionOperator(DataSet<T> input, PartitionMethod pMethod, String partitionLocationName) {
this(input, pMethod, null, null, null, partitionLocationName);
this(input, pMethod, null, null, null, null, partitionLocationName);
}

public PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<?> customPartitioner, String partitionLocationName) {
this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, partitionLocationName);
this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, null, null, partitionLocationName);
}

public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P> customPartitioner,
public <P> PartitionOperator(DataSet<T> input, Keys<T> pKeys, Partitioner<P> customPartitioner,
TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
{
this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, partitionLocationName);
this(input, PartitionMethod.CUSTOM, pKeys, customPartitioner, partitionerTypeInfo, null, partitionLocationName);
}

private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
TypeInformation<P> partitionerTypeInfo, String partitionLocationName)
private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T> pKeys, Partitioner<P> customPartitioner,
TypeInformation<P> partitionerTypeInfo, DataDistribution distribution, String partitionLocationName)
{
super(input, input.getType());

Preconditions.checkNotNull(pMethod);
Preconditions.checkArgument(pKeys != null || pMethod == PartitionMethod.REBALANCE, "Partitioning requires keys");
Preconditions.checkArgument(pMethod != PartitionMethod.CUSTOM || customPartitioner != null, "Custom partioning requires a partitioner.");

Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");

if (distribution != null) {
Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal.");
}

if (customPartitioner != null) {
pKeys.validateCustomPartitioner(customPartitioner, partitionerTypeInfo);
}
Expand All @@ -82,6 +96,7 @@ private <P> PartitionOperator(DataSet<T> input, PartitionMethod pMethod, Keys<T>
this.pKeys = pKeys;
this.partitionLocationName = partitionLocationName;
this.customPartitioner = customPartitioner;
this.distribution = distribution;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a check that a distribution is only set if pMethod == PartitionMethod.RANGE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other pMethod like hashPartition and customPartition don't need the distribution and the distribution use its default value: null, i think no check here also can make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check would prevent an invalid parameter combination and also shows readers of the code that such a combination is not valid. Hence, I think the check should be added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check here that the key types of distribution.getKeyTypes() are equal to pKeys.getKeyFieldTypes().

}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -125,6 +140,7 @@ else if (pMethod == PartitionMethod.HASH || pMethod == PartitionMethod.CUSTOM ||
PartitionOperatorBase<T> partitionedInput = new PartitionOperatorBase<>(operatorInfo, pMethod, logicalKeyPositions, name);
partitionedInput.setInput(input);
partitionedInput.setParallelism(getParallelism());
partitionedInput.setDistribution(distribution);
partitionedInput.setCustomPartitioner(customPartitioner);

return partitionedInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,23 @@
import com.google.common.collect.Lists;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.SampleInCoordinator;
import org.apache.flink.api.java.functions.SampleInPartition;
import org.apache.flink.api.java.functions.SampleWithFraction;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -250,6 +257,32 @@ public static <T> DataSet<T> sampleWithSize(
return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation);
}

// --------------------------------------------------------------------------------------------
// Partition
// --------------------------------------------------------------------------------------------

/**
* Range-partitions a DataSet on the specified tuple field positions.
*/
public static <T> PartitionOperator<T> partitionByRange(DataSet<T> input, DataDistribution distribution, int... fields) {
return new PartitionOperator<>(input, PartitionOperatorBase.PartitionMethod.RANGE, new Keys.ExpressionKeys<>(fields, input.getType(), false), distribution, Utils.getCallLocationName());
}

/**
* Range-partitions a DataSet on the specified fields.
*/
public static <T> PartitionOperator<T> partitionByRange(DataSet<T> input, DataDistribution distribution, String... fields) {
return new PartitionOperator<>(input, PartitionOperatorBase.PartitionMethod.RANGE, new Keys.ExpressionKeys<>(fields, input.getType()), distribution, Utils.getCallLocationName());
}

/**
* Range-partitions a DataSet using the specified key selector function.
*/
public static <T, K extends Comparable<K>> PartitionOperator<T> partitionByRange(DataSet<T> input, DataDistribution distribution, KeySelector<T, K> keyExtractor) {
final TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, input.getType());
return new PartitionOperator<>(input, PartitionOperatorBase.PartitionMethod.RANGE, new Keys.SelectorFunctionKeys<>(input.clean(keyExtractor), input.getType(), keyType), distribution, Utils.getCallLocationName());
}

// --------------------------------------------------------------------------------------------
// Checksum
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
Expand Down Expand Up @@ -51,7 +52,7 @@ public PartitionNode(PartitionOperatorBase<?> operator) {
super(operator);

OperatorDescriptorSingle descr = new PartitionDescriptor(
this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner());
this.getOperator().getPartitionMethod(), this.keys, operator.getCustomPartitioner(), operator.getDistribution());
this.possibleProperties = Collections.singletonList(descr);
}

Expand Down Expand Up @@ -88,12 +89,14 @@ public static class PartitionDescriptor extends OperatorDescriptorSingle {

private final PartitionMethod pMethod;
private final Partitioner<?> customPartitioner;
private final DataDistribution distribution;

public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner) {
public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner, DataDistribution distribution) {
super(pKeys);

this.pMethod = pMethod;
this.customPartitioner = customPartitioner;
this.distribution = distribution;
}

@Override
Expand Down Expand Up @@ -127,7 +130,7 @@ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
for (int field : this.keys) {
ordering.appendOrdering(field, null, Order.ASCENDING);
}
rgps.setRangePartitioned(ordering);
rgps.setRangePartitioned(ordering, distribution);
break;
default:
throw new IllegalArgumentException("Invalid partition method");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.Ordering;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class GlobalProperties implements Cloneable {

private Partitioner<?> customPartitioner;

private DataDistribution distribution;

// --------------------------------------------------------------------------------------------

/**
Expand All @@ -80,16 +83,38 @@ public void setHashPartitioned(FieldList partitionedFields) {
this.partitioningFields = partitionedFields;
this.ordering = null;
}


/**
* Set the parameters for range partition.
*
* @param ordering Order of the partitioned fields
*/
public void setRangePartitioned(Ordering ordering) {
if (ordering == null) {
throw new NullPointerException();
}

this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
this.ordering = ordering;
this.partitioningFields = ordering.getInvolvedIndexes();
}

/**
* Set the parameters for range partition.
*
* @param ordering Order of the partitioned fields
* @param distribution The data distribution for range partition. User can supply a customized data distribution,
* also the data distribution can be null.
*/
public void setRangePartitioned(Ordering ordering, DataDistribution distribution) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you've changed this part before, but could you add a setRangePartitioned(Ordering ordering) method which can be used if no DataDistribution is provided? Would be nicer than passing null everywhere.

if (ordering == null) {
throw new NullPointerException();
}

this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
this.ordering = ordering;
this.partitioningFields = ordering.getInvolvedIndexes();
this.distribution = distribution;
}

public void setAnyPartitioning(FieldList partitionedFields) {
Expand Down Expand Up @@ -167,6 +192,10 @@ public Partitioner<?> getCustomPartitioner() {
return this.customPartitioner;
}

public DataDistribution getDataDistribution() {
return this.distribution;
}

// --------------------------------------------------------------------------------------------

public boolean isPartitionedOnFields(FieldSet fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ public GlobalProperties getGlobalProperties() {
this.globalProps.setHashPartitioned(this.shipKeys);
break;
case PARTITION_RANGE:
this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder));
this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder), this.dataDistribution);
break;
case FORWARD:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,16 @@ public void postVisit(PlanNode node) {
// Make sure we only optimize the DAG for range partition, and do not optimize multi times.
if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {

if(node.isOnDynamicPath()) {
throw new InvalidProgramException("Range Partitioning not supported within iterations.");
if(channel.getDataDistribution() == null) {
if (node.isOnDynamicPath()) {
throw new InvalidProgramException("Range Partitioning not supported within iterations if users do not supply the data distribution.");
}

PlanNode channelSource = channel.getSource();
List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
channelSource.getOutgoingChannels().remove(channel);
channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
}

PlanNode channelSource = channel.getSource();
List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
channelSource.getOutgoingChannels().remove(channel);
channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.optimizer.dataproperties;

import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

Expand All @@ -37,6 +39,11 @@ public int getNumberOfFields() {
return 0;
}

@Override
public TypeInformation[] getKeyTypes() {
return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
}

@Override
public void write(DataOutputView out) throws IOException {

Expand Down
Loading