In [None]:
import time
import random
import time
import requests 
import re
import logging
import subprocess
from subprocess import Popen
from sys import platform
import os, sys
import logging
import json
import threading
import numpy as np
import pandas as pd

import subprocess
import time
import signal
from typing import Dict

from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node
from netunicorn.base.task import Success, Failure, Task, TaskDispatcher


from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline
from netunicorn.library.tasks.basic import SleepTask
from netunicorn.library.tasks.measurements.ookla_speedtest import SpeedTest
from netunicorn.library.tasks.measurements.ping import Ping
from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node
from netunicorn.base.task import Failure, Task, TaskDispatcher
from netunicorn.base import Result, Failure, Success, Task, TaskDispatcher
from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node

from typing import Dict
from typing import Optional
from enum import IntEnum
from datetime import datetime

from returns.pipeline import is_successful
from returns.result import Failure

from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.base import Experiment, ExperimentStatus, Pipeline
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture
from netunicorn.library.tasks.upload.fileio import UploadToFileIO
from netunicorn.library.tasks.upload.webdav import UploadToWebDav
from netunicorn.library.tasks.basic import SleepTask
from netunicorn.library.tasks.measurements.ookla_speedtest import SpeedTest
from netunicorn.library.tasks.video_watchers.vimeo_watcher import WatchVimeoVideo

from netunicorn.base import DockerImage


# Selenium based Twitch watcher that disables QUIC so we can capture Pure TCP Packets. 

In [None]:
"""
Selenium-based Twitch watcher
"""
import os
import random
import subprocess
import time
from typing import Optional

from netunicorn.base import Result, Success, Task, TaskDispatcher
from netunicorn.base.architecture import Architecture
from netunicorn.base.nodes import Node



def watch(
    url: str, duration: int = 10, chrome_location: Optional[str] = None, webdriver_arguments: Optional[list] = None
) -> Result[str, str]:
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    from selenium.webdriver.chrome.service import Service

    display_number = random.randint(100, 500)
    xvfb_process = subprocess.Popen(
        ["Xvfb", f":{display_number}", "-screen", "0", "1920x1080x24"]
    )
    os.environ["DISPLAY"] = f":{display_number}"

    options = Options()
    options.add_argument("--no-sandbox")
    options.add_argument("--autoplay-policy=no-user-gesture-required")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--enable-http2")
    options.add_argument("--disable-quic")
    if webdriver_arguments:
        for argument in webdriver_arguments:
            options.add_argument(argument)
    if chrome_location:
        options.binary_location = chrome_location
    driver = webdriver.Chrome(service=Service(), options=options)

    time.sleep(1)
    driver.get(url)
    

    # can't find a way to check whether video is playing now, so let's just wait a timeout
    time.sleep(duration)
    result = Success(f"Video probably finished by timeout: {duration} seconds")
    driver.close()
    xvfb_process.kill()
    return result


