Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>129</maxAllowedViolations>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,110 +17,110 @@

package org.apache.storm;

import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.st.topology.TestableTopology;
import org.apache.storm.st.utils.TimeUtil;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.st.topology.TestableTopology;
import org.apache.storm.st.utils.TimeUtil;
import org.apache.storm.topology.base.BaseRichSpout;

/**
* This is a basic example of a Storm topology.
*/
public class ExclamationTopology {

public static final String WORD = "word";
public static final String EXCLAIM_1 = "exclaim1";
public static final String EXCLAIM_2 = "exclaim2";
public static final int SPOUT_EXECUTORS = 10;
public static final int EXCLAIM_2_EXECUTORS = 2;
public static final String WORD = "word";
public static final String EXCLAIM_1 = "exclaim1";
public static final String EXCLAIM_2 = "exclaim2";
public static final int SPOUT_EXECUTORS = 10;
public static final int EXCLAIM_2_EXECUTORS = 2;

public static class ExclamationBolt extends BaseRichBolt {
public static class ExclamationBolt extends BaseRichBolt {

OutputCollector _collector;
OutputCollector collector;

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void execute(Tuple tuple) {
collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
}

public static class FixedOrderWordSpout extends BaseRichSpout {

public static final List<String> WORDS = Collections.unmodifiableList(Arrays.asList("nathan", "mike", "jackson", "golda", "bertels"));

private SpoutOutputCollector collector;
private int currentIndex = 0;
private int numEmitted = 0;
public static class FixedOrderWordSpout extends BaseRichSpout {

public static final List<String> WORDS = Collections.unmodifiableList(Arrays.asList("nathan",
"mike",
"jackson",
"golda",
"bertels"));

private SpoutOutputCollector collector;
private int currentIndex = 0;
private int numEmitted = 0;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

@Override
public void nextTuple() {
if (numEmitted >= TestableTopology.MAX_SPOUT_EMITS) {
//Stop emitting at a certain point, because log rolling breaks the tests.
return;
}
//Sleep a bit to avoid hogging the CPU.
TimeUtil.sleepMilliSec(1);
collector.emit(new Values(WORDS.get((currentIndex++) % WORDS.size())));
++numEmitted;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setDebug(true);
String topoName = "test";
if (args.length > 0) {
topoName = args[0];
}

conf.setNumWorkers(3);
StormTopology topology = getStormTopology();
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topology);
}

@Override
public void nextTuple() {
if (numEmitted >= TestableTopology.MAX_SPOUT_EMITS) {
//Stop emitting at a certain point, because log rolling breaks the tests.
return;
}
//Sleep a bit to avoid hogging the CPU.
TimeUtil.sleepMilliSec(1);
collector.emit(new Values(WORDS.get((currentIndex++) % WORDS.size())));
++numEmitted;
public static StormTopology getStormTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD, new FixedOrderWordSpout(), SPOUT_EXECUTORS);
builder.setBolt(EXCLAIM_1, new ExclamationTopology.ExclamationBolt(), 3).shuffleGrouping(WORD);
builder.setBolt(EXCLAIM_2, new ExclamationTopology.ExclamationBolt(), EXCLAIM_2_EXECUTORS).shuffleGrouping(EXCLAIM_1);
return builder.createTopology();
}

}

public static void main(String[] args) throws Exception {
StormTopology topology = getStormTopology();

Config conf = new Config();
conf.setDebug(true);
String topoName = "test";
if (args.length > 0) {
topoName = args[0];
}

conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(topoName, conf, topology);
}

public static StormTopology getStormTopology() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD, new FixedOrderWordSpout(), SPOUT_EXECUTORS);
builder.setBolt(EXCLAIM_1, new ExclamationTopology.ExclamationBolt(), 3).shuffleGrouping(WORD);
builder.setBolt(EXCLAIM_2, new ExclamationTopology.ExclamationBolt(), EXCLAIM_2_EXECUTORS).shuffleGrouping(EXCLAIM_1);
return builder.createTopology();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.storm.debug;

import java.net.URL;
import java.net.URLClassLoader;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;
import java.net.URLClassLoader;

public class DebugHelper {
private static final Logger LOG = LoggerFactory.getLogger(DebugHelper.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ public interface TestableTopology {
int TIMEDATA_SLEEP_BETWEEN_EMITS_MS = 20;
//Some tests rely on reading the worker log. If there are too many emits and too much is logged, the log might roll, breaking the test.
//Ensure the time based windowing tests can emit for 5 minutes
long MAX_SPOUT_EMITS = TimeUnit.MINUTES.toMillis(5)/TIMEDATA_SLEEP_BETWEEN_EMITS_MS;
long MAX_SPOUT_EMITS = TimeUnit.MINUTES.toMillis(5) / TIMEDATA_SLEEP_BETWEEN_EMITS_MS;

StormTopology newTopology();

String getBoltName();

int getBoltExecutors();

String getSpoutName();

int getSpoutExecutors();
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputC
@Override
public void nextTuple() {
if (currentNum >= TestableTopology.MAX_SPOUT_EMITS) {
//Stop emitting at a certain point, because log rolling breaks the tests.
return;
//Stop emitting at a certain point, because log rolling breaks the tests.
return;
}
//Sleep a bit to avoid hogging the CPU.
TimeUtil.sleepMilliSec(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@
package org.apache.storm.st.topology.window;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.storm.generated.StormTopology;
import org.apache.storm.st.topology.TestableTopology;
import org.apache.storm.st.topology.window.data.TimeData;
import org.apache.storm.st.utils.StringDecorator;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Computes sliding window sum
* Computes sliding window sum.
*/
public class SlidingTimeCorrectness implements TestableTopology {
private static final Logger LOG = LoggerFactory.getLogger(SlidingTimeCorrectness.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@
package org.apache.storm.st.topology.window;

import com.google.common.collect.Lists;

import java.util.List;

import org.apache.storm.generated.StormTopology;
import org.apache.storm.st.topology.TestableTopology;
import org.apache.storm.st.utils.StringDecorator;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.st.topology.TestableTopology;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.st.utils.StringDecorator;

import java.util.List;

/**
* Computes sliding window sum
* Computes sliding window sum.
*/
public class SlidingWindowCorrectness implements TestableTopology {
private static final Logger LOG = LoggerFactory.getLogger(SlidingWindowCorrectness.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,34 @@
import org.slf4j.LoggerFactory;

public class TimeDataIncrementingSpout extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(TimeDataIncrementingSpout.class);
private SpoutOutputCollector collector;
private int currentNum;
private String componentId;
private static final Logger LOG = LoggerFactory.getLogger(TimeDataIncrementingSpout.class);
private SpoutOutputCollector collector;
private int currentNum;
private String componentId;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(TimeData.getFields());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(TimeData.getFields());
}

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
componentId = context.getThisComponentId();
this.collector = collector;
}
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
componentId = context.getThisComponentId();
this.collector = collector;
}

@Override
public void nextTuple() {
if (currentNum >= TestableTopology.MAX_SPOUT_EMITS) {
//Stop emitting at a certain point, because log rolling breaks the tests.
return;
}
//Sleep a bit between emits to ensure that we don't reach the cap too quickly, since this spout is used to test time based windows
TimeUtil.sleepMilliSec(TestableTopology.TIMEDATA_SLEEP_BETWEEN_EMITS_MS);
currentNum++;
TimeData data = TimeData.newData(currentNum);
final Values tuple = data.getValues();
collector.emit(tuple);
LOG.info(StringDecorator.decorate(componentId, data.toString()));
@Override
public void nextTuple() {
if (currentNum >= TestableTopology.MAX_SPOUT_EMITS) {
//Stop emitting at a certain point, because log rolling breaks the tests.
return;
}
//Sleep a bit between emits to ensure that we don't reach the cap too quickly, since this spout is used to test time based windows
TimeUtil.sleepMilliSec(TestableTopology.TIMEDATA_SLEEP_BETWEEN_EMITS_MS);
currentNum++;
TimeData data = TimeData.newData(currentNum);
final Values tuple = data.getValues();
collector.emit(tuple);
LOG.info(StringDecorator.decorate(componentId, data.toString()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@
package org.apache.storm.st.topology.window;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.storm.generated.StormTopology;
import org.apache.storm.st.topology.TestableTopology;
import org.apache.storm.st.topology.window.data.TimeData;
import org.apache.storm.st.utils.StringDecorator;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseWindowedBolt;
import org.apache.storm.st.utils.StringDecorator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Computes sliding window sum
* Computes sliding window sum.
*/
public class TumblingTimeCorrectness implements TestableTopology {
private static final Logger LOG = LoggerFactory.getLogger(TumblingTimeCorrectness.class);
Expand Down
Loading