Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/storm-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@
<artifactId>maven-checkstyle-plugin</artifactId>
<!--Note - the version would be inherited-->
<configuration>
<maxAllowedViolations>263</maxAllowedViolations>
<maxAllowedViolations>0</maxAllowedViolations>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*
* @see <a href="http://storm.apache.org/documentation/Distributed-RPC.html">Distributed RPC</a>
*/
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class BasicDRPCTopology {
public static void main(String[] args) throws Exception {
Config conf = new Config();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class BlobStoreAPIWordCountTopology {
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class);
private static ClientBlobStore store; // Client API to invoke blob store API functionality
Expand All @@ -70,10 +71,10 @@ public static void prepare() {
// storm blobstore create --file blacklist.txt --acl o::rwa key
private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file)
throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException {
String stringBlobACL = "o::rwa";
AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL);
String stringBlobAcl = "o::rwa";
AccessControl blobAcl = BlobStoreAclHandler.parseAccessControl(stringBlobAcl);
List<AccessControl> acls = new LinkedList<AccessControl>();
acls.add(blobACL); // more ACLs can be added here
acls.add(blobAcl); // more ACLs can be added here
SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls);
AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey, settableBlobMeta);
blobStream.write(readFile(file).toString().getBytes());
Expand Down Expand Up @@ -214,17 +215,17 @@ public void buildAndLaunchWordCountTopology(String[] args) {

// Spout implementation
public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
SpoutOutputCollector collector;

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.collector = collector;
}

@Override
public void nextTuple() {
Utils.sleep(100);
_collector.emit(new Values(getRandomSentence()));
collector.emit(new Values(getRandomSentence()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ protected int run(String[] args) {
}

public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
OutputCollector collector;

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
collector.ack(tuple);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* java. This can show how fast the word count can run.
*/
public class FastWordCountTopology {

public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
ClusterSummary summary = client.getClusterInfo();
String id = null;
Expand Down Expand Up @@ -80,8 +81,10 @@ public static void printMetrics(Nimbus.Iface client, String name) throws Excepti
}
}
double avgLatency = weightedAvgTotal / acked;
System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " +
(((double) acked) / uptime + " failed: " + failed));
System.out.println("uptime: " + uptime
+ " acked: " + acked
+ " avgLatency: " + avgLatency
+ " acked/sec: " + (((double) acked) / uptime + " failed: " + failed));
}

public static void kill(Nimbus.Iface client, String name) throws Exception {
Expand Down Expand Up @@ -130,19 +133,19 @@ public static class FastRandomSentenceSpout extends BaseRichSpout {
"this is a test of the emergency broadcast system this is only a test",
"peter piper picked a peck of pickeled peppers"
};
SpoutOutputCollector _collector;
Random _rand;
SpoutOutputCollector collector;
Random rand;

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = ThreadLocalRandom.current();
this.collector = collector;
rand = ThreadLocalRandom.current();
}

@Override
public void nextTuple() {
String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
_collector.emit(new Values(sentence), sentence);
String sentence = CHOICES[rand.nextInt(CHOICES.length)];
collector.emit(new Values(sentence), sentence);
}

