# Labeled Stream Creator

## Environment

In [69]:
import nuclio

In [98]:
import os

base_path = os.path.abspath('../')
base_stream_path = f'/users/orz{base_path[5:]}'
data_path = os.path.join(base_path, 'data')
src_path = os.path.join(base_path, 'src')
streaming_path = os.path.join(base_stream_path, 'streaming')
os.environ['base_path'] = base_path
os.environ['data_path'] = data_path
os.environ['src_path'] = src_path
os.environ['streaming_path'] = streaming_path
os.environ['fs_streaming_path'] = os.path.join(base_path, 'streaming')

In [85]:
%nuclio config kind = "nuclio"
%nuclio config spec.build.baseImage = "mlrun/ml-models"
%nuclio cmd -c python -m pip install v3io --upgrade

%nuclio: setting kind to 'nuclio'
%nuclio: setting spec.build.baseImage to 'mlrun/ml-models'


In [99]:
%%nuclio env
METRICS_TABLE = ${fs_streaming_path}/metrics
PREDICTIONS_TABLE = ${streaming_path}/predictions
OUTPUT_STREAM = ${streaming_path}/labels_stream
prediction_col = predictions
label_col = is_error
output_stream_shards = 1

%nuclio: setting 'METRICS_TABLE' environment variable
%nuclio: setting 'PREDICTIONS_TABLE' environment variable
%nuclio: setting 'OUTPUT_STREAM' environment variable
%nuclio: setting 'prediction_col' environment variable
%nuclio: setting 'label_col' environment variable
%nuclio: setting 'output_stream_shards' environment variable


## Function

In [100]:
# nuclio: start-code

In [101]:
import os
import pandas as pd
import json
import v3io
import v3io.dataplane
import socket

In [102]:
def split_path(mntpath=''):
    if mntpath[0] == '/':
        mntpath = mntpath[1:]
    paths = mntpath.split('/')
    container = paths[0]
    subpath = ''
    if len(paths) > 1:
        subpath = mntpath[len(container):]
    return container, subpath

In [103]:
def create_stream(context, path, shards=1):
    # create a stream w shards
    container, stream_path = split_path(path)
    context.logger.info(f'Creating stream in Container: {container} & Path {stream_path}')
    response = context.v3io_client.stream.create(container=container,
                                        stream_path=stream_path, 
                                        shard_count=shards,
                                        raise_for_status=v3io.dataplane.RaiseForStatus.never)
    response.raise_for_status([409, 204])

In [104]:
def push_to_stream(context, stream_path, data):
    def restructure_stream_event(context, event):
        instances = [dict()]
        for key in data.keys():
            if key not in ['when', 'class', 'model', 'worker', 'hostname', context.prediction_col]:
                instances[0].update({key: event.pop(key)})
        event['request'] = {'instances': instances}
        event['resp'] = [int(event.pop(context.prediction_col))]
        return event
    
    records = json.loads(data.to_json(orient='records'))
    records = [{'data': json.dumps(restructure_stream_event(context, record))} for record in records]
    context.logger.info(f'Logging {len(records)} records, Record example: {records[0]}')
    container, stream_path = split_path(stream_path)
    # batch
    step = 10
    for idx in range(0, len(records), step):
        response = context.v3io_client.put_records(container=container,
                                                   path=stream_path, 
                                                   records=records[idx:idx+step])

In [105]:
def get_data_parquet(table, files_to_select=1):
    mpath = [os.path.join(table, file) for file in os.listdir(table) if file.endswith(('parquet', 'pq'))]
    files_by_updated = sorted(mpath, key=os.path.getmtime, reverse=False)
    context.logger.debug_with('Input', input_files=files_by_updated[:files_to_select])
    dfs = pd.concat([pd.read_parquet(file) for file in files_by_updated[:files_to_select]])
    return dfs

In [106]:
def init_context(context):
    setattr(context, 'metrics_table', os.environ['METRICS_TABLE'])
    setattr(context, 'predictions_table', os.environ['PREDICTIONS_TABLE'])
    setattr(context, 'output_stream', os.environ['OUTPUT_STREAM'])
    setattr(context, 'timestamp_col', os.getenv('timestamp_col', 'when'))
    setattr(context, 'orig_timestamp_col', os.getenv('orig_timestamp_col', 'timestamp'))
    
    v3io_client = v3io.dataplane.Client(logger_verbosity='DEBUG', transport_verbosity='DEBUG')
