# Send data to Streams jobs via REST

This is a utility to send some code to a Streams job that has REST endpoints enabled. 

This notebook can be modified for use with any Streams job that uses the `EndpointSink/Source` operators.
 - To send data:
     - [Fill in the `POST_ENDPOINT` and `BASE_CPD_URL` values](#urls).
     - Modify the [Send data](#send) cell to send data to your endpoint.
 - To retrieve data, 
     - [Modify the `GET_ENDPOINT` and `BASE_CPD_URL`](#urls)
     - [Run the `Get data` cell](#get). 

### Sample Streams application
If you do not have a Streams application, this notebook will work as-is as with this tutorial, so follow the tutorial if you haven't already to start running the Streams jobs.

## Get the URLs for the REST endpoint(s)


Once the Streams job is running, in the Cloud Pak for Data home page under **Services > Instances** there will be a new entry for the job's web service of type `streams-application` titled `<space_name>.<job id>` 

E.g. if the submitted job has id 4 and you used the default Streams space, then there will be an entry like `sample-instance.sample-instance.4` in the list of instances.

Click that entry to get to the Swagger page for the REST service, which will list the URL for the GET endpoint and also allow you to try retrieving data from the Streams job.

For each endpoint listed under **Data access endpoints**, the full URL for the endpoint will be `CPD_URL` + `ENDPOINT`.



<a id="urls"></a>
Find the URL for the `CalculationResult_Results` and `IncomingRequest_Operands` endpoints.

In [1]:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Edit these to set the value you get from the Swagger UI
GET_ENDPOINT = "<paste endpoint here>"
POST_ENDPOINT = "<paste endpoint here>"
BASE_CPD_URL = "<PASTE CPD URL>"
POST_ENDPOINT = "/streams_application/v1/tooling-55-cpd_sample-calculatorservice-68/endpoints/IncomingRequest_Operands/data"


GET_ENDPOINT = "/streams_application/v1/tooling-55-cpd_sample-calculatorservice-68/endpoints/CalculationAnalyzer_Results/data"
# Edit this BASE_CPD_URL to set the URL of your CPD
BASE_CPD_URL = "https://tooling-55-cpd-cpd-tooling-55-cpd.apps.cpstreamsx6.cp.fyre.ibm.com"

GET_URL = BASE_CPD_URL + GET_ENDPOINT
POST_URL = BASE_CPD_URL + POST_ENDPOINT

#### Set your USERNAME and PASSWORD to use with the REST API
Run this cell to be prompted for USERNAME and PASSWORD. These variables will be used to test REST requests later in the notebook.

In [2]:
import getpass
USERNAME = input("CPD USERNAME:  ")
PASSWORD = getpass.getpass("CPD PASSWORD:  ")
USERNAME = "admin"
PASSWORD = "password"

CPD USERNAME:  admin
CPD PASSWORD:  ········


<a id="send"></a>
## Send some data to the Streams job

If you are running the Calculator sample, simply run this cell.  It sends the input data to the application.

Otherwise,  modify the `get_new_tuple` function to create a tuple that will be sent to your Streams application. This function is called repeatedly by the next cell. By default it just sends two numbers, `x` and `y`. 

In [25]:

counter = 1
def get_new_tuple():
    ##Modify this function to change the data sent to the streams app
    global counter
    counter = counter + 1
    new_tuple = dict(x= counter, y= counter * 10)
    return new_tuple

### Set parameters for sending data
Change the values in the cell below to determine how many tuples to send at a time, and for how long

In [18]:

DURATION = 10 # How long to send data to the streams app for, in seconds
NUMBER_OF_ITEMS_TO_SEND = 100   # send 100 Tuples at a time                          
RUN_IN_BACKGROUND = False # keep sending data in the background or send in the foreground

In [22]:
import threading
import json
import requests
def send_data():
    with requests.Session() as s:
        s.auth = (USERNAME, PASSWORD)
        s.cookies = requests.cookies.RequestsCookieJar()

        for i in range(DURATION):
            items = []
            # Build a list of items
            for i in range(NUMBER_OF_ITEMS_TO_SEND):
                data_to_send = get_new_tuple()
                items.append(data_to_send)

            # Convert data to json     
            req_data = json.dumps(dict(items=items))
            # make request
            rsp = s.post(POST_URL, data=req_data, headers={"Content-Type": "application/json"}, verify=False)
            #print(req_data)
            if rsp.status_code != 204:
                message = "Error sending data: %s" % rsp.text
                print(message)
                break
            time.sleep(1.0)
    print("Done")

if (RUN_IN_BACKGROUND):
    from IPython.lib import backgroundjobs as bg
    jobs = bg.BackgroundJobManager()
    print('POSTing data to the Streams app in the background..')
    jobs.new(send_data)
else:
    print('POSTing data to the Streams app, please wait..')
    send_data()


POSTing data to the Streams app, please wait..
Done


<a id="get"></a>

## GET data from the Streams job

In [None]:
import requests
import time
with requests.Session() as s:
    s.auth=(USERNAME, PASSWORD)
    s.cookies = requests.cookies.RequestsCookieJar()
    last_items = []
    last_mod = 0
    for i in range(2):
        rsp = s.get(GET_URL, verify=False, headers={'If-Modified-Since-Milliseconds': str(last_mod)})
        if rsp.status_code != 200:
            print(rsp.text)
            print(rsp)
            break
        last_items = items
        items =  rsp.json()['items']
        if (len(items) > 0):
            print("Got %d tuples:" % len(items))
            for item in items:
                print(item)
        last_mod = int(rsp.headers['Last-Modified-Milliseconds'])
        time.sleep(1.0)