# Network Operations
## Pre-Processing

In [1]:
# nuclio: ignore
import nuclio

Define the MLRun environment

In [2]:
from mlrun import new_function, code_to_function, get_run_db, mount_v3io, NewTask, mlconf, new_model_server, run_local
mlconf.dbpath = 'http://mlrun-api:8080'

Add used packages

In [3]:
%%nuclio cmd -c
pip install pyarrow
pip install pandas

## Function

In [4]:
# nuclio: start-code

In [5]:
import os
import pandas as pd

In [6]:
def aggregate(context,
              df_artifact, 
              keys=None, 
              metrics=None, 
              labels=None, 
              metric_aggs=['mean'], 
              label_aggs=['max'], 
              suffix=None, 
              window=3, 
              center=False, 
              append_to_df=True,
              save_to='aggregate_df.pq'):
    
    context.logger.info(df_artifact)
    input_df = pd.read_parquet(df_artifact)
    
    # Verify there is work to be done
    if not (metrics or labels):
        context.log_artifact('df', input_df)
        return input_df
    
    # Select the correct indexes
    if keys:
        current_index = input_df.index.names
        indexes_to_drop = [col for col in input_df.index.names if col not in keys]
        df = input_df.reset_index(level=indexes_to_drop)
    else:
        df = input_df
    
    # For each metrics
    if metrics:
        metrics_df = df.loc[:, metrics].rolling(window=window,
                                                center=center).aggregate(metric_aggs)
        
        # Flatten all the aggs
        metrics_df.columns = ['_'.join(col).strip() for col in metrics_df.columns.values]
        
        # Add suffix
        if suffix:
            metrics_df.columns = [f'{metric}_{suffix}' for metric in metrics_df.columns]
            
        if append_to_df:
            final_df = pd.merge(input_df, metrics_df, suffixes=('', suffix), left_index=True, right_index=True)
        else:
            final_df = metrics_df

    # For each label
    if labels:
        labels_df = df.loc[:, labels].rolling(window=window,
                                              center=center).aggregate(label_aggs)
        # Flatten all the aggs
        labels_df.columns = ['_'.join(col).strip() for col in labels_df.columns.values]
        
        # Add suffix
        if suffix:
            labels_df.columns = [f'{label}_{suffix}' for label in labels_df.columns]
            
        if metrics:
            final_df = pd.merge(final_df, labels_df, suffixes=('', suffix), left_index=True, right_index=True)   
        else:
            if append_to_df:
                final_df = pd.merge(input_df, labels_df, suffixes=('', suffix), left_index=True, right_index=True)      
            else:
                final_df = labels_df
        
    # Save the result dataframe
    os.makedirs(os.path.dirname(save_to), exist_ok=True)
    final_df.to_parquet(save_to, engine='pyarrow')
    context.log_artifact('aggregate', local_path=save_to)

In [7]:
# nuclio: end-code

## Test
Define client to get metrics sample

In [8]:
# Define V3IO Client
import v3io_frames as v3f
client = v3f.Client('framesd:8081', container='bigdata')

# Define base dirs
project_dir = os.path.join('/', 'User', 'demo-network-operations')

In [9]:
metrics = client.read('tsdb', 'netops_metrics', multi_index=True)
metrics_pq = os.path.join(project_dir, 'data', 'metrics.pq')
metrics.to_parquet(metrics_pq, engine='pyarrow', index=True)
metrics.head(2)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,cpu_utilization,cpu_utilization_is_error,is_error,latency,latency_is_error,packet_loss,packet_loss_is_error,throughput,throughput_is_error
time,company,data_center,device,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2020-03-10 08:52:37.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,78.396905,0.0,0.0,0.0,0.0,0.0,0.0,251.553131,0.0
2020-03-10 08:52:42.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,73.343463,0.0,0.0,0.0,0.0,0.113283,0.0,247.769434,0.0


### Local Test
Define the aggregate test task

In [10]:
aggregate_task = NewTask(name='aggregate',
                         project='network-operations',
                         params={'df_artifact': os.path.join(project_dir, 'data', 'metrics.pq'),
                                 'metrics': ['cpu_utilization'],
                                 'labels': ['is_error'],
                                 'metric_aggs': ['mean', 'sum'],
                                 'label_aggs': ['max'],
                                 'suffix': 'daily',
                                 'append_to_df': True,
                                 'window': 5,
                                 'center': True,
                                 'save_to': os.path.join(project_dir, 'data', 'aggregate.pq')},
                         handler=aggregate)

In [11]:
run_local(aggregate_task)

[mlrun] 2020-03-10 08:58:23,186 starting run aggregate uid=d29be7a9c96848fbbe814df748592033  -> http://mlrun-api:8080
[mlrun] 2020-03-10 08:58:23,214 /User/demo-network-operations/data/metrics.pq
[mlrun] 2020-03-10 08:58:23,286 log artifact aggregate at /User/demo-network-operations/data/aggregate.pq, size: 37175, db: Y



uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...592033,0,Mar 10 08:58:23,completed,aggregate,kind=handlerowner=adminhost=jupyter-78ddb8b99c-mz8dk,,"df_artifact=/User/demo-network-operations/data/metrics.pqmetrics=['cpu_utilization']labels=['is_error']metric_aggs=['mean', 'sum']label_aggs=['max']suffix=dailyappend_to_df=Truewindow=5center=Truesave_to=/User/demo-network-operations/data/aggregate.pq",,aggregate


