<h1 style="text-align:center;text-decoration: underline">Stream Analytics Tutorial</h1>
<h1>Overview</h1>
<p>Welcome to the stream analytics tutorial for EpiData Lite. In this tutorial we will perform near real-time stream analytics on sample weather data acquired from a simulated wireless sensor network.</p>

<h2>Package and Module Imports</h2>
<p>As a first step, we will import packages and modules required for this tutorial. Since <i>EpiData Lite Context (ec)</i> is required to use the application, it is implicitly imported. Sample functions for near real-time analytics are avaialable in <i>EpiData Analytics</i> package. Other packages and modules, such as <i>datetime</i>, <i>pandas</i> and <i>matplotlib</i>, can also be imported at this time.</p>

In [1]:
import sys
sys.path.append("/Users/srinibadri/Documents/Repos/epidata/epidata-community-interns/ipython")
sys.path.append("/Users/srinibadri/Documents/Repos/epidata/epidata-community-interns/ipython/epidata")

from epidata.EpidataLiteContext import EpidataLiteContext
from epidata.EpidataLiteStreamingContext import EpidataLiteStreamingContext
#from epidata.analytics import *

%matplotlib inline
from datetime import datetime, timedelta
import pandas as pd
import time
import pylab as pl
from IPython import display
import json

ec = EpidataLiteContext()
esc = EpidataLiteStreamingContext()

gateway:  <py4j.java_gateway.JavaGateway object at 0x000002AC47ABEC10>
gg:  <py4j.java_gateway.JavaGateway object at 0x000002AC489E5F40>
jelc:  com.epidata.spark.EpidataLiteContext@1bc12f3
jesc:  com.epidata.spark.EpidataLiteStreamingContext@b409669


<h2>Stream Analysis</h2>
<h3>Function Definition</h3>
<p>EpiData supports development and deployment of custom algorithms via Jupyter Notebook. Below, we define python functions for substituting extreme outliers and aggregating temperature measurements. These functions can be operated on near real-time and historic data. In this tutorial, we will apply the functions on near real-time data being ingested to the platform</p>

In [2]:
import pandas as pd
import math, numbers

def identity_demo(df, meas_names=[], params={}):
    """
    Returns input dataframe without changing it.
    """
    
    return df


In [3]:
import pandas as pd
import math, numbers
import json

def substitute_demo(df, meas_names, method="rolling", size=3):
    """
    Substitute missing measurement values within a data frame, using the specified method.
    """
    print(df, "\n")
    df = str(df)
    df = df[11:]
    df = df [:-1]
    print(df, "\n")
    df = list(df.split("},"))
    print(df, "\n")
    for i in range(len(df)):
        df[i] = df[i].replace('{', '')
        df[i] = df[i].replace('}', '')
        df[i] = df[i].strip()
        # df[i] = json.loads(df[i])
        df[i] = {j.split('=')[0]: j.split('=')[1] for j in df[i].split(', ')}
        
    print(df, "\n")
    df = pd.DataFrame.from_records(df)
    print(df, "\n")
    df["meas_value"].replace(250, np.nan, inplace=True)
    
    for meas_name in meas_names:

        if (method == "rolling"):
            if ((size % 2 == 0) and (size != 0)): size += 1   
            if df.loc[df["meas_name"]==meas_name].size > 0:
                indices = df.loc[df["meas_name"] == meas_name].index[df.loc[df["meas_name"] == meas_name]["meas_value"].apply(
                    lambda x: not isinstance(x, str) and (x == None or np.isnan(x)))]
                substitutes = df.loc[df["meas_name"]==meas_name]["meas_value"].rolling( window=size, min_periods=1, center=True).mean()
            
                df["meas_value"].fillna(substitutes, inplace=True)
                df.loc[indices, "meas_flag"] = "substituted"
                df.loc[indices, "meas_method"] = "rolling average"
        else:
            raise ValueError("Unsupported substitution method: ", repr(method))
    
    return df


In [4]:
import pandas as pd
import numpy as np
import json

def subgroup_statistics(row):
    row['start_time'] = np.min(row["ts"])
    row["stop_time"] = np.max(row["ts"])
    row["meas_summary_name"] = "statistics"
    row["meas_summary_value"] = json.dumps({'count': row["meas_value"].count(), 'mean': row["meas_value"].mean(),
                                            'std': row["meas_value"].std(), 'min': row["meas_value"].min(), 
                                            'max': row["meas_value"].max()})
    row["meas_summary_description"] = "descriptive statistics"
    return row

def meas_statistics_demo(df, meas_names, method="standard"):
    """
    Compute statistics on measurement values within a data frame, using the specified method.
    """
    
    if (method == "standard"):
        df_grouped = df.loc[df["meas_name"].isin(meas_names)].groupby(["company", "site", "station", "sensor"], 
                            as_index=False)
        df_summary = df_grouped.apply(subgroup_statistics).loc[:, ["company", "site", "station", "sensor",
            "start_time", "stop_time", "event", "meas_name", "meas_summary_name", "meas_summary_value", 
            "meas_summary_description"]].drop_duplicates()
    else:
        raise ValueError("Unsupported summary method: ", repr(method))
                
    return df_summary

<h3>Transformations and Streams</h3>
<p>The analytics algorithms are executed on near real-time data through transformations. A transformation specifies the function, its parameters and destination. The destination can be one of the pre-defined topics, namely <i>'measurements_cleansed'</i> or <i>'measurements_summary'</i>, or another custom topic.</p>
<p>Once the transformations are defined, they are initiated via <i>ec.create_stream(source_topic, destination_topic, transformation)</i> function call.</p>