@Override
Expand All @@ -152,7 +155,7 @@ public void ack(Object id) {

@Override
public void fail(Object id) {
_collector.emit(new Values(id), id);
collector.emit(new Values(id), id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.storm.utils.Utils;

public class InOrderDeliveryTest {

public static void printMetrics(Nimbus.Iface client, String name) throws Exception {
ClusterSummary summary = client.getClusterInfo();
String id = null;
Expand Down Expand Up @@ -75,8 +76,10 @@ public static void printMetrics(Nimbus.Iface client, String name) throws Excepti
}
}
double avgLatency = weightedAvgTotal / acked;
System.out.println("uptime: " + uptime + " acked: " + acked + " avgLatency: " + avgLatency + " acked/sec: " +
(((double) acked) / uptime + " failed: " + failed));
System.out.println("uptime: " + uptime
+ " acked: " + acked
+ " avgLatency: " + avgLatency
+ " acked/sec: " + (((double) acked) / uptime + " failed: " + failed));
}

public static void kill(Nimbus.Iface client, String name) throws Exception {
Expand Down Expand Up @@ -116,21 +119,21 @@ public static void main(String[] args) throws Exception {
}

public static class InOrderSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
int _base = 0;
int _i = 0;
SpoutOutputCollector collector;
int base = 0;
int count = 0;

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_base = context.getThisTaskIndex();
this.collector = collector;
base = context.getThisTaskIndex();
}

@Override
public void nextTuple() {
Values v = new Values(_base, _i);
_collector.emit(v, "ACK");
_i++;
Values v = new Values(base, count);
collector.emit(v, "ACK");
count++;
}

@Override
Expand All @@ -157,7 +160,9 @@ public void execute(Tuple tuple, BasicOutputCollector collector) {
Integer c1 = tuple.getInteger(0);
Integer c2 = tuple.getInteger(1);
Integer exp = expected.get(c1);
if (exp == null) exp = 0;
if (exp == null) {
exp = 0;
}
if (c2.intValue() != exp.intValue()) {
System.out.println(c1 + " " + c2 + " != " + exp);
throw new FailedException(c1 + " " + c2 + " != " + exp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,3 @@ protected int run(String[] args) throws Exception {
return submit("lambda-demo", conf, builder);
}
}

class Prefix implements Serializable {
private String str;

public Prefix(String str) {
this.str = str;
}

@Override
public String toString() {
return this.str;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.DRPCClient;

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class ManualDRPC {

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,37 +49,37 @@ public static void main(String[] args) throws Exception {
}

public static class ExclamationLoggingBolt extends BaseRichBolt {
OutputCollector _collector;
Logger _rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
OutputCollector collector;
Logger rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
// ensure the loggers are configured in the worker.xml before
// trying to use them here
Logger _logger = LoggerFactory.getLogger("com.myapp");
Logger _subLogger = LoggerFactory.getLogger("com.myapp.sub");
Logger logger = LoggerFactory.getLogger("com.myapp");
Logger subLogger = LoggerFactory.getLogger("com.myapp.sub");

@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
_rootLogger.debug("root: This is a DEBUG message");
_rootLogger.info("root: This is an INFO message");
_rootLogger.warn("root: This is a WARN message");
_rootLogger.error("root: This is an ERROR message");
rootLogger.debug("root: This is a DEBUG message");
rootLogger.info("root: This is an INFO message");
rootLogger.warn("root: This is a WARN message");
rootLogger.error("root: This is an ERROR message");

_logger.debug("myapp: This is a DEBUG message");
_logger.info("myapp: This is an INFO message");
_logger.warn("myapp: This is a WARN message");
_logger.error("myapp: This is an ERROR message");
logger.debug("myapp: This is a DEBUG message");
logger.info("myapp: This is an INFO message");
logger.warn("myapp: This is a WARN message");
logger.error("myapp: This is an ERROR message");

_subLogger.debug("myapp.sub: This is a DEBUG message");
_subLogger.info("myapp.sub: This is an INFO message");
_subLogger.warn("myapp.sub: This is a WARN message");
_subLogger.error("myapp.sub: This is an ERROR message");
subLogger.debug("myapp.sub: This is a DEBUG message");
subLogger.info("myapp.sub: This is an INFO message");
subLogger.warn("myapp.sub: This is a WARN message");
subLogger.error("myapp.sub: This is an ERROR message");

_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
collector.ack(tuple);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.storm.starter;

import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,8 +40,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;

/**
* An example that demonstrates the usage of {@link org.apache.storm.topology.IStatefulWindowedBolt} with window persistence.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.starter;

import java.io.Serializable;

class Prefix implements Serializable {
private String str;

public Prefix(String str) {
this.str = str;
}

@Override
public String toString() {
return this.str;
}
}
Loading