Skip to content
Zubair Saiyed edited this page Sep 7, 2016 · 2 revisions

Storm Dev

Requirements

  • At least 4 EC2 instances (see AWS Intro)
  • Passwordless SSH on each node as described in the Hadoop Intro Dev
  • Zookeeper

Dependencies

Storm requires Java. If Java is not installed on all the machines, install with the following command

all-nodes:~$ sudo apt-get update
all-nodes:~$ sudo apt-get install openjdk-7-jdk

You can check if the installation was successful by typing the following

all-nodes:~$ which java
/usr/bin/java

all-nodes:~$ java -version
java version "1.7.0.0_79"
OpenJDK Runtime Environment (IcedTea 2.5.5) (7u79-2.5.5-0ubuntu0.14.04.2)
OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode)

Install Storm on all nodes

This installation process must be executed on the “master” node (which Storm calls Nimbus) AND all the workers.

We will grab the Storm 0.9.5 version and save it to a Downloads folder. Next we will install it into our /usr/local directory and rename the folder to simply ‘storm’

node:~$ wget http://mirrors.gigenet.com/apache/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz -P ~/Downloads
node:~$ sudo tar zxvf ~/Downloads/apache-storm*.gz -C /usr/local
node:~$ sudo mv /usr/local/apache-storm* /usr/local/storm

Add the following environment variables to the ~/.profile

export STORM_HOME=/usr/local/storm
export PATH=$PATH:$STORM_HOME/bin

Be sure to source the profile

node:~$. ~/.profile

Configure Storm on all Nodes

Create a local directory on each node for storing state and give it the proper permission

node:~$ sudo mkdir /usr/local/storm/local_state
node:~$ sudo chown ubuntu /usr/local/storm/local_state

