In [None]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

# Discussion Section Week 2


## Netunicorn Model

### Tasks
- basic building block for netUnicorn
- Highly configurable
- allow users to define the behavior of their experiments

#### User Actions
- file download/upload
- streaming video (YouTube, Netflix, etc.)
- video conferencing (Zoom, Skype, etc.)

#### Network Measurements
- Packet Captures
- Speedtests
- Ping / Latency Measurements

#### Application Profiling
- Measuring Quality of Experience
    - tools such as selenium, pyautogui allow us to extract information from browser

### Pipeline
- Pipeline is an ordered collection of Tasks
- Each task is executed after the previous task completes

### Nodes
- Infrastructure contains set of nodes able to run pipelines
- Nodes can be...
    - different architectures
    - VMs in the cloud
    - raspberry pi devices
    - your laptop

### Deployment
- Deployment is a mapping of a pipeline to a node

### Experiment
- One or more deployments

In [None]:
import os
import time
import random
import time
import requests 
import re
import logging
import subprocess
from subprocess import Popen
from sys import platform
import os, sys
import logging
import json
import threading

from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline
from netunicorn.library.tasks.basic import SleepTask
from netunicorn.library.tasks.measurements.ookla_speedtest import SpeedTest
from netunicorn.library.tasks.video_watchers.youtube_watcher import WatchYouTubeVideo
from netunicorn.library.tasks.video_watchers.vimeo_watcher import WatchVimeoVideo
from netunicorn.library.tasks.video_watchers.twitch_watcher import WatchTwitchStream
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture
from netunicorn.library.tasks.upload.fileio import UploadToFileIO
from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node
from netunicorn.base.task import Failure, Task, TaskDispatcher
from netunicorn.base import Result, Failure, Success, Task, TaskDispatcher
from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node

from typing import Dict
from typing import Optional
from enum import IntEnum

In [None]:
class DummyTask(Task):
    def __init__(self, name: str):
        self.name = name
        super().__init__()

    def run(self):
        return 0

class DummyRepeaterTask(Task):
    def __init__(self, lookup_for: str):
        self.lookup_for = lookup_for
        super().__init__()

    def run(self):
        return f"Dummy Task2 Success!"

In [None]:
pipeline = Pipeline().then(DummyTask(name="dummy1")).then(DummyRepeaterTask(lookup_for='dummy1'))

In [None]:
# Authenticattion for NetUnicorn
NETUNICORN_ENDPOINT = os.environ.get('NETUNICORN_ENDPOINT', 'http://54.84.96.4:26611')
NETUNICORN_LOGIN = os.environ.get('NETUNICORN_LOGIN', 'test')
NETUNICORN_PASSWORD = os.environ.get('NETUNICORN_PASSWORD', 'test')
client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
print("Health Check: {}".format(client.healthcheck()))

In [None]:
# Get Client Nodes
nodes = client.get_nodes()
print(nodes)

In [None]:
# Create experiment - Map pipeline to Nodes
working_hosts = ['dockerhost']
working_nodes = nodes.filter(lambda node: node.name in working_hosts).take(len(working_hosts))
experiment = Experiment().map(pipeline, working_nodes)
print("Experiment: {}".format(experiment))

In [None]:
# Optionally set the specific Docker image to use
#from netunicorn.base import DockerImage
#for deployment in experiment:
#    # you can explore the image on the DockerHub
#    deployment.environment_definition = DockerImage(image='netunicorn/chromium:0.3.0')

In [None]:
# Name Experiment
experiment_label = "test_pipeline"

# Delete any previous experiments
try:
    client.delete_experiment(experiment_label)
except RemoteClientException:
    pass

In [None]:
# Prepare Experiment
client.prepare_experiment(experiment, experiment_label)
while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status == ExperimentStatus.READY:
        break
    time.sleep(20)

time.sleep(5)

In [None]:
# Execute Experiment
client.start_execution(experiment_label)
while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status != ExperimentStatus.RUNNING:
        break
    time.sleep(20)

In [None]:
# Get Results
from returns.pipeline import is_successful
from returns.result import Failure

for report in info.execution_result:
    print(f"Node name: {report.node.name}")
    print(f"Error: {report.error}")

    if report.result is None:
        print("report.result is EMPTY..")
        continue

    result, log = report.result  # report stores results of execution and corresponding log

    # result is a returns.result.Result object, could be Success of Failure
    print(f"Result is: {type(result)}")
    if is_successful(result):
        data = result.unwrap()
    else:
        data = result.failure()
    try:
        for key, value in data.items():
            print(f"{key}: {value}")
    except:
        print(f"No attribute 'items' in result")

    # we also can explore logs
    for line in log:
        print(line.strip())
    print()

