# Stream to Features
  --------------------------------------------------------------------

##### This notebook will create a function that create feature vectors from streaming events


## Create and Test a Local Function 
Import nuclio SDK and magics, <b>do not remove the cell and comment !!!</b>

In [32]:
# nuclio: ignore
import nuclio

#### Functions imports

In [2]:
# nuclio: start-code

In [42]:
import os
import json
from v3io import dataplane, common
import v3io.dataplane
from datetime import datetime

<b>Specify function dependencies and configuration<b>

In [4]:
%nuclio cmd -c pip install v3io

In [51]:
%%nuclio env
V3IO_ACCESS_KEY = ${V3IO_ACCESS_KEY}
CONTAINER = users
FEATURE_TABLE_PATH = ${V3IO_USERNAME}/examples/rapid-churn/feature-table
MODEL_SERVING_URL = ''

ENRICHMENT_TABLE_PATH = ${V3IO_USERNAME}/examples/rapid-churn/enrichment-table
ENRICHMENT_KEY = postcode

OUTPUT_STREAM_PATH = ${V3IO_USERNAME}/examples/rapid-churn/user-events-stream
SHARDS_COUNT = 8

%nuclio: setting 'V3IO_ACCESS_KEY' environment variable
%nuclio: setting 'CONTAINER' environment variable
%nuclio: setting 'FEATURE_TABLE_PATH' environment variable
%nuclio: setting 'MODEL_SERVING_URL' environment variable
%nuclio: setting 'ENRICHMENT_TABLE_PATH' environment variable
%nuclio: setting 'ENRICHMENT_KEY' environment variable
%nuclio: setting 'OUTPUT_STREAM_PATH' environment variable
%nuclio: setting 'SHARDS_COUNT' environment variable


In [67]:
%%nuclio config
spec.triggers.v3io_stream.kind = "v3ioStream"
spec.triggers.v3io_stream.disabled = false
spec.triggers.v3io_stream.url = "http://v3io-webapi:8081/users/${V3IO_USERNAME}/examples/rapid-churn/incoming-events-stream@stream2features"
spec.triggers.v3io_stream.maxWorkers = 10
spec.triggers.v3io_stream.password = "${V3IO_ACCESS_KEY}"
spec.triggers.v3io_stream.attributes.pollingIntervalMs = 500
spec.triggers.v3io_stream.attributes.seekTo = "earliest"
spec.triggers.v3io_stream.attributes.readBatchSize = 64


%nuclio: setting spec.triggers.v3io_stream.kind to 'v3ioStream'
%nuclio: setting spec.triggers.v3io_stream.disabled to False
%nuclio: setting spec.triggers.v3io_stream.url to 'http://v3io-webapi:8081/users/admin/examples/rapid-churn/incoming-events-stream@stream2features'
%nuclio: setting spec.triggers.v3io_stream.maxWorkers to 10
%nuclio: setting spec.triggers.v3io_stream.password to '4253f5c9-020f-4e6c-ae66-c9e9c5ad24ed'
%nuclio: setting spec.triggers.v3io_stream.attributes.pollingIntervalMs to 500
%nuclio: setting spec.triggers.v3io_stream.attributes.seekTo to 'earliest'
%nuclio: setting spec.triggers.v3io_stream.attributes.readBatchSize to 64


### Manage output stream

In [52]:
# nuclio: ignore

V3IO_ACCESS_KEY = os.getenv('V3IO_ACCESS_KEY')
CONTAINER = os.getenv('CONTAINER')
OUTPUT_STREAM_PATH = os.getenv('OUTPUT_STREAM_PATH')
SHARDS_COUNT = int(os.getenv('SHARDS_COUNT'))
    
v3io_client = v3io.dataplane.Client(endpoint='http://v3io-webapi:8081', access_key=V3IO_ACCESS_KEY)


#### Create output stream

In [53]:
# nuclio: ignore
resp = v3io_client.create_stream(container=CONTAINER,
                           path=OUTPUT_STREAM_PATH,
                           shard_count=SHARDS_COUNT)
resp.status_code

204

## Function code

