Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
* Note: This should be committed with SPOI-3346
Browse files Browse the repository at this point in the history
              * Removed INITIAL_PARTITION_COUNT
              * Updated partitioners which dependend on the INITIAL_PARTITION_COUNT with a property which defines the number of partitions.
              * Added comment to partitioners which don't support parallel partitioning
              * Added tests for some apps
              * Made pom.xml for benchmark and contrib copy dependencies to target/deps to make it easier to launch apps
              * Tested with all functional apps
  • Loading branch information
ilooner committed Dec 2, 2014
1 parent 79157f5 commit 33ae3cf
Show file tree
Hide file tree
Showing 42 changed files with 1,003 additions and 263 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -9,3 +9,4 @@ target/
*.iml
npm-debug.log
nb-configuration.xml
hadoop.log
Expand Up @@ -583,11 +583,6 @@ public void populateDAG(DAG dag, Configuration conf)
RabbitMQLogsInputOperator syslogLogInput = dag.addOperator("SyslogLogInput", RabbitMQLogsInputOperator.class);
RabbitMQLogsInputOperator systemLogInput = dag.addOperator("SystemLogInput", RabbitMQLogsInputOperator.class);

// dynamically partition based on number of incoming tuples from the queue
//dag.setAttribute(apacheLogInput, OperatorContext.INITIAL_PARTITION_COUNT, 2);
//dag.setAttribute(apacheLogInput, OperatorContext.PARTITION_TPS_MIN, 1000);
//dag.setAttribute(apacheLogInput, OperatorContext.PARTITION_TPS_MAX, 3000);

/*
* Convert incoming JSON structures to flattened map objects
*/
Expand Down
Expand Up @@ -460,8 +460,12 @@ else if (split[0].toLowerCase().equals("dimensions")) {
}
}

/**
* <b>Note:</b> This partitioner does not support parallel partitioning.<br/><br/>
* {@inheritDoc}
*/
@Override
public Collection<Partition<DimensionOperator>> definePartitions(Collection<Partition<DimensionOperator>> partitions, int incrementalCapacity)
public Collection<Partition<DimensionOperator>> definePartitions(Collection<Partition<DimensionOperator>> partitions, int partitionCnt)
{
ArrayList<Partition<DimensionOperator>> newPartitions = new ArrayList<Partition<DimensionOperator>>();
String[] filters = registry.list(LogstreamUtil.FILTER);
Expand Down
Expand Up @@ -103,8 +103,12 @@ protected StreamCodec<Map<String, DimensionObject<String>>> getStreamCodec()
return new LogstreamTopNStreamCodec();
}

/**
* <b>Note:</b> This partitioner does not support parallel partitioning.<br/><br/>
* {@inheritDoc}
*/
@Override
public Collection<Partition<LogstreamTopN>> definePartitions(Collection<Partition<LogstreamTopN>> partitions, int incrementalCapacity)
public Collection<Partition<LogstreamTopN>> definePartitions(Collection<Partition<LogstreamTopN>> partitions, int partitionCnt)
{
ArrayList<Partition<LogstreamTopN>> newPartitions = new ArrayList<Partition<LogstreamTopN>>();
String[] filters = registry.list(LogstreamUtil.FILTER);
Expand Down
Expand Up @@ -118,6 +118,7 @@ public void setRegistry(LogstreamPropertyRegistry registry)
}

