# kafka-streams-flow

The cells needed to run your application are included below. Make any changes and add your sources, analytics and outputs.

### Documentation
   - [Streams Python development guide](https://ibmstreams.github.io/streamsx.documentation/docs/latest/python/)
   - [Streams Python API](https://streamsxtopology.readthedocs.io/)

## Install  python packages
Installs the required python packages with pip.

In [1]:
!pip install --user python-dateutil==2.8.0
!pip install --user streamsx.kafka>=1.9.0
!pip install --user streamsx==1.14.13



## Setup 

Sets up the Streams instance name and extracts the resources required for the Streams application to a local directory.

In order to submit a Streams application you need to provide the name of the Streams instance.
To change the instance for the Streams application:
1. From the navigation menu, click **My instances**.
2. Click the **Provisioned Instances** tab.
3. Update the value of streams_instance_name in the cell below according to your Streams instance.


In [2]:
from project_lib import Project
import os, shutil, tarfile
from icpd_core import icpd_util    

def setup(archive, resource_path):
    def extract_project_file(file, path):
        project = Project.access()
        if os.path.exists(path):
            shutil.rmtree(path)
        os.makedirs(path)
        buffio = project.get_file(file, direct_storage=True)
        tarfile.open(fileobj=buffio, mode="r:gz").extractall(path)
    extract_project_file(archive, resource_path)
    os.chdir(resource_path) 

In [3]:
streams_instance_name = "streams"
cfg = icpd_util.get_service_instance_details(streams_instance_name)
resource_path = "streams_flows_notebooks/kafka_streams_flow_1597176040664"
setup("streams_flows_notebooks/kafka_streams_flow_1597176040664.tar.gz", resource_path)

## Create the flow

In [4]:
%%writefile flow_schemas

from datetime import datetime
from typing import NamedTuple

DEFAULT_DATETIME = datetime.fromtimestamp(0)


class SampleDataSchema(NamedTuple):
    click_event_type: str = ""
    customer_id: float = 0.0
    time_stamp: datetime = DEFAULT_DATETIME
    total_number_of_distinct_items_in_basket: float = 0.0
    total_number_of_items_in_basket: float = 0.0
    total_price_of_basket: float = 0.0
    product_category: str = ""
    product_name: str = ""
    product_price: float = 0.0
    session_duration: float = 0.0


class JsonFromTuple1Schema(NamedTuple):
    content: str = ""


Writing flow_schemas


In [5]:

from streamsx.topology.topology import Topology
import flow_schemas

import datetime
import json
from lib.error_utils import TupleError
import lib.file_utils as file_utils
from lib.sampledata.sample_data_producer import SampleDataProducer
from lib.type_adapter import adapt_if_needed as _adapt_if_needed
import os
import streamsx.kafka as kafka
import time
import typing


# ================================================================================
# MAIN

def build_flow():
    topo = Topology(name='kafka_streams_flow', namespace=os.environ.get('USER', 'flow'))
    topo.name_to_runtime_id = name_mapping().get

    topo.add_pip_package('python-dateutil==2.8.0')
    topo.add_pip_package('streamsx.kafka>=1.9.0')

    sample_data_stream = add_sample_data(topo)  # Node: "Sample Data"
    kafka_stream = add_kafka(sample_data_stream)  # Node: "Kafka"

    add_views(topo)
    return topo


# ================================================================================
# Function for top-level operator: Sample Data
def add_sample_data(topo):
    return (
        topo
        .source(
            generate_sample_data,
            name='Sample Data')
        .filter(
            lambda event: True,
            name='CompositeOutput1')
    )


# ================================================================================
# Function for top-level operator: Kafka
def add_kafka(stream):
    connection = file_utils.read_from_json(os.path.abspath("connections/kafka_4560768a-c25f-49e7-9333-23726b8ae71e.json"))

    return (
        stream
        .map(
            lambda event: {
                'content':
                    json.dumps({
                        'click_event_type': event.click_event_type,
                        'customer_id': event.customer_id,
                        'time_stamp': event.time_stamp.isoformat(),
                        'total_number_of_distinct_items_in_basket': event.total_number_of_distinct_items_in_basket,
                        'total_number_of_items_in_basket': event.total_number_of_items_in_basket,
                        'total_price_of_basket': event.total_price_of_basket,
                        'product_category': event.product_category,
                        'product_name': event.product_name,
                        'product_price': event.product_price,
                        'session_duration': event.session_duration
                    })
            },
            name='JsonFromTuple1',
            schema=flow_schemas.JsonFromTuple1Schema)
        .for_each(
            kafka.KafkaProducer(
                config={
                    'bootstrap.servers': connection['brokers'],
                    'security.protocol': connection['security_protocol'],
                    'sasl.mechanism': connection['sasl_mechanism'],
                    'sasl.jaas.config': f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{connection["username"]}" password="{connection["api_key"]}";'
                },
                message_attribute_name='content',
                topic='clicks'),
            name='Kafka')
    )


# ================================================================================
# Operator-specific global code, such as filter classes:

def generate_sample_data() -> typing.Iterable[flow_schemas.SampleDataSchema]:
    producer = SampleDataProducer(['clickStreamSampleData'])
    offset = 0
    while True:
        sample_events = producer.get_events('clickStreamSampleData', offset, 1)
        sample_event = sample_events[0]
        offset += 1

        errors = []
        try:
            output_event = flow_schemas.SampleDataSchema(
                click_event_type=_adapt_if_needed(sample_event.click_event_type, str, 'click_event_type', 'Text', errors),
                customer_id=_adapt_if_needed(sample_event.customer_id, float, 'customer_id', 'Number', errors),
                time_stamp=_adapt_if_needed(sample_event.time_stamp, datetime.datetime, 'time_stamp', 'Date', errors),
                total_number_of_distinct_items_in_basket=_adapt_if_needed(sample_event.total_number_of_distinct_items_in_basket, float, 'total_number_of_distinct_items_in_basket', 'Number', errors),
                total_number_of_items_in_basket=_adapt_if_needed(sample_event.total_number_of_items_in_basket, float, 'total_number_of_items_in_basket', 'Number', errors),
                total_price_of_basket=_adapt_if_needed(sample_event.total_price_of_basket, float, 'total_price_of_basket', 'Number', errors),
                product_category=_adapt_if_needed(sample_event.product_category, str, 'product_category', 'Text', errors),
                product_name=_adapt_if_needed(sample_event.product_name, str, 'product_name', 'Text', errors),
                product_price=_adapt_if_needed(sample_event.product_price, float, 'product_price', 'Number', errors),
                session_duration=_adapt_if_needed(sample_event.session_duration, float, 'session_duration', 'Number', errors)
            )
            if len(errors) > 0:
                raise ValueError('\n'.join(errors))
            yield output_event
        except Exception as err:
            TupleError(operation_id='Sample Data', message=str(err))

        time.sleep(0.05)


# ================================================================================
# Utils:

def add_views(topo):
    name_to_id = name_mapping()
    for name, stream in topo.streams.items():
        stream_id = name_to_id.get(name)
        if stream_id and stream_id.endswith('__Composite_Output_Id'):
            stream.view(name=stream_id + "__output")


def name_mapping():
    return {
        'Sample Data': 'Sample_Data',
        'CompositeOutput1': 'Sample_Data__Composite_Output_Id',
        'JsonFromTuple1': 'JsonFromTuple1',
        'Kafka': 'Kafka'
    }


## Submit the application

In [6]:
import streamsx
import datetime
from streamsx.topology.context import ContextTypes, JobConfig
from streamsx.topology import context

def submit_app():
    cfg[context.ConfigParams.SSL_VERIFY] = False
    app = build_flow()

    dt = datetime.datetime.now().strftime('%F_%T')
    
    job_config = JobConfig(job_name=f'{app.namespace}:{app.name}:{dt}', tracing='info')
    job_config.add(cfg)

    shutil.copytree('lib', 'python/modules/lib')
    app.add_file_dependency('python', 'opt')

    submission_result = streamsx.topology.context.submit(ContextTypes.DISTRIBUTED, app, config=cfg)
    streams_job = submission_result.job
    print("JobId: ", streams_job.id, "\nJob name: ", streams_job.name)
submit_app()

properties file /tmp/wsuser/producer-40jfnu_b.properties generated.
Properties file etc/producer-40jfnu_b.properties added to the topology kafka_streams_flow


IntProgress(value=0, bar_style='info', description='Initializing', max=10, style=ProgressStyle(description_wid…

2020-08-13 22:13:27,430 streamsx.topology.context [INFO] Generating SPL and submitting application.


JobId:  20 
Job name:  flow:kafka_streams_flow:2020-08-13_22:13:27


## Delete the resource directory (Optional)
Cleans up the resource folders used in this application.

In [None]:
#cleanup()
# import shutil
# os.chdir(os.environ['PWD'])
# if os.path.exists(resource_path):
#     shutil.rmtree(resource_path)