to track results use .show() or .logs() or in CLI: 
!mlrun get run d29be7a9c96848fbbe814df748592033 --project network-operations , !mlrun logs d29be7a9c96848fbbe814df748592033 --project network-operations
[mlrun] 2020-03-10 08:58:23,320 run executed, status=completed


<mlrun.model.RunObject at 0x7f4b4eb3a0b8>

### Test on cluster

Convert the code to an MLRun function

In [12]:
fn = code_to_function('aggregate', 
                      code_output=os.path.join(project_dir, 'src', 'aggregate.py'),
                      kind='job').apply(mount_v3io())
fn.export(os.path.join(project_dir, 'yaml', 'aggregate.yaml'))

[mlrun] 2020-03-10 08:58:28,305 function spec saved to path: /User/demo-network-operations/yaml/aggregate.yaml


<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f4b4eb3ac18>

In [13]:
fn.deploy()

[mlrun] 2020-03-10 08:58:28,328 starting remote build, image: .mlrun/func-default-aggregate-latest
[36mINFO[0m[0000] Resolved base name mlrun/mlrun:0.4.4 to mlrun/mlrun:0.4.4 
[36mINFO[0m[0000] Resolved base name mlrun/mlrun:0.4.4 to mlrun/mlrun:0.4.4 
[36mINFO[0m[0000] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0000] Error while retrieving image from cache: getting file info: stat /cache/sha256:6acdce89d632b5e683a6d7fa651a928ba2227f7322060d207491518dd555543c: no such file or directory 
[36mINFO[0m[0000] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0000] Built cross stage deps: map[]                
[36mINFO[0m[0000] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0000] Error while retrieving image from cache: getting file info: stat /cache/sha256:6acdce89d632b5e683a6d7fa651a928ba2227f7322060d207491518dd555543c: no such file or directory 
[36mINFO[0m[0000] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0000] Unpacking

True

In [14]:
fn.run(aggregate_task)

[mlrun] 2020-03-10 08:59:31,507 starting run aggregate uid=0dd33c2630f548a99095586049574fae  -> http://mlrun-api:8080
[mlrun] 2020-03-10 08:59:31,559 Job is running in the background, pod: aggregate-wfb9n
[mlrun] 2020-03-10 08:59:39,064 /User/demo-network-operations/data/metrics.pq
[mlrun] 2020-03-10 08:59:39,152 log artifact aggregate at /User/demo-network-operations/data/aggregate.pq, size: 37175, db: Y

[mlrun] 2020-03-10 08:59:39,162 run executed, status=completed
final state: succeeded


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...574fae,0,Mar 10 08:59:39,completed,aggregate,host=aggregate-wfb9nkind=jobowner=admin,,"append_to_df=Truecenter=Truedf_artifact=/User/demo-network-operations/data/metrics.pqlabel_aggs=['max']labels=['is_error']metric_aggs=['mean', 'sum']metrics=['cpu_utilization']save_to=/User/demo-network-operations/data/aggregate.pqsuffix=dailywindow=5",,aggregate


to track results use .show() or .logs() or in CLI: 
!mlrun get run 0dd33c2630f548a99095586049574fae --project network-operations , !mlrun logs 0dd33c2630f548a99095586049574fae --project network-operations
[mlrun] 2020-03-10 08:59:40,704 run executed, status=completed


<mlrun.model.RunObject at 0x7f4b4c07f160>

### Show results

In [15]:
pd.read_parquet(os.path.join(project_dir, 'data', 'aggregate.pq'))

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,cpu_utilization,cpu_utilization_is_error,is_error,latency,latency_is_error,packet_loss,packet_loss_is_error,throughput,throughput_is_error,cpu_utilization_mean_daily,cpu_utilization_sum_daily,is_error_max_daily
time,company,data_center,device,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
2020-03-10 08:52:37.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,78.396905,0.0,0.0,0.000000,0.0,0.000000,0.0,251.553131,0.0,,,
2020-03-10 08:52:42.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,73.343463,0.0,0.0,0.000000,0.0,0.113283,0.0,247.769434,0.0,,,
2020-03-10 08:52:47.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,59.492871,0.0,0.0,0.000000,0.0,2.300774,0.0,278.961957,0.0,71.124247,355.621234,0.0
2020-03-10 08:52:52.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,73.483893,0.0,0.0,3.051355,0.0,0.000000,0.0,240.857938,0.0,72.022903,360.114513,0.0
2020-03-10 08:52:57.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,70.904103,0.0,0.0,9.490740,0.0,0.159850,0.0,264.266446,0.0,76.811031,384.055156,0.0
2020-03-10 08:53:02.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,82.890184,0.0,0.0,4.127941,0.0,0.136417,0.0,245.991379,0.0,78.464203,392.321015,0.0
2020-03-10 08:53:07.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,97.284105,0.0,0.0,0.000000,0.0,0.000000,0.0,240.420034,0.0,74.490695,372.453477,0.0
2020-03-10 08:53:12.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,67.758731,0.0,0.0,2.274885,0.0,0.000000,0.0,266.785162,0.0,74.996004,374.980022,0.0
2020-03-10 08:53:17.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,53.616354,0.0,0.0,1.321255,0.0,0.000000,0.0,251.037814,0.0,71.844994,359.224969,0.0
2020-03-10 08:53:22.874000+00:00,Lambert__Watson_and_Stone,Cabrera_Ranch,6767536359526,73.430648,0.0,0.0,1.489125,0.0,2.563138,0.0,233.471864,0.0,65.726164,328.630819,0.0
