dt.application.GenericDimensionsApplication.operator.Store.fileStore.basePath
diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
index cf197b8d79..a8e8a31f39 100644
--- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
+++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java
@@ -15,6 +15,15 @@
*/
package com.datatorrent.demos.distributeddistinct;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.common.util.DTThrowable;
+import com.datatorrent.lib.algo.UniqueValueCount;
+import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput;
+import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -22,24 +31,11 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
-
import javax.annotation.Nonnull;
-
+import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-
-import com.datatorrent.common.util.DTThrowable;
-import com.datatorrent.lib.algo.UniqueValueCount;
-import com.datatorrent.lib.algo.UniqueValueCount.InternalCountOutput;
-import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator;
-
/**
*
* This operator supplements the {@link UniqueValueCount} operator by making it state-full.
@@ -58,11 +54,12 @@
*/
public abstract class UniqueValueCountAppender extends JDBCLookupCacheBackedOperator> implements Partitioner>
{
-
protected Set partitionKeys;
protected int partitionMask;
protected transient long windowID;
protected transient boolean batch;
+ @Min(1)
+ private int partitionCount = 1;
public UniqueValueCountAppender()
@@ -71,6 +68,16 @@ public UniqueValueCountAppender()
partitionMask = 0;
}
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
@Override
public void setup(Context.OperatorContext context)
{
@@ -181,13 +188,19 @@ public void remove(Object key)
* rollback, each partition will only clear the data that it is responsible for.
*/
@Override
- public Collection>> definePartitions(Collection>> partitions, int incrementalCapacity)
+ public Collection>> definePartitions(Collection>> partitions, int partitionCnt)
{
- if (incrementalCapacity == 0) {
- return partitions;
+ final int finalCapacity;
+
+ //In the case of parallel partitioning
+ if(partitionCnt != 0) {
+ finalCapacity = partitionCnt;
+ }
+ //Do normal partitioning
+ else {
+ finalCapacity = partitionCount;
}
- final int finalCapacity = partitions.size() + incrementalCapacity;
UniqueValueCountAppender anOldOperator = partitions.iterator().next().getPartitionedInstance();
partitions.clear();
diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
index 5a2bf781d2..5cbf5d1ce6 100644
--- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
+++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/Application.java
@@ -32,7 +32,7 @@
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.lib.algo.StatelessThroughputBasedPartitioner;
+import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.PubSubWebSocketInputOperator;
import com.datatorrent.lib.io.PubSubWebSocketOutputOperator;
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
index 89f2451e92..0db6f590c2 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapOperator.java
@@ -15,6 +15,13 @@
*/
package com.datatorrent.demos.mroperator;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.demos.mroperator.ReporterImpl.ReporterType;
+import com.datatorrent.lib.io.fs.AbstractHDFSInputOperator;
+import com.datatorrent.lib.util.KeyHashValPair;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -25,9 +32,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.validation.constraints.Min;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
@@ -46,14 +51,8 @@
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.datatorrent.lib.io.fs.AbstractHDFSInputOperator;
-import com.datatorrent.lib.util.KeyHashValPair;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.demos.mroperator.ReporterImpl.ReporterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
@@ -83,6 +82,8 @@ public class MapOperator extends AbstractHDFSInputOperator imple
public final transient DefaultOutputPort> outputCount = new DefaultOutputPort>();
public final transient DefaultOutputPort> output = new DefaultOutputPort>();
private transient JobConf jobConf;
+ @Min(1)
+ private int partitionCount = 1;
public Class extends InputSplit> getInputSplitClass()
{
@@ -115,6 +116,16 @@ public void setDirName(String dirName)
super.setFilePath(dirName);
}
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
@Override
public void beginWindow(long windowId)
{
@@ -288,8 +299,10 @@ public void partitioned(Map>> par
@SuppressWarnings("rawtypes")
@Override
- public Collection>> definePartitions(Collection>> partitions, int incrementalCapacity)
+ public Collection>> definePartitions(Collection>> partitions, int partitionCnt)
{
+ int tempPartitionCount = partitionCount;
+
Collection c = partitions;
Collection>> operatorPartitions = c;
Partition> template;
@@ -300,7 +313,7 @@ public Collection>> definePartitions(Colle
if (outstream.size() == 0) {
InputSplit[] splits;
try {
- splits = getSplits(new JobConf(conf), incrementalCapacity + 1, template.getPartitionedInstance().getDirName());
+ splits = getSplits(new JobConf(conf), tempPartitionCount, template.getPartitionedInstance().getDirName());
}
catch (Exception e1) {
logger.info(" can't get splits {}", e1.getMessage());
diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java
index 64035db255..c8dbc916e6 100644
--- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java
+++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/MapReduceApplication.java
@@ -16,8 +16,8 @@
package com.datatorrent.demos.mroperator;
import com.datatorrent.api.*;
-import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.partitioner.StatelessPartitioner;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -99,7 +99,7 @@ public void populateDAG(DAG dag, Configuration conf)
MapOperator inputOperator = dag.addOperator("map", new MapOperator());
inputOperator.setInputFormatClass(inputFormat);
inputOperator.setDirName(dirName);
- dag.setAttribute(inputOperator, OperatorContext.INITIAL_PARTITION_COUNT, numberOfMaps);
+ dag.setAttribute(inputOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner>(numberOfMaps));
String configFileName = null;
if (configurationfilePath != null && !configurationfilePath.isEmpty()) {
@@ -117,7 +117,9 @@ public void populateDAG(DAG dag, Configuration conf)
ReduceOperator reduceOpr = dag.addOperator("reduce", new ReduceOperator());
reduceOpr.setReduceClass(reduceClass);
reduceOpr.setConfigFile(configFileName);
- dag.setAttribute(reduceOpr, Context.OperatorContext.INITIAL_PARTITION_COUNT, numberOfReducers);
+ dag.setAttribute(reduceOpr,
+ Context.OperatorContext.PARTITIONER,
+ new StatelessPartitioner>(numberOfReducers));
HdfsKeyValOutputOperator console = dag.addOperator("console", new HdfsKeyValOutputOperator());
console.setFilePath(outputDirName);
diff --git a/demos/scalability/src/main/java/com/datatorrent/demos/scalability/ScalableAdsApp.java b/demos/scalability/src/main/java/com/datatorrent/demos/scalability/ScalableAdsApp.java
index 669e09f183..97a4c52006 100644
--- a/demos/scalability/src/main/java/com/datatorrent/demos/scalability/ScalableAdsApp.java
+++ b/demos/scalability/src/main/java/com/datatorrent/demos/scalability/ScalableAdsApp.java
@@ -15,18 +15,15 @@
*/
package com.datatorrent.demos.scalability;
+import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.algo.StatelessPartitioner;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.MapMultiConsoleOutputOperator;
-
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.hadoop.conf.Configuration;
/**
@@ -59,7 +56,7 @@ public void populateDAG(DAG dag, Configuration conf)
InputItemGenerator input = dag.addOperator("input", InputItemGenerator.class);
dag.setOutputPortAttribute(input.outputPort, PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
- dag.setAttribute(input, OperatorContext.INITIAL_PARTITION_COUNT, partitions);
+ dag.setAttribute(input, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(partitions));
InputDimensionGenerator inputDimension = dag.addOperator("inputDimension", InputDimensionGenerator.class);
dag.setInputPortAttribute(inputDimension.inputPort, PortContext.PARTITION_PARALLEL, true);
@@ -77,7 +74,9 @@ public void populateDAG(DAG dag, Configuration conf)
//MapMultiConsoleOutputOperator console = dag.addOperator("console", MapMultiConsoleOutputOperator.class);
ConsoleOutputOperator console = dag.addOperator("console", ConsoleOutputOperator.class);
- dag.setAttribute(console, OperatorContext.INITIAL_PARTITION_COUNT, partitions_agg);
+ dag.setAttribute(console,
+ Context.OperatorContext.PARTITIONER,
+ new StatelessPartitioner(partitions_agg));
console.silent= true;
console.setDebug(false);
dag.setInputPortAttribute(console.input, PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
index cb0dea43f2..3365dd2a4f 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java
@@ -20,6 +20,7 @@
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.algo.PartitionableUniqueCount;
+import com.datatorrent.lib.partitioner.StatelessPartitioner;
import com.datatorrent.lib.algo.UniqueCounterValue;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.stream.StreamDuplicater;
@@ -51,7 +52,7 @@ public void populateDAG(DAG dag, Configuration entries)
PartitionableUniqueCount uniqCount = dag.addOperator("uniqevalue", new PartitionableUniqueCount());
uniqCount.setCumulative(false);
- dag.setAttribute(uniqCount, Context.OperatorContext.INITIAL_PARTITION_COUNT, 3);
+ dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner>(3));
CountVerifier verifier = dag.addOperator("verifier", new CountVerifier());
StreamDuplicater> dup = dag.addOperator("dup", new StreamDuplicater>());
diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
index b9f5784e9c..fcf8e75013 100644
--- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
+++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java
@@ -20,6 +20,7 @@
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.algo.PartitionableUniqueCount;
+import com.datatorrent.lib.partitioner.StatelessPartitioner;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.util.KeyValPair;
import org.apache.hadoop.conf.Configuration;
@@ -43,7 +44,8 @@ public void populateDAG(DAG dag, Configuration entries)
PartitionableUniqueCount> uniqCount =
dag.addOperator("uniqevalue", new PartitionableUniqueCount>());
uniqCount.setCumulative(false);
- dag.setAttribute(uniqCount, Context.OperatorContext.INITIAL_PARTITION_COUNT, 3);
+ dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new StatelessPartitioner>>(3));
+
ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
dag.addStream("datain", randGen.outPort, uniqCount.data);
diff --git a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
index 0d9c5973bf..bae2199777 100644
--- a/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
+++ b/demos/yahoofinance/src/main/java/com/datatorrent/demos/yahoofinance/YahooFinanceApplication.java
@@ -95,8 +95,8 @@
*
* Scaling Options :
*
- * - Volume operator can be replicated using 'INITIAL_PARTITION_COUNT' options
- * on operator.
+ * - Volume operator can be replicated using a {@link StatelessPartitioner}
+ * on an operator.
* - Range value operator can replicated but using proper unifier
* operator(read App Dev Guide).
* - Slinging window operator can be replicated with proper unifier operator.
diff --git a/library/src/main/java/com/datatorrent/lib/bucket/BucketManager.java b/library/src/main/java/com/datatorrent/lib/bucket/BucketManager.java
index 4d40003b7e..ff5b2e6551 100644
--- a/library/src/main/java/com/datatorrent/lib/bucket/BucketManager.java
+++ b/library/src/main/java/com/datatorrent/lib/bucket/BucketManager.java
@@ -154,8 +154,7 @@ public interface BucketManager
* @param partitionKeysToManagers mapping of partition keys to {@link BucketManager}s of new partitions.
* @param partitionMask partition mask to find which partition an event belongs to.
*/
- void definePartitions(List> oldManagers, Map> partitionKeysToManagers,
- int partitionMask);
+ void definePartitions(List> oldManagers, Map> partitionKeysToManagers, int partitionMask);
/**
* Callback interface for {@link BucketManager} for load and off-load operations.
diff --git a/library/src/main/java/com/datatorrent/lib/dedup/Deduper.java b/library/src/main/java/com/datatorrent/lib/dedup/Deduper.java
index 75a612c801..aee475c889 100644
--- a/library/src/main/java/com/datatorrent/lib/dedup/Deduper.java
+++ b/library/src/main/java/com/datatorrent/lib/dedup/Deduper.java
@@ -19,7 +19,7 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-
+import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.mutable.MutableLong;
@@ -145,6 +145,8 @@ public final void process(INPUT tuple)
private transient OperatorContext context;
protected BasicCounters counters;
private transient long currentWindow;
+ @Min(1)
+ private int partitionCount = 1;
public Deduper()
{
@@ -156,6 +158,16 @@ public Deduper()
counters = new BasicCounters(MutableLong.class);
}
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
@Override
public void setup(OperatorContext context)
{
@@ -252,12 +264,6 @@ public void bucketOffLoaded(long bucketKey)
{
}
- public Collection>> rebalancePartitions(Collection>> partitions)
- {
- /* we do not re-balance since we do not know how to do it */
- return partitions;
- }
-
@Override
public void partitioned(Map>> partitions)
{
@@ -265,10 +271,17 @@ public void partitioned(Map>> partitio
@Override
@SuppressWarnings({"BroadCatchBlock", "TooBroadCatch", "UseSpecificCatch"})
- public Collection>> definePartitions(Collection>> partitions, int incrementalCapacity)
+ public Collection>> definePartitions(Collection>> partitions, int partitionCnt)
{
- if (incrementalCapacity == 0) {
- return rebalancePartitions(partitions);
+ final int finalCapacity;
+
+ //Do parallel partitioning
+ if(partitionCnt != 0) {
+ finalCapacity = partitionCnt;
+ }
+ //Do normal partitioning
+ else {
+ finalCapacity = partitionCount;
}
//Collect the state here
@@ -294,7 +307,6 @@ public Collection>> definePartitions(Collection
partition.getPartitionedInstance().waitingEvents.clear();
}
- final int finalCapacity = partitions.size() + incrementalCapacity;
partitions.clear();
Collection>> newPartitions = Lists.newArrayListWithCapacity(finalCapacity);
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractBlockReader.java
index 7238c85b82..8f84d217d0 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractBlockReader.java
@@ -268,9 +268,13 @@ protected void closeCurrentReader() throws IOException
}
}
+ /**
+ * Note: This partitioner does not support parallel partitioning.
+ * {@inheritDoc}
+ */
@SuppressWarnings("unchecked")
@Override
- public Collection>> definePartitions(Collection>> partitions, int incrementalCapacity)
+ public Collection>> definePartitions(Collection>> partitions, int partitionCnt)
{
if (partitions.iterator().next().getStats() == null) {
//First time when define partitions is called
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
index e440cafe81..e1d79561a4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFSDirectoryInputOperator.java
@@ -658,11 +658,11 @@ protected void closeFile(InputStream is) throws IOException
}
@Override
- public Collection>> definePartitions(Collection>> partitions, int incrementalCapacity)
+ public Collection>> definePartitions(Collection>> partitions, int partitionCnt)
{
lastRepartition = System.currentTimeMillis();
- int totalCount = computedNewPartitionCount(partitions, incrementalCapacity);
+ int totalCount = computedNewPartitionCount(partitions, partitionCnt);
LOG.debug("Computed new partitions: {}", totalCount);
diff --git a/library/src/main/java/com/datatorrent/lib/partitioner/StatelessPartitioner.java b/library/src/main/java/com/datatorrent/lib/partitioner/StatelessPartitioner.java
new file mode 100644
index 0000000000..c4aaef7ddf
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/partitioner/StatelessPartitioner.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.partitioner;
+
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.InputPort;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Partitioner.Partition;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.validation.constraints.Min;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a simple partitioner which creates partitionCount number of clones of an operator.
+ * @param The type of the operator
+ */
+public class StatelessPartitioner implements Partitioner, Serializable
+{
+ private static final Logger logger = LoggerFactory.getLogger(StatelessPartitioner.class);
+ private static final long serialVersionUID = 201411071710L;
+ /**
+ * The number of partitions for the default partitioner to create.
+ */
+ @Min(1)
+ private int partitionCount = 1;
+
+ /**
+ * This creates a partitioner which creates only one partition.
+ */
+ public StatelessPartitioner()
+ {
+ }
+
+ /**
+ * This constructor is used to create the partitioner from a property.
+ * @param value A string which is an integer of the number of partitions to create
+ */
+ public StatelessPartitioner(String value)
+ {
+ this(Integer.parseInt(value));
+ }
+
+ /**
+ * This creates a partitioner which creates partitonCount partitions.
+ * @param partitionCount The number of partitions to create.
+ */
+ public StatelessPartitioner(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ /**
+ * This method sets the number of partitions for the StatelessPartitioner to create.
+ * @param partitionCount The number of partitions to create.
+ */
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ /**
+ * This method gets the number of partitions for the StatelessPartitioner to create.
+ * @return The number of partitions to create.
+ */
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Collection> definePartitions(Collection> partitions, int partitionCnt)
+ {
+ int tempPartitionCount;
+
+ //Do parallel partitioning
+ if(partitionCnt != 0) {
+ tempPartitionCount = partitionCnt;
+ }
+ //Do normal partitioning
+ else {
+ tempPartitionCount = partitionCount;
+ }
+
+ //Get a partition
+ DefaultPartition partition = (DefaultPartition) partitions.iterator().next();
+
+ logger.debug("entering define partitions, partitionCount {} incrementalCapacity {}",
+ tempPartitionCount,
+ partitionCnt);
+ T operator = partitions.iterator().next().getPartitionedInstance();
+ Collection> newPartitions = null;
+
+ //first call to define partitions
+ if(partitions.iterator().next().getStats() == null) {
+ logger.debug("first call to define partitions");
+ newPartitions = Lists.newArrayList();
+
+ for (int partitionCounter = 0;
+ partitionCounter < tempPartitionCount;
+ partitionCounter++) {
+ newPartitions.add(new DefaultPartition(operator));
+ }
+
+ // partition the stream that was first connected in the DAG and send full data to remaining input ports
+ // this gives control over which stream to partition under default partitioning to the DAG writer
+ List> inputPortList = partition.getInputPortList();
+ //assign partition keys
+ if (!inputPortList.isEmpty()) {
+ DefaultPartition.assignPartitionKeys(newPartitions, inputPortList.iterator().next());
+ }
+ }
+ else {
+ logger.debug("subsequent call to define partitions");
+ //define partitions is being called again
+
+ //Repartitioning input operator
+ if(partition.getPartitionKeys().isEmpty()) {
+ newPartitions = repartitionInputOperator(partitions);
+ }
+ else {
+ newPartitions = repartition(partitions);
+ }
+ }
+
+ logger.debug("new partition size {}", newPartitions.size());
+
+ return newPartitions;
+ }
+
+ @Override
+ public void partitioned(Map> partitions)
+ {
+ //Do nothing
+ }
+
+ /**
+ * Change existing partitioning based on runtime state (load). Unlike
+ * implementations of {@link Partitioner}), decisions are made
+ * solely based on load indicator and operator state is not
+ * considered in the event of partition split or merge.
+ *
+ * @param partitions
+ * List of new partitions
+ * @return
+ */
+ public static Collection> repartition(Collection> partitions)
+ {
+ List> newPartitions = new ArrayList>();
+ HashMap> lowLoadPartitions = new HashMap>();
+ for (Partition p: partitions) {
+ int load = p.getLoad();
+ if (load < 0) {
+ // combine neighboring underutilized partitions
+ PartitionKeys pks = p.getPartitionKeys().values().iterator().next(); // one port partitioned
+ for (int partitionKey: pks.partitions) {
+ // look for the sibling partition by excluding leading bit
+ int reducedMask = pks.mask >>> 1;
+ String lookupKey = Integer.valueOf(reducedMask) + "-" + Integer.valueOf(partitionKey & reducedMask);
+ Partition siblingPartition = lowLoadPartitions.remove(partitionKey & reducedMask);
+ if (siblingPartition == null) {
+ lowLoadPartitions.put(partitionKey & reducedMask, p);
+ }
+ else {
+ // both of the partitions are low load, combine
+ PartitionKeys newPks = new PartitionKeys(reducedMask, Sets.newHashSet(partitionKey & reducedMask));
+ // put new value so the map gets marked as modified
+ InputPort> port = siblingPartition.getPartitionKeys().keySet().iterator().next();
+ siblingPartition.getPartitionKeys().put(port, newPks);
+ // add as new partition
+ newPartitions.add(siblingPartition);
+ //LOG.debug("partition keys after merge {}", siblingPartition.getPartitionKeys());
+ }
+ }
+ }
+ else if (load > 0) {
+ // split bottlenecks
+ Map, PartitionKeys> keys = p.getPartitionKeys();
+ Map.Entry, PartitionKeys> e = keys.entrySet().iterator().next();
+
+ final int newMask;
+ final Set newKeys;
+
+ if (e.getValue().partitions.size() == 1) {
+ // split single key
+ newMask = (e.getValue().mask << 1) | 1;
+ int key = e.getValue().partitions.iterator().next();
+ int key2 = (newMask ^ e.getValue().mask) | key;
+ newKeys = Sets.newHashSet(key, key2);
+ }
+ else {
+ // assign keys to separate partitions
+ newMask = e.getValue().mask;
+ newKeys = e.getValue().partitions;
+ }
+
+ for (int key: newKeys) {
+ Partition newPartition = new DefaultPartition(p.getPartitionedInstance());
+ newPartition.getPartitionKeys().put(e.getKey(), new PartitionKeys(newMask, Sets.newHashSet(key)));
+ newPartitions.add(newPartition);
+ }
+ }
+ else {
+ // leave unchanged
+ newPartitions.add(p);
+ }
+ }
+ // put back low load partitions that could not be combined
+ newPartitions.addAll(lowLoadPartitions.values());
+ return newPartitions;
+ }
+
+ /**
+ * Adjust the partitions of an input operator (operator with no connected input stream).
+ *
+ * @param The operator type
+ * @param partitions
+ * @return
+ */
+ public static Collection> repartitionInputOperator(Collection> partitions)
+ {
+ List> newPartitions = new ArrayList>();
+ List> lowLoadPartitions = new ArrayList>();
+ for (Partition p: partitions) {
+ int load = p.getLoad();
+ if (load < 0) {
+ if (!lowLoadPartitions.isEmpty()) {
+ newPartitions.add(lowLoadPartitions.remove(0));
+ }
+ else {
+ lowLoadPartitions.add(p);
+ }
+ }
+ else if (load > 0) {
+ newPartitions.add(new DefaultPartition(p.getPartitionedInstance()));
+ newPartitions.add(new DefaultPartition(p.getPartitionedInstance()));
+ }
+ else {
+ newPartitions.add(p);
+ }
+ }
+ newPartitions.addAll(lowLoadPartitions);
+ return newPartitions;
+ }
+}
diff --git a/library/src/main/java/com/datatorrent/lib/algo/StatelessThroughputBasedPartitioner.java b/library/src/main/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitioner.java
similarity index 85%
rename from library/src/main/java/com/datatorrent/lib/algo/StatelessThroughputBasedPartitioner.java
rename to library/src/main/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitioner.java
index 88ee9ace29..b75e267539 100644
--- a/library/src/main/java/com/datatorrent/lib/algo/StatelessThroughputBasedPartitioner.java
+++ b/library/src/main/java/com/datatorrent/lib/partitioner/StatelessThroughputBasedPartitioner.java
@@ -13,21 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.datatorrent.lib.algo;
+package com.datatorrent.lib.partitioner;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.StatsListener;
+import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.*;
+import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Operator;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.StatsListener;
-
/**
*
* This does the partition of the operator based on the throughput. This partitioner doesn't copy the state during the repartitioning
@@ -48,6 +47,8 @@
public class StatelessThroughputBasedPartitioner implements StatsListener, Partitioner, Serializable
{
private static final Logger logger = LoggerFactory.getLogger(StatelessThroughputBasedPartitioner.class);
+ private static final long serialVersionUID = 201412021109L;
+
private long maximumEvents;
private long minimumEvents;
private long cooldownMillis = 2000;
@@ -55,6 +56,43 @@ public class StatelessThroughputBasedPartitioner implements
private long partitionNextMillis;
private boolean repartition;
private transient HashMap partitionedInstanceStatus = new HashMap();
+ @Min(1)
+ private int initialPartitionCount = 0;
+
+ /**
+ * This creates a partitioner which begins with only one partition.
+ */
+ public StatelessThroughputBasedPartitioner()
+ {
+ }
+
+ /**
+ * This constructor is used to create the partitioner from a property.
+ * @param value A string which is an integer of the number of partitions to begin with
+ */
+ public StatelessThroughputBasedPartitioner(String value)
+ {
+ this(Integer.parseInt(value));
+ }
+
+ /**
+ * This creates a partitioner which creates partitonCount partitions.
+ * @param initialPartitionCount The number of partitions to begin with.
+ */
+ public StatelessThroughputBasedPartitioner(int initialPartitionCount)
+ {
+ this.initialPartitionCount = initialPartitionCount;
+ }
+
+ public void setInitialPartitionCount(int initialPartitionCount)
+ {
+ this.initialPartitionCount = initialPartitionCount;
+ }
+
+ public int getInitialPartitionCount()
+ {
+ return initialPartitionCount;
+ }
@Override
public Response processStats(BatchedOperatorStats stats)
@@ -84,7 +122,7 @@ else if (!repartition) {
}
@Override
- public Collection> definePartitions(Collection> partitions, int incrementalCapacity)
+ public Collection> definePartitions(Collection> partitions, int partitionCnt)
{
if (partitionedInstanceStatus == null || partitionedInstanceStatus.isEmpty()) {
// first call
@@ -94,7 +132,8 @@ public Collection> definePartitions(Collection> partit
}
partitionNextMillis = System.currentTimeMillis() + 2 * cooldownMillis;
nextMillis = partitionNextMillis;
- return null;
+ //This is now broken cannot return null. Refer to StatelessPartitioner
+ return new StatelessPartitioner(initialPartitionCount).definePartitions(partitions, partitionCnt);
}
else {
// repartition call
diff --git a/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java b/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
index 57f38ec04f..9c89b00562 100644
--- a/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
+++ b/library/src/main/java/com/datatorrent/lib/statistics/DimensionsComputation.java
@@ -15,32 +15,30 @@
*/
package com.datatorrent.lib.statistics;
+import com.datatorrent.api.*;
+import com.datatorrent.api.Context.OperatorContext;
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import gnu.trove.map.hash.TCustomHashMap;
+import gnu.trove.strategy.HashingStrategy;
import java.io.*;
import java.lang.reflect.Array;
import java.util.*;
import java.util.Map.Entry;
-
-import gnu.trove.map.hash.TCustomHashMap;
-import gnu.trove.strategy.HashingStrategy;
+import javax.validation.constraints.Min;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.*;
-
/**
* An implementation of an operator that computes dimensions of events.
*
* @displayName Dimension Computation
* @category Statistics
* @tags event, dimension, aggregation, computation
- *
+ *
* @param - Type of the tuple whose attributes are used to define dimensions.
* @since 1.0.2
*/
@@ -52,7 +50,7 @@ public void setUnifier(Unifier unifier)
{
this.unifier = unifier;
}
-
+
/**
* Output port that emits an aggregate of events.
*/
@@ -208,19 +206,37 @@ public void transferDimension(Aggregator aggregator, Dimension
public static class PartitionerImpl implements Partitioner>
{
+ @Min(1)
+ private int partitionCount;
+
+ public void setPartitionCount(int partitionCount)
+ {
+ this.partitionCount = partitionCount;
+ }
+
+ public int getPartitionCount()
+ {
+ return partitionCount;
+ }
+
@Override
public void partitioned(Map>> partitions)
{
}
@Override
- public Collection>> definePartitions(Collection>> partitions, int incrementalCapacity)
+ public Collection>> definePartitions(Collection>> partitions, int partitionCnt)
{
- if (incrementalCapacity == 0) {
- return partitions;
- }
+ int newPartitionsCount;
- int newPartitionsCount = partitions.size() + incrementalCapacity;
+ //Do parallel partitioning
+ if(partitionCnt != 0) {
+ newPartitionsCount = partitionCnt;
+ }
+ //Do normal partitioning
+ else {
+ newPartitionsCount = partitionCount;
+ }
LinkedHashMap, DimensionsComputation> map = new LinkedHashMap, DimensionsComputation>(newPartitionsCount);
for (Partition> partition : partitions) {
diff --git a/library/src/main/java/com/datatorrent/lib/util/AlertEscalationOperator.java b/library/src/main/java/com/datatorrent/lib/util/AlertEscalationOperator.java
index 546ee03700..508b175536 100644
--- a/library/src/main/java/com/datatorrent/lib/util/AlertEscalationOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/util/AlertEscalationOperator.java
@@ -128,8 +128,12 @@ public void partitioned(Map> partiti
{
}
+ /**
+ * Note: This partitioner does not support parallel partitioning.
+ * {@inheritDoc}
+ */
@Override
- public Collection> definePartitions(Collection> partitions, int incrementalCapacity)
+ public Collection> definePartitions(Collection> partitions, int partitionCnt)
{
// prevent partitioning
List> newPartitions = new ArrayList>(1);
diff --git a/library/src/test/java/com/datatorrent/lib/algo/StatelessPartitionerTest.java b/library/src/test/java/com/datatorrent/lib/algo/StatelessPartitionerTest.java
new file mode 100644
index 0000000000..3ae245dd25
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/algo/StatelessPartitionerTest.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.algo;
+
+import com.datatorrent.lib.partitioner.StatelessPartitioner;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Partitioner.Partition;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.StringCodec.Object2String;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.stream.DevNull;
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StatelessPartitionerTest
+{
+ @ApplicationAnnotation(name="TestApp")
+ public static class DummyApp implements StreamingApplication
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ DummyOperator output = dag.addOperator("DummyOutput", DummyOperator.class);
+ DevNull sink = dag.addOperator("Sink", new DevNull());
+ dag.addStream("OutputToSink", output.output, sink.data);
+ }
+ }
+
+ public static class DummyOperator implements Operator
+ {
+ public final DefaultOutputPort output = new DefaultOutputPort();
+
+ private Integer value;
+
+ public DummyOperator()
+ {
+ }
+
+ public DummyOperator(Integer value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ //Do nothing
+ }
+
+ @Override
+ public void endWindow()
+ {
+ //Do nothing
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ //Do nothing
+ }
+
+ @Override
+ public void teardown()
+ {
+ //Do nothing
+ }
+
+ public void setValue(int value)
+ {
+ this.value = value;
+ }
+
+ public int getValue()
+ {
+ return value;
+ }
+ }
+
+ @Test
+ public void partition1Test()
+ {
+ DummyOperator dummyOperator = new DummyOperator(5);
+ StatelessPartitioner statelessPartitioner = new StatelessPartitioner();
+
+ Collection> partitions = Lists.newArrayList();
+ DefaultPartition defaultPartition = new DefaultPartition(dummyOperator);
+ partitions.add(defaultPartition);
+
+ Collection> newPartitions = statelessPartitioner.definePartitions(partitions, 0);
+ Assert.assertEquals("Incorred number of partitions", 1, newPartitions.size());
+
+ for(Partition partition: newPartitions) {
+ Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
+ }
+ }
+
+ @Test
+ public void partition5Test()
+ {
+ DummyOperator dummyOperator = new DummyOperator(5);
+ StatelessPartitioner statelessPartitioner = new StatelessPartitioner(5);
+
+ Collection> partitions = Lists.newArrayList();
+ DefaultPartition defaultPartition = new DefaultPartition(dummyOperator);
+ partitions.add(defaultPartition);
+
+ Collection> newPartitions = statelessPartitioner.definePartitions(partitions, 0);
+ Assert.assertEquals("Incorred number of partitions", 5, newPartitions.size());
+
+ for(Partition partition: newPartitions) {
+ Assert.assertEquals("Incorrect cloned value", 5, partition.getPartitionedInstance().getValue());
+ }
+ }
+
+ @Test
+ public void objectPropertyTest()
+ {
+ Object2String> propertyReader = new Object2String>();
+ StatelessPartitioner partitioner = propertyReader.fromString("com.datatorrent.lib.algo.StatelessPartitioner:3");
+ Assert.assertEquals(3, partitioner.getPartitionCount());
+ }
+
+ @Test
+ public void launchPartitionTestApp()
+ {
+ Configuration conf = new Configuration(false);
+ conf.set("dt.operator.DummyOutput.attr.PARTITIONER", "com.datatorrent.lib.algo.StatelessPartitioner:3");
+
+ LocalMode lma = LocalMode.newInstance();
+
+ try {
+ lma.prepareDAG(new DummyApp(), conf);
+ LocalMode.Controller lc = lma.getController();
+ lc.run(1);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
diff --git a/samples/src/main/java/com/datatorrent/samples/lib/math/PartitionMathSumSample.java b/samples/src/main/java/com/datatorrent/samples/lib/math/PartitionMathSumSample.java
index 2631efbea5..27004ac7d2 100644
--- a/samples/src/main/java/com/datatorrent/samples/lib/math/PartitionMathSumSample.java
+++ b/samples/src/main/java/com/datatorrent/samples/lib/math/PartitionMathSumSample.java
@@ -16,15 +16,14 @@
package com.datatorrent.samples.lib.math;
-import org.apache.hadoop.conf.Configuration;
-
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
-
+import com.datatorrent.lib.partitioner.StatelessPartitioner;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.math.Sum;
import com.datatorrent.lib.testbench.RandomEventGenerator;
+import org.apache.hadoop.conf.Configuration;
/**
* * This sample application code for showing sample usage of malhar operator(s).
@@ -53,8 +52,7 @@ public void populateDAG(DAG dag, Configuration conf)
Sum sum = dag.addOperator("sum", Sum.class);
dag.addStream("stream1", rand.integer_data, sum.data);
- dag.getMeta(sum).getAttributes()
- .put(OperatorContext.INITIAL_PARTITION_COUNT, 4);
+ dag.getMeta(sum).getAttributes().put(OperatorContext.PARTITIONER, new StatelessPartitioner>(4));
dag.getMeta(sum).getAttributes()
.put(OperatorContext.APPLICATION_WINDOW_COUNT, 20);
diff --git a/samples/src/test/java/com/datatorrent/samples/lib/math/PartitionMathSumSampleTest.java b/samples/src/test/java/com/datatorrent/samples/lib/math/PartitionMathSumSampleTest.java
new file mode 100644
index 0000000000..cc1b142388
--- /dev/null
+++ b/samples/src/test/java/com/datatorrent/samples/lib/math/PartitionMathSumSampleTest.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.samples.lib.math;
+
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.samples.lib.math.PartitionMathSumSample;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class PartitionMathSumSampleTest
+{
+ @Test
+ public void testApp()
+ {
+ LocalMode lma = LocalMode.newInstance();
+ Configuration conf = new Configuration();
+
+ try {
+ lma.prepareDAG(new PartitionMathSumSample(), conf);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ LocalMode.Controller lc = lma.getController();
+ lc.setHeartbeatMonitoringEnabled(false);
+ lc.run(50);
+ }
+}