From c3162413b2f6979595393f20d347e6e2057620fa Mon Sep 17 00:00:00 2001 From: FelixNeutatz Date: Wed, 14 Jan 2015 16:50:37 +0100 Subject: [PATCH 1/7] [FLINK-1398] Introduce extractSingleField() in DataSet --- .../org/apache/flink/api/java/DataSet.java | 27 +-- .../api/java/functions/SemanticPropUtil.java | 7 + .../api/java/operators/ExtractOperator.java | 225 ++++++++++++++++++ .../translation/PlanExtractOperator.java | 54 +++++ 4 files changed, 293 insertions(+), 20 deletions(-) create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 82e37ac723437..0421145486c8e 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -43,30 +43,12 @@ import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter; -import org.apache.flink.api.java.operators.AggregateOperator; -import org.apache.flink.api.java.operators.CoGroupOperator; +import org.apache.flink.api.java.operators.*; import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets; -import org.apache.flink.api.java.operators.CrossOperator; -import org.apache.flink.api.java.operators.CustomUnaryOperation; -import org.apache.flink.api.java.operators.DataSink; -import org.apache.flink.api.java.operators.DeltaIteration; -import org.apache.flink.api.java.operators.DistinctOperator; -import org.apache.flink.api.java.operators.FilterOperator; -import org.apache.flink.api.java.operators.ProjectOperator; import org.apache.flink.api.java.functions.FirstReducer; -import org.apache.flink.api.java.operators.FlatMapOperator; -import org.apache.flink.api.java.operators.GroupReduceOperator; -import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; -import org.apache.flink.api.java.operators.Keys; -import org.apache.flink.api.java.operators.MapOperator; -import org.apache.flink.api.java.operators.MapPartitionOperator; -import org.apache.flink.api.java.operators.PartitionOperator; import org.apache.flink.api.java.operators.ProjectOperator.Projection; -import org.apache.flink.api.java.operators.ReduceOperator; -import org.apache.flink.api.java.operators.SortedGrouping; -import org.apache.flink.api.java.operators.UnionOperator; -import org.apache.flink.api.java.operators.UnsortedGrouping; +import org.apache.flink.api.java.operators.ExtractOperator.Extraction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; @@ -295,6 +277,11 @@ public FilterOperator filter(FilterFunction filter) { public ProjectOperator project(int... fieldIndexes) { return new Projection(this, fieldIndexes).projectTupleX(); } + + public ExtractOperator extractSingleField(int fieldIndex, Class outputType) { + return new Extraction(this, fieldIndex, outputType).extractElementX(); + } + // -------------------------------------------------------------------------------------------- // Non-grouped aggregations diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index 7c80bf87d4d6d..de2e6f45f114c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -62,6 +62,13 @@ public static SingleInputSemanticProperties createProjectionPropertiesSingle(int return ssp; } + public static SingleInputSemanticProperties createExtractionPropertiesSingle(int field) { + SingleInputSemanticProperties ssp = new SingleInputSemanticProperties(); + ssp.addForwardedField(field, 0); + return ssp; + } + + public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) { DualInputSemanticProperties dsp = new DualInputSemanticProperties(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java new file mode 100644 index 0000000000000..6887033d7cd98 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.SemanticPropUtil; +import org.apache.flink.api.java.operators.translation.PlanExtractOperator; +import org.apache.flink.api.java.operators.translation.PlanProjectOperator; +import org.apache.flink.api.java.tuple.*; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.util.Arrays; + +//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator +//CHECKSTYLE.ON: AvoidStarImport + +/** + * This operator represents the application of a projection operation on a data set, and the + * result data set produced by the function. + * + * @param The type of the data set projected by the operator. + * @param The type of data set that is the result of the projection. + */ +public class ExtractOperator + extends SingleInputOperator> { + + protected final int field; + + private Extraction extr; + + public ExtractOperator(DataSet input, int field, TypeInformation returnType) { + super(input, returnType); + + this.field = field; + extr = null; + } + + public ExtractOperator(DataSet input, int field, TypeInformation returnType, Extraction extr) { + super(input, returnType); + + this.field = field; + this.extr = extr; + } + + @Override + protected org.apache.flink.api.common.operators.base.MapOperatorBase> translateToDataFlow(Operator input) { + String name = getName() != null ? getName() : "Extraction " + field; + // create operator + PlanExtractOperator ppo = new PlanExtractOperator(field, name, getInputType(), getResultType()); + // set input + ppo.setInput(input); + // set dop + ppo.setDegreeOfParallelism(this.getParallelism()); + + //TODO: check this + ppo.setSemanticProperties(SemanticPropUtil.createExtractionPropertiesSingle(field)); + + return ppo; + } + + /** + * Continues a Project transformation on a {@link org.apache.flink.api.java.tuple.Tuple} {@link org.apache.flink.api.java.DataSet}.
+ * Note: Only Tuple DataSets can be projected using field indexes.
+ * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.
+ * Additional fields can be added to the projection by calling {@link org.apache.flink.api.java.operators.ExtractOperator#project(int[])}. + * + * Note: With the current implementation, the Project transformation looses type information. + * + * @param fieldIndex The field indexes which are added to the Project transformation. + * The order of fields in the output tuple corresponds to the order of field indexes. + * @return A ProjectOperator that represents the projected DataSet. + * + * @see org.apache.flink.api.java.tuple.Tuple + * @see org.apache.flink.api.java.DataSet + * @see org.apache.flink.api.java.operators.ExtractOperator + */ + @SuppressWarnings("hiding") + public ExtractOperator extract(int fieldIndex) { + extr.acceptIndex(fieldIndex); + + return extr.extractElementX(); + } + + public ExtractOperator type(Class type) { + return extr.extractElementX(); + } + + + /** + * Deprecated method only kept for compatibility. + */ + @SuppressWarnings({ "unchecked", "hiding" }) + @Deprecated + public ExtractOperator types(Class... types) { + TupleTypeInfo typeInfo = (TupleTypeInfo)this.getResultType(); + + if(types.length != typeInfo.getArity()) { + throw new InvalidProgramException("Provided types do not match projection."); + } + for (int i=0; i typeClass = types[i]; + if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) { + throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection"); + } + } + return (ExtractOperator) this; + } + + public static class Extraction { + + private final DataSet ds; + private int fieldIndex; + + public Extraction(DataSet ds, int fieldIndex, Class outputType) { + + if(!(ds.getType() instanceof TupleTypeInfo)) { + throw new UnsupportedOperationException("extract() can only be applied to DataSets of Tuples."); + } + + TupleTypeInfo tupleInfo = (TupleTypeInfo) ds.getType(); + + if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { + throw new IllegalArgumentException("The output class type has to be: " + tupleInfo.getTypeAt(fieldIndex).toString()); + } + + if(fieldIndex < 0) { + throw new IllegalArgumentException("The index of extract() has to be positive!"); + } + if(fieldIndex > Tuple.MAX_ARITY) { + throw new IllegalArgumentException("The index of extract() has to be smaller than the number of elements of the tuple!"); + } + + + this.ds = ds; + this.fieldIndex = fieldIndex; + } + + private void acceptIndex(int fieldIndex) { + + if(fieldIndex < 0) { + throw new IllegalArgumentException("The index of extract() has to be positive!"); + } + if(fieldIndex > Tuple.MAX_ARITY) { + throw new IllegalArgumentException("The index of extract() has to be smaller than the number of elements of the tuple!"); + } + + this.fieldIndex = fieldIndex; + } + + + + // -------------------------------------------------------------------------------------------- + // The following lines are generated. + // -------------------------------------------------------------------------------------------- + // BEGIN_OF_TUPLE_DEPENDENT_CODE + // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. + + /** + * Chooses a projectTupleX according to the length of + * {@link org.apache.flink.api.java.operators.ExtractOperator.Extraction#fieldIndex} + * + * @return The projected DataSet. + * + * @see org.apache.flink.api.java.operators.ExtractOperator.Extraction + */ + @SuppressWarnings("unchecked") + public ExtractOperator extractElementX() { + ExtractOperator projOperator = null; + + TupleTypeInfo tupleInfo = (TupleTypeInfo) ds.getType(); + TypeInformation tType = tupleInfo.getTypeAt(fieldIndex); + + return new ExtractOperator(this.ds, this.fieldIndex, tType, this); + } + + /** + * Projects a {@link org.apache.flink.api.java.tuple.Tuple} {@link org.apache.flink.api.java.DataSet} to the previously selected fields. + * + * @return The projected DataSet. + * + * @see org.apache.flink.api.java.tuple.Tuple + * @see org.apache.flink.api.java.DataSet + */ + /* + public ExtractOperator> projectTuple1() { + TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, ds.getType()); + TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); + + return new ExtractOperator>(this.ds, this.fieldIndexes, tType, this); + } + */ + + + + + private TypeInformation extractFieldType(int field, TypeInformation inType) { + + TupleTypeInfo inTupleType = (TupleTypeInfo) inType; + return inTupleType.getTypeAt(field); + } + + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java new file mode 100644 index 0000000000000..3f9ac769bf954 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators.translation; + +import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.UnaryOperatorInformation; +import org.apache.flink.api.common.operators.base.MapOperatorBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; + +public class PlanExtractOperator extends MapOperatorBase> { + + public PlanExtractOperator(int field, String name, TypeInformation inType, TypeInformation outType) { + super(new MapProjector(field, outType.createSerializer().createInstance()), new UnaryOperatorInformation(inType, outType), name); + } + + public static final class MapProjector + extends AbstractRichFunction + implements MapFunction + { + private static final long serialVersionUID = 1L; + + private final int field; + private final R out; + + private MapProjector(int field, R outInstance) { + this.field = field; + this.out = outInstance; + } + + // TODO We should use code generation for this. + @Override + public R map(T inTuple) throws Exception { + return inTuple.getField(field); + } + } +} From cf3bcb25332486a95172079e6591b77770b19159 Mon Sep 17 00:00:00 2001 From: FelixNeutatz Date: Wed, 14 Jan 2015 18:58:15 +0100 Subject: [PATCH 2/7] [FLINK-1398] prototype which uses simple map function --- .../org/apache/flink/api/java/DataSet.java | 26 ++++++++++++++++++- .../api/java/operators/ExtractOperator.java | 7 ++--- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 0421145486c8e..b414a4550779b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -46,9 +46,9 @@ import org.apache.flink.api.java.operators.*; import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets; import org.apache.flink.api.java.functions.FirstReducer; +import org.apache.flink.api.java.operators.ExtractOperator.Extraction; import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; import org.apache.flink.api.java.operators.ProjectOperator.Projection; -import org.apache.flink.api.java.operators.ExtractOperator.Extraction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; @@ -281,6 +281,30 @@ public ProjectOperator project(int... fieldIndexes) public ExtractOperator extractSingleField(int fieldIndex, Class outputType) { return new Extraction(this, fieldIndex, outputType).extractElementX(); } + + public SingleInputUdfOperator extractSingleFieldByMap(int fieldIndex, Class outputType) { + + TupleTypeInfo tupleInfo = (TupleTypeInfo) this.getType(); + if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { + throw new IllegalArgumentException("The output class type has to be: " + tupleInfo.getTypeAt(fieldIndex).toString()); + } + + return map(new ExtractElement(fieldIndex)).returns(tupleInfo.getTypeAt(fieldIndex)); + } + + public static final class ExtractElement implements MapFunction{ + private int id; + + public ExtractElement (int id){ + this.id = id; + } + + @Override + public R map(T value) { + return (R) value.getField(id); + + } + } // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java index 6887033d7cd98..085d1c7e7b2f7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java @@ -18,7 +18,7 @@ package org.apache.flink.api.java.operators; -import com.google.common.base.Preconditions; + import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.Operator; @@ -26,13 +26,10 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.functions.SemanticPropUtil; import org.apache.flink.api.java.operators.translation.PlanExtractOperator; -import org.apache.flink.api.java.operators.translation.PlanProjectOperator; -import org.apache.flink.api.java.tuple.*; +import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import java.util.Arrays; - //CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator //CHECKSTYLE.ON: AvoidStarImport From 2cdd7589825ff3a01247cd1614dc651be07ff173 Mon Sep 17 00:00:00 2001 From: FelixNeutatz Date: Wed, 14 Jan 2015 20:42:36 +0100 Subject: [PATCH 3/7] [FLINK-1398] add generics --- flink-java/src/main/java/org/apache/flink/api/java/DataSet.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index b414a4550779b..c3d46bf8ef74f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -282,7 +282,7 @@ public ExtractOperator extractSingleField(int fieldIndex, Class SingleInputUdfOperator> extractSingleFieldByMap(int fieldIndex, Class outputType) { TupleTypeInfo tupleInfo = (TupleTypeInfo) this.getType(); if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { From 7677636f71f1d75667e813c02bb6d54f619311ef Mon Sep 17 00:00:00 2001 From: FelixNeutatz Date: Wed, 14 Jan 2015 20:45:58 +0100 Subject: [PATCH 4/7] [FLINK-1398] modify generics --- .../src/main/java/org/apache/flink/api/java/DataSet.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index c3d46bf8ef74f..8633b8ef11980 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -282,9 +282,9 @@ public ExtractOperator extractSingleField(int fieldIndex, Class SingleInputUdfOperator> extractSingleFieldByMap(int fieldIndex, Class outputType) { + public SingleInputUdfOperator> extractSingleFieldByMap(int fieldIndex, Class outputType) { - TupleTypeInfo tupleInfo = (TupleTypeInfo) this.getType(); + TupleTypeInfo tupleInfo = (TupleTypeInfo) this.getType(); if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { throw new IllegalArgumentException("The output class type has to be: " + tupleInfo.getTypeAt(fieldIndex).toString()); } From 4c9f00d980c0b87c4ba56f00c15cd25175107f4d Mon Sep 17 00:00:00 2001 From: FelixNeutatz Date: Fri, 16 Jan 2015 01:03:15 +0100 Subject: [PATCH 5/7] [FLINK-1398] add as utility function --- .../org/apache/flink/api/java/DataSet.java | 365 +++++++++--------- .../api/java/functions/SemanticPropUtil.java | 13 +- .../api/java/operators/ExtractOperator.java | 222 ----------- .../translation/PlanExtractOperator.java | 54 --- .../flink/api/java/utilities/DataSetUtil.java | 85 ++++ .../api/java/utilities/DataSetUtilTest.java | 93 +++++ 6 files changed, 358 insertions(+), 474 deletions(-) delete mode 100644 flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java delete mode 100644 flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/utilities/DataSetUtil.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/utilities/DataSetUtilTest.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 8633b8ef11980..52c5966956635 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -43,12 +43,30 @@ import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat; import org.apache.flink.api.java.io.TextOutputFormat.TextFormatter; -import org.apache.flink.api.java.operators.*; +import org.apache.flink.api.java.operators.AggregateOperator; +import org.apache.flink.api.java.operators.CoGroupOperator; import org.apache.flink.api.java.operators.CoGroupOperator.CoGroupOperatorSets; +import org.apache.flink.api.java.operators.CrossOperator; +import org.apache.flink.api.java.operators.CustomUnaryOperation; +import org.apache.flink.api.java.operators.DataSink; +import org.apache.flink.api.java.operators.DeltaIteration; +import org.apache.flink.api.java.operators.DistinctOperator; +import org.apache.flink.api.java.operators.FilterOperator; +import org.apache.flink.api.java.operators.ProjectOperator; import org.apache.flink.api.java.functions.FirstReducer; -import org.apache.flink.api.java.operators.ExtractOperator.Extraction; +import org.apache.flink.api.java.operators.FlatMapOperator; +import org.apache.flink.api.java.operators.GroupReduceOperator; +import org.apache.flink.api.java.operators.IterativeDataSet; import org.apache.flink.api.java.operators.JoinOperator.JoinOperatorSets; +import org.apache.flink.api.java.operators.Keys; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.operators.MapPartitionOperator; +import org.apache.flink.api.java.operators.PartitionOperator; import org.apache.flink.api.java.operators.ProjectOperator.Projection; +import org.apache.flink.api.java.operators.ReduceOperator; +import org.apache.flink.api.java.operators.SortedGrouping; +import org.apache.flink.api.java.operators.UnionOperator; +import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; @@ -73,15 +91,15 @@ * @param The type of the DataSet, i.e., the type of the elements of the DataSet. */ public abstract class DataSet { - + private final ExecutionEnvironment context; - + // NOTE: the type must not be accessed directly, but only via getType() private TypeInformation type; - + private boolean typeUsed = false; - - + + protected DataSet(ExecutionEnvironment context, TypeInformation typeInfo) { if (context == null) { throw new NullPointerException("context is null"); @@ -96,27 +114,27 @@ protected DataSet(ExecutionEnvironment context, TypeInformation typeInfo) { /** * Returns the {@link ExecutionEnvironment} in which this DataSet is registered. - * + * * @return The ExecutionEnvironment in which this DataSet is registered. - * + * * @see ExecutionEnvironment */ public ExecutionEnvironment getExecutionEnvironment() { return this.context; } - + // -------------------------------------------------------------------------------------------- // Type Information handling // -------------------------------------------------------------------------------------------- - + /** * Tries to fill in the type information. Type information can be filled in later when the program uses * a type hint. This method checks whether the type information has ever been accessed before and does not * allow modifications if the type was accessed already. This ensures consistency by making sure different * parts of the operation do not assume different type information. - * + * * @param typeInfo The type information to fill in. - * + * * @throws IllegalStateException Thrown, if the type information has been accessed before. */ protected void fillInType(TypeInformation typeInfo) { @@ -127,12 +145,12 @@ protected void fillInType(TypeInformation typeInfo) { } this.type = typeInfo; } - + /** * Returns the {@link TypeInformation} for the type of this DataSet. - * + * * @return The TypeInformation for the type of this DataSet. - * + * * @see TypeInformation */ public TypeInformation getType() { @@ -159,15 +177,15 @@ public F clean(F f) { // -------------------------------------------------------------------------------------------- // Filter & Transformations // -------------------------------------------------------------------------------------------- - + /** * Applies a Map transformation on this DataSet.
* The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet. * Each MapFunction call returns exactly one element. - * + * * @param mapper The MapFunction that is called for each element of the DataSet. * @return A MapOperator that represents the transformed DataSet. - * + * * @see org.apache.flink.api.common.functions.MapFunction * @see org.apache.flink.api.common.functions.RichMapFunction * @see MapOperator @@ -205,20 +223,20 @@ public MapPartitionOperator mapPartition(MapPartitionFunction ma if (mapPartition == null) { throw new NullPointerException("MapPartition function must not be null."); } - + String callLocation = Utils.getCallLocationName(); TypeInformation resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, getType(), callLocation, true); return new MapPartitionOperator(this, resultType, clean(mapPartition), callLocation); } - + /** * Applies a FlatMap transformation on a {@link DataSet}.
* The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet. * Each FlatMapFunction call can return any number of elements including none. - * + * * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. * @return A FlatMapOperator that represents the transformed DataSet. - * + * * @see org.apache.flink.api.common.functions.RichFlatMapFunction * @see FlatMapOperator * @see DataSet @@ -232,16 +250,16 @@ public FlatMapOperator flatMap(FlatMapFunction flatMapper) { TypeInformation resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); return new FlatMapOperator(this, resultType, clean(flatMapper), callLocation); } - + /** * Applies a Filter transformation on a {@link DataSet}.
* The transformation calls a {@link org.apache.flink.api.common.functions.RichFilterFunction} for each element of the DataSet * and retains only those element for which the function returns true. Elements for * which the function returns false are filtered. - * + * * @param filter The FilterFunction that is called for each element of the DataSet. * @return A FilterOperator that represents the filtered DataSet. - * + * * @see org.apache.flink.api.common.functions.RichFilterFunction * @see FilterOperator * @see DataSet @@ -253,11 +271,11 @@ public FilterOperator filter(FilterFunction filter) { return new FilterOperator(this, clean(filter), Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Projections // -------------------------------------------------------------------------------------------- - + /** * Applies a Project transformation on a {@link Tuple} {@link DataSet}.
* Note: Only Tuple DataSets can be projected using field indexes.
@@ -278,50 +296,21 @@ public ProjectOperator project(int... fieldIndexes) return new Projection(this, fieldIndexes).projectTupleX(); } - public ExtractOperator extractSingleField(int fieldIndex, Class outputType) { - return new Extraction(this, fieldIndex, outputType).extractElementX(); - } - - public SingleInputUdfOperator> extractSingleFieldByMap(int fieldIndex, Class outputType) { - - TupleTypeInfo tupleInfo = (TupleTypeInfo) this.getType(); - if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { - throw new IllegalArgumentException("The output class type has to be: " + tupleInfo.getTypeAt(fieldIndex).toString()); - } - - return map(new ExtractElement(fieldIndex)).returns(tupleInfo.getTypeAt(fieldIndex)); - } - - public static final class ExtractElement implements MapFunction{ - private int id; - - public ExtractElement (int id){ - this.id = id; - } - - @Override - public R map(T value) { - return (R) value.getField(id); - - } - } - - // -------------------------------------------------------------------------------------------- // Non-grouped aggregations // -------------------------------------------------------------------------------------------- - + /** * Applies an Aggregate transformation on a non-grouped {@link Tuple} {@link DataSet}.
* Note: Only Tuple DataSets can be aggregated. * The transformation applies a built-in {@link Aggregations Aggregation} on a specified field * of a Tuple DataSet. Additional aggregation functions can be added to the resulting * {@link AggregateOperator} by calling {@link AggregateOperator#and(Aggregations, int)}. - * + * * @param agg The built-in aggregation function that is computed. * @param field The index of the Tuple field on which the aggregation function is applied. * @return An AggregateOperator that represents the aggregated DataSet. - * + * * @see Tuple * @see Aggregations * @see AggregateOperator @@ -375,16 +364,16 @@ public AggregateOperator max(int field) { public AggregateOperator min(int field) { return aggregate(Aggregations.MIN, field); } - + /** * Applies a Reduce transformation on a non-grouped {@link DataSet}.
* The transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} * until only a single element remains which is the result of the transformation. * A ReduceFunction combines two elements into one new element of the same type. - * + * * @param reducer The ReduceFunction that is applied on the DataSet. * @return A ReduceOperator that represents the reduced DataSet. - * + * * @see org.apache.flink.api.common.functions.RichReduceFunction * @see ReduceOperator * @see DataSet @@ -395,16 +384,16 @@ public ReduceOperator reduce(ReduceFunction reducer) { } return new ReduceOperator(this, clean(reducer), Utils.getCallLocationName()); } - + /** * Applies a GroupReduce transformation on a non-grouped {@link DataSet}.
* The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet. * The GroupReduceFunction can iterate over all elements of the DataSet and emit any * number of output elements including none. - * + * * @param reducer The GroupReduceFunction that is applied on the DataSet. * @return A GroupReduceOperator that represents the reduced DataSet. - * + * * @see org.apache.flink.api.common.functions.RichGroupReduceFunction * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet @@ -413,7 +402,7 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - + String callLocation = Utils.getCallLocationName(); TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, getType(), callLocation, true); return new GroupReduceOperator(this, resultType, clean(reducer), callLocation); @@ -454,7 +443,7 @@ public ReduceOperator minBy(int... fields) { return new ReduceOperator(this, new SelectByMinFunction( (TupleTypeInfo) getType(), fields), Utils.getCallLocationName()); } - + /** * Selects an element with maximum value. *

@@ -495,25 +484,25 @@ public ReduceOperator maxBy(int... fields) { * Returns a new set containing the first n elements in this {@link DataSet}.
* @param n The desired number of elements. * @return A ReduceGroupOperator that represents the DataSet containing the elements. - */ + */ public GroupReduceOperator first(int n) { if(n < 1) { throw new InvalidProgramException("Parameter n of first(n) must be at least 1."); } - + return reduceGroup(new FirstReducer(n)); } - + // -------------------------------------------------------------------------------------------- // distinct // -------------------------------------------------------------------------------------------- - + /** * Returns a distinct set of a {@link DataSet} using a {@link KeySelector} function. *

* The KeySelector function is called for each element of the DataSet and extracts a single key value on which the * decision is made if two items are distinct or not. - * + * * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which the * distinction of the DataSet is decided. * @return A DistinctOperator that represents the distinct DataSet. @@ -522,7 +511,7 @@ public DistinctOperator distinct(KeySelector keyExtractor) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new DistinctOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType), Utils.getCallLocationName()); } - + /** * Returns a distinct set of a {@link Tuple} {@link DataSet} using field position keys. *

@@ -537,7 +526,7 @@ public DistinctOperator distinct(KeySelector keyExtractor) { public DistinctOperator distinct(int... fields) { return new DistinctOperator(this, new Keys.ExpressionKeys(fields, getType(), true), Utils.getCallLocationName()); } - + /** * Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys. *

@@ -551,18 +540,18 @@ public DistinctOperator distinct(int... fields) { public DistinctOperator distinct(String... fields) { return new DistinctOperator(this, new Keys.ExpressionKeys(fields, getType()), Utils.getCallLocationName()); } - + /** * Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple. *

* Note: This operator can only be applied to Tuple DataSets. - * + * * @return A DistinctOperator that represents the distinct DataSet. */ public DistinctOperator distinct() { return new DistinctOperator(this, null, Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Grouping // -------------------------------------------------------------------------------------------- @@ -579,10 +568,10 @@ public DistinctOperator distinct() { *

  • {@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. *
  • {@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * - * + * * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped. * @return An UnsortedGrouping on which a transformation needs to be applied to obtain a transformed DataSet. - * + * * @see KeySelector * @see UnsortedGrouping * @see AggregateOperator @@ -594,7 +583,7 @@ public UnsortedGrouping groupBy(KeySelector keyExtractor) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new UnsortedGrouping(this, new Keys.SelectorFunctionKeys(clean(keyExtractor), getType(), keyType)); } - + /** * Groups a {@link Tuple} {@link DataSet} using field position keys.
    * Note: Field position keys only be specified for Tuple DataSets.
    @@ -607,10 +596,10 @@ public UnsortedGrouping groupBy(KeySelector keyExtractor) { *
  • {@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. *
  • {@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * - * + * * @param fields One or more field positions on which the DataSet will be grouped. * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. - * + * * @see Tuple * @see UnsortedGrouping * @see AggregateOperator @@ -648,44 +637,44 @@ public UnsortedGrouping groupBy(int... fields) { public UnsortedGrouping groupBy(String... fields) { return new UnsortedGrouping(this, new Keys.ExpressionKeys(fields, getType())); } - + // -------------------------------------------------------------------------------------------- // Joining // -------------------------------------------------------------------------------------------- - + /** * Initiates a Join transformation.
    * A Join transformation joins the elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine * joining elements into one DataSet.
    - * + * * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets join(DataSet other) { return new JoinOperatorSets(this, other); } - + /** * Initiates a Join transformation.
    * A Join transformation joins the elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine * joining elements into one DataSet.
    - * + * * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @param strategy The strategy that should be used execute the join. If {@code null} is give, then the * optimizer will pick the join strategy. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ @@ -703,17 +692,17 @@ public JoinOperatorSets join(DataSet other, JoinHint strategy) { * This method returns a {@link JoinOperatorSets} on which * {@link JoinOperatorSets#where(String...)} needs to be called to define the join key of the first * joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets joinWithTiny(DataSet other) { return new JoinOperatorSets(this, other, JoinHint.BROADCAST_HASH_SECOND); } - + /** * Initiates a Join transformation.
    * A Join transformation joins the elements of two @@ -723,17 +712,17 @@ public JoinOperatorSets joinWithTiny(DataSet other) { * larger than the first one.
    * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSet to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets joinWithHuge(DataSet other) { return new JoinOperatorSets(this, other, JoinHint.BROADCAST_HASH_FIRST); } - + // -------------------------------------------------------------------------------------------- // Co-Grouping // -------------------------------------------------------------------------------------------- @@ -749,10 +738,10 @@ public JoinOperatorSets joinWithHuge(DataSet other) { * of elements including none.
    * This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet of the CoGroup transformation. * @return A CoGroupOperatorSets to continue the definition of the CoGroup transformation. - * + * * @see CoGroupOperatorSets * @see CoGroupOperator * @see DataSet @@ -769,34 +758,34 @@ public CoGroupOperator.CoGroupOperatorSets coGroup(DataSet other) { * Continues a Join transformation and defines the {@link Tuple} fields of the second join * {@link DataSet} that should be used as join keys.
    * Note: Fields can only be selected as join keys on Tuple DataSets.
    - * + * * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys. * @return A DefaultJoin that represents the joined DataSet. */ - + /** * Initiates a Cross transformation.
    * A Cross transformation combines the elements of two * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of * both DataSets, i.e., it builds a Cartesian product. - * + * *

    * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * *

    * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
    - * + * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -805,7 +794,7 @@ public CoGroupOperator.CoGroupOperatorSets coGroup(DataSet other) { public CrossOperator.DefaultCross cross(DataSet other) { return new CrossOperator.DefaultCross(this, other, CrossHint.OPTIMIZER_CHOOSES, Utils.getCallLocationName()); } - + /** * Initiates a Cross transformation.
    * A Cross transformation combines the elements of two @@ -813,20 +802,20 @@ public CrossOperator.DefaultCross cross(DataSet other) { * both DataSets, i.e., it builds a Cartesian product. * This method also gives the hint to the optimizer that the second DataSet to cross is much * smaller than the first one. - * + * *

    * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * *

    * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
    - * + * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -835,7 +824,7 @@ public CrossOperator.DefaultCross cross(DataSet other) { public CrossOperator.DefaultCross crossWithTiny(DataSet other) { return new CrossOperator.DefaultCross(this, other, CrossHint.SECOND_IS_SMALL, Utils.getCallLocationName()); } - + /** * Initiates a Cross transformation.
    * A Cross transformation combines the elements of two @@ -843,20 +832,20 @@ public CrossOperator.DefaultCross crossWithTiny(DataSet other) { * both DataSets, i.e., it builds a Cartesian product. * This method also gives the hint to the optimizer that the second DataSet to cross is much * larger than the first one. - * + * *

    * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * *

    * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
    - * + * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -882,7 +871,7 @@ public CrossOperator.DefaultCross crossWithHuge(DataSet other) { *

     	 * {@code
     	 * DataSet input = ...;
    -	 * 
    +	 *
     	 * DataSet startOfIteration = input.iterate(10);
     	 * DataSet toBeFedBack = startOfIteration
     	 *                               .map(new MyMapper())
    @@ -893,17 +882,17 @@ public  CrossOperator.DefaultCross crossWithHuge(DataSet other) {
     	 * 

    * The iteration has a maximum number of times that it executes. A dynamic termination can be realized by using a * termination criterion (see {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet, DataSet)}). - * + * * @param maxIterations The maximum number of times that the iteration is executed. * @return An IterativeDataSet that marks the start of the iterative part and needs to be closed by * {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet)}. - * + * * @see org.apache.flink.api.java.operators.IterativeDataSet */ public IterativeDataSet iterate(int maxIterations) { return new IterativeDataSet(getExecutionEnvironment(), getType(), this, maxIterations); } - + /** * Initiates a delta iteration. A delta iteration is similar to a regular iteration (as started by {@link #iterate(int)}, * but maintains state across the individual iteration steps. The Solution set, which represents the current state @@ -930,30 +919,30 @@ public IterativeDataSet iterate(int maxIterations) { * {@code * DeltaIteration, Tuple2> iteration = * initialState.iterateDelta(initialFeedbakSet, 100, 0); - * + * * DataSet> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1) * .join(iteration.getSolutionSet()).where(0).equalTo(0) * .flatMap(new ProjectAndFilter()); - * + * * DataSet> feedBack = delta.join(someOtherSet).where(...).equalTo(...).with(...); - * + * * // close the delta iteration (delta and new workset are identical) * DataSet> result = iteration.closeWith(delta, feedBack); * } *

    - * + * * @param workset The initial version of the data set that is fed back to the next iteration step (the workset). * @param maxIterations The maximum number of iteration steps, as a fall back safeguard. * @param keyPositions The position of the tuple fields that is used as the key of the solution set. - * + * * @return The DeltaIteration that marks the start of a delta iteration. - * + * * @see org.apache.flink.api.java.operators.DeltaIteration */ public DeltaIteration iterateDelta(DataSet workset, int maxIterations, int... keyPositions) { Preconditions.checkNotNull(workset); Preconditions.checkNotNull(keyPositions); - + Keys.ExpressionKeys keys = new Keys.ExpressionKeys(keyPositions, getType(), false); return new DeltaIteration(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations); } @@ -961,12 +950,12 @@ public DeltaIteration iterateDelta(DataSet workset, int maxIteratio // -------------------------------------------------------------------------------------------- // Custom Operators // ------------------------------------------------------------------------------------------- - + /** * Runs a {@link CustomUnaryOperation} on the data set. Custom operations are typically complex * operators that are composed of multiple steps. - * + * * @param operation The operation to run. * @return The data set produced by the operation. */ @@ -975,70 +964,70 @@ public DataSet runOperation(CustomUnaryOperation operation) { operation.setInput(this); return operation.createResult(); } - + // -------------------------------------------------------------------------------------------- // Union // -------------------------------------------------------------------------------------------- /** * Creates a union of this DataSet with an other DataSet. The other DataSet must be of the same data type. - * + * * @param other The other DataSet which is unioned with the current DataSet. * @return The resulting DataSet. */ public UnionOperator union(DataSet other){ return new UnionOperator(this, other, Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Partitioning // -------------------------------------------------------------------------------------------- - + /** * Hash-partitions a DataSet on the specified key fields. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @param fields The field indexes on which the DataSet is hash-partitioned. * @return The partitioned DataSet. */ public PartitionOperator partitionByHash(int... fields) { return new PartitionOperator(this, PartitionMethod.HASH, new Keys.ExpressionKeys(fields, getType(), false), Utils.getCallLocationName()); } - + /** * Hash-partitions a DataSet on the specified key fields. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @param fields The field expressions on which the DataSet is hash-partitioned. * @return The partitioned DataSet. */ public PartitionOperator partitionByHash(String... fields) { return new PartitionOperator(this, PartitionMethod.HASH, new Keys.ExpressionKeys(fields, getType()), Utils.getCallLocationName()); } - + /** * Partitions a DataSet using the specified KeySelector. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. * @return The partitioned DataSet. - * + * * @see KeySelector */ public > PartitionOperator partitionByHash(KeySelector keyExtractor) { final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new PartitionOperator(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName()); } - + /** * Partitions a tuple DataSet on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key type. *

    * Note: This method works only on single field keys. - * + * * @param partitioner The partitioner to assign partitions to keys. * @param field The field index on which the DataSet is to partitioned. * @return The partitioned DataSet. @@ -1046,13 +1035,13 @@ public > PartitionOperator partitionByHash(KeySelecto public PartitionOperator partitionCustom(Partitioner partitioner, int field) { return new PartitionOperator(this, new Keys.ExpressionKeys(new int[] {field}, getType(), false), clean(partitioner), Utils.getCallLocationName()); } - + /** * Partitions a POJO DataSet on the specified key fields using a custom partitioner. * This method takes the key expression to partition on, and a partitioner that accepts the key type. *

    * Note: This method works only on single field keys. - * + * * @param partitioner The partitioner to assign partitions to keys. * @param field The field index on which the DataSet is to partitioned. * @return The partitioned DataSet. @@ -1060,7 +1049,7 @@ public PartitionOperator partitionCustom(Partitioner partitioner, int public PartitionOperator partitionCustom(Partitioner partitioner, String field) { return new PartitionOperator(this, new Keys.ExpressionKeys(new String[] {field}, getType()), clean(partitioner), Utils.getCallLocationName()); } - + /** * Partitions a DataSet on the key returned by the selector, using a custom partitioner. * This method takes the key selector t get the key to partition on, and a partitioner that @@ -1068,59 +1057,59 @@ public PartitionOperator partitionCustom(Partitioner partitioner, Stri *

    * Note: This method works only on single field keys, i.e. the selector cannot return tuples * of fields. - * + * * @param partitioner The partitioner to assign partitions to keys. * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. * @return The partitioned DataSet. - * + * * @see KeySelector */ public > PartitionOperator partitionCustom(Partitioner partitioner, KeySelector keyExtractor) { final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new PartitionOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType), clean(partitioner), Utils.getCallLocationName()); } - + /** * Enforces a re-balancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the * following task. This can help to improve performance in case of heavy data skew and compute intensive operations. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @return The re-balanced DataSet. */ public PartitionOperator rebalance() { return new PartitionOperator(this, PartitionMethod.REBALANCE, Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Top-K // -------------------------------------------------------------------------------------------- - + // -------------------------------------------------------------------------------------------- // Result writing // -------------------------------------------------------------------------------------------- - + /** * Writes a DataSet as a text file to the specified location.
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the text file is written to. * @return The DataSink that writes the DataSet. - * + * * @see TextOutputFormat */ public DataSink writeAsText(String filePath) { return output(new TextOutputFormat(new Path(filePath))); } - + /** * Writes a DataSet as a text file to the specified location.
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the text file is written to. * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE. * @return The DataSink that writes the DataSet. - * + * * @see TextOutputFormat */ public DataSink writeAsText(String filePath, WriteMode writeMode) { @@ -1128,7 +1117,7 @@ public DataSink writeAsText(String filePath, WriteMode writeMode) { tof.setWriteMode(writeMode); return output(tof); } - + /** * Writes a DataSet as a text file to the specified location.
    * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written. @@ -1157,51 +1146,51 @@ public DataSink writeAsFormattedText(String filePath, TextFormatter f public DataSink writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter formatter) { return this.map(new FormattingMapper(clean(formatter))).writeAsText(filePath, writeMode); } - + /** * Writes a {@link Tuple} DataSet as a CSV file to the specified location.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
    * Tuples are are separated by the newline character ({@code \n}). - * + * * @param filePath The path pointing to the location the CSV file is written to. * @return The DataSink that writes the DataSet. - * + * * @see Tuple * @see CsvOutputFormat */ public DataSink writeAsCsv(String filePath) { return writeAsCsv(filePath, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); } - + /** * Writes a {@link Tuple} DataSet as a CSV file to the specified location.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
    * Tuples are are separated by the newline character ({@code \n}). - * + * * @param filePath The path pointing to the location the CSV file is written to. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. * @return The DataSink that writes the DataSet. - * + * * @see Tuple * @see CsvOutputFormat */ public DataSink writeAsCsv(String filePath, WriteMode writeMode) { return internalWriteAsCsv(new Path(filePath),CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER, writeMode); } - + /** * Writes a {@link Tuple} DataSet as a CSV file to the specified location with the specified field and line delimiters.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. - * + * * @see Tuple * @see CsvOutputFormat */ @@ -1213,19 +1202,19 @@ public DataSink writeAsCsv(String filePath, String rowDelimiter, String field * Writes a {@link Tuple} DataSet as a CSV file to the specified location with the specified field and line delimiters.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. - * + * * @see Tuple * @see CsvOutputFormat */ public DataSink writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) { return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode); } - + @SuppressWarnings("unchecked") private DataSink internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) { Validate.isTrue(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples."); @@ -1235,35 +1224,35 @@ private DataSink internalWriteAsCsv(Path filePath, String r } return output((OutputFormat) of); } - + /** * Writes a DataSet to the standard output stream (stdout).
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @return The DataSink that writes the DataSet. */ public DataSink print() { return output(new PrintingOutputFormat(false)); } - + /** * Writes a DataSet to the standard error stream (stderr).
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @return The DataSink that writes the DataSet. */ public DataSink printToErr() { return output(new PrintingOutputFormat(true)); } - + /** * Writes a DataSet using a {@link FileOutputFormat} to a specified location. * This method adds a data sink to the program. - * + * * @param outputFormat The FileOutputFormat to write the DataSet. * @param filePath The path to the location where the DataSet is written. * @return The DataSink that writes the DataSet. - * + * * @see FileOutputFormat */ public DataSink write(FileOutputFormat outputFormat, String filePath) { @@ -1273,16 +1262,16 @@ public DataSink write(FileOutputFormat outputFormat, String filePath) { outputFormat.setOutputFilePath(new Path(filePath)); return output(outputFormat); } - + /** * Writes a DataSet using a {@link FileOutputFormat} to a specified location. * This method adds a data sink to the program. - * + * * @param outputFormat The FileOutputFormat to write the DataSet. * @param filePath The path to the location where the DataSet is written. * @param writeMode The mode of writing, indicating whether to overwrite existing files. * @return The DataSink that writes the DataSet. - * + * * @see FileOutputFormat */ public DataSink write(FileOutputFormat outputFormat, String filePath, WriteMode writeMode) { @@ -1294,36 +1283,36 @@ public DataSink write(FileOutputFormat outputFormat, String filePath, Writ outputFormat.setWriteMode(writeMode); return output(outputFormat); } - + /** * Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program. * Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks * or transformations) at the same time. - * + * * @param outputFormat The OutputFormat to process the DataSet. * @return The DataSink that processes the DataSet. - * + * * @see OutputFormat * @see DataSink */ public DataSink output(OutputFormat outputFormat) { Validate.notNull(outputFormat); - + // configure the type if needed if (outputFormat instanceof InputTypeConfigurable) { ((InputTypeConfigurable) outputFormat).setInputType(getType()); } - + DataSink sink = new DataSink(this, outputFormat, getType()); this.context.registerDataSink(sink); return sink; } - - + + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- - + protected static void checkSameExecutionContext(DataSet set1, DataSet set2) { if (set1.context != set2.context) { throw new IllegalArgumentException("The two inputs have different execution contexts."); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index de2e6f45f114c..adfd41cce9c58 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -62,13 +62,6 @@ public static SingleInputSemanticProperties createProjectionPropertiesSingle(int return ssp; } - public static SingleInputSemanticProperties createExtractionPropertiesSingle(int field) { - SingleInputSemanticProperties ssp = new SingleInputSemanticProperties(); - ssp.addForwardedField(field, 0); - return ssp; - } - - public static DualInputSemanticProperties createProjectionPropertiesDual(int[] fields, boolean[] isFromFirst) { DualInputSemanticProperties dsp = new DualInputSemanticProperties(); @@ -338,7 +331,7 @@ private static FieldSet readFieldSetFromString(String s, TypeInformation inTy if (!isValidField(inType, field)) { throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the input tuple."); } - + fs = fs.addField(field); } return fs; @@ -365,8 +358,8 @@ public static SingleInputSemanticProperties getSemanticPropsSingleFromString(Str return result; } - public static void getSemanticPropsDualFromString(DualInputSemanticProperties target, String[] constantSetFirst, String[] constantSetSecond, String[] constantSetFirstExcept, - String[] constantSetSecondExcept, String[] readFieldsFirst, String[] readFieldsSecond, TypeInformation inType1, TypeInformation inType2, TypeInformation outType) + public static void getSemanticPropsDualFromString(DualInputSemanticProperties target, String[] constantSetFirst, String[] constantSetSecond, String[] constantSetFirstExcept, + String[] constantSetSecondExcept, String[] readFieldsFirst, String[] readFieldsSecond, TypeInformation inType1, TypeInformation inType2, TypeInformation outType) { parseConstantFieldsFirst(constantSetFirst, target, inType1, outType); parseConstantFieldsSecond(constantSetSecond, target, inType2, outType); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java deleted file mode 100644 index 085d1c7e7b2f7..0000000000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ExtractOperator.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.operators; - - -import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.functions.SemanticPropUtil; -import org.apache.flink.api.java.operators.translation.PlanExtractOperator; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator -//CHECKSTYLE.ON: AvoidStarImport - -/** - * This operator represents the application of a projection operation on a data set, and the - * result data set produced by the function. - * - * @param The type of the data set projected by the operator. - * @param The type of data set that is the result of the projection. - */ -public class ExtractOperator - extends SingleInputOperator> { - - protected final int field; - - private Extraction extr; - - public ExtractOperator(DataSet input, int field, TypeInformation returnType) { - super(input, returnType); - - this.field = field; - extr = null; - } - - public ExtractOperator(DataSet input, int field, TypeInformation returnType, Extraction extr) { - super(input, returnType); - - this.field = field; - this.extr = extr; - } - - @Override - protected org.apache.flink.api.common.operators.base.MapOperatorBase> translateToDataFlow(Operator input) { - String name = getName() != null ? getName() : "Extraction " + field; - // create operator - PlanExtractOperator ppo = new PlanExtractOperator(field, name, getInputType(), getResultType()); - // set input - ppo.setInput(input); - // set dop - ppo.setDegreeOfParallelism(this.getParallelism()); - - //TODO: check this - ppo.setSemanticProperties(SemanticPropUtil.createExtractionPropertiesSingle(field)); - - return ppo; - } - - /** - * Continues a Project transformation on a {@link org.apache.flink.api.java.tuple.Tuple} {@link org.apache.flink.api.java.DataSet}.
    - * Note: Only Tuple DataSets can be projected using field indexes.
    - * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.
    - * Additional fields can be added to the projection by calling {@link org.apache.flink.api.java.operators.ExtractOperator#project(int[])}. - * - * Note: With the current implementation, the Project transformation looses type information. - * - * @param fieldIndex The field indexes which are added to the Project transformation. - * The order of fields in the output tuple corresponds to the order of field indexes. - * @return A ProjectOperator that represents the projected DataSet. - * - * @see org.apache.flink.api.java.tuple.Tuple - * @see org.apache.flink.api.java.DataSet - * @see org.apache.flink.api.java.operators.ExtractOperator - */ - @SuppressWarnings("hiding") - public ExtractOperator extract(int fieldIndex) { - extr.acceptIndex(fieldIndex); - - return extr.extractElementX(); - } - - public ExtractOperator type(Class type) { - return extr.extractElementX(); - } - - - /** - * Deprecated method only kept for compatibility. - */ - @SuppressWarnings({ "unchecked", "hiding" }) - @Deprecated - public ExtractOperator types(Class... types) { - TupleTypeInfo typeInfo = (TupleTypeInfo)this.getResultType(); - - if(types.length != typeInfo.getArity()) { - throw new InvalidProgramException("Provided types do not match projection."); - } - for (int i=0; i typeClass = types[i]; - if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) { - throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection"); - } - } - return (ExtractOperator) this; - } - - public static class Extraction { - - private final DataSet ds; - private int fieldIndex; - - public Extraction(DataSet ds, int fieldIndex, Class outputType) { - - if(!(ds.getType() instanceof TupleTypeInfo)) { - throw new UnsupportedOperationException("extract() can only be applied to DataSets of Tuples."); - } - - TupleTypeInfo tupleInfo = (TupleTypeInfo) ds.getType(); - - if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { - throw new IllegalArgumentException("The output class type has to be: " + tupleInfo.getTypeAt(fieldIndex).toString()); - } - - if(fieldIndex < 0) { - throw new IllegalArgumentException("The index of extract() has to be positive!"); - } - if(fieldIndex > Tuple.MAX_ARITY) { - throw new IllegalArgumentException("The index of extract() has to be smaller than the number of elements of the tuple!"); - } - - - this.ds = ds; - this.fieldIndex = fieldIndex; - } - - private void acceptIndex(int fieldIndex) { - - if(fieldIndex < 0) { - throw new IllegalArgumentException("The index of extract() has to be positive!"); - } - if(fieldIndex > Tuple.MAX_ARITY) { - throw new IllegalArgumentException("The index of extract() has to be smaller than the number of elements of the tuple!"); - } - - this.fieldIndex = fieldIndex; - } - - - - // -------------------------------------------------------------------------------------------- - // The following lines are generated. - // -------------------------------------------------------------------------------------------- - // BEGIN_OF_TUPLE_DEPENDENT_CODE - // GENERATED FROM org.apache.flink.api.java.tuple.TupleGenerator. - - /** - * Chooses a projectTupleX according to the length of - * {@link org.apache.flink.api.java.operators.ExtractOperator.Extraction#fieldIndex} - * - * @return The projected DataSet. - * - * @see org.apache.flink.api.java.operators.ExtractOperator.Extraction - */ - @SuppressWarnings("unchecked") - public ExtractOperator extractElementX() { - ExtractOperator projOperator = null; - - TupleTypeInfo tupleInfo = (TupleTypeInfo) ds.getType(); - TypeInformation tType = tupleInfo.getTypeAt(fieldIndex); - - return new ExtractOperator(this.ds, this.fieldIndex, tType, this); - } - - /** - * Projects a {@link org.apache.flink.api.java.tuple.Tuple} {@link org.apache.flink.api.java.DataSet} to the previously selected fields. - * - * @return The projected DataSet. - * - * @see org.apache.flink.api.java.tuple.Tuple - * @see org.apache.flink.api.java.DataSet - */ - /* - public ExtractOperator> projectTuple1() { - TypeInformation[] fTypes = extractFieldTypes(fieldIndexes, ds.getType()); - TupleTypeInfo> tType = new TupleTypeInfo>(fTypes); - - return new ExtractOperator>(this.ds, this.fieldIndexes, tType, this); - } - */ - - - - - private TypeInformation extractFieldType(int field, TypeInformation inType) { - - TupleTypeInfo inTupleType = (TupleTypeInfo) inType; - return inTupleType.getTypeAt(field); - } - - } -} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java deleted file mode 100644 index 3f9ac769bf954..0000000000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/PlanExtractOperator.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.operators.translation; - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.base.MapOperatorBase; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple; - -public class PlanExtractOperator extends MapOperatorBase> { - - public PlanExtractOperator(int field, String name, TypeInformation inType, TypeInformation outType) { - super(new MapProjector(field, outType.createSerializer().createInstance()), new UnaryOperatorInformation(inType, outType), name); - } - - public static final class MapProjector - extends AbstractRichFunction - implements MapFunction - { - private static final long serialVersionUID = 1L; - - private final int field; - private final R out; - - private MapProjector(int field, R outInstance) { - this.field = field; - this.out = outInstance; - } - - // TODO We should use code generation for this. - @Override - public R map(T inTuple) throws Exception { - return inTuple.getField(field); - } - } -} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utilities/DataSetUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/utilities/DataSetUtil.java new file mode 100644 index 0000000000000..c0dde31bf8259 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/utilities/DataSetUtil.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utilities; + + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.api.java.operators.SingleInputUdfOperator; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; + + +public class DataSetUtil { + + + // -------------------------------------------------------------------------------------------- + // Extraction of a single field + // -------------------------------------------------------------------------------------------- + + /** + * Applies a single field extraction on a {@link Tuple} {@link DataSet}.
    + * Note: Can be only applied on Tuple DataSets using the corresponding field index.
    + * The transformation extracts of each Tuple of the DataSet a given field.
    + * + * + * @param ds The input DataSet. + * @param fieldIndex The field index of the input tuple which is extracted. + * @param outputType Class of the extracted field. + * @return A SingleInputUdfOperator that represents the extracted field. + * + * @see Tuple + * @see DataSet + * @see org.apache.flink.api.java.operators.SingleInputUdfOperator + */ + public static SingleInputUdfOperator> extractSingleField(DataSet ds, int fieldIndex, Class outputType) { + + if(!ds.getType().isTupleType()) { + throw new IllegalArgumentException("The DataSet has to contain a Tuple, not " + ds.getType().getTypeClass().getName()); + } + + TupleTypeInfo tupleInfo = (TupleTypeInfo) ds.getType(); + if(fieldIndex >= tupleInfo.getArity() || fieldIndex < 0) { + throw new IndexOutOfBoundsException("The field index has to be between 0 and " + (tupleInfo.getArity()-1)); + } + + if(!tupleInfo.getTypeAt(fieldIndex).equals(TypeExtractor.createTypeInfo(outputType))) { + throw new IllegalArgumentException("The output class type has to be: " + tupleInfo.getTypeAt(fieldIndex).toString()); + } + + return ds.map(new ExtractElement(fieldIndex)).returns(tupleInfo.getTypeAt(fieldIndex)); + } + + private static final class ExtractElement implements MapFunction { + private int id; + + public ExtractElement (int id){ + this.id = id; + } + + @Override + public R map(T value) { + return (R) value.getField(id); + + } + } + +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utilities/DataSetUtilTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utilities/DataSetUtilTest.java new file mode 100644 index 0000000000000..9d830d90d51c6 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/utilities/DataSetUtilTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utilities; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class DataSetUtilTest { + + // TUPLE DATA + + private final List> emptyTupleData = + new ArrayList>(); + + private final TupleTypeInfo> tupleTypeInfo = new + TupleTypeInfo>( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO + ); + + @Test + public void testSingleFieldExtraction() { + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo); + + // should work + try { + DataSetUtil.extractSingleField(tupleDs, 0, Integer.class); + } catch(Exception e) { + Assert.fail(); + } + + // should not work: index out of bounds of input tuple + try { + DataSetUtil.extractSingleField(tupleDs, 5, Integer.class); + Assert.fail(); + } catch(IndexOutOfBoundsException ioobe) { + // we're good here + } catch(Exception e) { + Assert.fail(); + } + + // should not work: index out of bounds of input tuple + try { + DataSetUtil.extractSingleField(tupleDs, -1, Integer.class); + Assert.fail(); + } catch(IndexOutOfBoundsException ioobe) { + // we're good here + } catch(Exception e) { + Assert.fail(); + } + + // should not work: wrong output class type + try { + DataSetUtil.extractSingleField(tupleDs, 0, Long.class); + Assert.fail(); + } catch(IllegalArgumentException iae) { + // we're good here + } catch(Exception e) { + Assert.fail(); + } + + } + +} From 8a8a37318e331a67fb85c1bac01a189c30b42867 Mon Sep 17 00:00:00 2001 From: FelixNeutatz Date: Fri, 16 Jan 2015 01:16:02 +0100 Subject: [PATCH 6/7] [FLINK-1398] clean up --- .../org/apache/flink/api/java/DataSet.java | 316 +++++++++--------- .../api/java/functions/SemanticPropUtil.java | 6 +- 2 files changed, 161 insertions(+), 161 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 52c5966956635..82e37ac723437 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -91,15 +91,15 @@ * @param The type of the DataSet, i.e., the type of the elements of the DataSet. */ public abstract class DataSet { - + private final ExecutionEnvironment context; - + // NOTE: the type must not be accessed directly, but only via getType() private TypeInformation type; - + private boolean typeUsed = false; - - + + protected DataSet(ExecutionEnvironment context, TypeInformation typeInfo) { if (context == null) { throw new NullPointerException("context is null"); @@ -114,27 +114,27 @@ protected DataSet(ExecutionEnvironment context, TypeInformation typeInfo) { /** * Returns the {@link ExecutionEnvironment} in which this DataSet is registered. - * + * * @return The ExecutionEnvironment in which this DataSet is registered. - * + * * @see ExecutionEnvironment */ public ExecutionEnvironment getExecutionEnvironment() { return this.context; } - + // -------------------------------------------------------------------------------------------- // Type Information handling // -------------------------------------------------------------------------------------------- - + /** * Tries to fill in the type information. Type information can be filled in later when the program uses * a type hint. This method checks whether the type information has ever been accessed before and does not * allow modifications if the type was accessed already. This ensures consistency by making sure different * parts of the operation do not assume different type information. - * + * * @param typeInfo The type information to fill in. - * + * * @throws IllegalStateException Thrown, if the type information has been accessed before. */ protected void fillInType(TypeInformation typeInfo) { @@ -145,12 +145,12 @@ protected void fillInType(TypeInformation typeInfo) { } this.type = typeInfo; } - + /** * Returns the {@link TypeInformation} for the type of this DataSet. - * + * * @return The TypeInformation for the type of this DataSet. - * + * * @see TypeInformation */ public TypeInformation getType() { @@ -177,15 +177,15 @@ public F clean(F f) { // -------------------------------------------------------------------------------------------- // Filter & Transformations // -------------------------------------------------------------------------------------------- - + /** * Applies a Map transformation on this DataSet.
    * The transformation calls a {@link org.apache.flink.api.common.functions.MapFunction} for each element of the DataSet. * Each MapFunction call returns exactly one element. - * + * * @param mapper The MapFunction that is called for each element of the DataSet. * @return A MapOperator that represents the transformed DataSet. - * + * * @see org.apache.flink.api.common.functions.MapFunction * @see org.apache.flink.api.common.functions.RichMapFunction * @see MapOperator @@ -223,20 +223,20 @@ public MapPartitionOperator mapPartition(MapPartitionFunction ma if (mapPartition == null) { throw new NullPointerException("MapPartition function must not be null."); } - + String callLocation = Utils.getCallLocationName(); TypeInformation resultType = TypeExtractor.getMapPartitionReturnTypes(mapPartition, getType(), callLocation, true); return new MapPartitionOperator(this, resultType, clean(mapPartition), callLocation); } - + /** * Applies a FlatMap transformation on a {@link DataSet}.
    * The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet. * Each FlatMapFunction call can return any number of elements including none. - * + * * @param flatMapper The FlatMapFunction that is called for each element of the DataSet. * @return A FlatMapOperator that represents the transformed DataSet. - * + * * @see org.apache.flink.api.common.functions.RichFlatMapFunction * @see FlatMapOperator * @see DataSet @@ -250,16 +250,16 @@ public FlatMapOperator flatMap(FlatMapFunction flatMapper) { TypeInformation resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); return new FlatMapOperator(this, resultType, clean(flatMapper), callLocation); } - + /** * Applies a Filter transformation on a {@link DataSet}.
    * The transformation calls a {@link org.apache.flink.api.common.functions.RichFilterFunction} for each element of the DataSet * and retains only those element for which the function returns true. Elements for * which the function returns false are filtered. - * + * * @param filter The FilterFunction that is called for each element of the DataSet. * @return A FilterOperator that represents the filtered DataSet. - * + * * @see org.apache.flink.api.common.functions.RichFilterFunction * @see FilterOperator * @see DataSet @@ -271,11 +271,11 @@ public FilterOperator filter(FilterFunction filter) { return new FilterOperator(this, clean(filter), Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Projections // -------------------------------------------------------------------------------------------- - + /** * Applies a Project transformation on a {@link Tuple} {@link DataSet}.
    * Note: Only Tuple DataSets can be projected using field indexes.
    @@ -295,22 +295,22 @@ public FilterOperator filter(FilterFunction filter) { public ProjectOperator project(int... fieldIndexes) { return new Projection(this, fieldIndexes).projectTupleX(); } - + // -------------------------------------------------------------------------------------------- // Non-grouped aggregations // -------------------------------------------------------------------------------------------- - + /** * Applies an Aggregate transformation on a non-grouped {@link Tuple} {@link DataSet}.
    * Note: Only Tuple DataSets can be aggregated. * The transformation applies a built-in {@link Aggregations Aggregation} on a specified field * of a Tuple DataSet. Additional aggregation functions can be added to the resulting * {@link AggregateOperator} by calling {@link AggregateOperator#and(Aggregations, int)}. - * + * * @param agg The built-in aggregation function that is computed. * @param field The index of the Tuple field on which the aggregation function is applied. * @return An AggregateOperator that represents the aggregated DataSet. - * + * * @see Tuple * @see Aggregations * @see AggregateOperator @@ -364,16 +364,16 @@ public AggregateOperator max(int field) { public AggregateOperator min(int field) { return aggregate(Aggregations.MIN, field); } - + /** * Applies a Reduce transformation on a non-grouped {@link DataSet}.
    * The transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} * until only a single element remains which is the result of the transformation. * A ReduceFunction combines two elements into one new element of the same type. - * + * * @param reducer The ReduceFunction that is applied on the DataSet. * @return A ReduceOperator that represents the reduced DataSet. - * + * * @see org.apache.flink.api.common.functions.RichReduceFunction * @see ReduceOperator * @see DataSet @@ -384,16 +384,16 @@ public ReduceOperator reduce(ReduceFunction reducer) { } return new ReduceOperator(this, clean(reducer), Utils.getCallLocationName()); } - + /** * Applies a GroupReduce transformation on a non-grouped {@link DataSet}.
    * The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} once with the full DataSet. * The GroupReduceFunction can iterate over all elements of the DataSet and emit any * number of output elements including none. - * + * * @param reducer The GroupReduceFunction that is applied on the DataSet. * @return A GroupReduceOperator that represents the reduced DataSet. - * + * * @see org.apache.flink.api.common.functions.RichGroupReduceFunction * @see org.apache.flink.api.java.operators.GroupReduceOperator * @see DataSet @@ -402,7 +402,7 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc if (reducer == null) { throw new NullPointerException("GroupReduce function must not be null."); } - + String callLocation = Utils.getCallLocationName(); TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, getType(), callLocation, true); return new GroupReduceOperator(this, resultType, clean(reducer), callLocation); @@ -443,7 +443,7 @@ public ReduceOperator minBy(int... fields) { return new ReduceOperator(this, new SelectByMinFunction( (TupleTypeInfo) getType(), fields), Utils.getCallLocationName()); } - + /** * Selects an element with maximum value. *

    @@ -484,25 +484,25 @@ public ReduceOperator maxBy(int... fields) { * Returns a new set containing the first n elements in this {@link DataSet}.
    * @param n The desired number of elements. * @return A ReduceGroupOperator that represents the DataSet containing the elements. - */ + */ public GroupReduceOperator first(int n) { if(n < 1) { throw new InvalidProgramException("Parameter n of first(n) must be at least 1."); } - + return reduceGroup(new FirstReducer(n)); } - + // -------------------------------------------------------------------------------------------- // distinct // -------------------------------------------------------------------------------------------- - + /** * Returns a distinct set of a {@link DataSet} using a {@link KeySelector} function. *

    * The KeySelector function is called for each element of the DataSet and extracts a single key value on which the * decision is made if two items are distinct or not. - * + * * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which the * distinction of the DataSet is decided. * @return A DistinctOperator that represents the distinct DataSet. @@ -511,7 +511,7 @@ public DistinctOperator distinct(KeySelector keyExtractor) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new DistinctOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType), Utils.getCallLocationName()); } - + /** * Returns a distinct set of a {@link Tuple} {@link DataSet} using field position keys. *

    @@ -526,7 +526,7 @@ public DistinctOperator distinct(KeySelector keyExtractor) { public DistinctOperator distinct(int... fields) { return new DistinctOperator(this, new Keys.ExpressionKeys(fields, getType(), true), Utils.getCallLocationName()); } - + /** * Returns a distinct set of a {@link Tuple} {@link DataSet} using expression keys. *

    @@ -540,18 +540,18 @@ public DistinctOperator distinct(int... fields) { public DistinctOperator distinct(String... fields) { return new DistinctOperator(this, new Keys.ExpressionKeys(fields, getType()), Utils.getCallLocationName()); } - + /** * Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple. *

    * Note: This operator can only be applied to Tuple DataSets. - * + * * @return A DistinctOperator that represents the distinct DataSet. */ public DistinctOperator distinct() { return new DistinctOperator(this, null, Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Grouping // -------------------------------------------------------------------------------------------- @@ -568,10 +568,10 @@ public DistinctOperator distinct() { *

  • {@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. *
  • {@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * - * + * * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which it is grouped. * @return An UnsortedGrouping on which a transformation needs to be applied to obtain a transformed DataSet. - * + * * @see KeySelector * @see UnsortedGrouping * @see AggregateOperator @@ -583,7 +583,7 @@ public UnsortedGrouping groupBy(KeySelector keyExtractor) { TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new UnsortedGrouping(this, new Keys.SelectorFunctionKeys(clean(keyExtractor), getType(), keyType)); } - + /** * Groups a {@link Tuple} {@link DataSet} using field position keys.
    * Note: Field position keys only be specified for Tuple DataSets.
    @@ -596,10 +596,10 @@ public UnsortedGrouping groupBy(KeySelector keyExtractor) { *
  • {@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation. *
  • {@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation. * - * + * * @param fields One or more field positions on which the DataSet will be grouped. * @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. - * + * * @see Tuple * @see UnsortedGrouping * @see AggregateOperator @@ -637,44 +637,44 @@ public UnsortedGrouping groupBy(int... fields) { public UnsortedGrouping groupBy(String... fields) { return new UnsortedGrouping(this, new Keys.ExpressionKeys(fields, getType())); } - + // -------------------------------------------------------------------------------------------- // Joining // -------------------------------------------------------------------------------------------- - + /** * Initiates a Join transformation.
    * A Join transformation joins the elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine * joining elements into one DataSet.
    - * + * * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets join(DataSet other) { return new JoinOperatorSets(this, other); } - + /** * Initiates a Join transformation.
    * A Join transformation joins the elements of two * {@link DataSet DataSets} on key equality and provides multiple ways to combine * joining elements into one DataSet.
    - * + * * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @param strategy The strategy that should be used execute the join. If {@code null} is give, then the * optimizer will pick the join strategy. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ @@ -692,17 +692,17 @@ public JoinOperatorSets join(DataSet other, JoinHint strategy) { * This method returns a {@link JoinOperatorSets} on which * {@link JoinOperatorSets#where(String...)} needs to be called to define the join key of the first * joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSets to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets joinWithTiny(DataSet other) { return new JoinOperatorSets(this, other, JoinHint.BROADCAST_HASH_SECOND); } - + /** * Initiates a Join transformation.
    * A Join transformation joins the elements of two @@ -712,17 +712,17 @@ public JoinOperatorSets joinWithTiny(DataSet other) { * larger than the first one.
    * This method returns a {@link JoinOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet with which this DataSet is joined. * @return A JoinOperatorSet to continue the definition of the Join transformation. - * + * * @see JoinOperatorSets * @see DataSet */ public JoinOperatorSets joinWithHuge(DataSet other) { return new JoinOperatorSets(this, other, JoinHint.BROADCAST_HASH_FIRST); } - + // -------------------------------------------------------------------------------------------- // Co-Grouping // -------------------------------------------------------------------------------------------- @@ -738,10 +738,10 @@ public JoinOperatorSets joinWithHuge(DataSet other) { * of elements including none.
    * This method returns a {@link CoGroupOperatorSets} on which one of the {@code where} methods * can be called to define the join key of the first joining (i.e., this) DataSet. - * + * * @param other The other DataSet of the CoGroup transformation. * @return A CoGroupOperatorSets to continue the definition of the CoGroup transformation. - * + * * @see CoGroupOperatorSets * @see CoGroupOperator * @see DataSet @@ -758,34 +758,34 @@ public CoGroupOperator.CoGroupOperatorSets coGroup(DataSet other) { * Continues a Join transformation and defines the {@link Tuple} fields of the second join * {@link DataSet} that should be used as join keys.
    * Note: Fields can only be selected as join keys on Tuple DataSets.
    - * + * * The resulting {@link DefaultJoin} wraps each pair of joining elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys. * @return A DefaultJoin that represents the joined DataSet. */ - + /** * Initiates a Cross transformation.
    * A Cross transformation combines the elements of two * {@link DataSet DataSets} into one DataSet. It builds all pair combinations of elements of * both DataSets, i.e., it builds a Cartesian product. - * + * *

    * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * *

    * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
    - * + * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -794,7 +794,7 @@ public CoGroupOperator.CoGroupOperatorSets coGroup(DataSet other) { public CrossOperator.DefaultCross cross(DataSet other) { return new CrossOperator.DefaultCross(this, other, CrossHint.OPTIMIZER_CHOOSES, Utils.getCallLocationName()); } - + /** * Initiates a Cross transformation.
    * A Cross transformation combines the elements of two @@ -802,20 +802,20 @@ public CrossOperator.DefaultCross cross(DataSet other) { * both DataSets, i.e., it builds a Cartesian product. * This method also gives the hint to the optimizer that the second DataSet to cross is much * smaller than the first one. - * + * *

    * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * *

    * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
    - * + * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -824,7 +824,7 @@ public CrossOperator.DefaultCross cross(DataSet other) { public CrossOperator.DefaultCross crossWithTiny(DataSet other) { return new CrossOperator.DefaultCross(this, other, CrossHint.SECOND_IS_SMALL, Utils.getCallLocationName()); } - + /** * Initiates a Cross transformation.
    * A Cross transformation combines the elements of two @@ -832,20 +832,20 @@ public CrossOperator.DefaultCross crossWithTiny(DataSet other) { * both DataSets, i.e., it builds a Cartesian product. * This method also gives the hint to the optimizer that the second DataSet to cross is much * larger than the first one. - * + * *

    * The resulting {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross} wraps each pair of crossed elements into a {@link Tuple2}, with * the element of the first input being the first field of the tuple and the element of the * second input being the second field of the tuple. - * + * *

    * Call {@link org.apache.flink.api.java.operators.CrossOperator.DefaultCross#with(org.apache.flink.api.common.functions.CrossFunction)} to define a * {@link org.apache.flink.api.common.functions.CrossFunction} which is called for * each pair of crossed elements. The CrossFunction returns a exactly one element for each pair of input elements.
    - * + * * @param other The other DataSet with which this DataSet is crossed. * @return A DefaultCross that returns a Tuple2 for each pair of crossed elements. - * + * * @see org.apache.flink.api.java.operators.CrossOperator.DefaultCross * @see org.apache.flink.api.common.functions.CrossFunction * @see DataSet @@ -871,7 +871,7 @@ public CrossOperator.DefaultCross crossWithHuge(DataSet other) { *

     	 * {@code
     	 * DataSet input = ...;
    -	 *
    +	 * 
     	 * DataSet startOfIteration = input.iterate(10);
     	 * DataSet toBeFedBack = startOfIteration
     	 *                               .map(new MyMapper())
    @@ -882,17 +882,17 @@ public  CrossOperator.DefaultCross crossWithHuge(DataSet other) {
     	 * 

    * The iteration has a maximum number of times that it executes. A dynamic termination can be realized by using a * termination criterion (see {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet, DataSet)}). - * + * * @param maxIterations The maximum number of times that the iteration is executed. * @return An IterativeDataSet that marks the start of the iterative part and needs to be closed by * {@link org.apache.flink.api.java.operators.IterativeDataSet#closeWith(DataSet)}. - * + * * @see org.apache.flink.api.java.operators.IterativeDataSet */ public IterativeDataSet iterate(int maxIterations) { return new IterativeDataSet(getExecutionEnvironment(), getType(), this, maxIterations); } - + /** * Initiates a delta iteration. A delta iteration is similar to a regular iteration (as started by {@link #iterate(int)}, * but maintains state across the individual iteration steps. The Solution set, which represents the current state @@ -919,30 +919,30 @@ public IterativeDataSet iterate(int maxIterations) { * {@code * DeltaIteration, Tuple2> iteration = * initialState.iterateDelta(initialFeedbakSet, 100, 0); - * + * * DataSet> delta = iteration.groupBy(0).aggregate(Aggregations.AVG, 1) * .join(iteration.getSolutionSet()).where(0).equalTo(0) * .flatMap(new ProjectAndFilter()); - * + * * DataSet> feedBack = delta.join(someOtherSet).where(...).equalTo(...).with(...); - * + * * // close the delta iteration (delta and new workset are identical) * DataSet> result = iteration.closeWith(delta, feedBack); * } *

    - * + * * @param workset The initial version of the data set that is fed back to the next iteration step (the workset). * @param maxIterations The maximum number of iteration steps, as a fall back safeguard. * @param keyPositions The position of the tuple fields that is used as the key of the solution set. - * + * * @return The DeltaIteration that marks the start of a delta iteration. - * + * * @see org.apache.flink.api.java.operators.DeltaIteration */ public DeltaIteration iterateDelta(DataSet workset, int maxIterations, int... keyPositions) { Preconditions.checkNotNull(workset); Preconditions.checkNotNull(keyPositions); - + Keys.ExpressionKeys keys = new Keys.ExpressionKeys(keyPositions, getType(), false); return new DeltaIteration(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations); } @@ -950,12 +950,12 @@ public DeltaIteration iterateDelta(DataSet workset, int maxIteratio // -------------------------------------------------------------------------------------------- // Custom Operators // ------------------------------------------------------------------------------------------- - + /** * Runs a {@link CustomUnaryOperation} on the data set. Custom operations are typically complex * operators that are composed of multiple steps. - * + * * @param operation The operation to run. * @return The data set produced by the operation. */ @@ -964,70 +964,70 @@ public DataSet runOperation(CustomUnaryOperation operation) { operation.setInput(this); return operation.createResult(); } - + // -------------------------------------------------------------------------------------------- // Union // -------------------------------------------------------------------------------------------- /** * Creates a union of this DataSet with an other DataSet. The other DataSet must be of the same data type. - * + * * @param other The other DataSet which is unioned with the current DataSet. * @return The resulting DataSet. */ public UnionOperator union(DataSet other){ return new UnionOperator(this, other, Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Partitioning // -------------------------------------------------------------------------------------------- - + /** * Hash-partitions a DataSet on the specified key fields. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @param fields The field indexes on which the DataSet is hash-partitioned. * @return The partitioned DataSet. */ public PartitionOperator partitionByHash(int... fields) { return new PartitionOperator(this, PartitionMethod.HASH, new Keys.ExpressionKeys(fields, getType(), false), Utils.getCallLocationName()); } - + /** * Hash-partitions a DataSet on the specified key fields. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @param fields The field expressions on which the DataSet is hash-partitioned. * @return The partitioned DataSet. */ public PartitionOperator partitionByHash(String... fields) { return new PartitionOperator(this, PartitionMethod.HASH, new Keys.ExpressionKeys(fields, getType()), Utils.getCallLocationName()); } - + /** * Partitions a DataSet using the specified KeySelector. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. * @return The partitioned DataSet. - * + * * @see KeySelector */ public > PartitionOperator partitionByHash(KeySelector keyExtractor) { final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new PartitionOperator(this, PartitionMethod.HASH, new Keys.SelectorFunctionKeys(clean(keyExtractor), this.getType(), keyType), Utils.getCallLocationName()); } - + /** * Partitions a tuple DataSet on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key type. *

    * Note: This method works only on single field keys. - * + * * @param partitioner The partitioner to assign partitions to keys. * @param field The field index on which the DataSet is to partitioned. * @return The partitioned DataSet. @@ -1035,13 +1035,13 @@ public > PartitionOperator partitionByHash(KeySelecto public PartitionOperator partitionCustom(Partitioner partitioner, int field) { return new PartitionOperator(this, new Keys.ExpressionKeys(new int[] {field}, getType(), false), clean(partitioner), Utils.getCallLocationName()); } - + /** * Partitions a POJO DataSet on the specified key fields using a custom partitioner. * This method takes the key expression to partition on, and a partitioner that accepts the key type. *

    * Note: This method works only on single field keys. - * + * * @param partitioner The partitioner to assign partitions to keys. * @param field The field index on which the DataSet is to partitioned. * @return The partitioned DataSet. @@ -1049,7 +1049,7 @@ public PartitionOperator partitionCustom(Partitioner partitioner, int public PartitionOperator partitionCustom(Partitioner partitioner, String field) { return new PartitionOperator(this, new Keys.ExpressionKeys(new String[] {field}, getType()), clean(partitioner), Utils.getCallLocationName()); } - + /** * Partitions a DataSet on the key returned by the selector, using a custom partitioner. * This method takes the key selector t get the key to partition on, and a partitioner that @@ -1057,59 +1057,59 @@ public PartitionOperator partitionCustom(Partitioner partitioner, Stri *

    * Note: This method works only on single field keys, i.e. the selector cannot return tuples * of fields. - * + * * @param partitioner The partitioner to assign partitions to keys. * @param keyExtractor The KeyExtractor with which the DataSet is hash-partitioned. * @return The partitioned DataSet. - * + * * @see KeySelector */ public > PartitionOperator partitionCustom(Partitioner partitioner, KeySelector keyExtractor) { final TypeInformation keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, getType()); return new PartitionOperator(this, new Keys.SelectorFunctionKeys(keyExtractor, getType(), keyType), clean(partitioner), Utils.getCallLocationName()); } - + /** * Enforces a re-balancing of the DataSet, i.e., the DataSet is evenly distributed over all parallel instances of the * following task. This can help to improve performance in case of heavy data skew and compute intensive operations. *

    * Important:This operation shuffles the whole DataSet over the network and can take significant amount of time. - * + * * @return The re-balanced DataSet. */ public PartitionOperator rebalance() { return new PartitionOperator(this, PartitionMethod.REBALANCE, Utils.getCallLocationName()); } - + // -------------------------------------------------------------------------------------------- // Top-K // -------------------------------------------------------------------------------------------- - + // -------------------------------------------------------------------------------------------- // Result writing // -------------------------------------------------------------------------------------------- - + /** * Writes a DataSet as a text file to the specified location.
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the text file is written to. * @return The DataSink that writes the DataSet. - * + * * @see TextOutputFormat */ public DataSink writeAsText(String filePath) { return output(new TextOutputFormat(new Path(filePath))); } - + /** * Writes a DataSet as a text file to the specified location.
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the text file is written to. * @param writeMode Control the behavior for existing files. Options are NO_OVERWRITE and OVERWRITE. * @return The DataSink that writes the DataSet. - * + * * @see TextOutputFormat */ public DataSink writeAsText(String filePath, WriteMode writeMode) { @@ -1117,7 +1117,7 @@ public DataSink writeAsText(String filePath, WriteMode writeMode) { tof.setWriteMode(writeMode); return output(tof); } - + /** * Writes a DataSet as a text file to the specified location.
    * For each element of the DataSet the result of {@link TextFormatter#format(Object)} is written. @@ -1146,51 +1146,51 @@ public DataSink writeAsFormattedText(String filePath, TextFormatter f public DataSink writeAsFormattedText(String filePath, WriteMode writeMode, final TextFormatter formatter) { return this.map(new FormattingMapper(clean(formatter))).writeAsText(filePath, writeMode); } - + /** * Writes a {@link Tuple} DataSet as a CSV file to the specified location.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
    * Tuples are are separated by the newline character ({@code \n}). - * + * * @param filePath The path pointing to the location the CSV file is written to. * @return The DataSink that writes the DataSet. - * + * * @see Tuple * @see CsvOutputFormat */ public DataSink writeAsCsv(String filePath) { return writeAsCsv(filePath, CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER); } - + /** * Writes a {@link Tuple} DataSet as a CSV file to the specified location.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. * Tuple fields are separated by the default field delimiter {@code "comma" (,)}.
    * Tuples are are separated by the newline character ({@code \n}). - * + * * @param filePath The path pointing to the location the CSV file is written to. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. * @return The DataSink that writes the DataSet. - * + * * @see Tuple * @see CsvOutputFormat */ public DataSink writeAsCsv(String filePath, WriteMode writeMode) { return internalWriteAsCsv(new Path(filePath),CsvOutputFormat.DEFAULT_LINE_DELIMITER, CsvOutputFormat.DEFAULT_FIELD_DELIMITER, writeMode); } - + /** * Writes a {@link Tuple} DataSet as a CSV file to the specified location with the specified field and line delimiters.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. - * + * * @see Tuple * @see CsvOutputFormat */ @@ -1202,19 +1202,19 @@ public DataSink writeAsCsv(String filePath, String rowDelimiter, String field * Writes a {@link Tuple} DataSet as a CSV file to the specified location with the specified field and line delimiters.
    * Note: Only a Tuple DataSet can written as a CSV file.
    * For each Tuple field the result of {@link Object#toString()} is written. - * + * * @param filePath The path pointing to the location the CSV file is written to. * @param rowDelimiter The row delimiter to separate Tuples. * @param fieldDelimiter The field delimiter to separate Tuple fields. * @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE. - * + * * @see Tuple * @see CsvOutputFormat */ public DataSink writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) { return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode); } - + @SuppressWarnings("unchecked") private DataSink internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) { Validate.isTrue(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples."); @@ -1224,35 +1224,35 @@ private DataSink internalWriteAsCsv(Path filePath, String r } return output((OutputFormat) of); } - + /** * Writes a DataSet to the standard output stream (stdout).
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @return The DataSink that writes the DataSet. */ public DataSink print() { return output(new PrintingOutputFormat(false)); } - + /** * Writes a DataSet to the standard error stream (stderr).
    * For each element of the DataSet the result of {@link Object#toString()} is written. - * + * * @return The DataSink that writes the DataSet. */ public DataSink printToErr() { return output(new PrintingOutputFormat(true)); } - + /** * Writes a DataSet using a {@link FileOutputFormat} to a specified location. * This method adds a data sink to the program. - * + * * @param outputFormat The FileOutputFormat to write the DataSet. * @param filePath The path to the location where the DataSet is written. * @return The DataSink that writes the DataSet. - * + * * @see FileOutputFormat */ public DataSink write(FileOutputFormat outputFormat, String filePath) { @@ -1262,16 +1262,16 @@ public DataSink write(FileOutputFormat outputFormat, String filePath) { outputFormat.setOutputFilePath(new Path(filePath)); return output(outputFormat); } - + /** * Writes a DataSet using a {@link FileOutputFormat} to a specified location. * This method adds a data sink to the program. - * + * * @param outputFormat The FileOutputFormat to write the DataSet. * @param filePath The path to the location where the DataSet is written. * @param writeMode The mode of writing, indicating whether to overwrite existing files. * @return The DataSink that writes the DataSet. - * + * * @see FileOutputFormat */ public DataSink write(FileOutputFormat outputFormat, String filePath, WriteMode writeMode) { @@ -1283,36 +1283,36 @@ public DataSink write(FileOutputFormat outputFormat, String filePath, Writ outputFormat.setWriteMode(writeMode); return output(outputFormat); } - + /** * Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program. * Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks * or transformations) at the same time. - * + * * @param outputFormat The OutputFormat to process the DataSet. * @return The DataSink that processes the DataSet. - * + * * @see OutputFormat * @see DataSink */ public DataSink output(OutputFormat outputFormat) { Validate.notNull(outputFormat); - + // configure the type if needed if (outputFormat instanceof InputTypeConfigurable) { ((InputTypeConfigurable) outputFormat).setInputType(getType()); } - + DataSink sink = new DataSink(this, outputFormat, getType()); this.context.registerDataSink(sink); return sink; } - - + + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- - + protected static void checkSameExecutionContext(DataSet set1, DataSet set2) { if (set1.context != set2.context) { throw new IllegalArgumentException("The two inputs have different execution contexts."); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java index adfd41cce9c58..7c80bf87d4d6d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SemanticPropUtil.java @@ -331,7 +331,7 @@ private static FieldSet readFieldSetFromString(String s, TypeInformation inTy if (!isValidField(inType, field)) { throw new IndexOutOfBoundsException("Annotation: Field " + field + " not available in the input tuple."); } - + fs = fs.addField(field); } return fs; @@ -358,8 +358,8 @@ public static SingleInputSemanticProperties getSemanticPropsSingleFromString(Str return result; } - public static void getSemanticPropsDualFromString(DualInputSemanticProperties target, String[] constantSetFirst, String[] constantSetSecond, String[] constantSetFirstExcept, - String[] constantSetSecondExcept, String[] readFieldsFirst, String[] readFieldsSecond, TypeInformation inType1, TypeInformation inType2, TypeInformation outType) + public static void getSemanticPropsDualFromString(DualInputSemanticProperties target, String[] constantSetFirst, String[] constantSetSecond, String[] constantSetFirstExcept, + String[] constantSetSecondExcept, String[] readFieldsFirst, String[] readFieldsSecond, TypeInformation inType1, TypeInformation inType2, TypeInformation outType) { parseConstantFieldsFirst(constantSetFirst, target, inType1, outType); parseConstantFieldsSecond(constantSetSecond, target, inType2, outType); From 384816fc778be76a9f47556fca7089c3060f3452 Mon Sep 17 00:00:00 2001 From: FelixNeutatz Date: Sun, 18 Jan 2015 23:21:35 +0100 Subject: [PATCH 7/7] [FLINK-1398] change naming --- .../apache/flink/api/java/{utilities => lib}/DataSetUtil.java | 2 +- .../flink/api/java/{utilities => lib}/DataSetUtilTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename flink-java/src/main/java/org/apache/flink/api/java/{utilities => lib}/DataSetUtil.java (98%) rename flink-java/src/test/java/org/apache/flink/api/java/{utilities => lib}/DataSetUtilTest.java (98%) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utilities/DataSetUtil.java b/flink-java/src/main/java/org/apache/flink/api/java/lib/DataSetUtil.java similarity index 98% rename from flink-java/src/main/java/org/apache/flink/api/java/utilities/DataSetUtil.java rename to flink-java/src/main/java/org/apache/flink/api/java/lib/DataSetUtil.java index c0dde31bf8259..3cc8bf10a5261 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utilities/DataSetUtil.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/lib/DataSetUtil.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.api.java.utilities; +package org.apache.flink.api.java.lib; import org.apache.flink.api.common.functions.MapFunction; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utilities/DataSetUtilTest.java b/flink-java/src/test/java/org/apache/flink/api/java/lib/DataSetUtilTest.java similarity index 98% rename from flink-java/src/test/java/org/apache/flink/api/java/utilities/DataSetUtilTest.java rename to flink-java/src/test/java/org/apache/flink/api/java/lib/DataSetUtilTest.java index 9d830d90d51c6..be9dee68eb54a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/utilities/DataSetUtilTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/lib/DataSetUtilTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.api.java.utilities; +package org.apache.flink.api.java.lib; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.DataSet;