/**
* <b>Note:</b> This partitioner does not support parallel partitioning.<br/><br/>
* Partitions count will be the number of input routing keys.
* Each partition receives tuples from its routing key.
*
Expand All @@ -126,7 +127,7 @@ public void setRegistry(LogstreamPropertyRegistry registry)
* @return
*/
@Override
public Collection<Partition<RabbitMQLogsInputOperator>> definePartitions(Collection<Partition<RabbitMQLogsInputOperator>> partitions, int incrementalCapacity)
public Collection<Partition<RabbitMQLogsInputOperator>> definePartitions(Collection<Partition<RabbitMQLogsInputOperator>> partitions, int partitionCnt)
{
if (routingKeys == null || routingKeys.length == 0) {
return partitions;
Expand Down
38 changes: 38 additions & 0 deletions benchmark/pom.xml
Expand Up @@ -18,6 +18,44 @@
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/deps</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
<execution>
<id>deploy-to-local-directory</id>
<phase>install</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
<outputDirectory>${project.build.directory}/deps</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.lib.algo.PartitionableUniqueCount;
import com.datatorrent.lib.partitioner.StatelessPartitioner;
import com.datatorrent.lib.algo.UniqueCounterValue;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.testbench.RandomEventGenerator;
Expand All @@ -41,7 +42,6 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio
@Override
public void populateDAG(DAG dag, Configuration entries)
{

dag.setAttribute(dag.APPLICATION_NAME, "UniqueValueCountDemo");
dag.setAttribute(dag.DEBUG, true);

Expand All @@ -50,12 +50,11 @@ public void populateDAG(DAG dag, Configuration entries)
RandomEventGenerator randGen = dag.addOperator("randomgen", new RandomEventGenerator());
randGen.setMaxvalue(999999);
randGen.setTuplesBlastIntervalMillis(50);
dag.setAttribute(randGen, Context.OperatorContext.INITIAL_PARTITION_COUNT, 3);
dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<RandomEventGenerator>(3));

/* Initialize with three partition to start with */
// UniqueCount1 uniqCount = dag.addOperator("uniqevalue", new UniqueCount1());
PartitionableUniqueCount<Integer> uniqCount = dag.addOperator("uniqevalue", new PartitionableUniqueCount<Integer>());
dag.setAttribute(uniqCount, Context.OperatorContext.INITIAL_PARTITION_COUNT, 3);
dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<PartitionableUniqueCount<Integer>>(3));
dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true);
uniqCount.setCumulative(false);

Expand All @@ -66,5 +65,4 @@ public void populateDAG(DAG dag, Configuration entries)
dag.addStream("consoutput", uniqCount.count, counter.data);
dag.addStream("final", counter.count, output.input);
}

}
Expand Up @@ -16,21 +16,22 @@
package com.datatorrent.benchmark.kafka;


import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Properties;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.Min;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;

/**
* This operator keep sending constant messages(1kb each) in {@link #threadNum} threads.&nbsp;
Expand All @@ -47,7 +48,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be

private String topic = "benchmark";

private int partitionNum = 5;
@Min(1)
private int partitionCount = 5;

private String brokerList = "localhost:9092";

Expand Down Expand Up @@ -134,15 +136,19 @@ public void partitioned(Map<Integer, Partition<BenchmarkPartitionableKafkaOutput
{
}

/**
* <b>Note:</b> This partitioner does not support parallel partitioning.<br/><br/>
* {@inheritDoc}
*/
@Override
public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, int pNum)
public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, int partitionCnt)
{

ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionNum);
ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount);

