Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

A library to process sensor data events real time using storm for sto…

…rm-contrib
  • Loading branch information...
commit e637168c859da7fba2e975836d86a6251635e552 1 parent b7b5cf9
@surajwaghulde surajwaghulde authored
View
48 examples/movingAverageWithSpikeDetection/README
@@ -0,0 +1,48 @@
+Technical Details:
+
+This is a fun project I created to leverage the newest real time computing platforms to process data generated from sensor devices. This project is about creating a library to continuously listen to and analyze stream of data generated by various sensor devices. This library is developed using storm-distributed stream computing platform (http://www.slideshare.net/nathanmarz/storm-distributed-and-faulttolerant-realtime-computation). The basic architecture of the platform is such that it continuously processes the streaming data generating another stream of result and you can continue creating of such pipeline of stream processing endlessly.
+
+I have written a library to compute moving average and spike detection for continuous stream of data that can be applied to finance or any other streams. The purpose of the library is I want to create a library of bolts that can do certain operations and we can reuse these bolts than starting over from scratch.
+
+Let us start with deploying my project on single node. Deploying it on single node is very simple. I benchmarked this project to process 96,000 sensor values per second on cluster of 3 machines. It detects spike within few milliseconds.
+
+Hardware Requirements: (Not necessary as you can generate input like sensor data input using inputStreamSpout)
+
+1. arduino kit with circuit design of photo resistor.
+2. Interface this kit with your laptop using serial port and run the light program I submitted to generate light intensity events.
+3. It will list the serial port being used on the machine.
+
+Software Requirements:
+
+1. Download storm version 0.6.2 from https://github.com/nathanmarz/storm/downloads
+2. Install maven. I am considering Java 1.6 is installed.
+
+
+Running my project:
+1. Download storm-starter project from https://github.com/nathanmarz/storm-starter/downloads, unzip the project, rename m2-pom.xml to pom.xml
+2. Copy my project movingAverageWithSpikeDetection.tar.gz to storm-starter/src/jvm directory. Unzip my project submitted movingAverageWithSpikeDetection.tar.gz using “tar –zxf movingAverageWithSpikeDetection.tar.gz”
+3. Build my project using maven with command in storm-starter folder – “mvn clean install” (it will install all the libraries for serialization and other stuff for distributed system.)
+4. Run “mvn eclipse:eclipse” to create eclipse .project file for simplicity.
+5. Open eclipse and import the movingAverageWithSpikeDetection project.
+6. Open LightEventSpout.java and change the PORT_NAMES[] entry according to the serial port on your machine which arduino kit is using. Baud rate is defined to be 9600 in LightEventSpout.java so if you change it for experiment make sure both the baud rates are matching, one from the device and one from the program.
+7. Upload the light program on the arduino kit.
+8. Run SpikeDetectionTopology.java, if you are getting an exception PortInUse then create a folder /var/lock/ and give 775 permissions to it, it is a problem with arduino to Java interface. (This will automatically invoke zookeeper distributed cluster management and run the program over it with one node)
+
+
+Creating a cluster of machines and running my project on distributed cluster:
+
+1. Download zookeeper from - http://download.filehat.com/apache/zookeeper/zookeeper-3.3.3/
+2. Unzip zookeeper and change zoo_example.cfg file from the config folder to zoo.cfg
+3. Go to bin folder in zookeeper and start zookeeper instance using this command - “zkServer.sh start”
+4. Unzip storm-0.6.1 folder
+5. Copy storm.yaml.example to storm.yaml and add all the machine names (you can use IP address) to storm.zookeeper.servers that indicates zookeeper is running on every machine for co-ordination in distributed system
+6. Add master machine-name as nimbus.host which is interfaced with the arduino
+(Remember all these steps needs to be done on every machine)
+
+
+Now your cluster is set. Do the following to run the above project on cluster of machines:
+
+1. On every machine, go to storm-0.6.2 folder. Go to bin directory inside it and run “./storm supervisor”
+2. On master machine, run “./storm nimbus” that will start the master process called nimbus that distributes the runnable programs over the cluster dynamically.
+3. Now run our project from the master node using the command – “./storm jar movingAverageSpikeDetection.jar movingAverage.SpikeDetectionTopology”. if you are getting an exception PortInUse then create a folder /var/lock/ and give 775 permissions to it, it is a problem with arduino to Java interface. (This will automatically invoke zookeeper distributed cluster management and run the program over it with one node)
+
View
38 examples/movingAverageWithSpikeDetection/arduino/light
@@ -0,0 +1,38 @@
+/*
+* A simple programme that will change the intensity of
+* an LED based * on the amount of light incident on
+* the photo resistor.
+*
+*/
+//PhotoResistor Pin
+int lightPin = 0; //the analog pin the photoresistor is
+ //connected to
+ //the photoresistor is not calibrated to any units so
+ //this is simply a raw sensor value (relative light)
+//LED Pin
+int ledPin = 9; //the pin the LED is connected to
+ //we are controlling brightness so
+ //we use one of the PWM (pulse width
+ // modulation pins)
+void setup()
+{
+ Serial.begin(9600);
+ pinMode(ledPin, OUTPUT); //sets the led pin to output
+}
+/*
+* loop() – this function will start after setup
+* finishes and then repeat
+*/
+void loop()
+{
+int lightLevel = analogRead(lightPin); //Read the
+ // lightlevel
+lightLevel = map(lightLevel, 0, 900, 1000, 9999);
+ //adjust the value 0 to 900 to
+ //span 0 to 255
+lightLevel = constrain(lightLevel, 1000, 9999);//make sure the
+ //value is betwween
+ //0 and 255
+analogWrite(ledPin, lightLevel); //write the value
+Serial.println(lightLevel);
+}
View
64 examples/movingAverageWithSpikeDetection/jvm/movingAverageWithSpikeDetection/InputStreamSpout.java
@@ -0,0 +1,64 @@
+package movingAverage;
+
+import java.util.Map;
+import java.util.Random;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class InputStreamSpout implements IRichSpout {
+ private static final long serialVersionUID = 1L;
+
+ private SpoutOutputCollector collector;
+ private int count = 1000000;
+ private String deviceID = "Arduino";
+
+ private final Random random = new Random();
+
+ @Override
+ public boolean isDistributed() {
+ return true;
+ }
+
+ @Override
+ public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context,
+ final SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ if (count-- > 0) {
+ collector.emit(new Values(deviceID, (random.nextDouble() * 10) + 50));
+ } else if (count-- == -1) {
+ collector.emit(new Values(deviceID, -1.0));
+ }
+// try {
+// Thread.sleep(20);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void ack(final Object id) {
+ }
+
+ @Override
+ public void fail(final Object id) {
+ }
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("string","double"));
+ }
+
+}
View
174 examples/movingAverageWithSpikeDetection/jvm/movingAverageWithSpikeDetection/LightEventSpout.java
@@ -0,0 +1,174 @@
+package movingAverage;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import gnu.io.CommPortIdentifier;
+import gnu.io.SerialPort;
+import gnu.io.SerialPortEvent;
+import gnu.io.SerialPortEventListener;
+import java.util.Enumeration;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class LightEventSpout implements IRichSpout, SerialPortEventListener {
+
+ private static final long serialVersionUID = 1L;
+
+ SerialPort serialPort;
+ /** The port we're normally going to use. */
+ private static final String PORT_NAMES[] = {
+ "/dev/tty.usbmodemfa131", // Mac OS X
+ };
+ /** Buffered input stream from the port */
+ private InputStream input;
+ /** The output stream to the port */
+ private OutputStream output;
+ /** Milliseconds to block while waiting for port open */
+ private static final int TIME_OUT = 2000;
+ /** Default bits per second for COM port. */
+ private static final int DATA_RATE = 9600;
+
+ private SpoutOutputCollector collector;
+ private String deviceID = "Arduino";
+ private BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>();
+
+ @Override
+ public boolean isDistributed() {
+ return true;
+ }
+
+ @Override
+ public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context,
+ final SpoutOutputCollector collector) {
+ this.collector = collector;
+
+ CommPortIdentifier portId = null;
+ Enumeration portEnum = CommPortIdentifier.getPortIdentifiers();
+
+ // iterate through, looking for the port
+ while (portEnum.hasMoreElements()) {
+ CommPortIdentifier currPortId = (CommPortIdentifier) portEnum.nextElement();
+ for (String portName : PORT_NAMES) {
+ if (currPortId.getName().equals(portName)) {
+ portId = currPortId;
+ System.out.println(portName);
+ System.out.println(portId);
+ break;
+ }
+ }
+ }
+
+ if (portId == null) {
+ System.out.println("Could not find COM port.");
+ return;
+ }
+
+ try {
+ // open serial port, and use class name for the appName.
+ System.out.println("serial port : " + serialPort);
+ serialPort = (SerialPort) portId.open(this.getClass().getName(),
+ TIME_OUT);
+ System.out.println("serial port : " + serialPort);
+
+ // set port parameters
+ serialPort.setSerialPortParams(DATA_RATE,
+ SerialPort.DATABITS_8,
+ SerialPort.STOPBITS_1,
+ SerialPort.PARITY_NONE);
+
+ // open the streams
+ input = serialPort.getInputStream();
+ output = serialPort.getOutputStream();
+
+ // add event listeners
+ serialPort.addEventListener(this);
+ serialPort.notifyOnDataAvailable(true);
+ } catch (Exception e) {
+ System.err.println(e.toString());
+ }
+ }
+
+ /**
+ * This should be called when you stop using the port.
+ * This will prevent port locking on platforms like Linux.
+ */
+ public synchronized void closeSerial() {
+ if (serialPort != null) {
+ serialPort.removeEventListener();
+ serialPort.close();
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ try {
+ collector.emit(new Values(deviceID, blockingQueue.take()));
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void ack(final Object id) {
+ }
+
+ @Override
+ public void fail(final Object id) {
+ }
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("string","double"));
+ }
+
+ @Override
+ public void serialEvent(SerialPortEvent oEvent) {
+ if (oEvent.getEventType() == SerialPortEvent.DATA_AVAILABLE) {
+ try {
+ int available = input.available();
+ byte chunk[] = new byte[available];
+ input.read(chunk, 0, available);
+ int j = 0;
+// System.out.println("chunk length " + chunk.length);
+ while(j < chunk.length && chunk[j] != '\n') {
+ j++;
+ }
+ j++;
+ byte number[] = new byte[4];
+ int count = 0;
+ while(j < chunk.length-1) {
+ while (chunk[j] != 10 && chunk[j] != 13 && j < chunk.length-1) {
+ number[count++] = chunk[j];
+ j++;
+ }
+ j++;
+ System.out.println(new String(number));
+ count = 0;
+ blockingQueue.add(Integer.parseInt(new String(number)));
+ }
+
+ } catch (Exception e) {
+ System.out.println("Error");
+ System.err.println(e.toString());
+ }
+ }
+ // Ignore all the other eventTypes, but you should consider the other ones.
+
+ }
+
+}
View
74 examples/movingAverageWithSpikeDetection/jvm/movingAverageWithSpikeDetection/MovingAverageBolt.java
@@ -0,0 +1,74 @@
+package movingAverage;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class MovingAverageBolt implements IBasicBolt {
+ private static final long serialVersionUID = 1L;
+ private int movingAverageWindow = 1000;
+ private Map<String, LinkedList<Double>> deviceIDtoStreamMap = new HashMap<String, LinkedList<Double>>();
+ private Map<String, Double> deviceIDtoSumOfEvents = new HashMap<String, Double>();
+
+ public MovingAverageBolt() {
+
+ }
+
+ public MovingAverageBolt(int movingAverageWindow) {
+ this.movingAverageWindow = movingAverageWindow;
+ }
+
+ @Override
+ public void prepare(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context) {
+ }
+
+ @Override
+ public void execute(final Tuple tuple, final BasicOutputCollector collector) {
+ final String deviceID = tuple.getString(0);
+ final double nextDouble = (double)tuple.getInteger(1);
+ double movingAvergeInstant = movingAverage(deviceID, nextDouble);
+ System.out.println(movingAvergeInstant + " : " + nextDouble);
+ collector.emit(new Values(deviceID, movingAvergeInstant, nextDouble));
+ }
+
+ public double movingAverage(String deviceID, double nextDouble) {
+ LinkedList<Double> valueList = new LinkedList<Double>();
+ double sum = 0.0;
+ if (deviceIDtoStreamMap.containsKey(deviceID)) {
+ valueList = deviceIDtoStreamMap.get(deviceID);
+ sum = deviceIDtoSumOfEvents.get(deviceID);
+ if (valueList.size() > movingAverageWindow-1) {
+ double valueToRemove = valueList.removeFirst();
+ sum -= valueToRemove;
+ }
+ valueList.addLast(nextDouble);
+ sum += nextDouble;
+ deviceIDtoSumOfEvents.put(deviceID, sum);
+ deviceIDtoStreamMap.put(deviceID, valueList);
+ return sum/valueList.size();
+ }
+ else {
+ valueList.add(nextDouble);
+ deviceIDtoStreamMap.put(deviceID, valueList);
+ deviceIDtoSumOfEvents.put(deviceID, nextDouble);
+ return nextDouble;
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("string", "double", "double"));
+ }
+}
View
50 examples/movingAverageWithSpikeDetection/jvm/movingAverageWithSpikeDetection/SpikeDetectionBolt.java
@@ -0,0 +1,50 @@
+package movingAverage;
+
+
+import java.util.Map;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class SpikeDetectionBolt implements IBasicBolt {
+
+ private static final long serialVersionUID = 1L;
+
+ private float spikeThreshold = 0.03f;
+
+ public SpikeDetectionBolt() {
+ }
+
+ public SpikeDetectionBolt(float spikeThreshold) {
+ this.spikeThreshold = spikeThreshold;
+ }
+
+ @Override
+ public void prepare(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context) {
+ }
+
+ @Override
+ public void execute(final Tuple tuple, final BasicOutputCollector collector) {
+ final String deviceID = tuple.getString(0);
+ final double movingAverageInstant = tuple.getDouble(1);
+ final double nextDouble = tuple.getDouble(2);
+ if (Math.abs(nextDouble - movingAverageInstant) > spikeThreshold * movingAverageInstant) {
+ collector.emit(new Values(deviceID, movingAverageInstant, nextDouble, "spike detected"));
+ System.out.println(deviceID + " " + movingAverageInstant + " " + nextDouble + " spike detected");
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("string", "double", "double", "string"));
+ }
+}
View
48 examples/movingAverageWithSpikeDetection/jvm/movingAverageWithSpikeDetection/SpikeDetectionTopology.java
@@ -0,0 +1,48 @@
+package movingAverage;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import java.util.Map;
+
+/**
+ * This is a basic example of a Storm topology.
+ */
+public class SpikeDetectionTopology {
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout("string", new LightEventSpout(), 2);
+ builder.setBolt("movingAverage", new MovingAverageBolt(10), 2)
+ .shuffleGrouping("string");
+ builder.setBolt("spikes", new SpikeDetectionBolt(0.10f), 2)
+ .shuffleGrouping("movingAverage");
+
+ Config conf = new Config();
+// conf.setDebug(true);
+
+
+ if(args!=null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+ } else {
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("spike", conf, builder.createTopology());
+ Utils.sleep(600000);
+ cluster.killTopology("spike");
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.