In [None]:
def execute_pipeline(curr_pipeline):
    NETUNICORN_ENDPOINT = os.environ.get('NETUNICORN_ENDPOINT', 'http://54.84.96.4:26611')
    NETUNICORN_LOGIN = os.environ.get('NETUNICORN_LOGIN', 'test')
    NETUNICORN_PASSWORD = os.environ.get('NETUNICORN_PASSWORD', 'test')
    client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
    print("Health Check: {}".format(client.healthcheck()))

    # Get Client Nodes
    nodes = client.get_nodes()
    print(nodes)

    working_hosts = ['dockerhost']
    working_nodes = nodes.filter(lambda node: node.name in working_hosts).take(len(working_hosts))
    experiment = Experiment().map(curr_pipeline, working_nodes)
    print("Experiment: {}".format(experiment))

    # Optionally set the specific Docker image to use
    #from netunicorn.base import DockerImage
    #for deployment in experiment:
    #    # you can explore the image on the DockerHub
    #    deployment.environment_definition = DockerImage(image='netunicorn/chromium:0.3.0')

    # Name Experiment
    experiment_label = "test_pipeline"

    # Delete any previous experiments
    try:
        client.delete_experiment(experiment_label)
    except RemoteClientException:
        pass

    # Prepare Experiment
    client.prepare_experiment(experiment, experiment_label)
    while True:
        info = client.get_experiment_status(experiment_label)
        print(info.status)
        if info.status == ExperimentStatus.READY:
            break
        time.sleep(20)

    time.sleep(5)

    # Execute Experiment
    client.start_execution(experiment_label)
    while True:
        info = client.get_experiment_status(experiment_label)
        print(info.status)
        if info.status != ExperimentStatus.RUNNING:
            break
        time.sleep(20)

    # Get Results
    from returns.pipeline import is_successful
    from returns.result import Failure

    for report in info.execution_result:
        print(f"Node name: {report.node.name}")
        print(f"Error: {report.error}")

        if report.result is None:
            print("report.result is EMPTY..")
            continue

        result, log = report.result  # report stores results of execution and corresponding log

        # result is a returns.result.Result object, could be Success of Failure
        print(f"Result is: {type(result)}")
        if is_successful(result):
            data = result.unwrap()
        else:
            data = result.failure()
        try:
            for key, value in data.items():
                print(f"{key}: {value}")
        except:
            print(f"No attribute 'items' in result")

        # we also can explore logs
        for line in log:
            print(line.strip())
        print()

In [None]:
execute_pipeline(pipeline)

### Running a Speedtest

In [None]:
import subprocess
from typing import Dict

from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node
from netunicorn.base.task import Failure, Task, TaskDispatcher


class SpeedTest(TaskDispatcher):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.linux_instance = SpeedTestLinuxImplementation(name=self.name)

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_instance

        raise NotImplementedError(
            f'SpeedTest is not implemented for architecture: {node.architecture}'
        )


class SpeedTestLinuxImplementation(Task):
    requirements = ["pip install speedtest-cli"]

    def run(self):
        result = subprocess.run(["speedtest-cli", "--simple", "--secure"], capture_output=True)
        if result.returncode != 0:
            return Failure(
                result.stdout.decode("utf-8").strip()
                + "\n"
                + result.stderr.decode("utf-8").strip()
            )

        return "Speedtest Finished"


In [None]:
pipeline = (
    Pipeline()
    .then(SpeedTest())
)

In [None]:
execute_pipeline(pipeline)

### Packet Capture (Start / Stop)

In [None]:
import subprocess
import time
import signal
from typing import List, Optional

from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node
from netunicorn.base import Task, TaskDispatcher, Result, Success, Failure


