<div style="background:#F5F7FA; height:100px; padding: 2em; font-size:14px;">
<span style="font-size:18px;color:#152935;">Want to do more?</span><span style="border: 1px solid #3d70b2;padding: 15px;float:right;margin-right:40px; color:#3d70b2; "><a href="https://ibm.co/wsnotebooks" target="_blank" style="color: #3d70b2;text-decoration: none;">Sign Up</a></span><br>
<span style="color:#5A6872;"> Try out this notebook with your free trial of IBM Watson Studio.</span>
</div>


# Ingest data from Message Hub in a streams flow


If your organization uses Message Hub to communicate between applications, you can easily ingest streaming data from Message Hub for analysis in real time by using IBM Streams Designer. Streams Designer is a <a href="https://developer.ibm.com/streamsdev/2017/11/28/quickly-create-streams-applications-using-new-streams-designer/" target="_blank" rel="noopener noreferrer">web based graphical IDE</a> to help you create streaming analytics applications without having to write a lot of code. Applications created with Streams Designer are called *flows*. This tutorial will show you how to create a streams flow that uses streaming data from Message Hub. The final result is shown below.

![mhub flow](https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2018/01/messagehubflow.gif)
This tutorial builds on the Data Historian example streams flow. That application uses sample data as its data source. The sample data is a stream of readings from weather stations (temperature, amount of rain, etc.) and is used to compute the average of those readings in the last hour. 

In this tutorial, you'll modify that example to use Message Hub as its data source.

This notebook runs under Python 3.5 with Spark 2.1.