#     v3io_client.stream.create(container='users', stream_path='/orz/mlrun-demos/demos/network-operations/streaming/labeled_stream', shard_count=1)
    setattr(context, 'v3io_client', v3io_client)
    create_stream(context, context.output_stream)
    
    setattr(context, 'label_col', os.environ['label_col'])
    setattr(context, 'prediction_col', os.environ['prediction_col'])

In [107]:
def handler(context, event):
    metrics = get_data_parquet(context.metrics_table, 2).loc[:, context.label_col].astype('int')
    metrics.index.names = list([name if name != context.orig_timestamp_col else context.timestamp_col for name in metrics.index.names])
    predictions = get_data_parquet(context.predictions_table, 2)
    context.logger.debug(f'Labeling metrics ({metrics.shape}) and predictions ({predictions.shape})')
    context.logger.debug_with('Indexes', metrics_index=metrics.index.names, predictions_index=predictions.index.names)
    
    full_df = pd.merge(left=predictions, right=metrics, left_on=list(metrics.index.names), left_index=True, right_index=True)
    full_df = full_df.reset_index()
    context.logger.debug(f'Fully labeled batch size is {full_df.shape}')
    context.logger.debug(f'Indexes: {list(full_df.index.names)}')
    context.logger.debug(f'Columns: {full_df.columns}')
    context.logger.debug_with('sample', full_df=full_df.head(1))
    full_df = full_df.loc[:10]
    
    push_to_stream(context, context.output_stream, full_df)

In [108]:
# nuclio: end-code

## Test

In [109]:
init_context(context)