class StartCapture(TaskDispatcher):
    def __init__(self, filepath: str, arguments: Optional[List[str]] = None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.filepath = filepath
        self.arguments = arguments

        self.linux_implementation = StartCaptureLinuxImplementation(
            filepath=self.filepath,
            arguments=self.arguments,
            *args,
            **kwargs
        )

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_implementation

        raise NotImplementedError(
            f'StartCapture is not implemented for {node.architecture}'
        )


class StartCaptureLinuxImplementation(Task):
    requirements = ["sudo apt-get update", "sudo apt-get install -y tcpdump"]

    def __init__(self, filepath: str, arguments: Optional[List[str]] = None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.arguments = arguments or []
        self.filepath = filepath

    def run(self) -> Result:
        signal.signal(signal.SIGCHLD, signal.SIG_IGN)

        proc = subprocess.Popen(
            ["tcpdump"] + self.arguments + ["-U", "-w", self.filepath]
        )
        time.sleep(2)
        if (exit_code := proc.poll()) is None:  # not finished yet
            return Success(proc.pid)
        return Failure(f"Tcpdump terminated with return code {exit_code}")


In [None]:
class StopNamedCapture(TaskDispatcher):
    def __init__(self, start_capture_task_name: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.start_capture_task_name = start_capture_task_name
        self.linux_implementation = StopNamedCaptureLinuxImplementation(
            capture_task_name=self.start_capture_task_name,
            *args,
            **kwargs,
        )

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_implementation

        raise NotImplementedError(
            f'StopCapture is not implemented for {node.architecture}'
        )


class StopNamedCaptureLinuxImplementation(Task):
    requirements = ["sudo apt-get update", "sudo apt-get install -y tcpdump", "sudo apt-get install -y procps"]

    def __init__(self, capture_task_name: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.capture_task_name = capture_task_name

    def run(self):
        signal.signal(signal.SIGCHLD, signal.SIG_IGN)
        pid = self.previous_steps.get(self.capture_task_name, [Failure("Named StartCapture not found")])[-1]
        if isinstance(pid, Failure):
            return pid

        pid = pid.unwrap()
        return subprocess.check_output(["kill", str(pid)])


### Uploading to FileIO

In [None]:
"""
Uploads files to file.io -- temporary file storage
"""

import subprocess

from netunicorn.base.nodes import Architecture, Node
from netunicorn.base.task import Task, TaskDispatcher


class UploadToFileIO(TaskDispatcher):
    def __init__(self, filepath: str, expires: str = "14d", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.linux_implementation = UploadToFileIOCurlImplementation(
            filepath=filepath, expires=expires, name=self.name
        )
        self.linux_implementation.requirements = ["sudo apt-get install -y curl"]

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_implementation

        raise NotImplementedError(
            f"UploadToFileIO is not implemented for architecture: {node.architecture}"
        )


class UploadToFileIOCurlImplementation(Task):
    def __init__(self, filepath: str, expires: str = "14d", *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.filepath = filepath
        self.expires = expires

    def run(self):
        command = ["curl", "-F", f"file=@{self.filepath}", f"https://file.io?expires={self.expires}"]
        return subprocess.run(command, check=True, capture_output=True).stdout.decode("utf-8")

In [None]:
pipeline = (
    Pipeline()
    .then(StartCapture(filepath="/tmp/capture.pcap", name="capture"))
    .then(SpeedTest())
    .then(StopNamedCapture(start_capture_task_name="capture"))
    .then(UploadToFileIO(filepath="/tmp/capture.pcap", expires="1d"))
)

In [None]:
execute_pipeline(pipeline)

### Speedtest Measurements

In [None]:
import subprocess
from typing import Dict

from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node
from netunicorn.base.task import Failure, Task, TaskDispatcher


class SpeedTest(TaskDispatcher):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.linux_instance = SpeedTestLinuxImplementation(name=self.name)

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_instance

        raise NotImplementedError(
            f'SpeedTest is not implemented for architecture: {node.architecture}'
        )


class SpeedTestLinuxImplementation(Task):
    requirements = ["pip install speedtest-cli"]

    def run(self):
        result = subprocess.run(["speedtest-cli", "--simple", "--secure"], capture_output=True)
        if result.returncode != 0:
            return Failure(
                result.stdout.decode("utf-8").strip()
                + "\n"
                + result.stderr.decode("utf-8").strip()
            )

        return self._format_data(result.stdout.decode("utf-8"))

    @staticmethod
    def _format_data(data: str) -> Dict[str, Dict]:
        ping, download, upload, _ = data.split("\n")
        return {
            "ping": {"value": float(ping.split(" ")[1]), "unit": ping.split(" ")[2]},
            "download": {
                "value": float(download.split(" ")[1]),
                "unit": download.split(" ")[2],
            },
            "upload": {
                "value": float(upload.split(" ")[1]),
                "unit": upload.split(" ")[2],
            },
        }

In [None]:
pipeline = (
    Pipeline()
    .then(SpeedTest())
    .then(SleepTask(10))
    .then(SpeedTest())
    .then(SleepTask(10))
    .then(SpeedTest())
    .then(SleepTask(10))
)

In [None]:
execute_pipeline(pipeline)