# Basic Ookla Speed Test example

In this scenario, we will measure speed test results from Ookle speedtest-cli utility, capture PCAPs during measurements and upload them to a file storage for future access.

Let's import base classes and particular tasks that we will use:

In [15]:
import sys
import os
import subprocess
import datetime

folder_b_path = os.path.abspath('/Users/eshagupta/Desktop/CS293N/netunicorn-library')
if folder_b_path not in sys.path:
    sys.path.insert(0, folder_b_path)


# !pip install netunicorn-base
# !pip install netunicorn-client
# !pip install netunicorn-library

from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline, Failure, Result, Success

In [16]:
import time 
from typing import Dict

from netunicorn.base import Architecture, Node, Task, TaskDispatcher
#from netunicorn.library.tasks.tasks_utils import subprocess_run

# class checkTime(Task):
#     requirements = ["echo 'server 128.111.5.228 minpoll 3 maxpoll 6 iburst prefer' | sudo tee -a /etc/chrony/chrony.conf",
#                     "sudo service chrony restart", "chronyc tracking"]

#     def run(self):
#         result = subprocess.run(['chronyc', 'tracking'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)    

#         if stdout:
#                 text = stdout.decode("utf-8") + "\n"
#         if stderr:
#             text += stderr.decode("utf-8")
            
#         return Success(text) if result.returncode == 0 else Failure(text)
    

class SpeedTest(TaskDispatcher):
    def __init__(self, num_tests, delay, *args, **kwargs):
        base_params = {k: kwargs[k] for k in ['name'] if k in kwargs}
        
        # Pass the remaining arguments to the parent class constructor
        super().__init__(*args, **base_params)
        self.linux_instance = SpeedTestLinuxImplementation(name=self.name, num_tests = num_tests, delay = delay)
        

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_instance

        raise NotImplementedError(
            f"SpeedTest is not implemented for architecture: {node.architecture}"
        )


