In [None]:
import pandas as pd
import json
import os
import random
import time
from datetime import datetime, timedelta
import logging
from icpd_core import icpd_util
from streamsx.topology.topology import Topology
import streamsx.topology.context

In [None]:
# Define a callable source that generates the sensor data
class SensorReadingsSource(object):
    def __call__(self):
        # This is just an example using generated data
        while True:
            time.sleep(0.03)
            sensor_id = random.randint(1,10)
            reading = {}
            reading ["sensor_id"] = "sensor_" + str(sensor_id)
            reading ["value"] =  random.random() * 3000
            reading["ts"] = int((datetime.now().timestamp()))
            reading["unique_id"] = os.environ.get("UNIQUE_ID")
            yield reading

In [None]:
# Define an aggregation function
def average_reading(items_in_window):
    df = pd.DataFrame(items_in_window)
    readings_by_id = df.groupby("sensor_id")
    averages = readings_by_id["value"].mean()
    period_end = df["ts"].max()
    result = []
    for id, avg in averages.iteritems():
        result.append({"average": avg,
                "sensor_id": id,
                "period_end": time.ctime(period_end)})
    return result

In [None]:
# Create a Streams topology
topo = Topology(name="SensorAverages")

# Data source
readings = topo.source(SensorReadingsSource(), name="Readings")

# Filter
valid_readings = readings.filter(lambda x : x["value"] > 100,
                                 name="ValidReadings")

# Define window: e.g. a 30 second rolling average, updated every second
interval = timedelta(seconds=30)
window = valid_readings.last(size=interval).trigger(when=timedelta(seconds=1))

# Pass aggregation function to Window.aggregate
#     average_reading returns a list of the averages for each sensor,
#     use flat map to convert it to individual tuples, one per sensor
rolling_average = window.aggregate(average_reading).flat_map()
    
# Simply print results to the log on the edge
rolling_average.print()

#
# Build this application so it can be deployed to the edge
#
streams_instance_name = "sample-streams" ### PLEASE NOTE:  Change this value to the name of your Streams instance in Cloud Pak for Data
cfg=icpd_util.get_service_instance_details(name=streams_instance_name)
job_config = streamsx.topology.context.JobConfig()
job_config.raw_overlay = {'edgeConfig': {'imageName':'edge-sensorrollingaverage', 'pipPackages':['pandas']}}
job_config.add(cfg)
cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False
submission_result = streamsx.topology.context.submit('EDGE', topo, cfg)
if submission_result.return_code == 0:
    print("Application image built successfully.")
    print("    Image:    %s" % (submission_result['image'],))