In [61]:
def init_context(context):
    V3IO_ACCESS_KEY = os.getenv('V3IO_ACCESS_KEY')
    CONTAINER = os.getenv('CONTAINER')
    FEATURE_TABLE_PATH = os.getenv('FEATURE_TABLE_PATH')
    MODEL_SERVING_URL = os.getenv('MODEL_SERVING_URL')
    v3io_client = dataplane.Client(endpoint='http://v3io-webapi:8081', access_key=V3IO_ACCESS_KEY)
    ENRICHMENT_TABLE_PATH = os.getenv('ENRICHMENT_TABLE_PATH')
    ENRICHMENT_KEY = os.getenv('ENRICHMENT_KEY')
    OUTPUT_STREAM_PATH = os.getenv('OUTPUT_STREAM_PATH')
    
    event_handlers = {'registration': process_registration,
                      'purchase': process_purchase,
                      'activity': process_activity}
    
    setattr(context, 'v3io_client', v3io_client)
    setattr(context, 'container', CONTAINER)
    setattr(context, 'feature_table_path', FEATURE_TABLE_PATH)
    setattr(context, 'model_serving_url', MODEL_SERVING_URL)
    setattr(context, 'event_handlers', event_handlers)
    setattr(context, 'enrichment_table_path', ENRICHMENT_TABLE_PATH)
    setattr(context, 'enrichment_key', ENRICHMENT_KEY)
    setattr(context, 'output_stream_path', OUTPUT_STREAM_PATH)

In [61]:
def handler(context, event):
    if type(event.body) is dict:
        event_dict = event.body
    else:
        event_dict = json.loads(event.body)
        
    if is_relevant_event(context, event_dict):
        event_type = get_event_type(event_dict)
        context.logger.info(f'Incoming event type: {event_type}')
        
        # python switch-case
        process_func = context.event_handlers.get(event_type)
        context.logger.info(f'Processing event {event_dict}')
        response = process_func(context, event_dict)
        context.logger.info(f'Finished processing with status: {response.status_code} - and response body: {response.body} , event: {event_dict}')
    else:
        context.logger.info(f'Not relevant event')    

        
def get_event_type(event):
    return event['event_type']


def is_relevant_event(context, event):
    return get_event_type(event) in context.event_handlers
        

def trigger_prediction(context, event):
    user_id = event['user_id']
    event_time = event['event_time']
    payload = {'user_id': user_id, 'event_time': event_time}

    resp = requests.post(url=context.predict_url, json=payload)
    context.logger.info(f'Triggered prediction for user: {user_id}, prediction response: {resp.text}')


def event_time_to_ts(event_time):
    dt = datetime.strptime(event_time,'%Y-%m-%d %H:%M:%S.%f')
    return datetime.timestamp(dt)


def get_sum_count_mean_var_expr(feature: str, current_value):
    sum_str = f"SET {feature}_sum= if_not_exists({feature}_sum, 0) + {current_value};"
    count_str = f"SET {feature}_count= if_not_exists({feature}_count, 0) + 1;"
    delta_str = f"SET {feature}_delta= {current_value} - if_not_exists({feature}_mean, 0);"
    mean_str = f"SET {feature}_mean= if_not_exists({feature}_mean, 0) + ({feature}_delta / {feature}_count);"
    m2_str = f"SET {feature}_m2= if_not_exists({feature}_m2, 0) + ({feature}_delta * ({current_value} - {feature}_mean));"
    var_str = f"SET {feature}_var= {feature}_m2 / (max(2, {feature}_count)-1);"
    expression = sum_str + count_str + delta_str + mean_str + m2_str + var_str
    return expression


def update_features(context, user_id, expression, condition):
    return context.v3io_client.update_item(container=context.container,
                                          path=common.helpers.url_join(context.feature_table_path, user_id),
                                          condition=condition,
                                          expression=expression,
                                          raise_for_status=dataplane.RaiseForStatus.never)

def enrich_event(context, event_dict):
    if context.enrichment_key in event_dict:
        enrichment_key_value = event_dict[context.enrichment_key]
        resp = context.v3io_client.get_item(container=context.container, 
                                            path=os.path.join(context.enrichment_table_path, str(enrichment_key_value)),
                                           raise_for_status=v3io.dataplane.RaiseForStatus.never)
        if 200 <= resp.status_code <= 299:
            enriched_event = {**event_dict, **resp.output.item}
            context.logger.info_with('Event was enriched', enriched_event=enriched_event)
            return enriched_event
        else:
            context.logger.debug_with("Couldn't enrich event", 
                                      enrichment_key_value=enrichment_key_value,
                                      response_status=resp.status_code, 
                                      response_body=resp.body.decode('utf-8'))
            return event_dict
    else:
        return event_dict


