# Demo 1: typical ML pipeline

## Data collection

Learning problem: create classifier for Vimeo vs SpeedTest vs Other

In [1]:
import os
import time

from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline

# Task using speedtest-cli to measure speedtest
from netunicorn.library.tasks.measurements.ookla_speedtest import SpeedTest

# Task to watch Vimeo
from netunicorn.library.tasks.video_watchers.vimeo_watcher import WatchVimeoVideoLinuxImplementation

# Tasks to start tcpdump and stop named tcpdump task
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture

Creating a pipeline: start capture, watch a Vimeo video, and do a speedtest, stop capture.

In [2]:
pipeline = (
    Pipeline()
    .then(StartCapture(filepath="/tmp/capture.pcap", name="capture"))
    .then(SpeedTest())
    .then([
        WatchVimeoVideoLinuxImplementation("https://vimeo.com/375468729", 15),
        WatchVimeoVideoLinuxImplementation("https://vimeo.com/375468729", 15)
    ])
    .then(StopNamedCapture(start_capture_task_name="capture"))
)

We connect to a local netunicorn instance, get first available node (localhost), and prepare the experiment

In [3]:
NETUNICORN_ENDPOINT = 'http://localhost:26611'
NETUNICORN_LOGIN = 'test'
NETUNICORN_PASSWORD = 'test'

# create a client and check that connection and instance are ok
client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
print(f"Instance is healthy: {client.healthcheck()}")

nodes = client.get_nodes()
working_nodes = nodes.take(1)
experiment = Experiment().map(pipeline, working_nodes)

from netunicorn.base import DockerImage
for deployment in experiment:
    deployment.environment_definition = DockerImage(image='pinot.cs.ucsb.edu/sigcommtutorial:latest')  # set the required prepared image
    deployment.environment_definition.runtime_context.additional_arguments = ["/tmp:/tmp"]             # also mount the local folder to save files
    deployment.cleanup = False

!rm -rf /tmp/capture*

Instance is healthy: True


Let's prepare and start an experiment, wait for it to finish, and check that it succeeded.

In [4]:
experiment_label = "session1-1"

try:
    client.delete_experiment(experiment_label)
except RemoteClientException:
    pass

client.prepare_experiment(experiment, experiment_label)
time.sleep(2)

while True:
    info = client.get_experiment_status(experiment_label)
    print(f"Preparation status: {info.status}")
    if info.status != ExperimentStatus.PREPARING:
        break
    time.sleep(10)

for deployment in client.get_experiment_status(experiment_label).experiment:
    print(f"Prepared: {deployment.prepared}, error: {deployment.error}")

client.start_execution(experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(f"Execution status: {info.status}")
    if info.status != ExperimentStatus.RUNNING:
        break
    time.sleep(10)

from returns.pipeline import is_successful

for report in info.execution_result:
    print(f"Node name: {report.node.name}")    # execution node name
    print(f"Error: {report.error}")            # if any error happened

    result, log = report.result  # report stores results of execution and corresponding log
    
    # result is a returns.result.Result object, could be Success of Failure
    print(f"Result is: {type(result)}")

    # let's unwrap the result (from the Success or Failure container to the actual result)
    data = result.unwrap() if is_successful(result) else result.failure()

    # and print all task names and corresponding execution results
    for key, value in data.items():
        print(f"{key}: {value}")

    # we also can explore logs of the executor in case there's anything there
    for line in log:
        print(line.strip())
    print()

Preparation status: ExperimentStatus.PREPARING
Preparation status: ExperimentStatus.PREPARING
Preparation status: ExperimentStatus.READY
Prepared: True, error: None
Execution status: ExperimentStatus.RUNNING
Execution status: ExperimentStatus.RUNNING
Execution status: ExperimentStatus.RUNNING
Execution status: ExperimentStatus.RUNNING
Execution status: ExperimentStatus.RUNNING
Execution status: ExperimentStatus.RUNNING
Execution status: ExperimentStatus.RUNNING
Execution status: ExperimentStatus.FINISHED
Node name: dockerhost
Error: None
Result is: <class 'returns.result.Success'>
capture: [<Success: 11>]
5b612a65-ea41-434c-a288-bae09b07c89e: [<Success: {'ping': {'value': 2.798, 'unit': 'ms'}, 'download': {'value': 966.82, 'unit': 'Mbit/s'}, 'upload': {'value': 947.69, 'unit': 'Mbit/s'}}>]
7e9acada-300a-42a7-94a0-1b5ac146d800: [<Success: Video finished by timeout: 15 seconds>]
07beb573-fc57-4227-bed2-f1dc1154402c: [<Success: Video finished by timeout: 15 seconds>]
370b14fc-afec-4a02-9c

## Data preprocessing: CICFlowMeter

In [5]:
!docker run -v /tmp/capture.pcap:/tmp/capture.pcap -v /tmp:/tmp/output --rm pinot.cs.ucsb.edu/cicflowmeter:latest /tmp/capture.pcap /tmp/output

cic.cs.unb.ca.ifm.Cmd You select: /tmp/capture.pcap
cic.cs.unb.ca.ifm.Cmd Out folder: /tmp/output
cic.cs.unb.ca.ifm.Cmd CICFlowMeter received 1 pcap file
Working on... capture.pcap
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jnetpcap.FlowGenerator Forward flow closed due to FIN Flag
cic.cs.unb.ca.jn

## Data cleaning

In [6]:
import pandas as pd

df = pd.read_csv("/tmp/capture.pcap_Flow.csv")
df = df.dropna()
df['Label'] = 'Other'
df.loc[(df['Total Fwd Packet'] > 30) | (df['Total Bwd packets'] > 30), 'Label'] = 'vimeo'    # any stream with more than 30 packets in any direction
df.loc[(df['Src Port'] == 8080) | (df['Dst Port'] == 8080), 'Label'] = 'speedtest'           # any stream to or from ports 8080 (ookla)

df = df.drop([
    'Flow ID',
    'Src IP',
    'Src Port',
    'Dst IP',
    'Dst Port',
    'Timestamp',
], axis=1)

target_variable = 'Label'
features = list(set(df.columns) - {target_variable})
X = df[features]
y = df[target_variable]

## Model training

In [7]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn import metrics

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

clf = RandomForestClassifier()
clf.fit(X_train, y_train)

In [8]:
y_pred = clf.predict(X_train.values)
print(metrics.classification_report(y_train, y_pred))

              precision    recall  f1-score   support

       Other       1.00      1.00      1.00       110
   speedtest       1.00      1.00      1.00       151
       vimeo       1.00      1.00      1.00        10

    accuracy                           1.00       271
   macro avg       1.00      1.00      1.00       271
weighted avg       1.00      1.00      1.00       271





In [9]:
y_pred = clf.predict(X_test.values)
print(metrics.classification_report(y_test, y_pred))

              precision    recall  f1-score   support

       Other       1.00      1.00      1.00        21
   speedtest       1.00      1.00      1.00        45
       vimeo       1.00      1.00      1.00         2

    accuracy                           1.00        68
   macro avg       1.00      1.00      1.00        68
weighted avg       1.00      1.00      1.00        68