## Table of Contents
1. [Prerequisites](#prereq)
1. [Set up a Message Hub instance](#step2)
    <br/>2.1 [Create a Message Hub instance](#step21)
    <br/>2.2 [Create a topic](#step22)
1. [Publish data to the Message Hub service](#step3)
    <br/>3.1 [Get your Message Hub credentials](#step31)
    <br/>3.2 [Install Confluent’s Apache Kafka Python client](#step32)
    <br/>3.3 [Generate sample data](#step33)
    <br/>3.4 [Start publishing the sample data to Message Hub](#step34)
1. [Use the data from Message Hub in your streams flow](#step4)
1. [Troubleshooting](#step5)
1. [Next steps - Send data to Message Hub from your streams flow](#next)


<a id="prereq"></a>
## Prerequisites

Since the goal of this notebook is to modify the Data Historian example flow to use Message Hub as its data source, you need to import the Data Historian example flow. If you already have the flow imported in a project, skip this step.
Otherwise, [watch this video](https://www.youtube.com/watch?v=rCNgJopanrY)  or follow the instructions below to import the Data Historian example flow.

1. Go to **Tools** > **Streams Designer**, or from a project, click **Add to project** > **Streams flow.**
1. Select **From example**. Choose a Streaming Analytics service, or create one if prompted to do so.  
1. Click **Data Historian Example**. 
1. Select a connection to Cloud Object Storage. 
1. Under **File path**, click the slider button to select a bucket from your Object Storage instance. Enter a file name after the bucket name, like `/mybucket/data_historian_results_%TIME.txt`.
1. Click **Create**. After the project is created, click *Run* to start the flow.
1. Verify that data is being generated by logging into your Cloud Object Storage service to view the results file.


You can learn more about this example [here](https://dataplatform.ibm.com/docs/content/streaming-pipelines/data_historian_example_pipeline.html?audience=wdp&context=analytics&linkInPage=true).


<a id="step2"></a>
## 2. Set up your Message Hub instance

<a id="step21"></a>

### 2.1 Create a Message Hub instance


If you do not already have an instance of Message Hub, you must create one. You can do so from any page in the Watson Studio.   You must have already added a credit card to your IBM Cloud account to have access to Message Hub service.
 - Select **Data Services** from the toolbar, and then click **Services**. 
    ![Services page](https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/11/wdp-service.png)
 - Click **Create new** in the top right.
 - Choose **Message Hub** from the list of services that appear.
 - Click **Standard** to select the Standard pricing plan, and then click **Create**.
 - In the Confirm Creation dialog box that appears, change the Service Name if you wish.
 - Your service will be created, and then you will be returned to the Services page.


<a id="step22"></a>

### Create a topic
- In the Services page, find your Message Hub service. In the context menu under **Actions**, click **Manage in IBM Cloud**. 
![Manage in Cloud](https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/11/manage.png)
<br/>
- The Manage page of the service opens in a new tab. 
- A list of topics will appear. Click the **+** sign under **Topics** to add a new topic called `temperature`, and then click **Create topic**. 
![Create new topic](https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/11/topic.png)
<br/><br/>

<a id="step3"></a>

## 3. Publish data to your Message Hub instance
Before you can use data *from* Message Hub in your streams flow, you'll need an application that is sending data *to* Message Hub.  The next few cells of this notebook will create a simple application that will generate and send data to Message Hub for use in your streams flow.

The application needs to be configured with credentials of your Message Hub service so that it can securely send data.

So, first you must retrieve credentials for your Message Hub instance.

<a id="step31"></a>

### 3.1 Get your Message Hub service credentials
- In the Manage page of your service, click **Service credentials**.
- A set of service credentials should already be created. If not, click **New credential** and accept the defaults. 
- Click **View credentials**, and then click the copy button to copy them to the clipboard.
![copy credentials](https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/11/copycredentials.png)

- Paste them in the cell below where indicated.


<a id="creds"></a>
#### Paste your Message Hub credentials here

In [1]:
message_hub_credentials = #PASTE CREDENTIALS HERE#

<a id="step32"></a>
### 3.2 Install Confluent’s Apache Kafka Python client
This client contains the API used to communicate with Message Hub.

In [3]:
!pip install confluent-kafka



<a id="step33"></a>
### 3.3 Generate sample data

The next cell will generate simulated data for 4 weather stations. Each weather station is represented by the `Device` class. The `Device` class generates JSON data in the same input schema that is used in the Data Historian example pipeline:
```
{
"id": "IALBERTA598",
"tz": "America/Edmonton",
"dateutc": "2017-01-24 05:03:50",
"latitude": 50.88381958,
"longitude": -113.98414612,
"temperature": "16.399999618530273",
"baromin": 30.69109211987511,
"humidity": 109.53158304521693,
"rainin": 0,
"time_stamp": "2017-11-21 01:41:05"
}
```


In [2]:
from random import random
from datetime import datetime, timezone
class Device(object):
    def __init__(self, name, latitude, longitude, timezone):
        self.id = name
        self.latitude = latitude
        self.longitude = longitude
        self.tz = timezone
        self.temp = random() * 50
        self.humidity = 50*random() + 50
        self.baromin = 75 *random()
    def getrain(self, humidity):
        if (humidity > 75.0 or self.temp > 30):
            return humidity * (0.001 * random())
    def getReadingAsJSON(self):
        humidity = (self.humidity * 0.98) + random()
        reading =   {
            "id": self.id,
            "tz": self.tz,
            "dateutc": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S"),
            "latitude": self.latitude,
            "longitude": self.longitude,
            "temperature": (self.temp * 0.95) + random(),
            "baromin": (self.baromin * 0.95) + random(),
            "humidity": humidity,
            "rainin": self.getrain(humidity),
            "time_stamp": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
          }
        return reading

def list_as_str(lizt):
    return ",".join(lizt)


def generate_data(duration=15):
    sample_data = [("IALBERTA384","America/Edmonton",51.07976532,-115.33161163),
            ("IANDALUC208", "Europe/Madrid", 37.62454224, -2.70604014), 
            ("IANSEROY2", "Indian/Mahe", -4.74099493,55.51583862), 
            ("I1189","Asia/Yekaterinburg",57.15063477, 65.56357574)]
    devices = []
    for sample in sample_data:
        name, timezone, latitude, longitude = sample
        devices.append(Device(name,latitude,longitude, timezone))
    
    duration = duration * 60
    if (duration is 0):
        print("Will produce data indefinitely. Click Kernel >Interrupt Kernel to stop producing data.")
    count = 0
    while count < duration or duration is 0:
        for device in devices:
            reading = device.getReadingAsJSON()
            yield reading
        time.sleep(1.0)
        count = count + 1

<a id="step34"></a>
### 3.4 Start publishing  sample data to Message Hub

We use the service credentials you pasted earlier to create an instance of the `Producer` class and then publish a stream of weather station readings to the *temperature* topic. You can use the `TOPIC` variable to change the topic.


*Note*: This cell will run for 20 minutes in a background job. If your streams flow stops receiving data, re-run the cell to keep generating data. Alternatively, change the `DURATION_IN_MINUTES` variable. You can set the `DURATION_IN_MINUTES` to a larger value, or to `0` to run this cell indefinitely.


In [3]:
from confluent_kafka import Producer
import time
from datetime import datetime
import json

TOPIC = "temperature"
DURATION_IN_MINUTES = 20

mhub_config  = {
    'debug': 'msg',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.password': message_hub_credentials["password"],
   'bootstrap.servers': list_as_str(message_hub_credentials["kafka_brokers_sasl"]),                               
   'sasl.username': message_hub_credentials["user"]
}
                                    
def start():
    producer = Producer(**mhub_config)
    for reading in generate_data(DURATION_IN_MINUTES):
        producer.produce(TOPIC, value=json.dumps(reading).encode('utf-8'))
    producer.flush()


            
from IPython.lib import backgroundjobs as bg

if DURATION_IN_MINUTES > 0:
    print(datetime.now().ctime() +": Sending data to Message Hub for %d minutes" % DURATION_IN_MINUTES)

jobs = bg.BackgroundJobManager()
jobs.new(start)

Tue Jan  9 16:46:39 2018: Sending data to Message Hub for 20 minutes
Starting job # 0 in a separate thread.


<BackgroundJob #0: <function start at 0x7fe38ebf7510>>

<a id="step4"></a>

## 4. Use the data from Message Hub in your streams flow

Now that we have data being sent to the Message Hub instance, you can use it in your flow. If the flow is currently running, click the *stop* button to stop it, and then click *edit* to edit it in the canvas.



### Add and configure the Message Hub operator 

- Drag the Message Hub operator from the Source list to the canvas.
- Under **Connection**, click **Add connection**. In the Create Connection window, choose your service instance under **Your service instances in IBM Cloud**. The credentials will be populated in the dialog. Click **Create**.
![Message Hub Connection dialog](https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/11/dialog.png)

- Under **Topic**, the list will be populated with the topics you have created in Message Hub.  Select the *temperature* topic.
- Select **Edit Schema**, and then click **Show Preview** to verify that the data from the notebook is being sent correctly. You should see a preview of the data. If you do not see any data, make sure that the service credentials and name match what you created earlier.  Also, verify that the previous cell is running.
- Click **Detect schema** to automatically determine the output schema of the operator. Make sure the values are as follows:
    - `id` : Text
    - `tz`: Text
    - `dateutc`: Date
    - `latitude`: Number
    - `longitude`: Number
    - `temperature`: Number
    - `baromin`: Number
    - `rainin`:  Number
    - `humidity` : Number
    - `time_stamp`: Date
   
<img alt="Schema detection" src="https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/11/schema.png"></img>
- Click **Save**, and then **Close** to close the Edit Schema window.
- Connect the Message Hub operator to the Aggregation operator, and then delete the Sample Data operator.  Your graph should look like this:
<img alt="flow diagram with message hub" src="https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/12/finalflow1.png"></img>
- •	Click **Save**, and then click **Run** to start the streams flow. After it starts, you should see data flowing from Message Hub.

<img alt="running streams flow" src="https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2017/12/runningflow2.png"></img>
    
    
<a id="step5"></a>
## 5. Troubleshooting

If you get any errors, check the following points:
- Your Streaming Analytics service is started.
- Credentials are correctly copied from Message Hub and <a href="#creds">pasted in the cell</a>.
- Data is being sent to Message Hub, [check step 3.4](#step34).
You could modify the cell below to try to read from the `temperature` topic. Change `RESULTS_TOPIC_NAME` to `temperature`. If that cell prints data from Message Hub, then the problem you are having is likely a related to the configuration of your Streams flow.


<a id="next"></a>
## 6. Next steps - Send data to Message Hub from your streams flow
See if you can change the Streams flow to send the results  *to* Message Hub instead of Object Storage. 
If it is working correctly, the next cell will print out the results of the Aggregation operator.
 
**Hint:** You will need to create a new topic in Message Hub. Call it `results`.
You'll also need the Message Hub target operator, which you will find in the *Target* category in the canvas.

In [20]:
from confluent_kafka import Consumer, KafkaError

RESULTS_TOPIC_NAME = "results"
mhub_config_consumer  = {
     'group.id': 'mygroup',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.password': message_hub_credentials["password"],
   'bootstrap.servers': list_as_str(message_hub_credentials["kafka_brokers_sasl"]),                               
   'sasl.username': message_hub_credentials["user"]
}
consumer = Consumer(**mhub_config_consumer)
consumer.subscribe([RESULTS_TOPIC_NAME])
for i in range(10): #only print the first 10 messages.
    message = consumer.poll(timeout=2.0)
    
    if message is not None:
        if message.error():
            error = message.error()
            if error.code() == KafkaError._PARTITION_EOF :
                print("No more messages. Is data being sent to the %s topic? " % RESULTS_TOPIC_NAME) 
                break
            else:
                print("Error: " + error.str())
                break
        else:
            print("Incoming data: %s" % message.value().decode("utf-8"))        
        
        
consumer.close()

Incoming data: { "id" : "I1189", "tz" : "Asia/Yekaterinburg", "dateutc" : "2017-12-11T20:59:34", "time_stamp" : "2017-12-11T20:59:34", "longitude" : 65.56357574, "latitude" : 57.15063477, "rainin" : 0, "humidity" : 49.2586993697516, "baromin" : 18.2346408820349, "temperature" : 0.0232874316157452 }
Incoming data: { "id" : "IALBERTA384", "tz" : "America/Edmonton", "dateutc" : "2017-12-11T20:59:36", "time_stamp" : "2017-12-11T20:59:36", "longitude" : -115.33161163, "latitude" : 51.07976532, "rainin" : 0, "humidity" : 22.117086699143, "baromin" : 68.1079591166223, "temperature" : 0.0183134239080065 }
Incoming data: { "id" : "IANDALUC208", "tz" : "Europe/Madrid", "dateutc" : "2017-12-11T20:59:36", "time_stamp" : "2017-12-11T20:59:36", "longitude" : -2.70604014, "latitude" : 37.62454224, "rainin" : 0, "humidity" : 25.9265311927769, "baromin" : 27.7391455939335, "temperature" : 0.0195746529946135 }
Incoming data: { "id" : "IANSEROY2", "tz" : "Indian/Mahe", "dateutc" : "2017-12-11T20:59:36", 

### More information
- [Learn more about the other operators available in Streams Designer](https://dataplatform.ibm.com/docs/content/streaming-pipelines/creating-pipeline-manually.html?audience=wdp).
- [Visit Streamsdev](https://developer.ibm.com/streamsdev) to learn more about Streams.


### Author

**Natasha D'Silva** is a software developer at IBM Canada who specializes in streaming technology and cloud solutions.

Copyright © IBM Corp. 2017, 2018. This notebook and its source code are released under the terms of the Apache 2.0 License.