From 4e1b57d71f3a141b8206a62f12a9e102b7903fad Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 11:57:45 +0200 Subject: [PATCH 01/17] add some functions --- .../common/functions/GenericMapPartition.java | 17 +++++++++++++++++ .../example/java/wordcount/WordCount.java | 7 +++++-- .../java/functions/MapPartitionFunction.java | 4 ++++ 3 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java create mode 100644 stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java new file mode 100644 index 0000000000000..67a00e0f1d3b7 --- /dev/null +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java @@ -0,0 +1,17 @@ +package eu.stratosphere.api.common.functions; + +import eu.stratosphere.util.Collector; + +import java.util.Iterator; + +public interface GenericMapPartition extends Function { + + /** + * A user-implemented function that modifies or transforms an incoming object. + * + * @param records All records for the mapper + * @param out The collector to hand results to. + * @throws Exception + */ + void map(Iterator records, Collector out) throws Exception; +} \ No newline at end of file diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java index 95ab92f65abb5..69a3e0eb44015 100644 --- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java +++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java @@ -59,8 +59,11 @@ public static void main(String[] args) throws Exception { // get input data DataSet text = getTextDataSet(env); - - DataSet> counts = + + + + + DataSet> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java new file mode 100644 index 0000000000000..0fb9f5dd0f83d --- /dev/null +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java @@ -0,0 +1,4 @@ +package eu.stratosphere.api.java.functions; + +public class MapPartitionFunction { +} From b8eeb0c4875f8e914b4665ce750baf698a45d53f Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 12:22:43 +0200 Subject: [PATCH 02/17] MapPartitionDriver added --- .../common/functions/GenericMapPartition.java | 2 +- .../java/functions/MapPartitionFunction.java | 21 ++++- .../pact/runtime/task/MapPartitionDriver.java | 80 +++++++++++++++++++ 3 files changed, 101 insertions(+), 2 deletions(-) create mode 100644 stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java index 67a00e0f1d3b7..c96cf40ce8ab1 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java @@ -13,5 +13,5 @@ public interface GenericMapPartition extends Function { * @param out The collector to hand results to. * @throws Exception */ - void map(Iterator records, Collector out) throws Exception; + void mapPartition(Iterator records, Collector out) throws Exception; } \ No newline at end of file diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java index 0fb9f5dd0f83d..d83329f01b3ab 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java @@ -1,4 +1,23 @@ package eu.stratosphere.api.java.functions; -public class MapPartitionFunction { + +import eu.stratosphere.api.common.functions.AbstractFunction; +import eu.stratosphere.api.common.functions.GenericMapPartition; +import eu.stratosphere.util.Collector; + +import java.util.Iterator; + +public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition { + + private static final long serialVersionUID = 1L; + /** + * + * @param records All records for the mapper + * @param out The collector to hand results to. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract void mapPartition(Iterator records, Collector out) throws Exception; } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java new file mode 100644 index 0000000000000..e444d6712fd7c --- /dev/null +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java @@ -0,0 +1,80 @@ +package eu.stratosphere.pact.runtime.task; + +import eu.stratosphere.api.common.functions.GenericMap; +import eu.stratosphere.api.common.functions.GenericMapPartition; +import eu.stratosphere.pact.runtime.util.MutableToRegularIteratorWrapper; +import eu.stratosphere.util.Collector; +import eu.stratosphere.util.MutableObjectIterator; + + +/** + * MapPartition task which is executed by a Nephele task manager. The task has a single + * input and one or multiple outputs. It is provided with a MapFunction + * implementation. + *

+ * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the map_partition() method + * of the MapFunction. + * + * @see GenericCollectorMap + * + * @param The mapper's input data type. + * @param The mapper's output data type. + */ +public class MapPartitionDriver implements PactDriver, OT> { + + private PactTaskContext, OT> taskContext; + + private volatile boolean running; + + + @Override + public void setup(PactTaskContext, OT> context) { + this.taskContext = context; + this.running = true; + } + + @Override + public int getNumberOfInputs() { + return 1; + } + + @Override + public Class> getStubType() { + @SuppressWarnings("unchecked") + final Class> clazz = (Class>) (Class) GenericMapPartition.class; + return clazz; + } + + @Override + public boolean requiresComparatorOnInput() { + return false; + } + + @Override + public void prepare() { + // nothing, since a mapper does not need any preparation + } + + @Override + public void run() throws Exception { + // cache references on the stack + final MutableObjectIterator input = this.taskContext.getInput(0); + final GenericMapPartition function = this.taskContext.getStub(); + final Collector output = this.taskContext.getOutputCollector(); + + final MutableToRegularIteratorWrapper inIter = new MutableToRegularIteratorWrapper(input, this.taskContext.getInputSerializer(0).getSerializer() ); + IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance(); + + function.mapPartition(inIter, output); + } + + @Override + public void cleanup() { + // mappers need no cleanup, since no strategies are used. + } + + @Override + public void cancel() { + this.running = false; + } +} From f8eeb9944171ef30857560bf3e9ff36a3e2d4f82 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 12:30:57 +0200 Subject: [PATCH 03/17] add map partition driver strategy --- .../eu/stratosphere/pact/runtime/task/DriverStrategy.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java index f96a55360ad65..23ea2041bfa36 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java @@ -37,6 +37,10 @@ public enum DriverStrategy { COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, false), // the proper mapper MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false), + + // the proper map partition + MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false), + // the flat mapper FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, false), From 6bbb4cc043d86af0421a3d4228fea788f62dbe64 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 12:38:48 +0200 Subject: [PATCH 04/17] add map partition node map partition descriptor --- .../compiler/dag/MapPartitionNode.java | 57 ++++++++++++++++++ .../operators/MapPartitionDescriptor.java | 60 +++++++++++++++++++ .../pact/runtime/task/MapPartitionDriver.java | 1 - 3 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java create mode 100644 stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java new file mode 100644 index 0000000000000..209b7158eaf45 --- /dev/null +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java @@ -0,0 +1,57 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.compiler.dag; + +import java.util.Collections; +import java.util.List; + +import eu.stratosphere.api.common.operators.SingleInputOperator; +import eu.stratosphere.compiler.DataStatistics; +import eu.stratosphere.compiler.operators.MapPartitionDescriptor; +import eu.stratosphere.compiler.operators.OperatorDescriptorSingle; + +/** + * The optimizer's internal representation of a Map operator node. + */ +public class MapPartitionNode extends SingleInputNode { + + /** + * Creates a new MapNode for the given contract. + * + * @param operator The map contract object. + */ + public MapPartitionNode(SingleInputOperator operator) { + super(operator); + } + + @Override + public String getName() { + return "Map"; + } + + @Override + protected List getPossibleProperties() { + return Collections.singletonList(new MapPartitionDescriptor()); + } + + /** + * Computes the estimates for the Map operator. + * We assume that by default, Map takes one value and transforms it into another value. + * The cardinality consequently stays the same. + */ + @Override + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { + this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); + } +} diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java new file mode 100644 index 0000000000000..8f80e5534b020 --- /dev/null +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java @@ -0,0 +1,60 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.compiler.operators; + +import java.util.Collections; +import java.util.List; + +import eu.stratosphere.compiler.dag.SingleInputNode; +import eu.stratosphere.compiler.dataproperties.GlobalProperties; +import eu.stratosphere.compiler.dataproperties.LocalProperties; +import eu.stratosphere.compiler.dataproperties.RequestedGlobalProperties; +import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties; +import eu.stratosphere.compiler.plan.Channel; +import eu.stratosphere.compiler.plan.SingleInputPlanNode; +import eu.stratosphere.pact.runtime.task.DriverStrategy; + + +public class MapPartitionDescriptor extends OperatorDescriptorSingle { + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.MAP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "MapPartition ("+node.getPactContract().getName()+")", in, DriverStrategy.MAP_PARTITION); + } + + @Override + protected List createPossibleGlobalProperties() { + return Collections.singletonList(new RequestedGlobalProperties()); + } + + @Override + protected List createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps; + } +} diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java index e444d6712fd7c..e4294bcfb5fe5 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java @@ -1,6 +1,5 @@ package eu.stratosphere.pact.runtime.task; -import eu.stratosphere.api.common.functions.GenericMap; import eu.stratosphere.api.common.functions.GenericMapPartition; import eu.stratosphere.pact.runtime.util.MutableToRegularIteratorWrapper; import eu.stratosphere.util.Collector; From dbfe953bbb3bfb0f2aafafb719599fea8b8d56e6 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 14:40:36 +0200 Subject: [PATCH 05/17] final version integration --- .../compiler/dag/MapPartitionNode.java | 9 +- .../base/MapPartitionOperatorBase.java | 43 ++++ .../functions/MapPartitionFunction.java | 32 +++ .../operators/MapPartitionOperator.java | 190 ++++++++++++++++++ 4 files changed, 269 insertions(+), 5 deletions(-) create mode 100644 stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java create mode 100644 stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java create mode 100644 stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java index 209b7158eaf45..0d5e4ad1c4332 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java @@ -22,14 +22,14 @@ import eu.stratosphere.compiler.operators.OperatorDescriptorSingle; /** - * The optimizer's internal representation of a Map operator node. + * The optimizer's internal representation of a MapPartition operator node. */ public class MapPartitionNode extends SingleInputNode { /** * Creates a new MapNode for the given contract. * - * @param operator The map contract object. + * @param operator The map partition contract object. */ public MapPartitionNode(SingleInputOperator operator) { super(operator); @@ -37,7 +37,7 @@ public MapPartitionNode(SingleInputOperator operator) { @Override public String getName() { - return "Map"; + return "MapPartition"; } @Override @@ -46,12 +46,11 @@ protected List getPossibleProperties() { } /** - * Computes the estimates for the Map operator. + * Computes the estimates for the MapPartition operator. * We assume that by default, Map takes one value and transforms it into another value. * The cardinality consequently stays the same. */ @Override protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { - this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); } } diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java new file mode 100644 index 0000000000000..9e3552aa7da7e --- /dev/null +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/operators/base/MapPartitionOperatorBase.java @@ -0,0 +1,43 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.api.common.operators.base; + +import eu.stratosphere.api.common.functions.GenericMapPartition; +import eu.stratosphere.api.common.operators.SingleInputOperator; +import eu.stratosphere.api.common.operators.UnaryOperatorInformation; +import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper; +import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper; +import eu.stratosphere.api.common.operators.util.UserCodeWrapper; + + +/** + * + * @param The input type. + * @param The result type. + * @param The type of the user-defined function. + */ +public class MapPartitionOperatorBase> extends SingleInputOperator { + + public MapPartitionOperatorBase(UserCodeWrapper udf, UnaryOperatorInformation operatorInfo, String name) { + super(udf, operatorInfo, name); + } + + public MapPartitionOperatorBase(FT udf, UnaryOperatorInformation operatorInfo, String name) { + super(new UserCodeObjectWrapper(udf), operatorInfo, name); + } + + public MapPartitionOperatorBase(Class udf, UnaryOperatorInformation operatorInfo, String name) { + super(new UserCodeClassWrapper(udf), operatorInfo, name); + } +} diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java new file mode 100644 index 0000000000000..5e77a5a297f7a --- /dev/null +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java @@ -0,0 +1,32 @@ +package eu.stratosphere.api.java.record.functions; + +import eu.stratosphere.api.common.functions.AbstractFunction; +import eu.stratosphere.api.common.functions.GenericCollectorMap; +import eu.stratosphere.api.common.functions.GenericMapPartition; +import eu.stratosphere.types.Record; +import eu.stratosphere.util.Collector; + +import java.util.Iterator; + +/** + * The MapFunction must be extended to provide a mapper implementation + * By definition, the mapper is called for each individual input record. + */ +public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition { + + private static final long serialVersionUID = 1L; + + /** + * This method must be implemented to provide a user implementation of a mapper. + * It is called for each individual record. + * + * @param record The record to be mapped. + * @param out A collector that collects all output records. + * + * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the + * runtime catches an exception, it aborts the map task and lets the fail-over logic + * decide whether to retry the mapper execution. + */ + @Override + public abstract void mapPartition(Iterator records, Collector out) throws Exception; +} diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java new file mode 100644 index 0000000000000..a021a933cb522 --- /dev/null +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java @@ -0,0 +1,190 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.api.java.record.operators; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase; +import eu.stratosphere.api.java.record.functions.MapPartitionFunction; +import org.apache.commons.lang3.Validate; + + + +import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.RecordOperator; +import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper; +import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper; +import eu.stratosphere.api.common.operators.util.UserCodeWrapper; +import eu.stratosphere.api.java.record.functions.FunctionAnnotation; +import eu.stratosphere.types.Key; +import eu.stratosphere.types.Record; + +/** + * MapPartitionOperator that applies a {@link MapPartitionFunction} to each record independently. + * + * @see MapPartitionFunction + */ +public class MapPartitionOperator extends MapPartitionOperatorBase implements RecordOperator { + + private static String DEFAULT_NAME = ""; + + // -------------------------------------------------------------------------------------------- + + /** + * Creates a Builder with the provided {@link MapPartitionFunction} implementation. + * + * @param udf The {@link MapPartitionFunction} implementation for this Map operator. + */ + public static Builder builder(MapPartitionFunction udf) { + return new Builder(new UserCodeObjectWrapper(udf)); + } + + /** + * Creates a Builder with the provided {@link MapPartitionFunction} implementation. + * + * @param udf The {@link MapPartitionFunction} implementation for this Map operator. + */ + public static Builder builder(Class udf) { + return new Builder(new UserCodeClassWrapper(udf)); + } + + /** + * The private constructor that only gets invoked from the Builder. + * @param builder + */ + protected MapPartitionOperator(Builder builder) { + + super(builder.udf, OperatorInfoHelper.unary(), builder.name); + + if (builder.inputs != null && !builder.inputs.isEmpty()) { + setInput(Operator.createUnionCascade(builder.inputs)); + } + + setBroadcastVariables(builder.broadcastInputs); + setSemanticProperties(FunctionAnnotation.readSingleConstantAnnotations(builder.udf)); + } + + + @Override + public Class>[] getKeyClasses() { + return emptyClassArray(); + } + + // -------------------------------------------------------------------------------------------- + + /** + * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition). + */ + public static class Builder { + + /* The required parameters */ + private final UserCodeWrapper udf; + + /* The optional parameters */ + private List> inputs; + private Map> broadcastInputs; + private String name = DEFAULT_NAME; + + /** + * Creates a Builder with the provided {@link MapPartitionFunction} implementation. + * + * @param udf The {@link MapPartitionFunction} implementation for this Map operator. + */ + private Builder(UserCodeWrapper udf) { + this.udf = udf; + this.inputs = new ArrayList>(); + this.broadcastInputs = new HashMap>(); + } + + /** + * Sets the input. + * + * @param input The input. + */ + public Builder input(Operator input) { + Validate.notNull(input, "The input must not be null"); + + this.inputs.clear(); + this.inputs.add(input); + return this; + } + + /** + * Sets one or several inputs (union). + * + * @param inputs + */ + public Builder input(Operator...inputs) { + this.inputs.clear(); + for (Operator c : inputs) { + this.inputs.add(c); + } + return this; + } + + /** + * Sets the inputs. + * + * @param inputs + */ + public Builder inputs(List> inputs) { + this.inputs = inputs; + return this; + } + + /** + * Binds the result produced by a plan rooted at {@code root} to a + * variable used by the UDF wrapped in this operator. + */ + public Builder setBroadcastVariable(String name, Operator input) { + this.broadcastInputs.put(name, input); + return this; + } + + /** + * Binds multiple broadcast variables. + */ + public Builder setBroadcastVariables(Map> inputs) { + this.broadcastInputs.clear(); + this.broadcastInputs.putAll(inputs); + return this; + } + + /** + * Sets the name of this operator. + * + * @param name + */ + public Builder name(String name) { + this.name = name; + return this; + } + + /** + * Creates and returns a MapOperator from using the values given + * to the builder. + * + * @return The created operator + */ + public MapPartitionOperator build() { + if (name == null) { + name = udf.getUserCodeClass().getName(); + } + return new MapPartitionOperator(this); + } + } +} From d1df8307c641071812cc4d1f83ad2aef0eb06570 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 14:46:48 +0200 Subject: [PATCH 06/17] cleanup --- .../java/record/functions/MapPartitionFunction.java | 11 +++++------ .../java/record/operators/MapPartitionOperator.java | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java index 5e77a5a297f7a..8589070b1ab5d 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java @@ -1,7 +1,6 @@ package eu.stratosphere.api.java.record.functions; import eu.stratosphere.api.common.functions.AbstractFunction; -import eu.stratosphere.api.common.functions.GenericCollectorMap; import eu.stratosphere.api.common.functions.GenericMapPartition; import eu.stratosphere.types.Record; import eu.stratosphere.util.Collector; @@ -9,18 +8,18 @@ import java.util.Iterator; /** - * The MapFunction must be extended to provide a mapper implementation - * By definition, the mapper is called for each individual input record. + * The MapPartitionFunction must be extended to provide a map partition implementation + * By definition, the map partition is called for a full input set. */ public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition { private static final long serialVersionUID = 1L; /** - * This method must be implemented to provide a user implementation of a mapper. - * It is called for each individual record. + * This method must be implemented to provide a user implementation of a mappartitioner. + * It is called for a full input set. * - * @param record The record to be mapped. + * @param records all input records * @param out A collector that collects all output records. * * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java index a021a933cb522..1ac132cd39199 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/operators/MapPartitionOperator.java @@ -40,7 +40,7 @@ */ public class MapPartitionOperator extends MapPartitionOperatorBase implements RecordOperator { - private static String DEFAULT_NAME = ""; + private static String DEFAULT_NAME = ""; // -------------------------------------------------------------------------------------------- From dba2ce299164019322d5c71f6d7cbdf8c73f9869 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 15:19:44 +0200 Subject: [PATCH 07/17] cleanup --- .../stratosphere/compiler/PactCompiler.java | 42 +---- .../wordcount/WordCountWithMapPartition.java | 162 ++++++++++++++++++ .../eu/stratosphere/api/java/DataSet.java | 47 +++-- .../java/operators/MapPartitionOperator.java | 67 ++++++++ .../api/java/typeutils/TypeExtractor.java | 20 ++- 5 files changed, 270 insertions(+), 68 deletions(-) create mode 100644 stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java create mode 100644 stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java index bf3d6af9fea79..cfbafb7162872 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java @@ -23,52 +23,19 @@ import java.util.Map; import java.util.Set; +import eu.stratosphere.api.common.operators.base.*; +import eu.stratosphere.compiler.dag.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.stratosphere.api.common.Plan; import eu.stratosphere.api.common.operators.Operator; import eu.stratosphere.api.common.operators.Union; -import eu.stratosphere.api.common.operators.base.BulkIterationBase; import eu.stratosphere.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder; -import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase; -import eu.stratosphere.api.common.operators.base.CollectorMapOperatorBase; -import eu.stratosphere.api.common.operators.base.CrossOperatorBase; -import eu.stratosphere.api.common.operators.base.DeltaIterationBase; import eu.stratosphere.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder; import eu.stratosphere.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder; -import eu.stratosphere.api.common.operators.base.FilterOperatorBase; -import eu.stratosphere.api.common.operators.base.FlatMapOperatorBase; -import eu.stratosphere.api.common.operators.base.GenericDataSinkBase; -import eu.stratosphere.api.common.operators.base.GenericDataSourceBase; -import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase; -import eu.stratosphere.api.common.operators.base.JoinOperatorBase; -import eu.stratosphere.api.common.operators.base.MapOperatorBase; -import eu.stratosphere.api.common.operators.base.ReduceOperatorBase; import eu.stratosphere.compiler.costs.CostEstimator; import eu.stratosphere.compiler.costs.DefaultCostEstimator; -import eu.stratosphere.compiler.dag.BinaryUnionNode; -import eu.stratosphere.compiler.dag.BulkIterationNode; -import eu.stratosphere.compiler.dag.BulkPartialSolutionNode; -import eu.stratosphere.compiler.dag.CoGroupNode; -import eu.stratosphere.compiler.dag.CollectorMapNode; -import eu.stratosphere.compiler.dag.CrossNode; -import eu.stratosphere.compiler.dag.DataSinkNode; -import eu.stratosphere.compiler.dag.DataSourceNode; -import eu.stratosphere.compiler.dag.FilterNode; -import eu.stratosphere.compiler.dag.FlatMapNode; -import eu.stratosphere.compiler.dag.GroupReduceNode; -import eu.stratosphere.compiler.dag.IterationNode; -import eu.stratosphere.compiler.dag.MapNode; -import eu.stratosphere.compiler.dag.MatchNode; -import eu.stratosphere.compiler.dag.OptimizerNode; -import eu.stratosphere.compiler.dag.PactConnection; -import eu.stratosphere.compiler.dag.ReduceNode; -import eu.stratosphere.compiler.dag.SinkJoiner; -import eu.stratosphere.compiler.dag.SolutionSetNode; -import eu.stratosphere.compiler.dag.TempMode; -import eu.stratosphere.compiler.dag.WorksetIterationNode; -import eu.stratosphere.compiler.dag.WorksetNode; import eu.stratosphere.compiler.deadlockdetect.DeadlockPreventer; import eu.stratosphere.compiler.plan.BinaryUnionPlanNode; import eu.stratosphere.compiler.plan.BulkIterationPlanNode; @@ -663,8 +630,11 @@ else if (c instanceof GenericDataSourceBase) { n = dsn; } else if (c instanceof MapOperatorBase) { - n = new MapNode((MapOperatorBase) c); + n = new MapPartitionNode((MapOperatorBase) c); } + else if (c instanceof MapPartitionOperatorBase) { + n = new MapNode((MapPartitionOperatorBase) c); + } else if (c instanceof CollectorMapOperatorBase) { n = new CollectorMapNode((CollectorMapOperatorBase) c); } diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java new file mode 100644 index 0000000000000..0d7da8995023e --- /dev/null +++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java @@ -0,0 +1,162 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.example.java.wordcount; + +import eu.stratosphere.api.java.DataSet; +import eu.stratosphere.api.java.ExecutionEnvironment; +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.MapPartitionFunction; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.example.java.wordcount.util.WordCountData; +import eu.stratosphere.util.Collector; + +import java.util.Iterator; + +/** + * Implements the "WordCount" program that computes a simple word occurrence histogram + * over text files. + * + *

+ * The input is a plain text file with lines separated by newline characters. + * + *

+ * Usage: WordCount <text path> <result path>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}. + * + *

+ * This example shows how to: + *

    + *
  • write a simple Stratosphere program. + *
  • use Tuple data types. + *
  • write and use user-defined functions. + *
+ * + */ +@SuppressWarnings("serial") +public class WordCountWithMapPartition { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet text = getTextDataSet(env); + + + + + DataSet> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.mapPartition(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result + if(fileOutput) { + counts.writeAsCsv(outputPath, "\n", " "); + } else { + counts.print(); + } + + // execute program + env.execute("WordCount Example"); + } + + // ************************************************************************* + // USER FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a user-defined + * FlatMapFunction. The function takes a line (String) and splits it into + * multiple pairs in the form of "(word,1)" (Tuple2). + */ + public static final class Tokenizer extends MapPartitionFunction> { + @Override + public void mapPartition(Iterator records, Collector> out) throws Exception { + while(records.hasNext()){ + // normalize and split the line + String[] tokens = records.next().toLowerCase().split("\\W+"); + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + } + } + + /*@Override + public void flatMap(String value, Collector> out) { + // normalize and split the line + String[] tokens = value.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + }*/ + } + + // ************************************************************************* + // UTIL METHODS + // ************************************************************************* + + private static boolean fileOutput = false; + private static String textPath; + private static String outputPath; + + private static boolean parseParameters(String[] args) { + + if(args.length > 0) { + // parse input arguments + fileOutput = true; + if(args.length == 2) { + textPath = args[0]; + outputPath = args[1]; + } else { + System.err.println("Usage: WordCount "); + return false; + } + } else { + System.out.println("Executing WordCount example with built-in default data."); + System.out.println(" Provide parameters to read input data from a file."); + System.out.println(" Usage: WordCount "); + } + return true; + } + + private static DataSet getTextDataSet(ExecutionEnvironment env) { + if(fileOutput) { + // read the text file from given input path + return env.readTextFile(textPath); + } else { + // get default test text data + return WordCountData.getDefaultTextLineDataSet(env); + } + } +} diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java index 0770f24c84e91..2bcf02a0bc48e 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java @@ -14,43 +14,21 @@ **********************************************************************************************************************/ package eu.stratosphere.api.java; +import eu.stratosphere.api.java.functions.*; +import eu.stratosphere.api.java.operators.*; import org.apache.commons.lang3.Validate; import eu.stratosphere.api.common.io.FileOutputFormat; import eu.stratosphere.api.common.io.OutputFormat; import eu.stratosphere.api.java.aggregation.Aggregations; -import eu.stratosphere.api.java.functions.CoGroupFunction; -import eu.stratosphere.api.java.functions.FilterFunction; -import eu.stratosphere.api.java.functions.FlatMapFunction; -import eu.stratosphere.api.java.functions.GroupReduceFunction; -import eu.stratosphere.api.java.functions.KeySelector; -import eu.stratosphere.api.java.functions.MapFunction; -import eu.stratosphere.api.java.functions.ReduceFunction; import eu.stratosphere.api.java.io.CsvOutputFormat; import eu.stratosphere.api.java.io.PrintingOutputFormat; import eu.stratosphere.api.java.io.TextOutputFormat; -import eu.stratosphere.api.java.operators.AggregateOperator; -import eu.stratosphere.api.java.operators.CoGroupOperator; import eu.stratosphere.api.java.operators.CoGroupOperator.CoGroupOperatorSets; -import eu.stratosphere.api.java.operators.CrossOperator; import eu.stratosphere.api.java.operators.CrossOperator.DefaultCross; -import eu.stratosphere.api.java.operators.CustomUnaryOperation; -import eu.stratosphere.api.java.operators.DataSink; -import eu.stratosphere.api.java.operators.FilterOperator; -import eu.stratosphere.api.java.operators.FlatMapOperator; -import eu.stratosphere.api.java.operators.Grouping; -import eu.stratosphere.api.java.operators.JoinOperator; import eu.stratosphere.api.java.operators.JoinOperator.JoinHint; import eu.stratosphere.api.java.operators.JoinOperator.JoinOperatorSets; -import eu.stratosphere.api.java.operators.Keys; -import eu.stratosphere.api.java.operators.MapOperator; -import eu.stratosphere.api.java.operators.ProjectOperator; import eu.stratosphere.api.java.operators.ProjectOperator.Projection; -import eu.stratosphere.api.java.operators.ReduceGroupOperator; -import eu.stratosphere.api.java.operators.ReduceOperator; -import eu.stratosphere.api.java.operators.SortedGrouping; -import eu.stratosphere.api.java.operators.UnionOperator; -import eu.stratosphere.api.java.operators.UnsortedGrouping; import eu.stratosphere.api.java.record.functions.CrossFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple2; @@ -135,6 +113,27 @@ public MapOperator map(MapFunction mapper) { } return new MapOperator(this, mapper); } + + + + /** + * Applies a Map transformation on a {@link DataSet} by using an iterator.
+ * The transformation calls a {@link MapPartitionFunction} for the full DataSet. + * Each MapPartitionFunction call returns elements. + * + * @param mapPartition The MapPartitionFunction that is called for the full DataSet. + * @return A MapPartitionOperator that represents the transformed DataSet. + * + * @see MapPartitionFunction + * @see MapPartitionOperator + * @see DataSet + */ + public MapPartitionOperator mapPartition(MapPartitionFunction mapPartition ){ + if (mapPartition == null) { + throw new NullPointerException("MapPartition function must not be null."); + } + return new MapPartitionOperator(this, mapPartition); + } /** * Applies a FlatMap transformation on a {@link DataSet}.
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java new file mode 100644 index 0000000000000..836b20504d01d --- /dev/null +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/MapPartitionOperator.java @@ -0,0 +1,67 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.api.java.operators; + +import eu.stratosphere.api.common.functions.GenericMapPartition; +import eu.stratosphere.api.common.operators.Operator; +import eu.stratosphere.api.common.operators.UnaryOperatorInformation; +import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase; +import eu.stratosphere.api.java.DataSet; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.functions.MapPartitionFunction; +import eu.stratosphere.api.java.typeutils.TypeExtractor; + +/** + * This operator represents the application of a "mapPartition" function on a data set, and the + * result data set produced by the function. + * + * @param The type of the data set consumed by the operator. + * @param The type of the data set created by the operator. + * + * @see MapFunction + */ +public class MapPartitionOperator extends SingleInputUdfOperator> { + + protected final MapPartitionFunction function; + + + public MapPartitionOperator(DataSet input, MapPartitionFunction function) { + super(input, TypeExtractor.getMapPartitionReturnTypes(function, input.getType())); + + this.function = function; + extractSemanticAnnotationsFromUdf(function.getClass()); + } + + @Override + protected eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase> translateToDataFlow(Operator input) { + + String name = getName() != null ? getName() : function.getClass().getName(); + // create operator + MapPartitionOperatorBase> po = new MapPartitionOperatorBase>(function, new UnaryOperatorInformation(getInputType(), getResultType()), name); + // set input + po.setInput(input); + // set dop + if(this.getParallelism() > 0) { + // use specified dop + po.setDegreeOfParallelism(this.getParallelism()); + } else { + // if no dop has been specified, use dop of input operator to enable chaining + po.setDegreeOfParallelism(input.getDegreeOfParallelism()); + } + + return po; + } + +} diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java index 26c11c313f2d1..950b231cf2ce5 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java @@ -21,20 +21,13 @@ import java.lang.reflect.TypeVariable; import java.util.ArrayList; +import eu.stratosphere.api.java.functions.*; import eu.stratosphere.types.TypeInformation; import org.apache.commons.lang3.Validate; import org.apache.hadoop.io.Writable; import eu.stratosphere.api.common.io.InputFormat; -import eu.stratosphere.api.java.functions.CoGroupFunction; -import eu.stratosphere.api.java.functions.CrossFunction; -import eu.stratosphere.api.java.functions.FlatMapFunction; -import eu.stratosphere.api.java.functions.GroupReduceFunction; -import eu.stratosphere.api.java.functions.InvalidTypesException; -import eu.stratosphere.api.java.functions.JoinFunction; -import eu.stratosphere.api.java.functions.KeySelector; -import eu.stratosphere.api.java.functions.MapFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.types.Value; @@ -48,6 +41,17 @@ public static TypeInformation getMapReturnTypes(MapFunction TypeInformation getMapPartitionReturnTypes(MapPartitionFunction mapPartitionFunction, TypeInformation inType) { + validateInputType(MapPartitionFunction.class, mapPartitionFunction.getClass(), 0, inType); + if(mapPartitionFunction instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) mapPartitionFunction).getProducedType(); + } + return createTypeInfo(MapPartitionFunction.class, mapPartitionFunction.getClass(), 1, inType, null); + } + @SuppressWarnings("unchecked") public static TypeInformation getFlatMapReturnTypes(FlatMapFunction flatMapFunction, TypeInformation inType) { From 1f8e5a9aff5d1a45004ecb4694674e140ecc4cd5 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 15:20:55 +0200 Subject: [PATCH 08/17] cleanup --- .../eu/stratosphere/pact/runtime/task/MapPartitionDriver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java index e4294bcfb5fe5..0753e6e4f3772 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java @@ -14,7 +14,7 @@ * The MapPartitionTask creates an iterator over all key-value pairs of its input and hands that to the map_partition() method * of the MapFunction. * - * @see GenericCollectorMap + * @see GenericMapPartition * * @param The mapper's input data type. * @param The mapper's output data type. From 8dc6d70678de827882363f3616a70f0f4c5cff85 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 15:37:30 +0200 Subject: [PATCH 09/17] add estimation for map partition operator --- .../src/main/java/eu/stratosphere/compiler/PactCompiler.java | 4 ++-- .../java/eu/stratosphere/compiler/costs/CostEstimator.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java index cfbafb7162872..948869a0e87ab 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java @@ -630,10 +630,10 @@ else if (c instanceof GenericDataSourceBase) { n = dsn; } else if (c instanceof MapOperatorBase) { - n = new MapPartitionNode((MapOperatorBase) c); + n = new MapNode((MapOperatorBase) c); } else if (c instanceof MapPartitionOperatorBase) { - n = new MapNode((MapPartitionOperatorBase) c); + n = new MapPartitionNode((MapPartitionOperatorBase) c); } else if (c instanceof CollectorMapOperatorBase) { n = new CollectorMapNode((CollectorMapOperatorBase) c); diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java index 3e1d94a2e994d..ccecc0e42c3d7 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java @@ -154,6 +154,7 @@ public void costOperator(PlanNode n) { case BINARY_NO_OP: case COLLECTOR_MAP: case MAP: + case MAP_PARTITION: case FLAT_MAP: case ALL_GROUP_REDUCE: From 968a46e7e164130b864bf7e186c2e461168e118f Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 15:44:23 +0200 Subject: [PATCH 10/17] add map partition test --- .../test/operators/MapPartitionITCase.java | 131 ++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java new file mode 100644 index 0000000000000..6dbb944999218 --- /dev/null +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java @@ -0,0 +1,131 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.test.operators; + +import eu.stratosphere.api.common.Plan; +import eu.stratosphere.api.java.record.operators.FileDataSink; +import eu.stratosphere.api.java.record.operators.FileDataSource; +import eu.stratosphere.api.java.record.functions.MapPartitionFunction; +import eu.stratosphere.api.java.record.io.DelimitedInputFormat; +import eu.stratosphere.api.java.record.operators.MapOperator; +import eu.stratosphere.api.java.record.operators.MapPartitionOperator; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat; +import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseOutputFormat; +import eu.stratosphere.test.util.RecordAPITestBase; +import eu.stratosphere.types.IntValue; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; +import eu.stratosphere.util.Collector; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; + +@RunWith(Parameterized.class) +public class MapPartitionITCase extends RecordAPITestBase { + + private static final Log LOG = LogFactory.getLog(MapITCase.class); + + String inPath = null; + String resultPath = null; + + public MapPartitionITCase(Configuration testConfig) { + super(testConfig); + } + + private static final String IN = "1 1\n2 2\n2 8\n4 4\n4 4\n6 6\n7 7\n8 8\n" + + "1 1\n2 2\n2 2\n4 4\n4 4\n6 3\n5 9\n8 8\n1 1\n2 2\n2 2\n3 0\n4 4\n" + + "5 9\n7 7\n8 8\n1 1\n9 1\n5 9\n4 4\n4 4\n6 6\n7 7\n8 8\n"; + + private static final String RESULT = "1 11\n2 12\n4 14\n4 14\n1 11\n2 12\n2 12\n4 14\n4 14\n3 16\n1 11\n2 12\n2 12\n0 13\n4 14\n1 11\n4 14\n4 14\n"; + + @Override + protected void preSubmit() throws Exception { + inPath = createTempFile("in.txt", IN); + resultPath = getTempDirPath("result"); + } + + public static class TestMapPartition extends MapPartitionFunction implements Serializable { + private static final long serialVersionUID = 1L; + + private StringValue keyString = new StringValue(); + private StringValue valueString = new StringValue(); + + + @Override + public void mapPartition(Iterator records, Collector out) throws Exception { + while(records.hasNext() ){ + Record record = records.next(); + keyString = record.getField(0, keyString); + valueString = record.getField(1, valueString); + + LOG.debug("Processed: [" + keyString.toString() + "," + valueString.getValue() + "]"); + + if (Integer.parseInt(keyString.toString()) + Integer.parseInt(valueString.toString()) < 10) { + + record.setField(0, valueString); + record.setField(1, new IntValue(Integer.parseInt(keyString.toString()) + 10)); + + out.collect(record); + } + } + } + } + + @Override + protected Plan getTestJob() { + FileDataSource input = new FileDataSource( + new ContractITCaseInputFormat(), inPath); + DelimitedInputFormat.configureDelimitedFormat(input) + .recordDelimiter('\n'); + input.setDegreeOfParallelism(config.getInteger("MapPartitionTest#NoSubtasks", 1)); + + MapPartitionOperator testMapper = MapPartitionOperator.builder(new TestMapPartition()).build(); + testMapper.setDegreeOfParallelism(config.getInteger("TestMapPartition#NoSubtasks", 1)); + + FileDataSink output = new FileDataSink( + new ContractITCaseOutputFormat(), resultPath); + output.setDegreeOfParallelism(1); + + output.setInput(testMapper); + testMapper.setInput(input); + + return new Plan(output); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(RESULT, resultPath); + } + + @Parameters + public static Collection getConfigurations() throws FileNotFoundException, IOException { + LinkedList testConfigs = new LinkedList(); + + Configuration config = new Configuration(); + config.setInteger("MapPartitionTest#NoSubtasks", 4); + testConfigs.add(config); + + return toParameterList(testConfigs); + } +} From b5eb547df0e7a8a7afbaef7ff7e6fa80732d1988 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 15:46:52 +0200 Subject: [PATCH 11/17] cleanup --- .../java/eu/stratosphere/test/operators/MapPartitionITCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java index 6dbb944999218..5da0f999bb3da 100644 --- a/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java +++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/operators/MapPartitionITCase.java @@ -18,7 +18,6 @@ import eu.stratosphere.api.java.record.operators.FileDataSource; import eu.stratosphere.api.java.record.functions.MapPartitionFunction; import eu.stratosphere.api.java.record.io.DelimitedInputFormat; -import eu.stratosphere.api.java.record.operators.MapOperator; import eu.stratosphere.api.java.record.operators.MapPartitionOperator; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.test.operators.io.ContractITCaseIOFormats.ContractITCaseInputFormat; From 82b89ac8a9d9d027f199698f5a1d00238207718f Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 16:06:38 +0200 Subject: [PATCH 12/17] cleanup --- .../client/testjar/WordCount.java | 2 +- .../stratosphere/compiler/PactCompiler.java | 46 +++++++++++++++++-- .../wordcount/WordCountWithMapPartition.java | 38 +++++---------- 3 files changed, 53 insertions(+), 33 deletions(-) diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java index 5218dc22fb9a1..27e246d065e89 100644 --- a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java +++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java @@ -42,7 +42,7 @@ public static void main(String[] args) throws Exception { // get input data DataSet text = getTextDataSet(env); - + DataSet> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java index 948869a0e87ab..edb550c7f259e 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java @@ -13,6 +13,7 @@ package eu.stratosphere.compiler; + import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -23,19 +24,54 @@ import java.util.Map; import java.util.Set; -import eu.stratosphere.api.common.operators.base.*; -import eu.stratosphere.compiler.dag.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import eu.stratosphere.api.common.Plan; import eu.stratosphere.api.common.operators.Operator; import eu.stratosphere.api.common.operators.Union; +import eu.stratosphere.api.common.operators.base.BulkIterationBase; import eu.stratosphere.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder; +import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase; +import eu.stratosphere.api.common.operators.base.CollectorMapOperatorBase; +import eu.stratosphere.api.common.operators.base.CrossOperatorBase; +import eu.stratosphere.api.common.operators.base.DeltaIterationBase; import eu.stratosphere.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder; import eu.stratosphere.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder; +import eu.stratosphere.api.common.operators.base.FilterOperatorBase; +import eu.stratosphere.api.common.operators.base.FlatMapOperatorBase; +import eu.stratosphere.api.common.operators.base.GenericDataSinkBase; +import eu.stratosphere.api.common.operators.base.GenericDataSourceBase; +import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase; +import eu.stratosphere.api.common.operators.base.JoinOperatorBase; +import eu.stratosphere.api.common.operators.base.MapOperatorBase; +import eu.stratosphere.api.common.operators.base.MapPartitionOperatorBase; +import eu.stratosphere.api.common.operators.base.ReduceOperatorBase; import eu.stratosphere.compiler.costs.CostEstimator; import eu.stratosphere.compiler.costs.DefaultCostEstimator; +import eu.stratosphere.compiler.dag.BinaryUnionNode; +import eu.stratosphere.compiler.dag.BulkIterationNode; +import eu.stratosphere.compiler.dag.BulkPartialSolutionNode; +import eu.stratosphere.compiler.dag.CoGroupNode; +import eu.stratosphere.compiler.dag.CollectorMapNode; +import eu.stratosphere.compiler.dag.CrossNode; +import eu.stratosphere.compiler.dag.DataSinkNode; +import eu.stratosphere.compiler.dag.DataSourceNode; +import eu.stratosphere.compiler.dag.FilterNode; +import eu.stratosphere.compiler.dag.FlatMapNode; +import eu.stratosphere.compiler.dag.GroupReduceNode; +import eu.stratosphere.compiler.dag.IterationNode; +import eu.stratosphere.compiler.dag.MapNode; +import eu.stratosphere.compiler.dag.MapPartitionNode; +import eu.stratosphere.compiler.dag.MatchNode; +import eu.stratosphere.compiler.dag.OptimizerNode; +import eu.stratosphere.compiler.dag.PactConnection; +import eu.stratosphere.compiler.dag.ReduceNode; +import eu.stratosphere.compiler.dag.SinkJoiner; +import eu.stratosphere.compiler.dag.SolutionSetNode; +import eu.stratosphere.compiler.dag.TempMode; +import eu.stratosphere.compiler.dag.WorksetIterationNode; +import eu.stratosphere.compiler.dag.WorksetNode; import eu.stratosphere.compiler.deadlockdetect.DeadlockPreventer; import eu.stratosphere.compiler.plan.BinaryUnionPlanNode; import eu.stratosphere.compiler.plan.BulkIterationPlanNode; @@ -632,9 +668,9 @@ else if (c instanceof GenericDataSourceBase) { else if (c instanceof MapOperatorBase) { n = new MapNode((MapOperatorBase) c); } - else if (c instanceof MapPartitionOperatorBase) { - n = new MapPartitionNode((MapPartitionOperatorBase) c); - } + else if (c instanceof MapPartitionOperatorBase) { + n = new MapPartitionNode((MapPartitionOperatorBase) c); + } else if (c instanceof CollectorMapOperatorBase) { n = new CollectorMapNode((CollectorMapOperatorBase) c); } diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java index 0d7da8995023e..6c67fb5461bd6 100644 --- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java +++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java @@ -63,9 +63,6 @@ public static void main(String[] args) throws Exception { // get input data DataSet text = getTextDataSet(env); - - - DataSet> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.mapPartition(new Tokenizer()) @@ -94,32 +91,19 @@ public static void main(String[] args) throws Exception { * multiple pairs in the form of "(word,1)" (Tuple2). */ public static final class Tokenizer extends MapPartitionFunction> { - @Override - public void mapPartition(Iterator records, Collector> out) throws Exception { - while(records.hasNext()){ - // normalize and split the line - String[] tokens = records.next().toLowerCase().split("\\W+"); - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2(token, 1)); - } - } - } - } - - /*@Override - public void flatMap(String value, Collector> out) { - // normalize and split the line - String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2(token, 1)); + @Override + public void mapPartition(Iterator records, Collector> out) throws Exception { + while(records.hasNext()){ + // normalize and split the line + String[] tokens = records.next().toLowerCase().split("\\W+"); + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } } } - }*/ + } } // ************************************************************************* From f427d4cbf8fc7e239117ceade7f58d2677b965b7 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 16:07:33 +0200 Subject: [PATCH 13/17] cleanup --- .../common/functions/GenericMapPartition.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java index c96cf40ce8ab1..accf7314d895b 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/api/common/functions/GenericMapPartition.java @@ -5,13 +5,12 @@ import java.util.Iterator; public interface GenericMapPartition extends Function { - - /** - * A user-implemented function that modifies or transforms an incoming object. - * - * @param records All records for the mapper - * @param out The collector to hand results to. - * @throws Exception - */ - void mapPartition(Iterator records, Collector out) throws Exception; + /** + * A user-implemented function that modifies or transforms an incoming object. + * + * @param records All records for the mapper + * @param out The collector to hand results to. + * @throws Exception + */ + void mapPartition(Iterator records, Collector out) throws Exception; } \ No newline at end of file From 7a57a03a3111ea227b77187ad969cfc337e4bdc4 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 16:09:29 +0200 Subject: [PATCH 14/17] clean up --- .../eu/stratosphere/api/java/DataSet.java | 12 +++++----- .../java/functions/MapPartitionFunction.java | 22 +++++++++---------- .../api/java/typeutils/TypeExtractor.java | 14 ++++++------ 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java index 2bcf02a0bc48e..592e8d09cee3f 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java @@ -128,12 +128,12 @@ public MapOperator map(MapFunction mapper) { * @see MapPartitionOperator * @see DataSet */ - public MapPartitionOperator mapPartition(MapPartitionFunction mapPartition ){ - if (mapPartition == null) { - throw new NullPointerException("MapPartition function must not be null."); - } - return new MapPartitionOperator(this, mapPartition); - } + public MapPartitionOperator mapPartition(MapPartitionFunction mapPartition ){ + if (mapPartition == null) { + throw new NullPointerException("MapPartition function must not be null."); + } + return new MapPartitionOperator(this, mapPartition); + } /** * Applies a FlatMap transformation on a {@link DataSet}.
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java index d83329f01b3ab..97629140a991e 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java @@ -9,15 +9,15 @@ public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition { - private static final long serialVersionUID = 1L; - /** - * - * @param records All records for the mapper - * @param out The collector to hand results to. - * - * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation - * to fail and may trigger recovery. - */ - @Override - public abstract void mapPartition(Iterator records, Collector out) throws Exception; + private static final long serialVersionUID = 1L; + /** + * + * @param records All records for the mapper + * @param out The collector to hand results to. + * + * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation + * to fail and may trigger recovery. + */ + @Override + public abstract void mapPartition(Iterator records, Collector out) throws Exception; } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java index 950b231cf2ce5..7f922fd60aa8b 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java @@ -44,13 +44,13 @@ public static TypeInformation getMapReturnTypes(MapFunction TypeInformation getMapPartitionReturnTypes(MapPartitionFunction mapPartitionFunction, TypeInformation inType) { - validateInputType(MapPartitionFunction.class, mapPartitionFunction.getClass(), 0, inType); - if(mapPartitionFunction instanceof ResultTypeQueryable) { - return ((ResultTypeQueryable) mapPartitionFunction).getProducedType(); - } - return createTypeInfo(MapPartitionFunction.class, mapPartitionFunction.getClass(), 1, inType, null); - } + public static TypeInformation getMapPartitionReturnTypes(MapPartitionFunction mapPartitionFunction, TypeInformation inType) { + validateInputType(MapPartitionFunction.class, mapPartitionFunction.getClass(), 0, inType); + if(mapPartitionFunction instanceof ResultTypeQueryable) { + return ((ResultTypeQueryable) mapPartitionFunction).getProducedType(); + } + return createTypeInfo(MapPartitionFunction.class, mapPartitionFunction.getClass(), 1, inType, null); + } @SuppressWarnings("unchecked") From fa4f23377915bd5d85d268b1b9c4a16f945933f8 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 25 Jun 2014 16:22:14 +0200 Subject: [PATCH 15/17] clean up codestyle --- .../compiler/costs/CostEstimator.java | 2 +- .../wordcount/WordCountWithMapPartition.java | 3 +- .../eu/stratosphere/api/java/DataSet.java | 30 ++++- .../functions/MapPartitionFunction.java | 4 +- .../api/java/typeutils/TypeExtractor.java | 15 ++- .../pact/runtime/task/DriverStrategy.java | 2 +- .../pact/runtime/task/MapPartitionDriver.java | 110 +++++++++--------- 7 files changed, 99 insertions(+), 67 deletions(-) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java index ccecc0e42c3d7..1c2bd3deeae9d 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/CostEstimator.java @@ -154,7 +154,7 @@ public void costOperator(PlanNode n) { case BINARY_NO_OP: case COLLECTOR_MAP: case MAP: - case MAP_PARTITION: + case MAP_PARTITION: case FLAT_MAP: case ALL_GROUP_REDUCE: diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java index 6c67fb5461bd6..1d73bcee61d01 100644 --- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java +++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java @@ -16,7 +16,6 @@ import eu.stratosphere.api.java.DataSet; import eu.stratosphere.api.java.ExecutionEnvironment; -import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.MapPartitionFunction; import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.example.java.wordcount.util.WordCountData; @@ -103,7 +102,7 @@ public void mapPartition(Iterator records, Collector * A DataSet can be transformed into another DataSet by applying a transformation as for example diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java index 8589070b1ab5d..10adf0b3a6852 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java @@ -13,7 +13,7 @@ */ public abstract class MapPartitionFunction extends AbstractFunction implements GenericMapPartition { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; /** * This method must be implemented to provide a user implementation of a mappartitioner. @@ -27,5 +27,5 @@ public abstract class MapPartitionFunction extends AbstractFunction implements G * decide whether to retry the mapper execution. */ @Override - public abstract void mapPartition(Iterator records, Collector out) throws Exception; + public abstract void mapPartition(Iterator records, Collector out) throws Exception; } diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java index 7f922fd60aa8b..4f068bc4e25a0 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/typeutils/TypeExtractor.java @@ -21,16 +21,25 @@ import java.lang.reflect.TypeVariable; import java.util.ArrayList; -import eu.stratosphere.api.java.functions.*; import eu.stratosphere.types.TypeInformation; import org.apache.commons.lang3.Validate; import org.apache.hadoop.io.Writable; import eu.stratosphere.api.common.io.InputFormat; +import eu.stratosphere.api.java.functions.CoGroupFunction; +import eu.stratosphere.api.java.functions.CrossFunction; +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.GroupReduceFunction; +import eu.stratosphere.api.java.functions.InvalidTypesException; +import eu.stratosphere.api.java.functions.JoinFunction; +import eu.stratosphere.api.java.functions.KeySelector; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.functions.MapPartitionFunction; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.types.Value; + public class TypeExtractor { @SuppressWarnings("unchecked") @@ -42,8 +51,7 @@ public static TypeInformation getMapReturnTypes(MapFunction TypeInformation getMapPartitionReturnTypes(MapPartitionFunction mapPartitionFunction, TypeInformation inType) { validateInputType(MapPartitionFunction.class, mapPartitionFunction.getClass(), 0, inType); if(mapPartitionFunction instanceof ResultTypeQueryable) { @@ -52,7 +60,6 @@ public static TypeInformation getMapPartitionReturnTypes(MapParti return createTypeInfo(MapPartitionFunction.class, mapPartitionFunction.getClass(), 1, inType, null); } - @SuppressWarnings("unchecked") public static TypeInformation getFlatMapReturnTypes(FlatMapFunction flatMapFunction, TypeInformation inType) { validateInputType(FlatMapFunction.class, flatMapFunction.getClass(), 0, inType); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java index 23ea2041bfa36..730dc794ac960 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java @@ -39,7 +39,7 @@ public enum DriverStrategy { MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false), // the proper map partition - MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false), + MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false), // the flat mapper FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, false), diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java index 0753e6e4f3772..4f07fe5ce92c3 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java @@ -21,59 +21,59 @@ */ public class MapPartitionDriver implements PactDriver, OT> { - private PactTaskContext, OT> taskContext; - - private volatile boolean running; - - - @Override - public void setup(PactTaskContext, OT> context) { - this.taskContext = context; - this.running = true; - } - - @Override - public int getNumberOfInputs() { - return 1; - } - - @Override - public Class> getStubType() { - @SuppressWarnings("unchecked") - final Class> clazz = (Class>) (Class) GenericMapPartition.class; - return clazz; - } - - @Override - public boolean requiresComparatorOnInput() { - return false; - } - - @Override - public void prepare() { - // nothing, since a mapper does not need any preparation - } - - @Override - public void run() throws Exception { - // cache references on the stack - final MutableObjectIterator input = this.taskContext.getInput(0); - final GenericMapPartition function = this.taskContext.getStub(); - final Collector output = this.taskContext.getOutputCollector(); - - final MutableToRegularIteratorWrapper inIter = new MutableToRegularIteratorWrapper(input, this.taskContext.getInputSerializer(0).getSerializer() ); - IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance(); - - function.mapPartition(inIter, output); - } - - @Override - public void cleanup() { - // mappers need no cleanup, since no strategies are used. - } - - @Override - public void cancel() { - this.running = false; - } + private PactTaskContext, OT> taskContext; + + private volatile boolean running; + + + @Override + public void setup(PactTaskContext, OT> context) { + this.taskContext = context; + this.running = true; + } + + @Override + public int getNumberOfInputs() { + return 1; + } + + @Override + public Class> getStubType() { + @SuppressWarnings("unchecked") + final Class> clazz = (Class>) (Class) GenericMapPartition.class; + return clazz; + } + + @Override + public boolean requiresComparatorOnInput() { + return false; + } + + @Override + public void prepare() { + // nothing, since a mapper does not need any preparation + } + + @Override + public void run() throws Exception { + // cache references on the stack + final MutableObjectIterator input = this.taskContext.getInput(0); + final GenericMapPartition function = this.taskContext.getStub(); + final Collector output = this.taskContext.getOutputCollector(); + + final MutableToRegularIteratorWrapper inIter = new MutableToRegularIteratorWrapper(input, this.taskContext.getInputSerializer(0).getSerializer() ); + IT record = this.taskContext.getInputSerializer(0).getSerializer().createInstance(); + + function.mapPartition(inIter, output); + } + + @Override + public void cleanup() { + // mappers need no cleanup, since no strategies are used. + } + + @Override + public void cancel() { + this.running = false; + } } From e015e0d6c7a2f657ed793103795334881f401309 Mon Sep 17 00:00:00 2001 From: kay Date: Fri, 18 Jul 2014 17:31:17 +0200 Subject: [PATCH 16/17] cleanup for merge request --- .../client/testjar/WordCount.java | 2 +- .../operators/MapPartitionDescriptor.java | 2 +- .../wordcount/WordCountWithMapPartition.java | 145 ------------------ .../eu/stratosphere/api/java/DataSet.java | 16 +- .../java/functions/MapPartitionFunction.java | 13 ++ .../functions/MapPartitionFunction.java | 13 ++ .../pact/runtime/task/DriverStrategy.java | 2 +- .../pact/runtime/task/MapPartitionDriver.java | 13 ++ 8 files changed, 53 insertions(+), 153 deletions(-) delete mode 100644 stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java index 27e246d065e89..5218dc22fb9a1 100644 --- a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java +++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java @@ -42,7 +42,7 @@ public static void main(String[] args) throws Exception { // get input data DataSet text = getTextDataSet(env); - + DataSet> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java index 8f80e5534b020..41b707df84aee 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/MapPartitionDescriptor.java @@ -30,7 +30,7 @@ public class MapPartitionDescriptor extends OperatorDescriptorSingle { @Override public DriverStrategy getStrategy() { - return DriverStrategy.MAP; + return DriverStrategy.MAP_PARTITION; } @Override diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java deleted file mode 100644 index 1d73bcee61d01..0000000000000 --- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java +++ /dev/null @@ -1,145 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ -package eu.stratosphere.example.java.wordcount; - -import eu.stratosphere.api.java.DataSet; -import eu.stratosphere.api.java.ExecutionEnvironment; -import eu.stratosphere.api.java.functions.MapPartitionFunction; -import eu.stratosphere.api.java.tuple.Tuple2; -import eu.stratosphere.example.java.wordcount.util.WordCountData; -import eu.stratosphere.util.Collector; - -import java.util.Iterator; - -/** - * Implements the "WordCount" program that computes a simple word occurrence histogram - * over text files. - * - *

- * The input is a plain text file with lines separated by newline characters. - * - *

- * Usage: WordCount <text path> <result path>
- * If no parameters are provided, the program is run with default data from {@link WordCountData}. - * - *

- * This example shows how to: - *

    - *
  • write a simple Stratosphere program. - *
  • use Tuple data types. - *
  • write and use user-defined functions. - *
- * - */ -@SuppressWarnings("serial") -public class WordCountWithMapPartition { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up the execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataSet text = getTextDataSet(env); - - DataSet> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.mapPartition(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); - - // emit result - if(fileOutput) { - counts.writeAsCsv(outputPath, "\n", " "); - } else { - counts.print(); - } - - // execute program - env.execute("WordCount Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Implements the string tokenizer that splits sentences into words as a user-defined - * FlatMapFunction. The function takes a line (String) and splits it into - * multiple pairs in the form of "(word,1)" (Tuple2). - */ - public static final class Tokenizer extends MapPartitionFunction> { - @Override - public void mapPartition(Iterator records, Collector> out) throws Exception { - while(records.hasNext()){ - // normalize and split the line - String[] tokens = records.next().toLowerCase().split("\\W+"); - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2(token, 1)); - } - } - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - // parse input arguments - fileOutput = true; - if(args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: WordCount "); - return false; - } - } else { - System.out.println("Executing WordCount example with built-in default data."); - System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" Usage: WordCount "); - } - return true; - } - - private static DataSet getTextDataSet(ExecutionEnvironment env) { - if(fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } else { - // get default test text data - return WordCountData.getDefaultTextLineDataSet(env); - } - } -} diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java index 8aab62604cdd3..5c58f69bc1503 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java @@ -143,11 +143,17 @@ public MapOperator map(MapFunction mapper) { /** - * Applies a Map transformation on a {@link DataSet} by using an iterator.
- * The transformation calls a {@link MapPartitionFunction} for the full DataSet. - * Each MapPartitionFunction call returns elements. - * - * @param mapPartition The MapPartitionFunction that is called for the full DataSet. + * Applies a Map operation to the entire partition of the data. + * The function is called once per parallel partition of the data, + * and the entire partition is available through the given Iterator. + * The number of elements that each instance of the MapPartition function + * sees is non deterministic and depends on the degree of parallelism of the operation. + * + * This function is intended for operations that cannot transform individual elements, + * requires no grouping of elements. To transform individual elements, + * the use of {@code map()} and {@code flatMap()} is preferable. + * + * @param mapPartition The MapPartitionFunction that is called for the full DataSet. * @return A MapPartitionOperator that represents the transformed DataSet. * * @see MapPartitionFunction diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java index 97629140a991e..4c0155fc2f262 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/functions/MapPartitionFunction.java @@ -1,3 +1,16 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + package eu.stratosphere.api.java.functions; diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java index 10adf0b3a6852..5284d06e9e180 100644 --- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java +++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/record/functions/MapPartitionFunction.java @@ -1,3 +1,16 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + package eu.stratosphere.api.java.record.functions; import eu.stratosphere.api.common.functions.AbstractFunction; diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java index 730dc794ac960..5a0013ea8c7d6 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DriverStrategy.java @@ -38,7 +38,7 @@ public enum DriverStrategy { // the proper mapper MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false), - // the proper map partition + // the proper map partition MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false), // the flat mapper diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java index 4f07fe5ce92c3..0ec5f15c20ed4 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/MapPartitionDriver.java @@ -1,3 +1,16 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + package eu.stratosphere.pact.runtime.task; import eu.stratosphere.api.common.functions.GenericMapPartition; From c0bba330708d86fe0bbd2c4865fb594bbecd9398 Mon Sep 17 00:00:00 2001 From: kay Date: Fri, 18 Jul 2014 17:34:45 +0200 Subject: [PATCH 17/17] cleanup --- .../eu/stratosphere/example/java/wordcount/WordCount.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java index 69a3e0eb44015..95ab92f65abb5 100644 --- a/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java +++ b/stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java @@ -59,11 +59,8 @@ public static void main(String[] args) throws Exception { // get input data DataSet text = getTextDataSet(env); - - - - - DataSet> counts = + + DataSet> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1"