In [5]:
# Define tranformations and steam operations
op1 = esc.create_transformation(substitute_demo, ["Temperature", "Wind_Speed"], {})
# {"method": "rolling", "s": 3}
# op1 = esc.create_transformation("Identity", ["Temperature", "Wind_Speed"], {})
# op1 = esc.create_transformation(identity_demo, ["Temperature", "Wind_Speed"], {})
esc.create_stream("measurements_original", "measurements_cleansed", op1)

#op2 = esc.create_transformation("identity", ["Temperature", "Wind_Speed"], {})
#esc.create_stream("measurements_substituted_1", "measurements_cleansed", op2)

#op3 = esc.create_transformation("meas_statistics", ["Temperature", "Wind_Speed"], {"method": "standard"})
#esc.create_stream("measurements_substituted_1", "measurements_summary", op3)

# Start near real-time processing
esc.start_streaming()

stream created
streams started
ListBuffer({meas_name=Temperature, meas_datatype=double, meas_status=PASS, meas_value=70.59, site=San_Francisco, meas_lower_limit=10.1, station=WSN-1, company=EpiData, sensor=Temperature_Probe, meas_upper_limit=120.4, event=none, meas_unit=deg F, ts=1661528109650}, {meas_name=Wind_Speed, meas_datatype=double, meas_status=PASS, meas_value=8.01, site=San_Francisco, meas_lower_limit=0.0, station=WSN-1, company=EpiData, sensor=Anemometer, meas_upper_limit=25.0, event=none, meas_unit=mph, ts=1661528109650}, {meas_value=57.85, meas_name=Relative_Humidity, site=San_Francisco, meas_datatype=double, station=WSN-1, company=EpiData, sensor=RH_Probe, meas_status=PASS, event=none, meas_unit=%, ts=1661528109650}, {meas_name=Temperature, meas_datatype=double, meas_status=PASS, meas_value=70.99000000000001, site=San_Francisco, meas_lower_limit=10.1, station=WSN-1, company=EpiData, sensor=Temperature_Probe, meas_upper_limit=120.4, event=none, meas_unit=deg F, ts=166152810

<h3>Data Ingestion</h3>
<p>We can now start data ingestion from simulated wireless sensor network. To do so, you can download and run the <i>sensor_data_ingest_with_outliers.py</i> example shown in the image below.</p>
<img src="../static/jupyter_tree.png">

<h3>Data Query and Visualization</h3>

<p>We will query and visualize processed data is using <i>ec.query_measurements_cleansed() and ec.query_measurements_summary()</i> functions. For our example, we specify paramaters that match sample data set, and query the aggregated values using <i>ec.query_measurements_summary()</i> function call.</p>

In [6]:
# QUERY MEASUREMENTS_CLEANSED TABLE

# primary_key={"company": "Company-2", "site": "Site-1", "device_group":"1000", "tester": "Station-1"}
primary_key={"company": "EpiData", "site": "San_Francisco", "station":"WSN-1", "sensor": ["Temperature_Probe","Anemometer","RH_Probe"]}
start_time = datetime.strptime('1/1/2022 00:00:00', '%m/%d/%Y %H:%M:%S')
stop_time = datetime.strptime('1/1/2023 00:00:00', '%m/%d/%Y %H:%M:%S')

df_cleansed = ec.query_measurements_cleansed(primary_key, start_time, stop_time)

print(df_cleansed.tail(10))

   company event meas_datatype  meas_lower_limit meas_method  \
0  EpiData  none        double               0.0               
1  EpiData  none        double               0.0               
2  EpiData  none        double               0.0               
3  EpiData  none        double               NaN               
4  EpiData  none        double               NaN               
5  EpiData  none        double               NaN               
6  EpiData  none        double              10.1               
7  EpiData  none        double              10.1               
8  EpiData  none        double              10.1               
9  EpiData  none        double              10.1               

           meas_name meas_status meas_unit  meas_upper_limit  meas_value  \
0         Wind_Speed        PASS       mph              25.0        6.50   
1         Wind_Speed        PASS       mph              25.0        7.14   
2         Wind_Speed        PASS       mph              25.0       

In [7]:
# QUERY MEASUREMNTS_SUMMARY TABLE

# primary_key={"company": "Company-2", "site": "Site-1", "device_group":"1000", "tester": "Station-1"}
primary_key={"company": "EpiData", "site": "San_Francisco", "station":"WSN-1", "sensor": ["Temperature_Probe","Anemometer","RH_Probe"]}
start_time = datetime.strptime('1/1/2022 00:00:00', '%m/%d/%Y %H:%M:%S')
stop_time = datetime.strptime('1/1/2023 00:00:00', '%m/%d/%Y %H:%M:%S')

df_summary = ec.query_measurements_summary(primary_key, start_time, stop_time)
print(df_summary.tail(5))

Empty DataFrame
Columns: []
Index: []


<h3>Stop Stream Analytics</h3>
<p>The transformations can be stopped at any time via <i>esc.stop_streaming()</i> function call<p>

In [7]:
#Stop current near real-time processing
esc.stop_streaming()

streams stopped


<h2>Next Steps</h2>
<p>Congratulations, you have successfully perfomed near real-time analytics on sample data aquired by a simulated wireless sensor network. The next step is to explore various capabilities of EpiData by creating your own custom analytics application!</p>