def process_registration(context, event):
    user_id = event['user_id']
    
    enriched_event = enrich_event(context, event)
    
    features = {'registration_date': enriched_event['event_time'],
               'date_of_birth': enriched_event['date_of_birth'],
               'socioeconomic_idx':  enriched_event['socioeconomic_idx'],
               'affiliate_url': enriched_event['affiliate_url']}
    
    response = context.v3io_client.put_item(container=context.container,
                                       path=common.helpers.url_join(context.feature_table_path, user_id),
                                       attributes=features,
                                       raise_for_status=dataplane.RaiseForStatus.never)
    return response


def process_purchase(context, event):
    user_id = event['user_id']
    event_time = event['event_time']
    event_ts = event_time_to_ts(event_time)
    
    purchase_amount = event['amount']

    first_purchase_ts_str = f"SET first_purchase_ts=if_not_exists(first_purchase_ts, {event_ts});"
    sum_count_mean_var_expr = get_sum_count_mean_var_expr('purchase', purchase_amount)
    
    expression = first_purchase_ts_str + sum_count_mean_var_expr
    condition = f"exists(registration_date) AND (NOT exists(first_purchase_ts) OR first_purchase_ts >= ({event_ts} - 86400 ))"
    
    return update_features(context, user_id, expression, condition)


def process_activity(context, event):
    user_id = event['user_id']
    event_time = event['event_time']
    event_ts = event_time_to_ts(event_time)
    context.logger.info(f'enriching event for {user_id}')
    
    score = event['score']
    duration = event['duration']
    win_amount = event['is_win']
    score_expr = get_sum_count_mean_var_expr('score', score)
    duration_expr = get_sum_count_mean_var_expr('duration', duration)
    win_expr = get_sum_count_mean_var_expr('win_amount', win_amount)
 
    expression = score_expr + duration_expr + win_expr
    condition = f"first_purchase_ts >= ({event_ts} - 86400 )"
    
    updated = update_features(context, user_id, expression, condition)
    
    log_user_status(context, user_id)
    
    return updated

def log_user_status(context, user_id):
    features = {'data': json.dumps(context.v3io_client.get_item(container=context.container, 
                                        path=os.path.join(context.feature_table_path, user_id),
                                        raise_for_status=v3io.dataplane.RaiseForStatus.never).output.item),
                'partition_key': user_id}
    
    resp = context.v3io_client.put_records(container=context.container, 
                                   path=context.output_stream_path, 
                                   records=[features], 
                                   raise_for_status=v3io.dataplane.RaiseForStatus.never)
    
    context.logger.info_with('Sent event to stream', 
                             record=features,
                             response_status=resp.status_code, 
                             response_body=resp.body.decode('utf-8'))

The following end-code annotation tells ```nuclio``` to stop parsing the notebook from this cell. _**Please do not remove this cell**_:

In [8]:
# nuclio: end-code
# marks the end of a code section

## Test locally

In [69]:
event = nuclio.Event(body=b'{"user_id" : 111111 , "event_type": "registration", "postcode": 11014}')
init_context(context)
handler(context, event)


Python> 2020-07-28 17:19:53,874 [info] Incoming event type: registration
Python> 2020-07-28 17:19:53,874 [info] Processing event {'user_id': 111111, 'event_type': 'registration', 'postcode': 11014}
Python> 2020-07-28 17:19:53,876 [info] Event was enriched: {'enriched_event': {'user_id': 111111, 'event_type': 'registration', 'postcode': 11014, 'socioeconomic_idx': 1}}


KeyError: 'event_time'

## Deploy function

In [68]:
%nuclio deploy -p rapid-churn -n stream-to-features

[nuclio] 2020-07-28 14:32:52,264 (info) Build complete
[nuclio] 2020-07-28 14:32:56,321 (info) Function deploy complete
[nuclio] 2020-07-28 14:32:56,330 done updating stream-to-features, function address: 192.168.224.209:32166
%nuclio: function deployed
