In [3]:
import json
import os
import os.path
from os import path
import logging
import typing

from icpd_core import icpd_util

import streamsx
import streamsx.spl.op as op
from streamsx.topology import context
from streamsx.topology.topology import Topology
from streamsx.topology.schema import StreamSchema
from streamsx.topology.context import ConfigParams

print("INFO: streamsx package version: " + streamsx.topology.context.__version__)

INFO: streamsx package version: 1.17.0


In [4]:
# get current dir's
current_dir = os.getcwd()

spss_toolkit_dir = current_dir + "/spss_toolkit_install_dir/com.ibm.spss.streams.analytics"

if not path.exists('com.ibm.spss.streams.analytics.tar.gz'):
    print("  downloading spss toolkit ... ")
    os.system('wget -O com.ibm.spss.streams.analytics.tar.gz https://github.com/IBMPredictiveAnalytics/streamsx.spss.v4/raw/master/com.ibm.spss.streams.analytics.tar.gz')
    os.system('mkdir -p spss_toolkit_install_dir')
    os.system('tar -xzvf com.ibm.spss.streams.analytics.tar.gz -C spss_toolkit_install_dir')
  
print("SPSS toolkit has been downloaded to: "+ spss_toolkit_dir)    

  downloading spss toolkit ... 
SPSS toolkit has been downloaded to: /home/wsuser/work/spss_toolkit_install_dir/com.ibm.spss.streams.analytics


In [5]:
def print_scores(tpl):
    sex = (str) (tpl["s_sex"])
    income = (str) (tpl["income"])
    predLabel = (str) (tpl["predLabel"])
    confidence = (str) (tpl["confidence"])
    print("Score = sex["+sex+"] income["+income+"] predLabel["+predLabel+"] confidence["+confidence+"]")

import csv
import time
    
class CSVFileReader:
    def __init__(self, file_name):
        self.file_name = file_name
    def __call__(self):
        # Convert each row in the file to a dict
        header = ["s_sex","baseSalary", "bonusSalary"]
        i = 0
        while i < 100 :
            with open(streamsx.ec.get_application_directory() + "/etc/" +  self.file_name) as handle:
                reader = csv.DictReader(handle, delimiter=',',fieldnames=header)
                #Use this to skip the header line if your file has one
                next(reader)
                for row in reader:
                    row['baseSalary'] = int(row['baseSalary'])
                    row['bonusSalary'] = int(row['bonusSalary'])
                    yield row
                    time.sleep(1)
                handle.close()
                i += 1
    
def create_topology():   
    # Create a Streams topology
    topo = Topology(name="BasicSpssScorer")

    # topo.add_toolkit('/toolkits/com.ibm.spss.streams.analytics')
    streamsx.spl.toolkit.add_toolkit(topo, spss_toolkit_dir)

    # add spss model files
    print("adding the spss model files to bundle:   model.par, model.pim, model.xml, model.str")
    path_to_data_sets = "/project_data/data_asset/"
    topo.add_file_dependency(path_to_data_sets + 'model.par', 'etc')
    topo.add_file_dependency(path_to_data_sets + 'model.pim', 'etc')
    topo.add_file_dependency(path_to_data_sets + 'model.xml', 'etc')
    topo.add_file_dependency(path_to_data_sets + 'model.str', 'etc')
    topo.add_file_dependency(path_to_data_sets + 'input.csv', 'etc')

    # Data source
    
    internal_source = topo.source(CSVFileReader("input.csv"),name="InputSource")

    input_data_schema  = StreamSchema("tuple<rstring s_sex,int64 baseSalary,int64 bonusSalary>")
    output_data_schema = StreamSchema("tuple<rstring s_sex,int64 baseSalary,int64 bonusSalary,int64 income,rstring predLabel,float64 confidence>")

    internal_source = internal_source.map(lambda x: (x["s_sex"], x["baseSalary"], x["bonusSalary"]),
                    schema=input_data_schema,
                    name="Convert Schema")

    scoring = op.Map(name="SPSS Scorer", kind='com.ibm.spss.streams.analytics::SPSSScoring', stream=internal_source, schema=output_data_schema )
    scoring.params['pimfile'] = streamsx.spl.op.Expression.expression('getThisToolkitDir()+"' + '/etc/model.pim"')
    scoring.params['parfile'] = streamsx.spl.op.Expression.expression('getThisToolkitDir()+"' + '/etc/model.par"')
    scoring.params['xmlfile'] = streamsx.spl.op.Expression.expression('getThisToolkitDir()+"' + '/etc/model.xml"')
    scoring.params['modelFields'] = ["sex", "income"]
    # scoring.params['streamAttributes'] = scoring.attribute("s_sex, baseSalary+bonusSalary")
    scoring.params['streamAttributes'] = scoring.attribute("s_sex, baseSalary")

    scoring.income = scoring.output('fromModel("income")')
    scoring.predLabel = scoring.output('fromModel("$C-beer_beans_pizza")')
    scoring.confidence = scoring.output('fromModel("$CC-beer_beans_pizza")')

    scoring_stream = scoring.stream

    scoring_stream.for_each(print_scores, name="Score Summary")
    
    print("topology created successfully")
    
    return topo