Edit the /usr/local/storm/conf/storm.yaml file on all the nodes (don’t forget to remove the #’s on the left of the relevant lines)

node:~$ sudo nano /usr/local/storm/conf/storm.yaml

Find, uncomment, and change the bolded text that matches below

 ########### These MUST be filled in for a storm configuration
 # place the public dns for all the nodes with zookeeper installed
storm.zookeeper.servers:
     - "<zookeeper-public-dns-1>"
     - "<zookeeper-public-dns-2>"
     - "<zookeeper-public-dns-3>"
     - "<zookeeper-public-dns-4>"
#
nimbus.host: "<master-public-dns>"

Add the following lines below the previous lines

storm.local.dir: "/usr/local/storm/local_state"
#
ui.port: 8090
#
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
#

This last set of options tells the supervisor the ports for worker tasks. By default, there are 4 worker tasks running on each worker node, using ports 6700-6703 to communicate. Adding more ports (e.g. 6704) would mean there are more worker tasks launched on that worker machine.

Note: The port of the Storm UI (e.g. 8090) is chosen to avoid collisions with other UIs, like Spark.

Start Storm on the Cluster

Prior to starting Storm, make sure that Zookeeper is running correctly as described in the Zookeeper Dev, which you can check with:

all-nodes:~$ echo srvr | nc localhost 2181

If you don’t see an output, start Zookeeper with:

all-nodes:~$ sudo /usr/local/zookeeper/bin/zkServer.sh start

In the more advanced Dev, we’ll run Storm with supervision, but it’s good to start it manually to ensure that it’s working properly.

Starting the Storm Daemons

Storm uses three types of daemons (background processes) to run networks of computations (known as topologies):

  • Nimbus that coordinates work from the “master” node
  • Supervisors that process the data on the “worker” nodes
  • The UI which runs the WebUI on the “master” node

Start Nimbus on the master node as a background process (or use tmux as shown in the Advanced Kafka Dev)

master_node:~$ sudo $STORM_HOME/bin/storm nimbus &

Start the Storm Supervisor on each worker node as a background process (or use tmux)

worker_node:~$ sudo $STORM_HOME/bin/storm supervisor &

Start the WebUI on the master node as a background process (or use tmux)

master_node:~$ sudo $STORM_HOME/bin/storm ui &

You can go to namenode-public-dns:8090 in your browser to check if it’s working

When you’re done, you can bring these processes to the foreground with the fg command, and exit them with CTRL+c

Submitting a Storm Topology

The typical way to develop Storm programs (and many other distributed technologies) is to first build a topology locally on a single machine, then deploy that topology to the cluster. This can be done with a variety of tools, but the standard way is to use the Java tool Maven (even if your topology is written in another language like Python). There are also tools, such as Parse.ly’s Streamparser and Yelp’s Pyleus, that build and deploy topologies entirely in Python.

Submitting Topologies with Maven

First download the latest version of Maven to a single machine - this could be your local machine or one of the nodes (for this Dev, the directories will correspond to an Ubuntu system). You can get the latest download link from here, but as of writing this it can be download with:

single-machine:~$ wget http://www.gtlib.gatech.edu/pub/apache/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz -P ~/Downloads
single-machine:~$ sudo tar zxvf ~/Downloads/apache-maven*.gz -C /usr/local
single-machine:~$ sudo mv /usr/local/apache-maven* /usr/local/maven

Add the bin of this directory to your PATH by opening your profile file (e.g. ~/.profile on Ubuntu, ~/.bash_profile on OSX) and adding the line

export MAVEN_HOME=/usr/local/maven
export PATH=$PATH:$MAVEN_HOME/bin

Be sure to source the appropriate profile file

single-machine:~$ . ~/.profile

Check your installation of Maven with:

single-machine:~$ mvn -v

You should see something like the output below (but specific to your system)

Apache Maven 3.2.5 (12a6b3acb947671f09b81f49094c53f426d8cea1; 2014-12-14T12:29:23-05:00)
Maven home: /usr/local/Cellar/maven/3.2.5/libexec
Java version: 1.7.0_71, vendor: Oracle Corporation
Java home: /Library/Java/JavaVirtualMachines/jdk1.7.0_71.jdk/Contents/Home/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "10.10.5", arch: "x86_64", family: "mac"

The Storm source code comes with a nice “storm-starter” to run some simple topology examples. Go to the Storm release page and download the source code for the version that matches what’s installed on your cluster. Warning: The latest release on Github is often a newer beta version of Storm, make sure you get the matching, stable version (e.g. 0.9.5)!

single-machine:~$ wget https://github.com/apache/storm/archive/v0.9.5.tar.gz -P ~/Downloads
single-machine:~$ tar zxvf ~/Downloads/v0.9.5.tar.gz -C ~/

Maven is pretty simple to use - you define all your programs dependencies using a Project Object Model XML file (pom.xml), and maven will automatically download these dependencies when you build your project. Now build Storm from source on your local machine by running Maven within the Storm top-level directory (this may take a few minutes depending on your internet speed, but you should see a BUILD SUCCESS at the end).

single-machine:~$ cd storm-0.9.5
single-machine:~$ mvn clean install -DskipTests=true

This command cleans any conflicting dependency JARS from the ~/.m2 Maven directory, then installs Storm. As described here, the -D option sets a system property so Storm is built without running tests. You can see the specific dependencies that were built by looking at the pom.xml file.

Now compile and execute a specific topology, the WordCountTopology, using Maven in the storm-starter directory. (note on Windows you’ll need quotes i.e. “-Dstorm...WordCountTopology”)

single-machine:~$ cd examples/storm-starter/
single-machine:~$ mvn compile exec:java -Dstorm.topology=storm.starter.WordCountTopology

This runs WordCount locally with randomly generated words for 10 seconds before shutting down the topology.

To deploy this on your cluster, package up the storm-starter and all the dependencies into an “uberjar” (also known as a “fat jar”).

single-machine:~$ mvn package

This places the compiled jar into the target directory - use scp to transfer this to your nimbus (master) node.

single-machine:~$ scp -i ~/.ssh/ ~/storm-*/examples/storm-starter/target/storm-starter-*-jar-with-dependencies.jar ubuntu@:~/

Your can ssh into your master node and submit an example topology named test-word-count-topology in distributed mode (which Storm calls “remote”). Make sure all the Storm daemons (nimbus, supervisor, and ui) are still running from above, or restart them if necessary.

master:~$ sudo $STORM_HOME/bin/storm jar ~/storm-starter-*.jar storm.starter.WordCountTopology test-word-count-topology remote

If you now visit the Storm WebUI, you’ll see this Topology and can click on it to see various metrics, as well as deactivate, kill, or rebalance it. You can also see the Topology Visualization, with the rough latency of each bolt. See the documentation or Ch. 5.5 (pg. 116) of the Applied Storm book for more details on the WebUI.

You can see that Word Count is done in under 2ms!

Understanding the WordCount Topology

You can see the source code for this Topology here. There are many details, which can be learned by looking into the core concepts and/or reading the Storm Applied book in the Dropbox, but the most salient code is the TopologyBuilder part, where the RandomSentenceSpout is connected to the SplitSentence bolt, and then finally connected to the WordCount bolt.

TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

The random sentences are sent randomly to different Supervisors with the shuffle grouping, since splitting sentences can be done by any node. However, the actual word count is done using an in-memory hash-map so it’s important that a given word always goes to the same node (e.g. the word “rain” always goes to the same machine so it can be efficiently counted). This is done using a fields grouping, where the field that determines where the tuple is sent is “word”.

The code for the WordCount bolt, which extends the built-in BaseBasicBolt is pretty simple

public static class WordCount extends BaseBasicBolt {
  Map counts = new HashMap();

  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    String word = tuple.getString(0);
    Integer count = counts.get(word);
    if (count == null)
      count = 0;
    count++;
    counts.put(word, count);
    collector.emit(new Values(word, count));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word", "count"));
  }
}

Interestingly enough, the SplitSentence bolt is actually written in Python with the multi-lang feature of Storm. It’s called from the Java with

public static class SplitSentence extends ShellBolt implements IRichBolt {
  public SplitSentence() {
    super("python", "splitsentence.py");
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

  @Override
  public Map getComponentConfiguration() {
    return null;
  }
}

and the logic for splitting on whitespace is contained in splitsentence.py:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

Using this technique, any spout or bolt can be written in almost any language!

For this topology, there is no output bolt so you can’t see the results, but there are pre-written bolts that interact with most major databases. Also, there are pre-written spouts that interact with tools like RabbitMQ and Kafka.

Killing the Topology

You can kill this topology from the UI or with:

master:~$ sudo $STORM_HOME/bin/storm kill test-word-count-topology
Clone this wiki locally