class WatchTwitchStream(TaskDispatcher):
    def __init__(
            self,
            video_url: str,
            duration: Optional[int] = None,
            chrome_location: Optional[str] = None,
            webdriver_arguments: Optional[list] = None,
            *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.video_url = video_url
        self.duration = duration
        self.chrome_location = chrome_location
        self.webdriver_arguments = webdriver_arguments
        self.linux_implementation = WatchTwitchStreamLinuxImplementation(
            self.video_url,
            self.duration,
            self.chrome_location,
            self.webdriver_arguments,
            name=self.name
        )

    def dispatch(self, node: Node) -> Task:
        if node.architecture in {Architecture.LINUX_AMD64, Architecture.LINUX_ARM64}:
            return self.linux_implementation

        raise NotImplementedError(
            f'WatchTwitchVideo is not implemented for architecture: {node.architecture}'
        )


class WatchTwitchStreamLinuxImplementation(Task):
    requirements = [
        "apt install -y python3-pip wget xvfb procps chromium=131.0.6778.85 chromium-driver=131.0.6778.85",
        "pip3 install selenium webdriver-manager==4.17.1",
    ]

    def __init__(
            self,
            video_url: str,
            duration: int = 10,
            chrome_location: Optional[str] = None,
            webdriver_arguments: Optional[list] = None,
            *args, **kwargs
    ):
        self.video_url = video_url
        self.duration = duration
        self.chrome_location = chrome_location
        if not self.chrome_location:
            self.chrome_location = "/usr/bin/chromium"
        self.webdriver_arguments = webdriver_arguments
        super().__init__(*args, **kwargs)

    def run(self):
        return watch(self.video_url, self.duration, self.chrome_location, self.webdriver_arguments)

# Helper Functions

In [None]:
def execute_pipeline(pipeline, working_node, experiment_label):
    client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
    nodes = client.get_nodes()
    working_nodes = nodes.filter(lambda node: node.name.startswith(working_node)).take(1)
    experiment = Experiment().map(pipeline, working_nodes)
    
    for deployment in experiment:
        # you can explore the image on the DockerHub
        deployment.environment_definition = DockerImage(image='satyandraguthula/netunicorn_images')

    try:
        client.delete_experiment(experiment_label)
    except RemoteClientException:
        pass

    # Prepare Experiment
    client.prepare_experiment(experiment, experiment_label)
    while True:
        info = client.get_experiment_status(experiment_label)
        print(info.status)
        if info.status == ExperimentStatus.READY:
            break
        time.sleep(20)

    time.sleep(5)

    # Execute Experiment
    client.start_execution(experiment_label)
    while True:
        info = client.get_experiment_status(experiment_label)
        print(info.status)
        if info.status != ExperimentStatus.RUNNING:
            break
        time.sleep(20)
    return info

In [None]:
def display_results(info):
    # Get Results
    for report in info.execution_result:
        print(f"Node name: {report.node.name}")
        print(f"Error: {report.error}")

        if report.result is None:
            print("report.result is EMPTY..")
            continue

        result, log = report.result  # report stores results of execution and corresponding log

        # result is a returns.result.Result object, could be Success of Failure
        print(f"Result is: {type(result)}")
        if is_successful(result):
            data = result.unwrap()
        else:
            data = result.failure()
        try:
            for key, value in data.items():
                print(f"{key}: {value}")
        except:
            print(f"No attribute 'items' in result")

        # we also can explore logs
        for line in log:
            print(line.strip())
        print()

# Login. Enter Your credentails in netunicorn_login and netunicorn_password

In [None]:
netunicorn_login = 'login'
netunicorn_password = 'password'

NETUNICORN_ENDPOINT = os.environ.get('NETUNICORN_ENDPOINT', 'https://pinot.cs.ucsb.edu/netunicorn')
NETUNICORN_LOGIN = os.environ.get('NETUNICORN_LOGIN', netunicorn_login)
NETUNICORN_PASSWORD = os.environ.get('NETUNICORN_PASSWORD', netunicorn_password)

client = RemoteClient(endpoint=NETUNICORN_ENDPOINT, login=NETUNICORN_LOGIN, password=NETUNICORN_PASSWORD)
print("Health Check: {}".format(client.healthcheck()))
nodes = client.get_nodes()
print(nodes)

# Working Node

In [None]:
working_node = "your_node_name"

# Define Twitch streams you want to watch.

In [None]:
#Feel Free to Update with Different streamers
twitch_stream = [
    "https://www.twitch.tv/loltyler1",
    "https://www.twitch.tv/coconutb",
    "https://www.twitch.tv/summit1g",
    "https://www.twitch.tv/ludwig",
    "https://www.twitch.tv/sodapoppin",
    "https://www.twitch.tv/philza",
    "https://www.twitch.tv/ohnepixel",
    "https://www.twitch.tv/lirik",
    "https://www.twitch.tv/forsen",
    "https://www.twitch.tv/pokerstars"
    
]

# Install netUnicorn on the node we run the experiment on. 

In [None]:
class InstallNumpy(Task):
    requirements = [
        'pip install --upgrade netunicorn-library'
    ]
    def run(self):
        return

# Define Pipeline

In [None]:
pipeline = Pipeline()

pipeline.then(InstallNumpy())
pipeline.then(StartCapture(filepath="/tmp/data.pcap",
                           name= "twitch_capture"))

for i in range(10):
    pipeline.then(WatchTwitchStream(video_url=twitch_stream[i], duration=60))
pipeline.then(StopNamedCapture(start_capture_task_name="twitch_capture"))

pipeline.then(SleepTask(2))

pipeline.then(UploadToWebDav(filepaths={"/tmp/data.pcap"}, endpoint="http://endpoint_path", username="username", password="password"))


# Define and Run Experiment

In [None]:
info = execute_pipeline(pipeline, working_node, "experiment_label")
display_results(info)