# MLRun
# Nuclio - Data preperation function
## Environment
Preperation for MLRun (Until pip-install version)

In [2]:
# nuclio: ignore
!pip install nest_asyncio
!pip install aiohttp
!pip install git+https://github.com/nuclio/nuclio-jupyter.git@enhance-spec
!pip install git+https://github.com/v3io/mlrun.git

Collecting git+https://github.com/nuclio/nuclio-jupyter.git@enhance-spec
  Cloning https://github.com/nuclio/nuclio-jupyter.git (to revision enhance-spec) to /tmp/pip-req-build-vepf3urb
Branch 'enhance-spec' set up to track remote branch 'enhance-spec' from 'origin'.
Switched to a new branch 'enhance-spec'
Building wheels for collected packages: nuclio-jupyter
  Running setup.py bdist_wheel for nuclio-jupyter ... [?25ldone
[?25h  Stored in directory: /tmp/pip-ephem-wheel-cache-a3n_4f3a/wheels/27/dd/f2/d906ffa5224575ab70b4c6a404d0d2acda1808960485a5624e
Successfully built nuclio-jupyter
Collecting git+https://github.com/v3io/mlrun.git
  Cloning https://github.com/v3io/mlrun.git to /tmp/pip-req-build-xhws6er7
Building wheels for collected packages: mlrun
  Running setup.py bdist_wheel for mlrun ... [?25ldone
[?25h  Stored in directory: /tmp/pip-ephem-wheel-cache-w_2biqwr/wheels/2b/0a/b1/2800e59ea6571091083a35a67a92d5d5744a64b61928c849ab
Successfully built mlrun
Installing collected pa

Add MLRun to our python path (Fix until it will be pip-install)

### Load nuclio

In [1]:
# nuclio: ignore
import nuclio

### Configurations

In [2]:
# nuclio: ignore

# Setup environment variables
env_vars = {
    
}

# Function configurations
configs = {
    # Base image
    'spec.build.baseImage': 'python:3.6-jessie',
    
    # Triggers
    'spec.triggers': {
        'web': {
            'kind': 'http', 
            'maxWorkers': 1
        }
    }
}

# Build commands
build_commands = """pip install pyyaml
pip install pyarrow
pip install cudf
pip install v3io_frames --upgrade
pip install git+https://github.com/v3io/mlrun.git""".splitlines()

In [3]:
# DB Config
%nuclio env V3IO_FRAMESD=${V3IO_FRAMESD}
%nuclio env V3IO_USERNAME=${V3IO_USERNAME}
%nuclio env V3IO_ACCESS_KEY=${V3IO_ACCESS_KEY}

%nuclio: setting 'V3IO_FRAMESD' environment variable
%nuclio: setting 'V3IO_USERNAME' environment variable
%nuclio: setting 'V3IO_ACCESS_KEY' environment variable


## Function

In [4]:
# Utils
import os
import time
import pandas as pd
import itertools

# ML Pipeline Context
from mlrun import get_or_create_ctx, run_start

# DB Connection
import v3io_frames as v3f

# Parallelization
import dask.dataframe as dd
from dask.distributed import Client

In [69]:
def get_data_tsdb(client, metrics_table, indexes, metrics, label, shards, range_start='now-2h', range_end='now'):
    # Query the data from TSDB
    df = client.read(backend='tsdb', query=f'select {", ".join(metrics)}, {label} from {metrics_table}',
                          start=range_start, end=range_end, multi_index=True)
    
    # Format the df and load it to dask 
    df.index.names = indexes
    df = df.reset_index()
    df = dd.from_pandas(df, npartitions=shards)
    return df

In [70]:
def get_data_parquet(client, metrics_table, metrics, shards):
    # Get parquet files
    mpath = [os.path.join(context.metrics_table, file) for file in os.listdir(context.metrics_table)]
    
    # Get latest filename
    latest = max(mpath, key=os.path.getmtime)
    
    # Load parquet
    df = pd.read_parquet(latest)
    
    # Format the df and load it to dask 
    df = pd.Dataframe().loc[:, metrics]
    df.index.names = indexes
    df = df.reset_index()
    df = dd.from_pandas(df, npartitions=shards)
    return df

In [71]:
def create_rolling_features(df, window_size: int, metrics: list, label, indexes: list):
    features = df.copy()
    
    features['key'] = features.apply(lambda row: '_'.join([str(row[index]) for index in indexes]), axis=1, meta=features.compute().dtypes)
    features.set_index('key')
    
    for metric in metrics:
        features[metric] = getattr(features, metric).rolling(window=window_size).mean()
    features[label] = getattr(features, label).rolling(window=window_size).max()
                                     
    features = features.dropna()
    features = features.drop_duplicates()

    return features

In [72]:
def save_to_parquet(df: pd.DataFrame, indexes, features_table):
    print('Saving features to Parquet')
    
    # Need to fix timestamps from ns to ms if we write to parquet
    df = df.reset_index()
    df['timestamp'] = df.loc[:, 'timestamp'].astype('datetime64[ms]')
    
    # Fix indexes
    df= df.set_index(indexes)
    
    # Save parquet
    first_timestamp = df.index[0][0].strftime('%Y%m%dT%H%M%S')
    last_timestamp = df.index[-1][0].strftime('%Y%m%dT%H%M%S')
    filename = first_timestamp + '-' + last_timestamp + '.parquet'
    filepath = os.path.join(features_table, filename)
    with open(filepath, 'wb+') as f:
        df.to_parquet(f)

In [73]:
# Create Dask client
dask_client = Client()

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


In [76]:
def handler(context, event):
    
    mlctx = get_or_create_ctx('netops_data_preperations', event=event)
    
    context.logger.info(
        f'Run: {mlctx.name} uid={mlctx.uid}:{mlctx.iteration}')

    # Get properties from mlrun context
    from_tsdb = mlctx.get_param('from_tsdb', True)
    
    shards = mlctx.get_param('shards', 4)
    
    metrics_table = mlctx.get_param('metrics_table', 'netops_metrics')
    features_table = mlctx.get_param('features_table', 'netops_features')
    metric_names = mlctx.get_param('metric_names', ['cpu_utilization', 'packet_loss', 'throughput', 'latency'])
    label = mlctx.get_param('label', 'is_error')
    indexes = mlctx.get_param('indexes', ['timestamp', 'company', 'data_center', 'device'])
    
    agg_features = mlctx.get_param('agg_features', {'minutely': 3, 'hourly': 3*60})
    
    
    # Get the data
    if from_tsdb:
        # Create our DB client
        client = v3f.Client(address='framesd:8081', container='bigdata')
        
        # Create features table if needed
        try:
            client.create('tsdb', features_table, attrs={'rate': '1/s'}) #, if_exists=1)
        except:
            context.logger.debug('features table already exists')
        
        # Get the data from TSDB
        raw = get_data_tsdb(client, metrics_table, indexes, metric_names, label, shards, range_start='now-2h', range_end='now')
        
    else:
         # Create saving directory if needed
        filepath = os.path.join(features_table)
        os.makedirs(filepath, exist_ok=True)
            
        # Set Parquet reading function
        raw = get_data_parquet(client, metrics_table, metric_names, shards)
        
    # Create features
    features = raw
    for stub, window in agg_features.items():
        tdf = create_rolling_features(raw, window, metric_names, label, indexes)
        column_names = {metric: f'{metric}_{stub}' for metric in metric_names}
        column_names[label] = f'{label}_{stub}'
        tdf = tdf.rename(columns=column_names)
        features = features.merge(tdf, on=indexes, suffixes=('_raw', f'_{stub}'))
        features.compute()
        
    # Drop key columns
    features = features.reset_index(drop=True)
    feature_cols = [col for col in features.columns if 'key' in col]
    features = features.drop(feature_cols, axis=1)
    
    # Fix indexes before saving
    features = features.compute()
    features = features.set_index(indexes)
        
    # Save the features
    if from_tsdb:
        client.write('tsdb', features_table, features)
    else:
        save_to_parquet(features, indexes, feautures_table)

## Deployment

In [78]:
# nuclio: ignore
resp = run_start({}, handler=handler)

Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,542 [info] Run: netops_data_preperations uid=45207eca948d4463ad53394f0a7c4391:0
Python> 2019-08-09 08:51:57,

uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...7c4391,,,completed,,owner=iguaziohost=jupyter-70u91h6hx0-mmfj1-56b8bf8cf5-447q7runtime=handlerrepo=https://github.com/v3io/tutorials.gitcommit=ead08b6662dbddced9974f5973c95396622e8407,,,,