class SpeedTestLinuxImplementation(Task):
    requirements = ["pip install speedtest-cli", "pip install schedule"]

    def __init__(self, num_tests, delay, *args, **kwargs):
        self.num_tests = num_tests
        self.delay = delay
        base_params = {k: kwargs[k] for k in ['name'] if k in kwargs}
        
        # Pass the remaining arguments to the parent class constructor
        super().__init__(*args, **base_params)

    def run(self):
        processes = []
        results = []
        #raspi is 7 hours ahead
        desired_time = datetime.datetime(2024, 6, 14, 1, 28, 0)  # Example: June 12, 2024
        current_time = datetime.datetime.now()
        time_difference = desired_time - current_time
        wait_time_seconds = time_difference.total_seconds()
        if wait_time_seconds > 0:
            time.sleep(wait_time_seconds)
       

        # Start all processes concurrently
        for _ in range(self.num_tests):
            start_time = time.time()
            process = subprocess.Popen(["speedtest-cli", "--simple", "--secure"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            #result = subprocess_run(["speedtest-cli", "--simple", "--secure"])
            processes.append((process, start_time))
            #processes.append((result, start_time))
            time.sleep(self.delay)  # Delay between starting each test


        # Collect results from all processes
        for process, start_time in processes:
            stdout, stderr = process.communicate()
            end_time = time.time()
            elapsed_time = end_time - start_time

            text = ""
            if stdout:
                text = stdout.decode("utf-8") + "\n"
            if stderr:
                text += stderr.decode("utf-8")
            
            result = Success(text) if process.returncode == 0 else Failure(text)
            time_started = datetime.datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')
            result_with_time = f"{result} Time taken: {elapsed_time:.2f} seconds Time started: {time_started}\n"
            #results.append(result_with_time)
            results.append(result_with_time)
        self.job_executed = True
        return results
        





Now, let's create a pipeline. We would like to start the tcpdump (network traffic capturing), then do speedtest several times, then stop capturing the data and upload it to some temporary file storage (we chose `https://file.io` website for this, and no, they haven't paid us for the advertisement).

In [17]:
pipeline = (
    Pipeline()
    # .then(StartCapture(filepath="/tmp/capture.pcap", name="capture"))
)

pipeline.then(SpeedTest(num_tests = 30, delay = 1))

pipeline = (
    pipeline
    # .then(StopNamedCapture(start_capture_task_name="capture"))
    # .then(UploadToFileIO(filepath="/tmp/capture.pcap", expires="1d"))
)

After we decided what our pipeline would look like, we need to connect to some netunicorn instance and get nodes we will run our pipeline on. If you have `.env` file with credential in the folder, we need to read it, and then try to read needed parameters from environment variables.

If no `.env` file or parameters in environment variables are provided, let's assume you're working with local installation of netunicorn with the default endpoint address and credentials. If this is not the case, feel free to modify the next variables.

In [18]:
# API connection endpoint
NETUNICORN_ENDPOINT = 'https://pinot.cs.ucsb.edu/netunicorn/'
# user login
NETUNICORN_LOGIN = 'egalanua'
# user password
NETUNICORN_PASSWORD = 'nZ309yM5MgZi'


Connect to the instance and verify that it's healthy.

In [19]:
client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
client.healthcheck()

True

Great!

Now, let's ask for some nodes. For demonstration purposes we will take some nodes from our infrastructures that have names like `raspi-blablabla` (look at the filter function below). If you have local installation, let's take a single node. If you use your own infrastructure, feel free to modify the example.

In [20]:
nodes = client.get_nodes()
nodes

[<Uncountable node pool with next node template: [aws-fargate-A-egalanua-, aws-fargate-B-egalanua-, aws-fargate-ARM64-egalanua-]>, [raspi-e4:5f:01:a7:b1:c1, raspi-e4:5f:01:9c:ca:3a]]

In [21]:
# switch for showing our infrastructure vs you doing it locally on other nodes
# if os.environ.get('NETUNICORN_ENDPOINT', 'http://localhost:26611') != 'http://localhost:26611':
#     working_nodes = nodes.filter(lambda node: node.name.startswith("raspi")).take(5)
# else:
#     working_nodes = nodes.take(1)
working_nodes = nodes.filter(lambda node: node.name.startswith("raspi-e4:5f:01:9c:ca:3a")).take(1)
working_nodes
#c1 38

[raspi-e4:5f:01:9c:ca:3a]

Afterwards, we need to create the experiment -- let's assign the same pipeline to all nodes!

In [22]:
experiment123 = Experiment().map(pipeline, working_nodes)

In [23]:
experiment123

 - Deployment: Node=raspi-e4:5f:01:9c:ca:3a, executor_id=, prepared=False, error=None

Now, we defined the pipeline and the experiment, so it's time to prepare it...

In [24]:
experiment_label123 = "speed_test_example"
try:
    client.delete_experiment(experiment_label123)
except RemoteClientException:
    pass

client.prepare_experiment(experiment123, experiment_label123)

'speed_test_example'

...and wait while it's compiling and distributing to nodes.

In [25]:
import time
while True:
    info = client.get_experiment_status(experiment_label123)
    print(info.status)
    if info.status == ExperimentStatus.READY:
        break
    time.sleep(20)

ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.PREPARING
ExperimentStatus.READY


As soon as the experiment is READY, let's start it.

In [26]:
client.start_execution(experiment_label123)

'speed_test_example'

In [27]:
while True:
    info = client.get_experiment_status(experiment_label123)
    print(info.status)
    if info.status != ExperimentStatus.RUNNING:
        break
    time.sleep(10)

ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.FINISHED


If (we hope in your case too) the experiment is finished, we can explore the resulting object with execution information, such as errors, results of execution, and raw logs of all tasks in each deployment. 

In [28]:
from returns.pipeline import is_successful
from returns.result import Result

for report in info.execution_result:
    print(f"Node name: {report.node.name}")
    print(f"Error: {report.error}")

    result, log = report.result  # report stores results of execution and corresponding log
    
    # result is a returns.result.Result object, could be Success of Failure
    print(f"Result is: {type(result)}")
    if isinstance(result, Result):
        print(result)
        # .unwrap() if is_successful(result) else result
        # for key, value in data.items():
        #     print(f"{key}: {value}")

    # we also can explore logs
    for line in log:
        print(line.strip())
    print()

Node name: raspi-e4:5f:01:9c:ca:3a
Error: None
Result is: <class 'returns.result.Success'>
<Success: defaultdict(<class 'list'>, {'a64c1df2-9de2-43ad-8040-e8fea4274e8f': [<Success: ['<Success: Ping: 18.76 ms\nDownload: 4.70 Mbit/s\nUpload: 6.80 Mbit/s\n\n> Time taken: 60.80 seconds Time started: 2024-06-14 18:06:54\n', '<Success: Ping: 8.724 ms\nDownload: 16.15 Mbit/s\nUpload: 5.35 Mbit/s\n\n> Time taken: 59.80 seconds Time started: 2024-06-14 18:06:55\n', '<Success: Ping: 38.871 ms\nDownload: 12.75 Mbit/s\nUpload: 5.66 Mbit/s\n\n> Time taken: 62.35 seconds Time started: 2024-06-14 18:06:56\n', '<Success: Ping: 16.812 ms\nDownload: 5.27 Mbit/s\nUpload: 3.16 Mbit/s\n\n> Time taken: 61.34 seconds Time started: 2024-06-14 18:06:57\n', '<Success: Ping: 247.236 ms\nDownload: 3.20 Mbit/s\nUpload: 2.47 Mbit/s\n\n> Time taken: 74.25 seconds Time started: 2024-06-14 18:06:58\n', '<Success: Ping: 27.563 ms\nDownload: 18.64 Mbit/s\nUpload: 7.38 Mbit/s\n\n> Time taken: 73.25 seconds Time started: 

As you see, in this example we successfully measured speed test several times from our nodes, captured the traffic and uploaded the data to the cloud. Now the only thing left is to explore it and draw some conclusions, but we will leave this to you. :)

Please, visit the https://netunicorn.cs.ucsb.edu website if you look for additional documentation or information regarding this platform, usage, and API.