# Detect potentially malfunctioning sensors in real time using Streaming Analytics and Python


In this notebook, you will create an application that receives weather data from simulated weather stations and then detects if any of those stations are malfunctioning.  This is done by comparing the temperature from each station with the average temperature from all the other stations in the same region. If a station's reading is considered to be an outlier, then it is flagged as potentially malfunctioning.

The data is visualized on a map, and malfunctioning stations are shown as red, as in the image below. Note that the readings are updated in real time.
<img src="https://raw.githubusercontent.com/IBMStreams/samples/master/IoT/WeatherStationApp/img/dsx-weather-app.gif"/>


# Step 1: Setup


This notebook requires you to have the Streaming Analytics service and the Watson IoT Platform service. You can set up and connect the services manually or use the automatic option if it applies to you.


## Option 1: Automatically deploy and configure the services

You can use this option if 
- You do not have the services service created in Bluemix, or
- You have both services created and would like to use automatic configuration. If so, you need to rename the services to match the names expected by the configuration script before starting the process:
   - Rename the Streaming Analtyics service to *Streaming-Analytics* 
   - Rename the Watson IoT Platform to *Internet-of-Things-Platform*.
   *These names must match exactly as indicated here*.

[![Deploy To Bluemix](https://bluemix.net/deploy/button.png)](https://bluemix.net/deploy?repository=https://github.com/IBMStreams/streamsx.iot.starterkit.git)
Click the "Deploy To Bluemix" button above to deploy the services automatically.

Once the deployment is finished, go to your Bluemix dashboard as shown below and click the link to go to the home page of the starter kit. This is where you'll access the needed credentials.

![Landing page](https://raw.githubusercontent.com/IBMStreams/samples/master/IoT/WeatherStationApp/img/bluemix-dashboard.png)


### Finish setup
#### Go to the home page of the starter kit:
1. Go to "Tools" and click "Submit IotPlatform Job".
1. Download the `device.cfg` file: Click **View All Credentials**, then under **Edgent Credentials**, select **Download Device.cfg**. Save this file locally.
2. Get credentials for your Streaming Analytics service:
 - From the landing page, click **View All Credentials**.
 - In the  **Streams credentials** tab, click "Get Credentials", and copy the contents of the cell. You will paste them where required in the cell below. 
 - Paste them in the cell titled *Specify Streams Credentials* below.



## Option 2: Manual deployment 

If you have not already done so, create instances of the [Streaming Analytics service](https://console.bluemix.net/catalog/services/streaming-analytics) and the [Watson IoT Platform](https://console.bluemix.net/catalog/services/internet-of-things-platform) service. 

Next, follow the [instructions in this post](https://developer.ibm.com/streamsdev/docs/setup-instructions-connecting-edgent-streams-applications-watson-iot-platform).
After completing the steps in the above article, you should have:
 - A registered device on the Watson IoT Platform and a `device.cfg` file with the information for the device.
 - The `IotPlatformBluemix` application running in your Streaming Analytics application.
 
Finally, **get your Streaming Analytics credentials** and paste them in the cell below:
- Open the Streaming Analytics service dashboard, click **Service Credentials**. If no credentials are listed, click **New Credential** to create a set of credentials. Then click **View Credentials**, and finally click the Copy icon.
- You need the *service_name* (`test1` in the screenshot)  below. Then, set  the credentials you copied to the `credentials` variable in the cell below and change the `service_name` variable to your service name.

<img src='https://github.com/orzade/streamsx-notebooks/blob/master/copyservicecredentials.png?raw=true' alt="Copy your service credentials" title="Streaming Analytics catalog - Copy your service credentials"></img>

 

## 1.2 Specify Streams credentials
- Set  the Streaming Analytics credentials you copied to the `credentials` variable in the cell below.
- Change the `service_name` variable to your service name.

In [None]:
#Set up access to Streaming Analytics service

def get_service_name():
    service_name = "Streaming-Analytics" ## If you chose manual deployment, change the service name here 
    return service_name
def get_credentials():
    
    credentials = """ ## Paste retrieved credentials here"""
    return credentials

def submit_to_service(topo):
    service_name = get_service_name()
    credentials = get_credentials()
    vs={'streaming-analytics': [{'name': service_name, 'credentials': json.loads (credentials)}]}
    cfg = {}
    cfg[ConfigParams.VCAP_SERVICES] = vs
    cfg[ConfigParams.SERVICE_NAME] = service_name
    return submit('STREAMING_ANALYTICS_SERVICE', topo, cfg)


## 1.3 Start generating data
The data processed by this notebook is from an Edgent application that sends simulated weather data from different locations in Toronto and Markham.  
Download and unpack the [Weather Station simulator application](https://github.com/IBMStreams/samples/raw/WeatherStationApp/IoT/WeatherStationApp/WeatherStationSimulator/weather-station-simulator.jar).

Make sure you have also saved your `device.cfg` file locally.

Start generating data by running the application:

`java -Dcom.ibm.iotf.enableCustomFormat=false -jar weather-station-simulator.jar path/to/device.cfg`


## Step 2. Create Streams application

This application will ingest data from the sensors in different locations and show the live readings from each sensor on a map, updating in real time. It will also show any sensors that are detected to be outliers as red.

### 2.1 Define helper classes

The `TagOutliers` class below will compute the rolling average temperature for the weather stations, and then will add a tag to each station based on whether or not it could possibly be malfunctioning. 

In [None]:
import numpy as np
        
def parse_json(tuple):
    js = tuple["jsonString"]
    reading = json.loads(js)
    return reading["d"]
   
class TagOutliers():
    """
    A callable class that determines if a tuple is an outlier. 
    It adds a new key "outlier" to the tuple to indicate whether or not the value is an outlier.
    An outlier is defined as more than (threshold * standard deviation) from the average.
    
    Args:
        threshold (number): threshold
        n: window size, the number of items used to compute the average
    """
    def __init__(self, threshold, n):
        self.threshold = threshold
        self.window = {}
        self.num = n
    def window_isfull(self):
        return len(self.window) == self.num 
    def is_outlier(self, value, average, stddev):
        distance_from_avg = abs(value-average)
        #if the distance_from_avg exceeds the threshold then this is an outlier
        is_outlier = distance_from_avg > (self.threshold *stddev)
        return is_outlier
    
    def __call__(self, tuple):
        """
        Args:
            tuple that represents a reading from a weather station.
        Returns:
            None if the window is not full.
            When the window is full, returns a list of all the tuples in the window, flagging those which are outliers
        """
        #add an entry to the window for this tuple based on its id
        self.window[tuple["id"]] = tuple
        
        #calculate stddev and avg when the window is full
        if self.window_isfull():

            readings = [x["temp"] for x in self.window.values()]
            #use numpy.average and std to compute average and stddev
            avg = np.average(readings);
            stddev = np.std(readings)
            #determine which of the stations in the window have a value that is an outlier
            for station in self.window.values():
                temp = station["temp"]
                station["outlier"] = str(self.is_outlier(temp, avg, stddev))
                station["avg"] = avg
                
            values =  list(self.window.values()) 
            self.window = {}
            
            return values

    
class MergeStreams:
    """
    It will take two streams of lists, and produce one stream with the contents of lists
    The incoming data is split by location, and this class merges the previously split streams
    """
    def __init__(self):
        self.last_rh = []
        self.last_mkm = []
    
    def __call__(self, tuple):
        if tuple[0]["location"] == "tor":
            self.last_rh = tuple
        else:
            self.last_mkm = tuple
        if len(self.last_mkm) > 0 and len(self.last_rh) > 0 :
            merged = self.last_mkm + self.last_rh
            self.last_mkm = []
            self.last_rh = []
            return merged
 



### 2.2 Define Streams `Topology` to tag outliers

Streams applications are directed graphs with data moving from one operation to the next. A Streams application written in Python is called a `Topology`.
The `Topology` we are creating will:
- Subscribe to data from the Watson IoT platform, 
- Create 2  groups of readings, one for each location so we can compare each sensor to its nearest neighbours.
- Use numpy to determine the rolling average and standard deviation and check each sensor's values to see if it is an outlier




In [None]:
#this window size is the number of stations in each location, 
#if you change the Edgent application to add more stations, this needs to be changed accordingly.
from streamsx.topology.topology import Topology
from streamsx.topology.context import *
from streamsx.topology import schema
import streamsx.spl.op as op
import json

NUM_STATIONS = 20
NUM_STATIONS_PER_LOCATION = NUM_STATIONS/2
window_size = NUM_STATIONS_PER_LOCATION
threshold = 2

# Create Topology - our application graph
topo = Topology('TaggedWeatherStationData')
sch =  schema.StreamSchema("tuple <rstring typeId, rstring deviceId, rstring eventId,rstring jsonString>")
# read from data source
raw_events = topo.subscribe(topic="streamsx/iot/device/events", schema=sch)

# Get only events with id "weather"
json_data_from_iotp = raw_events.filter(lambda tuple: tuple["eventId"] == "weather", "RawEvents")

#parse json to python objects
readings = json_data_from_iotp.map(parse_json,"WeatherEvents")

# Split by location so that averages are computed based on location
tor_str = readings.filter(lambda tuple: tuple["location"] == "tor", "Toronto")
mkm_str  = readings.filter(lambda tuple: tuple["location"] == "mkm", "Markham")

#each stream is a list of all the current readings for each station
#the TagOutliers class uses numpy to compute average and standard deviation and determines which stations are outliers
tor_tagged =  tor_str.map(TagOutliers(threshold, window_size))
mkm_tagged = mkm_str.map(TagOutliers(threshold, window_size))

#merge the output streams for easy graphing in plotly
merged = tor_tagged.union({mkm_tagged}).map(MergeStreams())

merged.print()

#this view allows us to graph the data from the merged stream
station_data_view = merged.view()

# Submit on Bluemix
job_submission_result = submit_to_service(topo)

print("\nSubmitted job to Streaming Analytics service")


### 2.3 View the data in the Streams Console
Now that your application is running, you can view the printed output in the Streams console.

- Go to the [Bluemix dashboard](https://console.bluemix.net/dashboard/) 
- Click your Streaming Analytics service instance
- Click launch on the service management page. This will open the Streams Console.
- Click the "Log Viewer", and look for the application with a name of the form `ipythoninput::TaggedWeatherStations`, and open the **Console Log** as shown below. You can see the output of the application.
![ConsoleLog](https://raw.githubusercontent.com/IBMStreams/samples/master/IoT/WeatherStationApp/img/view_log.png)



# Step 3: Plot the Weather Station Data

We now have a Streams application running and ingesting data, and we've seen its output in the Streams console.  But this is just printed data. We would like to take the output of the Streams application and display it on a Plotly map so we can see which weather stations are malfunctioning. 


## 3.1 Get your Plotly credentials information

We want to create a map that shows readings from weather stations in real time

This map is created using Plotly. 
So, if you want to create the visualization, you need to [register with Plotly](https://plot.ly/accounts/login/?action=signup) then get the following keys for your Plotly account:
- API key  
- Streaming API key.

Paste that informatin in the cell below.

In [None]:
#Change the values in this cell with your username, api key, stream ids

plotly_api_key = 'abcdsf'
plotly_stream_ids =['ahighjam9','12324xxf']
plotly_username = 'yourplotlyid'

## 3.2 Setup the Plotly map


### 3.2.1 Create Plotly stream  objects using your credentials

`plotly_stream_id` object is the object that identifies the stream. We will also create a plotly stream link object called `plotly_data_stream`. The stream link object is used to update the map with streaming data. Both objects require the `plotly_stream_id` you just set above.

In [None]:
from plotly.graph_objs import *
import plotly
import plotly.plotly as pty
import plotly.tools 
import plotly.graph_objs as graph_objs


def get_trace():
    loc_1_lats = [43.722,43.685,43.671,43.677,43.673]
    loc_1_lons = [-79.384,-79.474,-79.343,-79.421,-79.345]
    loc_2_lats  =[43.870,43.880,43.844750,43.8570,43.830]
    loc_2_lons  = [-79.271,-79.362,-79.330,-79.370,-79.310]
    loc_ids = ["loc_" + str(x) for x in range(20)]
    lats = loc_1_lats + loc_2_lats
    lons=loc_1_lons+loc_2_lons
    trace = Data([
        Scattermapbox(lat= lats,lon=lons,
            mode='markers+text',
            marker=Marker(
                size=40,symbol="circle",color="royalblue",
            ),ids=loc_ids,hovertext="", text=loc_ids,stream=plotly_stream_id, textfont=dict(size="12",color="white")
        )
    ])
    return trace

def get_layout():
    layout = Layout(autosize=True, hovermode='closest' ,title="Weather Stations",
        mapbox= dict(bearing=0,  pitch=0, zoom=10,
            center=dict(lat=43.760859080766361,
                        lon=-79.3474682425380699)),
    )
    return layout

#Configure plotly to use our token
plotly.tools.set_credentials_file(username=plotly_username, api_key=plotly_api_key, stream_ids=plotly_stream_ids)

stream_tokens = plotly.tools.get_credentials_file()['stream_ids']
#In our case we just have the one token. If you wanted to graph multiple streams you would have to use multiple tokens.
token = stream_tokens[0]   

plotly_stream_id = dict(token=token, maxpoints=100)
plotly_data_stream = pty.Stream(stream_id=token)


fig = dict(data=get_trace(), layout=get_layout())
map_url = pty.plot( fig, validate=False, filename='weather_stn_graph', auto_open=False, fileopt='overwrite')
        

## 3.3 Fetch and display the sensor data from Streams
Now, we can start streaming data from Streams to our map.  Here we will define the `send_data_to_plotly` function will retrieve the tagged weather station data from Streams and send it to the plotly map above using the stream link object `plotly_data_stream`. Since this is a potentially indefinite stream, I use the `num_refreshes` parameter to control how many times the data is fetched.  We will call this function when we are ready to view the map.

If the map is no longer updating, re-run the cell below.


Below we'll put the code to retrieve and visualize the data in a function we call `send_data_to_plotly()`. As the name implies, it will retrieve data from Streams,and send it to our Plotly map. Though this function could run indefinitely, we will set it to run only for a few minutes. You can re-run the cell to start generating data once it stops.


In [None]:
import time
def send_data_to_plotly(streams_view, plotly_map_stream, num_refreshes, tag=True):
    plotly_map_stream.open() 
    view_data = streams_view.start_data_fetch()
   
    for i in range(num_refreshes):
        try:
            stations = view_data.get()
            #need list of latitudes, longitudes
            lats =[]
            ids =[]
            lons = []
            colors = []
            labels=[]
            #print(stations)
            for station in stations:
                lats.append(station["lat"])
                lons.append(station["lon"])
                color ="royalblue"
               
                if tag is True:
                    color = "red" if station.get("outlier","False") =="True"  else "seagreen"
                    
                colors.append(color)

                labels.append(str(round(station["temp"],1)))
                s_id = station["id"]
                if (color == "red"):
                    s_id = s_id + "(Avg = " + str(station["avg"]) +")"
                ids.append(s_id)

            n = len(stations)
           
            plotly_map_stream.write(dict(
                    lon=lons, lat=lats,
                    text=labels, hovertext=ids,type="Scattermapbox" , 
                    marker={"opacity":[1.0]*n,"size": [30]*n, "color":colors,"symbol": ["circle"] * n}) )
            time.sleep(0.5)
            if (i % 5) == 0:
                sys.stdout.write(".")
            plotly_map_stream.heartbeat()
        except Exception as e:
            print(e)
            break
    plotly_map_stream.close()
    streams_view.stop_data_fetch()
    print("\nDone refreshing map. Re-run this cell to send data to the map again")


### Embed the map in the notebook

We embed the map in the notebook. But no data will display because we have not yet called the  `send_data_to_plotly()` function. 

In [None]:
plotly.tools.embed(map_url,width="85%",height=800)


### Send tagged station data from Streams to the map
Call the `send_data_to_plotly()` function. Scroll up to view the map.  You can re-run the cell to start generating data once it stops.
This cell will run for a few minutes and then stop.
Re-run the cell to continue updating the map.

In [None]:
print("Sending data to map application. Scroll up to view embedded map")
num_refreshes = 20
send_data_to_plotly(station_data_view, plotly_data_stream, num_refreshes, tag=True)


## Step 4: Shutdown
1. Kill the simulator process. By default, it will terminate after 30 minutes.
2. Terminate the Streams application by running the cell below.
3. Stop the `IotPlatform` job: log in to the Streams Console to cancel it or use the "Tools" page of the IoT starter kit.

In [None]:
job = job_submission_result.job
if job.cancel():
    print("Cancelled the Streams application. Run the cells in step 2 to re-submit the application.")

# Next Steps

In this notebook, we created a Streaming Analytics application that analyzed data from IoT devices and visualized the results. The following are helpful resources to learn more:

- Learn more about developing Streaming Analytics applications in Python: 
  - [Consult the Streams Python API Developer Guide](http://ibmstreams.github.io/streamsx.documentation/docs/python/1.6/python-appapi-devguide/)
  - Take the [Streaming Analytics for Python developers course](https://developer.ibm.com/courses/all/streaming-analytics-basics-python-developers/) on developerWorks.
- For more Edgent and IoTP documentation, you can check out the [Edgent to Quickstart guide](http://edgent.incubator.apache.org/docs/quickstart.html).
- Visit [Streamsdev, the Streams developer community](https://developer.ibm.com/streamsdev/), for more useful resources about Streams.

Happy Streaming!

 
