# LocalCart scenario part 2: Creating streaming pipelines


## Introduction 

A web or mobile app will trigger events as a user navigates a web site. These clickstream events indicate when a customer logs in, adds something to a basket, completes an order, and logs out. The events are placed into configured Message Hub (Apache Kafka) that provides a scalable way to buffer the data before it is saved, analysed, and rendered. Using the instructions in [Notebook #1 - Creating a Kafka Producer of ClickStream events](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-1.ipynb) we generate clickstream events for LocalCart and send them to Message Hub to show how data can be collected offline and streamed to the cloud later. A [Java app](https://localcartkafkaproducer.mybluemix.net/LocalCartKafkaProducer/) continuously feeds a simulated stream of events to Message Hub. 

This notebook is divided into two parts, describing how to use pipelines to perform streaming data analysis and save data for static analysis.


<img src="https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/nb2_flow.png"></img>


### Streaming data analysis

[Example 1: Capturing clickstream events for real-time analysis](intro_a). You can use streaming pipelines to performs event-based aggregation operations (calculate the number of currently open baskets and value of those baskets, ...) on the fly and store the results into a Redis database. The aggregated data can easily be visualized in real-time using web applications that monitor this database, as described in [Notebook#4:Visualize streaming data in a real-time dashboard](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-4.ipynb). 


### Static data analysis

[Example 2: Capturing clickstream events for static analysis](#intro_b). You can also use streaming pipelines to store clickstream events (as-is or in modified form) in flat files, which can be processed offline - either by batch processes or interactively, as outlined in [Notebook#3b: Analyze static clickstreams](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-3b.ipynb).


This notebook runs on Python 2 with Spark 2.0.

<a id="intro_a"></a>

***
# Example 1: Capturing clickstream events for real-time analysis
***


<img src="https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/streaming_analysis.png"></img>


In this first example you will create a pipeline that ingests `login`, `add_to_basket` and `checkout` clickstream events, aggregates them according to our business needs and stores the aggregated data in a Redis database, which will be monitored by a real-time dashboard:

<img src='https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/MARMARMAR_result.png'></img>

## Example 1 table of contents

* [E1.1 Redis setup](#redis)<br>
* [E1.2 Create a streaming pipeline](#create_p1) <br>
* [E1.3 Process login clickstream events](#login) <br>
* [E1.4 Process add_to_cart clickstream events](#addtocart) <br>
* [E1.5 Process checkout clickstream events](#addtocart) <br>
* [E1.6 Run the pipeline](#run_1)<br>
* [E1.7 Summary and next steps](#summary_1)<br>


<a id="redis"></a>
***

## E1.1 Redis setup

Redis is an in-memory database. It stores its data in RAM, making it a very fast way of storing and retrieving data. It provides a set of primitive data structures, but we only concern ourselves with [hashes](https://redis.io/commands#hash) for this exercise.

A Redis hash is a data structure that allows several keys to be stored together. We are going to configure a Redis hash called `funnel` that contains the following output:

- login_count - the number of people who logged into LocalCart
- basket_count - the number of items added into a shopping cart
- checkout_count - the number of purchases made
- basket_total - the total price of items added into a shopping cart
- checkout_total - the total price of items purchased

These are the outputs of the aggregation functions in our streaming pipeline. 

### E1.1.1 Provision a Redis instance 

If you are participating in the WDP Beta program a Redis service has already been provisioned for you. Proceed to the next section to collect your credentials.

1. Open [](https://apsportal.ibm.com/settings/services?context=analytics) in a new browser window. A list of your provisioned services is displayed.
1. Click **Create Service**.
1. Search for  **Compose by Redis**.
1. Provision a service instance. 

### E1.1.2 Collect your Redis connection information

1. Open the [Bluemix Data Services list](https://console.bluemix.net/dashboard/services) in a new browser window. A list of your provisioned services is displayed.
1. Locate the **Compose for Redis** service and click on the tile.
1. Open the _Service Credentials_ tab and view the credentials.
```
{
  "db_type": "redis",
  "maps": [],
  "name": "b...b",
  "uri_cli": "redis-cli -h **HOSTNAME** -p **PORT** -a **PASSWORD**",
  "deployment_id": "5...2",
  "uri": "redis://admin:**PASSWORD**@**HOSTNAME**:**PORT**"
}
```

Note your `**HOSTNAME**`, `**PORT**` and `**PASSWORD**` information

### E1.1.3 Verify your redis connectivity
You can verify your redis connectivity information in this notebook by installing the Python Redis library with the following command:

In [None]:
!pip install redis

We import the library and connect to Redis with the following command. Replace the credential placeholders with your credentials.

In [None]:
import redis
# TODO replace **HOSTNAME**, **PORT** and **PASSWORD** with your credentials
r = redis.StrictRedis(host='**HOSTNAME**', port=**PORT**, db=0, password='**PASSWORD**')

We can then create a hash called `funnel` to store our real-time data to the database by using the `hset` function:

In [None]:
r.hset('funnel', 'basket_count', 554);
r.hset('funnel', 'basket_total', 951);
r.hset('funnel', 'checkout_count', 21);
r.hset('funnel', 'checkout_total', 5400);
r.hset('funnel', 'login_count', 100);

We can also use this connection to retrieve all the values from our `funnel` hash using `hgetall`:

In [None]:
r.hgetall('funnel')

**Note:** 
The Redis connection above seems to freeze in this notebook after a minute or so. In this case, you will need to restart the notebook kernel to restore it.
<BR>
We can now create streaming pipelines that store aggregated data in Redis.

<a id="create_p1"></a>
***

## E1.2 Create a streaming pipeline

In IBM Data Science Experience, do these steps:

1. Select a project that you want to contain the streaming pipeline.
1. Click the **Analytics Assets** tab and scroll to the _Streaming Pipelines_ section.
1. Click **+ add streaming pipelines**.
1. In the _Create Streaming Pipeline_ window, 
  1. Enter pipeline name `events2redis`
  1. Select **Manual**. (You will use the wizard in Example 2.)
  1. Click **Create**.

An empty canvas is displayed, along with a list of _Source_, _Target_ and _Processing and Analytics_ operators that you can choose from. Source operators load data and target operators store data.

<a id="login"></a>
***

## E1.3 Process login clickstream events

First we need to collect `login` data from Message Hub and calculate the number of logins during a rolling one hour time window. The incoming `login` event payload has the following structure:
```
  {
    "customer_id": "13872",
    "click_event_type": "login",
    "total_price_of_basket": "0.0",
    "total_number_of_items_in_basket": "0",
    "total_number_of_distinct_items_in_basket": "0",
    "event_time": "2017-07-11 20:10:52 UTC"
  }
```


### E1.3.1 Configure the source

1. Drag a **MessageHub** source operator into the pipeline canvas.
1. Configure the MessageHub operator by doing these steps in the _Properties_ pane:
	1. Select your MessageHub instance.
	1. Select the `login` topic.
	1. Click **Edit Schema** to specify the payload properties this operator will make available to operators that are connected to its output port. Since we only want to count the number of login events we only make the `customer_id` available.
    1. Choose
            - Attribute Name: `customer_id`
            - Attribute Data Type: `Text` 
            - Attribute Path: `.customer_id` (note the leading period!)


Our streaming pipeline now has its first operator and looks like this: 

<img src='https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/M.png'></img>


### E1.3.2 Set up aggregation functions

Streaming data can be aggregated by applying functions such as sum, count, minimum, or maximum. The results of the aggregation can be done on the aggregation before it is written to the Redis database. Our aim is to calculate the number of people who logged into LocalCart for a sliding one-hour window.

In the pipeline canvas, do these steps:

1. Drag an **Aggregation** operator from the _Processing and Analytics_ area, and then drop it on the canvas next to the MessageHub operator.
2. Drag your mouse pointer from the output port of the MessageHub operator to the input port of the Aggregation operator to connect them.
3. Click the **Aggregation** operator to open its _Properties_ pane. Set the following _Aggregation Window_ parameters:
    - Type - `sliding`
    - Time Units - `hour`
    - Number of Time Units - `1`
    - Partition By - leave unchanged
    - Group By - leave unchanged
4. In the **Functions** area of the _Aggregation Properties_ pane, define one aggregation:
    - Aggregation 1: count the logins
        - Output Field Name - `login_count`
        - Function Type - `Count`
    Note: To identify how many different customers have logged in during the rolling 1 hour time window, we would use the `CountDistinct` function and apply it to `customer_id`.

Our pipeline now has two connected operators: a source operator and an aggregation operator. Hover over the arrow to review the data flow between them.

<img src='https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/op_op_io.png'></img>



### E1.3.3 Configure the target

Next, add a Redis target operator. In the streaming pipeline canvas, do these steps:

1. Drag a **Redis** operator from the _Target_ area, and then drop it on the canvas next to the Aggregation operator.
1. Drag your mouse pointer from the output port of the Aggregation operator to the input port of the Redis operator to connect them.
1. Click the **Redis** operator to open its Properties pane. 
    - Type in the `**HOSTNAME**`, `**PORT**` and `**PASSWORD**` credentials of your Redis by Compose service.
    - In the **Key Template** field, type in `funnel`. 
    - Click **Test Connection** to validate that your connection information is correct.
1. Save the pipeline. The setup for `login` event processing is complete.

  <img src='https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/MAR.png'></img>    



***

<a id="addtocart"></a>
## E1.4 Process add_to_cart clickstream events

Next we need to collect `add_to_cart` event data from Message Hub and calculate the number of shopping baskets and their combined value during a rolling one hour time window. The incoming `add_to_cart` event payload has the following structure:

```
{
    "customer_id": "13859",
    "click_event_type": "add_to_cart",
    "product_name": "Oatmeal",
    "product_category": "Food",
    "product_price": "2.49",
    "total_price_of_basket": "153.41",
    "total_number_of_items_in_basket": "19",
    "total_number_of_distinct_items_in_basket": "6",
    "event_time": "2017-06-23 12:56:18 UTC"
}
```

### E1.4.1 Configure the source, aggregation function and target for add_to_basket events

1. Drag another **MessageHub** source operator into the pipeline canvas.
1. Configure the MessageHub operator by doing these steps in the Properties pane:
	1. Select your MessageHub instance.
	1. Select the `add_to_basket` topic.
	1. Click **Edit Schema** to make the customer id and basket value available to connected operators.
      - Attribute Name: `customer_id` 
            - Attribute Data Type: `Text` 
            - Attribute Path: `.customer_id` 
      - Attribute Name: `total_price_of_basket` 
            - Attribute Data Type: `Number` 
            - Attribute Path: `.total_price_of_basket` 
1. Drag an **Aggregation** operator from the **Processing and Analytics** area, and then drop it on the canvas next to the MessageHub operator.
1. Drag your mouse pointer from the output port of the MessageHub operator to the input port of the Aggregation operator to connect them.
1. Click the **Aggregation** operator to open its _Properties_ pane. Set the following _Aggregation Window_ parameters:
    - Type - `sliding`
    - Time Units - `hour`
    - Number of Time Units - `1`
    - Partition By - leave unchanged
    - Group By - leave unchanged
1. In the **Functions** area of the _Aggregation Properties_ pane, define two aggregations:
    - Aggregation 1: count the baskets
        - Output Field Name - `basket_count`
        - Function Type - `Count`
    - Aggregation 2: Sum up basket values
        - Output Field Name - `basket_total`
        - Function Type - `Sum`
        - Apply Function to - `total_price_of_basket`
        
1. Copy the existing **Redis** operator that's already on the canvas and paste it next to the _Aggregation_ Operator. 
1. Drag your mouse pointer from the output port of the Aggregation operator to the input port of the Redis operator to connect them.

 Your pipeline is now configured to stream and aggregate `login` and `add_to_basket` events:
    
 <img src='https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/MARMAR.png'></img>    

1. Save your pipeline.




<a id="checkout"></a>
***

## E1.5 Process checkout clickstream events

First we need to create a streaming pipeline that collects `checkout` event data from a Message Hub operator and calculates the number of checkouts and their combined value during a rolling one hour time window. The incoming `checkout` event payload has the following structure:

```
{
    "customer_id": "11828",
    "click_event_type": "checkout",
    "total_price_of_basket": "72.80000000000001",
    "total_number_of_items_in_basket": "20",
    "total_number_of_distinct_items_in_basket": "5",
    "session_duration": "440",
    "event_time": "2017-06-23 13:09:12 UTC"
}
```

### E1.5.1 Set up pipeline source, aggregation function and target for checkout events

1. Drag another **MessageHub** source operator into the pipeline canvas.
1. Configure the MessageHub operator by doing these steps in the Properties pane:
	1. Select the ClickStream MessageHub instance.
	1. Select the `checkout` topic.
	1. Click **Edit Schema** to specify the message attributes that this pipeline will consume. Define the following attributes:
      - Attribute Name: `customer_id` 
            - Attribute Data Type: `Text` 
            - Attribute Path: `.customer_id` 
      - Attribute Name: `total_price_of_basket` 
            - Attribute Data Type: `Number` 
            - Attribute Path: `.total_price_of_basket` 
1. Drag an **Aggregation** operator from the **Processing and Analytics** area, and then drop it on the canvas next to the MessageHub operator.
1. Drag your mouse pointer from the output port of the MessageHub operator to the input port of the Aggregation operator to connect them.
1. Click the **Aggregation** operator to open its _Properties_ pane. Set the following _Aggregation Window_ parameters:
    - Type - `sliding`
    - Time Units - `hour`
    - Number of Time Units - `1`
    - Partition By - leave unchanged
    - Group By - leave unchanged
1. In the **Functions** area of the _Aggregation Properties_ pane, define two aggregations:
    - Aggregation 1: count checkouts
        - Output Field Name - `checkout_count`
        - Function Type - `Count`
    - Aggregation 2: Sum basket values
        - Output Field Name - `checkout_total`
        - Function Type - `Sum`
        - Apply Function to - `total_price_of_basket`
        
1. Copy the existing **Redis** operator that's already on the canvas and paste it next to the _Aggregation_ Operator. 
1. Drag your mouse pointer from the output port of the Aggregation operator to the input port of the Redis operator to connect them. The completed pipeline now looks as follows: <br>
   <img src='https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/MARMARMAR.png'></img>    

1. Save your pipeline.

<a id="run_1"></a>
## E1.6 Run the pipeline

1. To run the pipeline, click **Run**. 
1. Wait for the pipeline to start. If the pipeline does not start verify your pipeline setup. If no events are flowing from MessageHub Operators make sure that your producer (simulating user activity), which you've launched in notebook 1, is running. 
1. Click on any operator to display throughput information.

<img src= "https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/streaming_status.png"></img>

Congratulations! You just created a streaming pipeline that ingests clickstream data from MessageHub, aggregates data and stores it in Redis storage.


<a id="summary_1"></a>
## E1.7 Summary and next steps
In this section, you consumed and aggregated clickstream events that were generated in [Notebook #1: Creating a Kafka Producer of ClickStream events](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-1.ipynb).

You can now skip to [Notebook#4:Visualize streaming data in a real-time dashboard](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-4.ipynb) to learn about how to visualize the aggregated data in real-time using a simple web application or continue to the next section to configure a pipeline for static analysis.

<img src="https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/nb2_dashboard.png"></img>



<a id="intro_b"></a>

***
# Example 2: Capturing clickstream events for static analysis
***


In this second example you will create multiple pipelines that ingest all clickstream events and store them as-is in CSV files on Object Storage in preparation for static analysis, as illustrated in [Notebook#3b: Analyze static clickstreams](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-3b.ipynb).

<img src="https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/static_analysis.png"></img>


## Example 2 table of contents

* [E2.1 Create a pipeline for login clickstream events](#login_2) <br>
* [E2.2 Create additional pipelines](#more_2) <br>
* [E2.3 Summary and next steps](#summary_2)<br>


<a id="login_2"></a>
***

## E2.1 Create a pipeline for login clickstream events

In IBM Data Science Experience, do these steps:

1. Select a project that you want to contain the streaming pipeline.
1. Click the **Analytics Assets** tab and scroll to the _Streaming Pipelines_ section.
1. Click **+ add streaming pipelines**.
1. In the _Create Streaming Pipeline_ window, 
  1. Enter pipeline name `login2csv`.
  1. Select **Wizard**. 
  1. Click **Create**.
1. In the **Select Source** tab and click **MessageHub**.
1. Under the Instance drop-down menu, select your MessageHub instance.
1. Under the Topic drop-down menu, select **login**. Click **Continue**.
1. Wait for the Data Preview window to display the streaming data for the selected event. (If no data is displayed make sure your producer is running.)
 > You can customize the pre-defined schema (e.g. remove attributes or change data types) by clicking _Edit schema_. Do not make any changes at this time.
1. Click **Continue**.
1. In the Select Target window, click **Object Storage**.
1. Under the Object Storage Instance drop-down menu, select the instance that is used by the DSX project.
   <br>
   > Take note of the  Object Storage instance name. You will need this information in [Notebook 3b: Static clickstream analysis](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-3b.ipynb) when you load and analyze the clickstream events.
1. Under the Container drop-down menu, select the Object Storage container you want to write to. 
   <br>
   > Take note of the  Object Storage container name. You will need this information in [Notebook 3b: Static clickstream analysis](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-3b.ipynb) when you load and analyze the clickstream events.
1. Under File Name, type **login_TIMESTAMP.csv** (**Note:** "TIMESTAMP" is a reserved word that will be replaced with an actual timestamp when the file is written).
   > Note: if you choose a file name other than `login_TIMESTAMP.csv` you must also modify notebook 3B and change the default file name in the data load cell.
1. Under Format, select **csv**.
1. Under Delimiter, select **Comma (,)**.
1. Click **Save**. 
1. Click **Run** to start the pipeline.

<img src="https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/static_status.png"></img>

   If no errors are reported all login events are written to timestamped CSV files in Object Storage.
   
<img src="https://raw.githubusercontent.com/wdp-beta/get-started/master/notebooks/images/nb2_os.png"></img>   

<a id="more_2"></a>
***

## E2.2 Create additional pipelines

1. Repeat the steps in section E2.1 for the `add_to_cart`, `browsing`, `checkout`, `logout_with_purchase`, and `logout_without_purchase` events:
      1. Pipeline name: `addtocart2csv`, Message Hub instance: pick your instance name, Topic `add_to_cart`,  Object storage instance: same as before, Object Storage container name: same as before, File name: `addtocart_TIMESTAMP.csv`, Format: `csv`, Delimiter: `Comma (,)` 
      > Note: if you choose a filename other than the one listed here you must also modify notebook 3B and change the default file name in the data load cell.
      1. Pipeline name: `browsing2csv`, Message Hub instance: pick your instance name, Topic `browsing`,  Object storage instance: same as before, Object Storage container name: same as before, File name: `browsing_TIMESTAMP.csv`, Format: `csv`, Delimiter: `Comma (,)` 
      > Note: if you choose a filename other than the one listed here you must also modify notebook 3B and change the default file name in the data load cell.
      1. Pipeline name: `checkout2csv`, Message Hub instance: pick your instance name, Topic `checkout`,  Object storage instance: same as before, Object Storage container name: same as before, File name: `checkout_TIMESTAMP.csv`, Format: `csv`, Delimiter: `Comma (,)` 
      > Note: if you choose a filename other than the one listed here you must also modify notebook 3B and change the default file name in the data load cell.
      1. Pipeline name: `logoutwithpurchase2csv`, Message Hub instance: pick your instance name, Topic `logout_with_purchase`,  Object storage instance: same as before, Object Storage container name: same as before, File name: `logoutwithpurchase_TIMESTAMP.csv`, Format: `csv`, Delimiter: `Comma (,)`    
      > Note: if you choose a filename other than the one listed here you must also modify notebook 3B and change the default file name in the data load cell.
      1. Pipeline name: `logoutwithoutpurchase2csv`, Message Hub instance: pick your instance name, Topic `logout_without_purchase`,  Object storage instance: same as before, Object Storage container name: same as before, File name: `logoutwithoutpurchase_TIMESTAMP.csv`, Format: `csv`, Delimiter: `Comma (,)`         
      > Note: if you choose a filename other than the one listed here you must also modify notebook 3B and change the default file name in the data load cell.


<a id="summary_2"></a>
***

## E2.3 Summary and next steps
In this section, you created six streaming pipelines that persist clickstream event datato flat files in Object Storage. You can now do one of the following:

#### Accessing CSV files on Object Storage
1. Log in to [Bluemix](https://console.bluemix.net/) by using your DSX credentials.
1. Navigate to the space where the Object Storage instance is located. This space is what you selected when you created the DSX project.
1. Open the Object Storage instance.

#### Accessing CSV files on Object Storage manually
1. Open the **Manage** tab, and then select the container that you specified when you created the data collection pipeline. 
1. Select a CSV file. In the "Select Action"" list, select "Download File" to view it.

#### Accessing CSV files on Object Storage programatically
1. Open the **Service credentials** tab. Select a Key Name, and then click **View credentials**. 
1. Copy the credentials and provide this information whenever you want to load data files programatically, such as in [Notebook 3b: Static clickstream analysis](https://github.com/wdp-beta/get-started/blob/master/notebooks/localcart-scenario-part-3b.ipynb).



***

### Authors

Glynn Bird is a Developer Advocate for Watson Data Platform at IBM. 

Raj Singh is a Developer Advocate for Watson Data Platform at IBM.

***
Copyright © IBM Corp. 2017. This notebook and its source code are released under the terms of the MIT License.