In [6]:

def submit(topo):
    streams_instance_name = "sample-streams" ## Change this to Streams instance
    try:
        cfg=icpd_util.get_service_instance_details(name=streams_instance_name, instance_type="streams")
    except TypeError:
        cfg=icpd_util.get_service_instance_details(name=streams_instance_name)

    job_config = streamsx.topology.context.JobConfig()
    job_config.raw_overlay = {
        'edgeConfig': {'imageName': 'edge-basicspssscorer4', 'baseImage': 'streams-base-edge-spss-python-el7:v5_r_5.5.0.0_latest',
                   'pipPackages': ['pandas']}}
    job_config.add(cfg)
    cfg[context.ConfigParams.SSL_VERIFY] = False
    cfg['topology.keepArtifacts'] = True

    #print(cfg)
    submission_result = context.submit(context.ContextTypes.EDGE, topo, config=cfg)

    return submission_result

In [7]:
print("")
print("Creating topology")
myTopology = create_topology()
print("Submitting Topology to Streams/Edge image builder.")
submission_result = submit(myTopology)
print("")
print("Image built: "+submission_result["image"])
print(" ")

print(submission_result)

print("DONE")


Creating topology
adding the following files to bundle: input.csv,model.par,model.pim,model.xml,model.str
topology created successfully
Submitting Topology to Streams for execution..


IntProgress(value=0, bar_style='info', description='Initializing', max=10, style=ProgressStyle(description_wid…

Insecure host connections enabled.
Insecure host connections enabled.
Insecure host connections enabled.



Image built: image-registry.openshift-image-registry.svc:5000/edge35/edge-basicspssscorer4:streamsx
 
{'toolkitRoot': '/tmp/tk5466645246806237553', 'archivePath': '/tmp/code_archive5648571303035944724.zip', 'submitMetrics': {'buildArchiveSize': 3906453, 'buildArchiveUploadTime_ms': 239, 'buildState_submittedTime_ms': 23, 'buildState_buildingTime_ms': 22169, 'totalBuildTime_ms': 23201}, 'build': {'name': None, 'artifacts': [{'sabUrl': 'https://build-sample-streams-build.edge35:8445/streams/v1/builds/134/artifacts/0/applicationbundle', 'imageDigest': 'sha256:011f92a3d2d48e8ae86a60233fd23174fbc1b1e404e65b90a2a47be5227bcd8b', 'image': 'image-registry.openshift-image-registry.svc:5000/edge35/edge-basicspssscorer4:streamsx'}]}, 'submitImageMetrics': {'buildState_submittedTime_ms': 29, 'buildState_buildingTime_ms': 284775, 'totalBuildTime_ms': 285809}, 'totalBuildTime_ms': 309010, 'imageDigest': 'sha256:011f92a3d2d48e8ae86a60233fd23174fbc1b1e404e65b90a2a47be5227bcd8b', 'image': 'image-regist