# IBM Streams sample application

This sample demonstrates creating a Streams Python application to perform some analytics, and viewing the results.

Familiarity with [Python](https://www.python.org/about/gettingstarted/) is recommended.


In this notebook, you'll see examples of how to :
 1. [Setup a connection to the Streams instance](#setup)
 2. [Create the application](#create)
 3. [Submit the application](#launch)
 4. [Connect to the running application to view data](#view)
 5. [Stop the application](#cancel)

# Overview

**About the sample**

This application simulates a data hub that receives readings from sensors. It computes the 30 second rolling average of the reported readings using [Pandas](https://pandas.pydata.org/).  

**How it works**
   
A Streams Python application processes a continuous and potentially infinite stream of data. The data is processed in memory and is not stored in a database first.

The Python application created in this notebook is submitted to the IBM Streams service for execution. Once the application is running in the service, you can connect to it from the notebook to continuously retrieve the results.

<img src="https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2019/04/how-it-works.jpg" alt="How it works">


### Documentation

- [Streams Python development guide](https://ibmstreams.github.io/streamsx.documentation/docs/latest/python/)
- [Streams Python API](https://streamsxtopology.readthedocs.io/)




# Prerequisites

This notebook can be used as-is from within an IBM Cloud Pak for Data project. 

<a name="setup"></a>

# 1. Set up a connection to the Streams instance


To submit the application for execution, you have to connect to the Streams instance.

<a name="cpd"></a>
### 1.1 Connect to a Streams instance from an IBM Cloud Pak for Data  project

In order to submit a Streams application you need to provide the name of the Streams instance.

1. From the navigation menu, click **My instances**.
2. Click the **Provisioned Instances** tab.
3. Update the value of `streams_instance_name` in the cell below according to your Streams instance name
4. Run the cell and skip to section 1.2

The cell below defines a function called `submit_topology` that will be used later on to submit the `Topology` once it is defined.

In [1]:
# Connect to a Streams instance from an IBM Cloud Pak for Data project

from icpd_core import icpd_util
from streamsx.topology import context

streams_instance_name = "streams" ## Change this to Streams instance
cfg=icpd_util.get_service_instance_details(name=streams_instance_name)

def submit_topology(topo):
    global cfg
    # Disable SSL certificate verification if necessary
    cfg[context.ConfigParams.SSL_VERIFY] = False
    # Topology wil be deployed as a distributed app
    contextType = context.ContextTypes.DISTRIBUTED
    return context.submit (contextType, topo, config = cfg)

### 1.2 Verify `streamsx` package version

Run the cell below to check which version of the `streamsx` package is installed.  

If you need to upgrade, use

- `import sys`
- `!{sys.executable} -m pip install --user --upgrade streamsx` to upgrade the package.
- Or, use  `!{sys.executable} -m pip install --user streamsx==somever` to install a specific version of the package. 


In [2]:
# Verify streamsx package version

import streamsx.topology.context
print("INFO: streamsx package version: " + streamsx.topology.context.__version__)

#For more details uncomment line below.
#!pip show streamsx

INFO: streamsx package version: 1.14.13


<a id="create"></a>
# 2. Create the application
 

All Streams applications start with  a `Topology` object, so start by creating one:


In [3]:
from streamsx.topology.topology import Topology

topo = Topology(name="Cart Item Cost")

## 2.1 Define sources
Your application needs some data to analyze, so the first step is to define a data source that produces the data to be analyzed. 

Next, use the data source to create a `Stream` object. A `Stream` is a potentially infinite sequence of tuples containing the data to be analyzed.

Tuples are Python objects by default. Other supported formats include JSON. [See the doc for all supported formats](https://streamsxtopology.readthedocs.io/en/stable/streamsx.topology.topology.html#stream).

### 2.1.1 Define a source class

Define a callable class that will produce the data to be analyzed.

This example class simulates readings from a clickstream. Each reading is a Python `dict` containing the customer click data.

### 2.1.2  Create the `Stream `

Create a `Stream` called  `clicks` that will contain the simulated data that `ClickStreamReader` produces:

In [4]:
# Add our clickstream csv file to the topology so that our application can uss it
# "/ect" is where we will store the data on the IBM Streams service cluster
# Change the filename if needed

topo.add_file_dependency('/project_data/data_asset/clickstream.csv', 'etc')

'etc/clickstream.csv'

In [5]:
# Define a function that can read/process our csv file

import csv
from streamsx.ec import get_application_directory

# a class which can be used in topology as a source
class CSVFileReader:
    def __init__(self, file_name):
        self.file_name = file_name
    def __call__(self):
        # Convert each row in the file to a dict
        click_file =  os.path.join(get_application_directory(), "etc", self.file_name)
        col_names = ["customer_id", "time_stamp", "click_event_type", "product_name", "product_category", "product_price",
                     "total_price_of_basket", "total_number_of_items_in_basket", "total_number_of_distinct_items_in_basket", "session_duration"]
       
        # run this indefinitely so that there will always be data for the view
        while True:
            with open(click_file) as handle:
                reader = csv.DictReader(handle, delimiter=',',
                                                fieldnames=col_names)
                #yield the lines in the file one at a time
                for row in reader:
                    yield dict(customerId = row["customer_id"],
                               timeStamp = row["time_stamp"], 
                               clickEventType = row["click_event_type"],
                               productName = row["product_name"],
                               productCategory = row["product_category"],
                               productPrice = float(row["product_price"]),
                               totalPriceOfBasket = float(row["total_price_of_basket"]),
                               totalNumberOfItemsInBasket = int(row["total_number_of_items_in_basket"]),
                               totalNumberOfDistinctItemsInBasket = int(row["total_number_of_distinct_items_in_basket"]),
                               sessionDuration = row["session_duration"])

In [6]:
# Assign the source of our application. 
# The stream data will be each row of our csv file

# Change the filename if needed
clicks = topo.source(CSVFileReader("clickstream.csv"))            

# 2.2 Analyze data

Use a variety of methods in the `Stream` class to analyze your in-flight data, including applying machine learning models.
 
This section will:
- Filter out tuples based on a condition,
- Compute the total cost in the customers cart.

Built-in methods exist for common operations, such as <code>Stream.filter</code> and <code>Stream.split</code>, which filter or split a stream of data respectively.

See the <a href="/streamsx.documentation/docs/python/1.6/python-appapi-devguide-4/"> common operations section</a> for other common examples. Check out the <a href="https://ibmstreams.github.io/streamsx.topology/doc/pythondoc/streamsx.topology.topology.html#streamsx.topology.topology.Stream">documentation </a> of the <code>Stream</code> class for full list of functions.

### 2.2.1 Filter data from the  `Stream`  

Use `Stream.filter()` to remove data that doesn't match a certain condition.

In [7]:
# Accept only clicks that add items to a cart

cart_items = clicks.filter(lambda x : x["clickEventType"] == "add_to_cart",
                           name="CartItems")

In [8]:
# Add up cost of items in cart

import time
import pandas as pd
from datetime import datetime, timedelta

def total_cost(items_in_window):
    # grab all the clicks in the stream window
    df = pd.DataFrame(items_in_window)
    
    # sort by customer ID
    clicks_by_id = df.groupby("customerId")
     
    # get the total amount per customer
    totals = clicks_by_id["productPrice"].sum()
    
    result = []
    for id, tot in totals.iteritems():
        result.append({"customer_id": id,
                       "total": round(tot,2)})
               
    return result

# set our window size and process all "add to cart" clicks in that window
interval = timedelta(seconds=30)
window = cart_items.last(size=interval).trigger(when=timedelta(seconds=1))

In [9]:
running_total = window.aggregate(total_cost).flat_map()

# 2.3 Create a `View` to preview the tuples on the `Stream` 


A `View` is a connection to a `Stream` that becomes activated when the application is running. The connection allows you to access the data on the `Stream` as it is being processed.


After submitting the `Topology`, we use a `View`  to examine the from within the notebook [in section 4](#view).

To view the data on the `running_total Stream`, define a `View` using `Stream.view()`:


In [10]:
my_view = running_total.view(name="RunningTotal", 
                      description="Sample of cart content prices per customer")

You can <a href="http://ibmstreams.github.io/streamsx.documentation/docs/python/1.6/python-appapi-devguide-6/#accessing-the-tuples-of-a-view">connect to a view in <em>any</em> running Streams job using the REST API</a> , regardless of what language was used to create the application.

# 2.4 Define output

You could also enable a microservices based architecture by publishing the results so that other Streams applications can connect to it.

Use `Stream.publish()` to make the `running_total Stream` available to other applications. 

To send the stream to another database or system, use a sink function (similar to the source function) and invoke it using `Stream.for_each`.

In [11]:
import json
# publish results as JSON
running_total.publish(topic="TotalsReadings",
                      schema=json, 
                      name="PublishTotals")

# Other options include:
# invoke another sink function:
# running_total.for_each(func=send_to_db)

<streamsx.topology.topology.Sink at 0x7f855c1eff60>

<a name="launch"></a>

# 3. Submit the application
A running Streams application is called a *job*. Use this cell to submit the `Topology` for execution, using the `submit_topology` function [defined in step 1](#setup).

In [12]:
# The submission_result object contains information about the running application, or job
print("Submitting Topology to Streams for execution..")
submission_result = submit_topology(topo)

if submission_result.job:
  streams_job = submission_result.job
  print ("JobId: ", streams_job.id , "\nJob name: ", streams_job.name)
else:
  print("Submission failed: "   + str(submission_result))

Submitting Topology to Streams for execution..


IntProgress(value=0, bar_style='info', description='Initializing', max=10, style=ProgressStyle(description_wid…

Insecure host connections enabled.
Insecure host connections enabled.
Insecure host connections enabled.


JobId:  32 
Job name:  ibmstreamswithnotebook::CartItemCost_32


<a name="view"></a>

# 4. Use a `View` to access data from the job

Now that the job is started, use the `my_view` object you created in step 2.3 to start retrieving data from the `running_total Stream`.

In [13]:
print("Fetching view data ...")
# Connect to the view and display the data
queue = my_view.start_data_fetch()
try:
    for val in range(10):
        print(queue.get())    
finally:
    my_view.stop_data_fetch()

Fetching view data ...
{'customer_id': '10002', 'total': 2434.85}
{'customer_id': '10004', 'total': 863.28}
{'customer_id': '10018', 'total': 2775.21}
{'customer_id': '10022', 'total': 2199.57}
{'customer_id': '10044', 'total': 575.28}
{'customer_id': '10049', 'total': 323.28}
{'customer_id': '10051', 'total': 107.64}
{'customer_id': '10053', 'total': 3025.23}
{'customer_id': '10055', 'total': 466.56}
{'customer_id': '10059', 'total': 35.64}


## 4.1 Display the results in real time
Calling `View.display()` from the notebook displays the results of the view in a table that is updated in real-time.

In [14]:
# Display the results in real-time for 30 seconds
my_view.display(duration=30)

VBox(children=(HBox(children=(Text(value='Sample of cart content prices per customer', description='RunningTot…

## 4.2 See job status 

In IBM Cloud Pak for Data, you can view job status and logs with the Job Graph.

To view job status and logs:
<ol>
<li>From the main menu, go to <b>My Instances &gt; Jobs</b>. </li>
<li>Find your job based on the <code>JobId</code> printed when you submitted the topology.</li>
<li>Select <b>View graph</b> from the context menu action for the running job.</li>
</ol>

For all other development environments, use the Streams Console.

[See instructions and an example](http://ibm.biz/Bdz6yD).

<a name="cancel"></a>

# 5. Cancel the job
Streams jobs run indefinitely, so make sure you cancel the job once you are finished running the sample.


In [15]:
#cancel the job in the IBM Streams service
submission_result.cancel_job_button()

# You can also interact with the job through the Job object returned from submission_result.job
# For example, use job.cancel() to cancel the running job directly.
#streams_job.cancel()

VBox(children=(Button(button_style='danger', description='Cancel job: ibmstreamswithnotebook::CartItemCost_32'…

# Summary

We started with a `Stream` called `clicks`, which contained the data we wanted to analyze. Next, we used functions in the `Stream` object to perform simple analysis and produced the `running_totals` stream.  This stream is `published` for other applications running within our Streams instance to access.

After submitting the application to the Streams service, we used the `my_view` view to see the results.



# Learn more 

- **Find more samples**: This notebook is one of several sample notebooks available in the [starter notebooks repository on GitHub](https://github.com/IBMStreams/sample.starter_notebooks). Visit the repository for examples of how to connect to common data sources, including Apache Kafka, IBM, and Db2 Warehouse. 


- Learn more about how to use the API from the [development guide](http://ibmstreams.github.io/streamsx.documentation/docs/python/1.6/python-appapi-devguide/).