for (int i = 0; i < partitionNum; i++) {
for (int i = 0; i < partitionCount; i++) {
BenchmarkPartitionableKafkaOutputOperator bpkoo = new BenchmarkPartitionableKafkaOutputOperator();
bpkoo.setPartitionNum(partitionNum);
bpkoo.setPartitionCount(partitionCount);
bpkoo.setTopic(topic);
bpkoo.setBrokerList(brokerList);
bpkoo.setStickyKey(i);
Expand Down Expand Up @@ -190,14 +196,14 @@ public void setTopic(String topic)
this.topic = topic;
}

public int getPartitionNum()
public int getPartitionCount()
{
return partitionNum;
return partitionCount;
}

public void setPartitionNum(int partitionNum)
public void setPartitionCount(int partitionCount)
{
this.partitionNum = partitionNum;
this.partitionCount = partitionCount;
}

public String getBrokerList()
Expand Down
Expand Up @@ -32,6 +32,7 @@
import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
import com.google.common.collect.Sets;

/**
* The stream app to test the benckmark of kafka
Expand Down Expand Up @@ -65,7 +66,7 @@ public void populateDAG(DAG dag, Configuration conf)
BenchmarkPartitionableKafkaInputOperator bpkio = new BenchmarkPartitionableKafkaInputOperator();


String type = conf.get("kafka.consumertype");
String type = conf.get("kafka.consumertype", "simple");

KafkaConsumer consumer = null;

Expand All @@ -83,15 +84,15 @@ public void populateDAG(DAG dag, Configuration conf)
consumer = new SimpleKafkaConsumer(null, 10000, 100000, "test_kafka_autop_client", new HashSet<Integer>());
}


consumer.setBrokerSet(Sets.newHashSet(conf.get("kafka.brokerlist").split("\\s*,\\s*")));
bpkio.setInitialPartitionCount(1);
bpkio.setTuplesBlast(1024 * 1024);
bpkio.setConsumer(consumer);
bpkio = dag.addOperator("KafkaBenchmarkConsumer", bpkio);

CollectorModule cm = dag.addOperator("DataBlackhole", CollectorModule.class);
dag.addStream("end", bpkio.oport, cm.inputPort).setLocality(Locality.CONTAINER_LOCAL);
dag.setInputPortAttribute(cm.inputPort, PortContext.PARTITION_PARALLEL, true);
dag.setAttribute(bpkio, OperatorContext.INITIAL_PARTITION_COUNT, 1);
dag.setAttribute(bpkio, OperatorContext.COUNTERS_AGGREGATOR, new KafkaConsumer.KafkaMeterStatsAggregator());
// dag.setAttribute(bpkio, OperatorContext.STATS_LISTENER, KafkaMeterStatsListener.class);

Expand Down
Expand Up @@ -19,7 +19,6 @@

import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.annotation.ApplicationAnnotation;

/**
Expand All @@ -37,7 +36,7 @@ public void populateDAG(DAG dag, Configuration conf)
dag.setAttribute(DAG.APPLICATION_NAME, "KafkaOutputBenchmark");
BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator("KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class);
bpkoo.setBrokerList(conf.get("kafka.brokerlist"));
dag.setAttribute(bpkoo, OperatorContext.INITIAL_PARTITION_COUNT, 2);
bpkoo.setPartitionCount(2);
}

}
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.benchmark.kafka;

import com.datatorrent.api.LocalMode;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;

public class KafkaInputBenchmarkTest
{
@Test
public void testBenchmark()
{
Configuration conf = new Configuration();
InputStream inputStream = getClass().getResourceAsStream("/dt-site-kafka.xml");
conf.addResource(inputStream);

LocalMode lma = LocalMode.newInstance();

try {
lma.prepareDAG(new KafkaInputBenchmark(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(30000);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.benchmark.kafka;

import com.datatorrent.api.LocalMode;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;

public class KafkaOutputBenchmarkTest
{
@Test
public void testBenchmark()
{
Configuration conf = new Configuration();
InputStream inputStream = getClass().getResourceAsStream("/dt-site-kafka.xml");
conf.addResource(inputStream);

LocalMode lma = LocalMode.newInstance();

try {
lma.prepareDAG(new KafkaInputBenchmark(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(30000);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
21 changes: 21 additions & 0 deletions benchmark/src/test/resources/dt-site-kafka.xml
@@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
<name>dt.operator.KafkaBenchmarkConsumer.consumer.topic</name>
<value>kafkaInputConsumerBenchmark</value>
</property>

<property>
<name>dt.operator.KafkaBenchmarkProducer.consumer.topic</name>
<value>kafkaInputConsumerBenchmark</value>
</property>

<property>
<name>kafka.brokerlist</name>
<value>localhost:9092</value>
</property>

</configuration>

0 comments on commit 33ae3cf

Please sign in to comment.