Python> 2020-12-22 08:47:03,786 [info] Creating stream in Container: users & Path /orz/mlrun-demos/demos/network-operations/streaming/labels_stream
2020-12-22 08:47:03,787 [debug] Tx: {'connection_idx': 0, 'method': 'POST', 'path': '/users/orz/mlrun-demos/demos/network-operations/streaming/labels_stream/', 'headers': {'X-v3io-function': 'CreateStream', 'X-v3io-session-key': '036f0244-f7a4-453b-b7d9-786172282378', 'Content-Type': 'application/json'}, 'body': '{"ShardCount":1,"RetentionPeriodHours":24}'}
2020-12-22 08:47:03,787 [debug] Tx: {'connection_idx': 0, 'method': 'POST', 'path': '/users/orz/mlrun-demos/demos/network-operations/streaming/labels_stream/', 'headers': {'X-v3io-function': 'CreateStream', 'X-v3io-session-key': '036f0244-f7a4-453b-b7d9-786172282378', 'Content-Type': 'application/json'}, 'body': '{"ShardCount":1,"RetentionPeriodHours":24}'}
2020-12-22 08:47:03,787 [debug] Tx: {'connection_idx': 0, 'method': 'POST', 'path': '/users/orz/mlrun-demos/demos/network-operations

In [110]:
event = nuclio.Event(body='')
out = handler(context, event)
out

ValueError: No objects to concatenate

## Stream test

In [25]:
from v3io.dataplane import Client
from pprint import pprint

In [26]:
v3io_client = Client()

In [27]:
# v3io_client.delete_stream(container='users', path='/admin/demos/network-operations/streaming/labeled_stream')

In [28]:
def print_stream(path, shard='0', seek_type='EARLIEST', last=100):
    # seek the shard to the first record in it
    container, stream_path = split_path(path)
    shard_path = os.path.join(stream_path, shard)
    response = v3io_client.seek_shard(container=container,
                                      path=shard_path, 
                                      seek_type=seek_type)
    response.raise_for_status()

    # get records, starting from the location we got from seek
    response = v3io_client.get_records(container=container,
                                       path=shard_path, 
                                       location=response.output.location)
    response.raise_for_status()
    
    models = ['pagehinkley', 'eddm', 'ddm']
    result_record = response.output.records
    records = [json.loads(record.data) for record in result_record[:last]]
    pprint(records)

In [30]:
print_stream(context.output_stream, seek_type='EARLIEST', last=2)

[{'class': 'RandomForestClassifier',
  'hostname': 'jupyter-558bf7fbc8-sq5kd',
  'model': 'netops_predictor_v1',
  'request': {'instances': [{'company': 'Wilson_LLC',
                             'cpu_utilization': 66.9391393542,
                             'data_center': 'Zachary_Drives',
                             'device': '6001003522699',
                             'is_error': 0,
                             'latency': 0.5372793066,
                             'packet_loss': 0.0,
                             'throughput': 256.4821896882}]},
  'resp': [0],
  'when': 1593499337454,
  'worker': None},
 {'class': 'RandomForestClassifier',
  'hostname': 'jupyter-558bf7fbc8-sq5kd',
  'model': 'netops_predictor_v1',
  'request': {'instances': [{'company': 'Wilson_LLC',
                             'cpu_utilization': 72.4927066691,
                             'data_center': 'Obrien_Mountain',
                             'device': '0966571261270',
                             'is_er

## Deploy

In [42]:
from mlrun import code_to_function, mount_v3io

In [43]:
fn = code_to_function('labeled-stream-creator',
                      kind='nuclio',
                      project='network-operations')
fn.spec.base_spec['spec']['build']['baseImage'] = 'mlrun/ml-models'
fn.apply(mount_v3io())
fn.add_trigger('cron', nuclio.triggers.CronTrigger(interval='1m'))

<mlrun.runtimes.function.RemoteRuntime at 0x7fb3d7effcc0>

In [44]:
fn.save()
fn.export('../src/labeled_stream_creator.yaml')

> 2020-08-11 09:28:55,659 [info] function spec saved to path: ../src/labeled_stream_creator.yaml


<mlrun.runtimes.function.RemoteRuntime at 0x7fb3d7effcc0>

In [34]:
fn.deploy(project='network-operations')

> 2020-08-10 13:51:08,258 [info] deploy started
[nuclio] 2020-08-10 13:54:42,507 (info) Build complete
[nuclio] 2020-08-10 13:54:52,646 (info) Function deploy complete
[nuclio] 2020-08-10 13:54:52,655 done creating network-operations-labeled-stream-creator, function address: 192.168.224.209:31059


'http://192.168.224.209:31059'

In [29]:
predictions = pd.read_parquet('/User/demos/network-operations/streaming/predictions/20200630T064217-20200630T074212.parquet')
predictions

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,Unnamed: 6_level_0,Unnamed: 7_level_0,cpu_utilization,latency,packet_loss,throughput,predictions
when,company,data_center,device,model,class,worker,hostname,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2020-06-30 06:42:17.454,Wilson_LLC,Zachary_Drives,6001003522699,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,66.939139,0.537279,0.000000,256.482190,False
2020-06-30 06:42:17.454,Wilson_LLC,Obrien_Mountain,0966571261270,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,72.492707,0.000000,4.961308,264.122648,False
2020-06-30 06:42:17.454,Wilson_LLC,Obrien_Mountain,8069812479542,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,69.116878,2.606934,0.000000,263.528599,False
2020-06-30 06:42:17.454,Bennett__Delacruz_and_Walls,Natasha_Harbors,5863502247054,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,64.944107,1.571046,0.172451,241.149554,False
2020-06-30 06:42:17.454,Bennett__Delacruz_and_Walls,Natasha_Harbors,4285071567351,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,78.641128,0.000000,0.000000,263.688823,False
...,...,...,...,...,...,...,...,...,...,...,...,...
2020-06-30 07:42:12.454,Wilson_LLC,Obrien_Mountain,8069812479542,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,59.574487,0.000000,0.000000,269.816306,False
2020-06-30 07:42:12.454,Bennett__Delacruz_and_Walls,Natasha_Harbors,5863502247054,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,100.000000,100.000000,50.000000,0.000000,True
2020-06-30 07:42:12.454,Bennett__Delacruz_and_Walls,Natasha_Harbors,4285071567351,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,100.000000,100.000000,50.000000,0.000000,True
2020-06-30 07:42:12.454,Bennett__Delacruz_and_Walls,Dominique_Branch,4579248894449,netops_predictor_v1,RandomForestClassifier,,jupyter-558bf7fbc8-sq5kd,69.053014,0.064657,0.000000,255.943689,False
