# Incomin Event Handler
  --------------------------------------------------------------------

Handle incoming events and write them to V3IO Stream


In [1]:
%run config.py

In [2]:
INPUT_STREAM = 'generated-stream'
OUTPUT_STREAM = 'incoming-events-stream'

In [3]:
INPUT_STREAM_URL = STREAM_CONFIGS.get(INPUT_STREAM).get('stream_url')
OUTPUT_STREAM_PATH = STREAM_CONFIGS.get(OUTPUT_STREAM).get('path')
SHARDS_COUNT = STREAM_CONFIGS.get(OUTPUT_STREAM).get('shard_count')

In [4]:
%env INPUT_STREAM_URL = {INPUT_STREAM_URL}
%env CONTAINER = {CONTAINER}
%env OUTPUT_STREAM_PATH = {OUTPUT_STREAM_PATH}
%env SHARDS_COUNT = {SHARDS_COUNT}

env: INPUT_STREAM_URL=http://v3io-webapi:8081/users/iguazio/demos/rapid-streaming-ml/data/generated-stream
env: CONTAINER=users
env: OUTPUT_STREAM_PATH=iguazio/demos/rapid-streaming-ml/data/incoming-events-stream
env: SHARDS_COUNT=8


## Create and Test a Local Function

In [5]:
import nuclio

#### Functions imports

In [6]:
# nuclio: start-code

<b>Specify function dependencies and configuration<b>

In [7]:
%nuclio cmd -c pip install v3io

In [8]:
%%nuclio env
V3IO_ACCESS_KEY = ${V3IO_ACCESS_KEY}
CONTAINER = ${CONTAINER}
OUTPUT_STREAM_PATH = ${OUTPUT_STREAM_PATH}
SHARDS_COUNT = ${SHARDS_COUNT}
PARTITION_ATTR = user_id

%nuclio: setting 'V3IO_ACCESS_KEY' environment variable
%nuclio: setting 'CONTAINER' environment variable
%nuclio: setting 'OUTPUT_STREAM_PATH' environment variable
%nuclio: setting 'SHARDS_COUNT' environment variable
%nuclio: setting 'PARTITION_ATTR' environment variable


In [9]:
%%nuclio config
spec.triggers.v3io_stream.kind = "v3ioStream"
spec.triggers.v3io_stream.disabled = false
spec.triggers.v3io_stream.maxWorkers = 10
spec.triggers.v3io_stream.password = "${V3IO_ACCESS_KEY}"
spec.triggers.v3io_stream.attributes.pollingIntervalMs = 500
spec.triggers.v3io_stream.attributes.seekTo = "earliest"
spec.triggers.v3io_stream.attributes.readBatchSize = 64

%nuclio: setting spec.triggers.v3io_stream.kind to 'v3ioStream'
%nuclio: setting spec.triggers.v3io_stream.disabled to False
%nuclio: setting spec.triggers.v3io_stream.maxWorkers to 10
%nuclio: setting spec.triggers.v3io_stream.password to '4868d73d-1bd9-486e-91bd-d89ad83e3a56'
%nuclio: setting spec.triggers.v3io_stream.attributes.pollingIntervalMs to 500
%nuclio: setting spec.triggers.v3io_stream.attributes.seekTo to 'earliest'
%nuclio: setting spec.triggers.v3io_stream.attributes.readBatchSize to 64


In [10]:
%nuclio config spec.triggers.v3io_stream.url = "${INPUT_STREAM_URL}@incomingeventhandler"

%nuclio: setting spec.triggers.v3io_stream.url to '$http://v3io-webapi:8081/users/iguazio/demos/rapid-streaming-ml/data/generated-stream@incomingeventhandler'


## Function code

In [11]:
import os
import json
import v3io.dataplane

def init_context(context):
    V3IO_ACCESS_KEY = os.getenv('V3IO_ACCESS_KEY')
    CONTAINER = os.getenv('CONTAINER')
    OUTPUT_STREAM_PATH = os.getenv('OUTPUT_STREAM_PATH')
    SHARDS_COUNT = os.getenv('SHARDS_COUNT')
    PARTITION_ATTR = os.getenv('PARTITION_ATTR')
    v3io_client = v3io.dataplane.Client(endpoint='http://v3io-webapi:8081', access_key=V3IO_ACCESS_KEY)
    
    setattr(context, 'v3io_client', v3io_client)
    setattr(context, 'partition_attr', PARTITION_ATTR)
    setattr(context, 'shards_count', int(SHARDS_COUNT))
    setattr(context, 'container', CONTAINER)
    setattr(context, 'output_stream_path', OUTPUT_STREAM_PATH)


def handler(context, event):
    print(dir(context))
    if type(event.body) is dict:
        event_dict = event.body
    else:
        event_dict = json.loads(event.body)
        
    context.logger.info_with('Got invoked',
                             trigger_kind=event.trigger.kind,
                             event_body=event_dict)
        
    partition_key = event_dict.get(context.partition_attr)
    record = event_to_record(event_dict, partition_key)
    
    resp = context.v3io_client.put_records(container=context.container, 
                                   path=context.output_stream_path, 
                                   records=[record], 
                                   raise_for_status=v3io.dataplane.RaiseForStatus.never)
    
    context.logger.info_with('Sent event to stream', 
                             record=record,
                             response_status=resp.status_code, 
                             response_body=resp.body.decode('utf-8'))
    
    return resp.status_code


def event_to_record(event_dict, partition_key):
    event_str = json.dumps(event_dict)
    return {'data': event_str, 'partition_key': str(partition_key)}

The following end-code annotation tells ```nuclio``` to stop parsing the notebook from this cell. _**Please do not remove this cell**_:

In [12]:
# nuclio: end-code
# marks the end of a code section

## Test locally

In [13]:
event = nuclio.Event(body=b'{"user_id" : 111111 , "event_type": "spin"}')
init_context(context)
handler(context, event)

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'container', 'logger', 'output_stream_path', 'partition_attr', 'set_logger_level', 'shards_count', 'trigger_name', 'user_data', 'v3io_client', 'worker_id']
Python> 2020-08-03 11:32:37,608 [info] Got invoked: {'trigger_kind': '', 'event_body': {'user_id': 111111, 'event_type': 'spin'}}
Python> 2020-08-03 11:32:37,610 [info] Sent event to stream: {'record': {'data': '{"user_id": 111111, "event_type": "spin"}', 'partition_key': '111111'}, 'response_status': 200, 'response_body': '{ "FailedRecordCount":0,"Records": [{ "SequenceNumber":1,"ShardId":5 } ] }'}


200

## Deploy function

In [None]:
%nuclio deploy -p {PROJECT_NAME} -n {V3IO_USERNAME}-incoming-event-handler

[nuclio] 2020-08-03 11:33:09,618 (info) Build complete
