In [None]:
!python --version

# 1. Install and import libraries

In [None]:
!pip install streamsx==1.16.0b0

In [None]:
!pip install streamsx.endpoint

In [None]:
!pip show streamsx

In [None]:
pip install pandas

In [None]:
import random, time, os
from datetime import datetime, timedelta
import pandas as pd

import streamsx.endpoint as endpoint

from streamsx import rest
from streamsx.topology.topology import Topology
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit, ContextTypes
from streamsx.rest_primitives import Instance
from streamsx.topology import context

# 2. Create the application

In [None]:
# Define a callable source 
class SensorReadingsSource(object):
    def __call__(self):
        # This is just an example of using generated data, 
        # Here you could connect to db
        # generate data
        # connect to data set
        # open file
        while True:
            time.sleep(0.001)
            sensor_id = random.randint(1,100)
            reading = {}
            reading ["sensor_id"] = "sensor_" + str(sensor_id)
            reading ["value"] =  random.random() * 3000
            reading["ts"] = int((datetime.now().timestamp())) 
            yield reading

In [None]:
def average_reading(items_in_window):
    df = pd.DataFrame(items_in_window)
    readings_by_id = df.groupby("sensor_id")
    
    averages = readings_by_id["value"].mean()
    period_end = df["ts"].max()

    result = []
    for id, avg in averages.iteritems():
        result.append({"average": avg,
                "sensor_id": id,
                "period_end": time.ctime(period_end)})
               
    return result

In [None]:
# Returns the original tuple with a new `coords` attribute
# representing the latitude and longitude of the sensor
def enrich(tpl):
    # use simulated data, but you could make a database call, 
    lat = round(random.random() + 39.8338515, 4)
    lon = round(-74.871826 + random.random(), 4)
    # update the tuple with new data
    tpl["coords"] = (lat, lon)
    return tpl

# 3. Build and submit the application

In [None]:
def build_topo():
    
    # Build Graph
    topo = Topology(name="SensorAverages")
    
    #Create a stream from the data using Topology.source
    readings = topo.source(SensorReadingsSource(), name="Readings")
    
    valid_readings = readings.filter(lambda x : x["value"] > 100,
                                 name="ValidReadings")
    
    # 2. Define window: e.g. a 30 second rolling average, updated every second
    interval = timedelta(seconds=30)
    window = valid_readings.last(size=interval).trigger(when=timedelta(seconds=1))

    # 3. Pass aggregation function to Window.aggregate
    # average_reading returns a list of the averages for each sensor,
    # use flat map to convert it to individual tuples, one per sensor
    rolling_average = window.aggregate(average_reading).flat_map()
    
    # Update the data on the rolling_average stream with the map transform
    enriched_average = rolling_average.map(enrich).as_json()
    
    endpoint.expose(window=enriched_average.last(1).trigger(1),
                context='sensor-averages',
                name='enriched',
                monitor=None)
    
    return topo

In [None]:
def submit_topology(topo):
    
    # Set the following 4 lines
    CP4D_URL = "..."
    STREAMS_INSTANCE_NAME = "..."
    STREAMS_USERNAME = '...'
    STREAMS_PASSWORD = '...'
    
    os.environ["STREAMS_USERNAME"] = STREAMS_USERNAME
    os.environ["STREAMS_PASSWORD"] = STREAMS_PASSWORD
    os.environ["STREAMS_INSTANCE_ID"] = STREAMS_INSTANCE_NAME
    os.environ["CP4D_URL"] = CP4D_URL

    cfg ={}
    cfg[context.ConfigParams.SSL_VERIFY] = False
    # This specifies how the application will be deployed
    contextType = context.ContextTypes.DISTRIBUTED
    return context.submit (contextType, topo, config = cfg)

In [None]:
topo = build_topo()

print("Submitting Topology to Streams for execution..")
submission_result = submit_topology(topo)

if submission_result.job:
  streams_job = submission_result.job
  print ("JobId: ", streams_job.id , "\nJob name: ", streams_job.name)
else:
  print("Submission failed: "   + str(submission_result))

# 4. Connect to endpoint

In [None]:
hostname = '...'

endpoint_url = 'https://' + hostname + '/streams/jobs/' + streams_job.id + '/sensor-averages/enriched/tuples'
print(endpoint_url)

### 4.1 Display the results in real time

In [None]:
import requests

while True:
    time.sleep(1)
    source = requests.get(endpoint_url, verify=False).json()
    print(source)