# Access streaming data with REST services

*Create a service that allows you to send and receive data to your IBM Streams application*

## Introduction

The goal of this tutorial is to teach you how to add a web service to a streaming application. Streaming apps start with a data source, do some data processing (such as filtering and aggregation), and send the results to one or more targets. By adding a web service with REST endpoints, we'll show you how you can `POST` data to an IBM Streams application and `GET` the data produced by the application
This sample demonstrates creating a web service that acts as an event source and event sink for a Streams application. The service runs as an instance on Cloud Pak for Data and provides a REST API that you can use to POST and GET data to/from the application.

### How it works
   
The web service that is created will run as a service instance on Cloud Pak for Data. Your web service clients can run anywhere. You can test it from your command-line with cURL or integrate the Streams application with another application. We'll demonstrate how to test it from a Python notebook and from the web service's built-in Swagger UI.

### Prerequisites

This notebook requires a **Python 3.6 kernel** on **IBM Cloud Pak for Data 3.5** and an instance of **IBM Streams**.

### Links

- [Streams Python development guide](https://ibmstreams.github.io/streamsx.documentation/docs/latest/python/)
- [Streams Python API](https://streamsxtopology.readthedocs.io/)
- [Blog post about feature details and documentation](https://community.ibm.com/community/user/cloudpakfordata/blogs/natasha-dsilva1/2020/12/07/send-and-receive-streaming-data-via-rest-with-ibm?CommunityKey=c0c16ff2-10ef-4b50-ae4c-57d769937235)
- [Tutorial: Access streaming data with rest services](/tutorials/access-streaming-data-with-rest-services/)

## Configure the Streams instance

In order to submit a Streams application you need to provide the name of the Streams instance.

1. From the navigation menu, click `Services > Instances`.
1. Update the value of `STREAMS_INSTANCE_NAME` to the name of your provisioned Streams instance in the cell below.
1. Run the cell and continue to the next cell.

In [None]:
# Edit to specify the name of the streams instance to use

STREAMS_INSTANCE_NAME = "<your-streams-instance-name>"

#### Set your USERNAME and PASSWORD to use with the REST API
Run this cell to be prompted for USERNAME and PASSWORD. These variables will be used to test REST requests later in the notebook.

In [None]:
import getpass
USERNAME = input("CPD USERNAME:  ")
PASSWORD = getpass.getpass("CPD PASSWORD:  ")

## Example 1: Create a Streams application that makes data available via REST
This application will use the `EndpointSink` operator to make data from the Streams application accessible via GET requests.
We will submit a Streams application that sends data to the endpoint sink. Once the application is running, we will have a new instance of the Streams job service listed under `Instances` in the CPD menu.
After getting the REST endpoint from the Streams job service instance, we can use the Python requests module to get data from the Streams application.


### Import `streamsx` and other Python modules

In [None]:
import datetime
import os
import random
import time
import typing

from streamsx.service import EndpointSink
from streamsx.service import EndpointSource
from streamsx.topology.topology import Topology
import streamsx.topology.context

print("INFO: streamsx package version: " + streamsx.topology.context.__version__)

# For more details uncomment line below
#!pip show streamsx

### Create reusable submit_topology method

The cell below defines a function called `submit_topology` which will be used in each example to submit a `Topology` once it is defined.

In [None]:
from icpd_core import icpd_util
from streamsx.topology import context

try:
    cfg = icpd_util.get_service_instance_details(name=STREAMS_INSTANCE_NAME, instance_type="streams")
except TypeError:
    cfg = icpd_util.get_service_instance_details(name=STREAMS_INSTANCE_NAME)

def submit_topology(topo):
    print("Submitting Topology to Streams for execution...")

    global cfg
    # Disable SSL certificate verification if necessary
    cfg[context.ConfigParams.SSL_VERIFY] = False
    # Topology wil be deployed as a distributed app
    contextType = context.ContextTypes.DISTRIBUTED
    submission_result = context.submit (contextType, topo, config=cfg)
    if submission_result.job:
        print("JobId: ", submission_result.job.id , "\nJob name: ", submission_result.job.name)
    else:
        print("Submission failed: " + str(submission_result))
        
    return submission_result

print("Setup complete")

### Define the schema

To work with REST endpoints, a schema must be defined for data tuples. We create a class that inherits from typing.NamedTuple and use type annotations to specify data types for each field in the tuple.

In [None]:
class Readings(typing.NamedTuple):
    reading: float
    index: int
    address: str
    timeStamp: int

### Define a source data generator
We need a source for our example topology. This function generates one Readings tuple every second to use a data source.

In [None]:
def readings() -> typing.Iterable[Readings] :
    counter = 0
    while True:
        time.sleep(1)
        address = "A7" + str(random.randint(14, 40)) + "_"+ chr(random.randint(65,123))
        timeStamp = int(datetime.datetime.now().timestamp())
        yield Readings(random.randint(1,100), counter, address, timeStamp)
        counter = counter + 1

### Create the topology
Using the streamsx Topology class, we create a topology and set a source that streams to a sink.

The key to creating a web service which allows us to GET the result data via a REST API is the EndpointSink. Notice the topology ends with a sink. This will result in the creation of the web service instance on CPD.

In [None]:
# Use the CPD project name as the namespace
project = os.environ["PROJECT_NAME"]

# Create a topology and give it a name and namespace
sender_topology = Topology(name="Sample_REST_Sink", namespace=project)

# Start with a source data operator
source = sender_topology.source(readings)

# Provide service and endpoint documentation (optional)
service_documentation={
   'title': 'random-readings-rest-sink',
   'description': 'RANDOM READINGS GENERATOR',
   'version': '1.0.0',
   'externalDocsUrl': 'https://example.com/randomreadings/doc',
   'externalDocsDescription': 'Random readings generator documentation'
}
tags = dict()
tag1 = {
   'Output': {
      'description': 'Output tag description',
      'externalDocs': {
         'url': 'https://example.com/randomreadings/output/doc',
         'description': 'Output tag external doc description'
      }
   }
}
tags.update(tag1)
service_documentation['tags'] = tags

endpoint_documentation = dict()
endpoint_documentation['summary'] = 'Random readings endpoint sink'
endpoint_documentation['tags'] = ['Output']
endpoint_documentation['description'] = 'Streams job emits some data with random readings'

doc_attr = dict()
descr = {'reading': {'description': 'Random generated reading'}}
doc_attr.update(descr)
descr = {'index': {'description': 'Incremental counter'}}
doc_attr.update(descr)
descr = {'address': {'description': 'Random generated address'}}
doc_attr.update(descr)
descr = {'timeStamp': {'description': 'Timestamp'}}
doc_attr.update(descr)
endpoint_documentation['attributeDescriptions'] = doc_attr

# Add an EndpointSink with service and endpoint documentation
# Send each tuple on the source stream to the EndpointSink operator.
# This operator will create a REST endpoint that you can use to access the data from the stream. 
source.for_each(EndpointSink(
    buffer_size=100000,
    service_documentation=service_documentation,
    endpoint_documentation=endpoint_documentation), name='Send to Job Service_Tuples')


### Submit the application


In [None]:
# The send_job_submission_result object contains information about the running application, or job

send_job_submission_result = submit_topology(sender_topology)

### Test the REST endpoint

Now we'll try retreiving data using the REST endpoint.

Once the topology is submitted, under **Instances** there will be a new entry titled `<streams-instance-name>.<space_name>.<job id>`.

E.g. if the submitted job has id 4 and the Streams instance is called `sample-instance`, and you used the default Streams space, then there will be an entry like `sample-instance.sample-instance.4` in the list of instances.

Click that entry to get to the Swagger page for the REST service, which will list the URL for the GET endpoint and also allow you to try retrieving data from the Streams job.

The full URL to retrieve data from the application you just submitted will be `CPD_URL` + `GET_ENDPOINT`.


In [None]:
# Edit this GET_ENDPOINT to set the value you get from the Swagger UI
GET_ENDPOINT = "<your-get-endpoint>"
# Edit this BASE_CPD_URL to set the URL of your CPD
BASE_CPD_URL = "<your-cpd-url>"

ENDPOINT = BASE_CPD_URL + GET_ENDPOINT

### Test the GET endpoint

In [None]:
import requests
import time
with requests.Session() as s:
    s.auth=(USERNAME, PASSWORD)
    s.cookies = requests.cookies.RequestsCookieJar()

    last_mod = 0
    for i in range(2):
        rsp = s.get(ENDPOINT, verify=False, headers={'If-Modified-Since-Milliseconds': str(last_mod)})
        if rsp.status_code != 200:
            print(rsp.text)
            print(rsp)
            break
        items =  rsp.json()['items']
        print(items)
        last_mod = int(rsp.headers['Last-Modified-Milliseconds'])
        time.sleep(1.0)

## Cancel the job and instance


In [None]:
# To cancel without the button use: send_job_submission_result.job.cancel()
send_job_submission_result.cancel_job_button()

#  Example 2: Create a Streams application that ingests data via REST

This application will use the `EndpointSource` operator to receive streaming data via POST requests. It receives a number, doubles it, and sends the new value back out via  the `EndpointSink`.

### Define the schema

To work with REST endpoints, a schema must be defined for data tuples. We create a class that inherits from typing.NamedTuple and use type annotations to specify data types for each field in the tuple.

In this example, we define a schema for both the incoming data that and the outgoing data.

In [None]:
# Define the schema of the incoming data
class IncomingReadings(typing.NamedTuple):
    index: int
    ID: str
    timeStamp: datetime.datetime

# Define the schema of the outgoing data
class DoubledReading(typing.NamedTuple):
    doubled_value: int
    index: int
    ID: str
    timeStamp: datetime.datetime

In [None]:

def double_value(data):
    doubled_value = data.index*2
    return DoubledReading(doubled_value, data.index, data.ID, data.timeStamp)

# Use the CPD project name as the namespace
project = os.environ["PROJECT_NAME"]
reciever_topology = Topology(name="Sample_REST_Source", namespace=project)

service_documentation={
   'title': 'double-rest-source',
   'description': 'SOURCE READINGS GENERATOR',
   'version': '1.0.0'
}
endpoint_documentation = {
   'summary': 'Readings endpoint source',
   'description': 'Readings endpoint source to be doubled'
}

# Invoke the EndpointSource operator as the data source
source_operator = reciever_topology.source(EndpointSource(
   schema=IncomingReadings,
   buffer_size=10000,
   service_documentation=service_documentation,
   endpoint_documentation=endpoint_documentation), name='IngestData')

incoming = source_operator

# Add a doubled value
doubled = incoming.map(double_value, schema=DoubledReading)

# Provide service and endpoint documentation (optional)
service_documentation={
   'title': 'doubled-rest-sink',
   'description': 'DOUBLED READINGS GENERATOR',
   'version': '1.0.0',
   'externalDocsUrl': 'https://example.com/doubledreadings/doc',
   'externalDocsDescription': 'Doubled readings generator documentation'
}
tags = dict()
tag1 = {
   'Output': {
      'description': 'Output tag description',
      'externalDocs': {
         'url': 'https://example.com/doubledreadings/output/doc',
         'description': 'Output tag external doc description'
      }
   }
}
tags.update(tag1)
service_documentation['tags'] = tags

endpoint_documentation = dict()
endpoint_documentation['summary'] = 'Doubled readings endpoint sink'
endpoint_documentation['tags'] = ['Output']
endpoint_documentation['description'] = 'Streams job emits data with doubled readings'

doc_attr = dict()
descr = {'doubled_value': {'description': 'Doubled reading'}}
doc_attr.update(descr)
descr = {'index': {'description': 'Incremental counter'}}
doc_attr.update(descr)
descr = {'ID': {'description': 'Identifier'}}
doc_attr.update(descr)
descr = {'timeStamp': {'description': 'Timestamp'}}
doc_attr.update(descr)
endpoint_documentation['attributeDescriptions'] = doc_attr

# Add an EndpointSink with service and endpoint documentation
# Make the result available via REST (GET)
doubled.for_each(EndpointSink(
    buffer_size=100000,
    service_documentation=service_documentation,
    endpoint_documentation=endpoint_documentation), name='Send Back To Job Service')

# Create a view to see the result data
result_view = doubled.view(name="Preview of Result data")

## Submit the ingest application

In [None]:
ingest_job_submission_result = submit_topology(reciever_topology)

## Send some data to the Streams job
Find the POST endpoint for the Receive job, from CPD, go to  Services > Instances and you will see something like `instance_name.instance_name.jobid` where jobid is the job id printed above.

Click that entry and you'll see the POST endpoint for the job.
Copy the POST endpoint and paste it below where indicated.


In [None]:
# Edit this POST_ENDPOINT to set the value you get from the Swagger UI
POST_ENDPOINT = "<your-post-endpoint>"

# Edit this BASE_CPD_URL to set the URL of your CPD (in case you didn't set it earlier)
# BASE_CPD_URL = "<your-cpd-url>"

ENDPOINT = BASE_CPD_URL + POST_ENDPOINT

In [None]:
import threading
import json
import requests
DURATION = 20 # How long to send data to the streams app for, in seconds
                            
def send_data():
    print('POSTing data to the Streams app')
    url = ENDPOINT
    with requests.Session() as s:
        s.auth = (USERNAME, PASSWORD)
        s.cookies = requests.cookies.RequestsCookieJar()
        numItems = 999 # send 1000 at a time
        # We set the buffer size to hold 10000 so we should not lose any data
        counter = 1
        for i in range(DURATION):
            items = []
            for i in range(numItems):
                stamp = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]+"+0000"
                items.append(dict(index=counter, timeStamp=stamp, ID=chr(65+(counter%10))))
                counter = counter+ 1

            req_data = json.dumps(dict(items=items))
            rsp = s.post(url, data=req_data, headers={"Content-Type": "application/json"}, verify=False)
            #print(req_data)
            if rsp.status_code != 204:
                message = "Error sending data: %s" % rsp.text
                print(message)
                break
            time.sleep(2.0)


from IPython.lib import backgroundjobs as bg
jobs = bg.BackgroundJobManager()
jobs.new(send_data)

## Use a view to see the data sent to the application
Run the cell below to see a sample of the data you sent via REST and of the modified stream. 
If no data is returned, that means you need to resend more data to the Streams application by rerunning the above cell. 
This is because Views only return a sample of the data _currently flowing through the application_. You can also increase the `DURATION` to send data for longer period of time.

In [None]:
queue = result_view.start_data_fetch()
for i in range(30):
    print(queue.get(timeout=30))
result_view.stop_data_fetch()

## Preview the result data

In [None]:
result_view.display(duration=20)

## Cancel the job and instance


In [None]:
ingest_job_submission_result.cancel_job_button()