# **Data Collection Pipeline using Netunicorn**

This document outlines a data collection pipeline designed around Netunicorn, a specialized library for gathering reliable networking data.

### **Pipeline Objective**

The primary goal of this pipeline is to capture detailed packet trace data while simulating real-world user interactions with YouTube videos. This involves using Selenium, a web testing automation library, to randomly pause and unpause videos. A custom task specifically developed for this purpose mimics user actions and collects critical data points during these interactions.

### **Data Collected**

The pipeline collects the following data:

* **`Packet Trace:`** Detailed network packet information captured during different video playback states.
* **`Video State:`** Real-time status of the YouTube video, categorized as playing, paused, or buffering.
* **`Latency:`** Measurements of network response delay, providing insights into network efficiency.
* **`YouTube Stats for Nerds:`** A set of advanced metrics provided by YouTube, offering deeper insights into video and network performance.

 

In [13]:
import os
import time
import json
import random
import subprocess
import numpy as np
import pandas as pd

from netunicorn.base import Task, Success, Failure
from netunicorn.base import DockerImage, Experiment, ExperimentStatus, Pipeline
from netunicorn.client.remote import RemoteClient, RemoteClientException
from netunicorn.library.tasks.capture.tcpdump import StartCapture, StopNamedCapture
from netunicorn.library.tasks.upload.fileio import UploadToFileIO

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import StaleElementReferenceException
from selenium.webdriver import ActionChains

## **Data Combiner**

A Task designed to take the pcap file and json file generated by the Pausing Simulator task and the StartCapture task and combine it into a csv file, this was made to save a bit of time.

In [14]:
class DataCombiner(Task):
    def __init__(self, pcap_filepath : str, video_details_filepath : str,  res_filepath : str) -> None:
        super().__init__()
        self.pcap_filepath = pcap_filepath
        self.video_details_filepath = video_details_filepath
        self.res_filepath = res_filepath

    def run(self):
        try:
            self.create_csv()
            json_df = self.pull_json()
            
            # DROPPING UNWANTED COLUMNS
            json_df = json_df.drop(columns=['latency', 'yt_metrics'])
            
            csv_df = self.pull_csv()
            merged_df = self.combine(csv_df, json_df)
            self.save_csv(merged_df)
            return Success("Successfully Combined Data")
        except Exception as e:
            return Failure(str(e))
        
        
    def create_csv(self):
        with open(self.res_filepath, 'w') as f:
            process = subprocess.run([
                "tshark", "-r", self.pcap_filepath,
                "-T", "fields",
                "-e", "frame.number",
                "-e", "frame.time_epoch",
                "-e", "ip.src",
                "-e", "ip.dst",
                "-e", "_ws.col.Protocol",
                "-e", "frame.len",
                "-e", "_ws.col.Info",
                "-E", "header=y",
                "-E", "separator=;"
            ], stdout=f, stderr=subprocess.PIPE, text=True)

            if process.returncode != 0:
                raise Exception(f"tshark error: {process.stderr}")
            elif process.stderr:
                print(f"tshark warnings: {process.stderr}")

    def pull_json(self):
        with open(self.video_details_filepath, 'r') as file:
            data = json.load(file)
        return pd.DataFrame(data)
    
    def save_csv(self, df):
        df.to_csv(self.res_filepath)
    
    def pull_csv(self):
        return pd.read_csv(self.res_filepath, sep=';')
    
    def combine(self, csv_df, json_df):
        csv_df.rename(columns={
            'frame.time_epoch': 'Time',
            'frame.number': 'No',
            'ip.src': 'Source',
            'ip.dst': 'Destination',
            '_ws.col.Protocol': 'Protocol',
            'frame.len': 'Length',
            '_ws.col.Info': 'Info',
        }, inplace=True)
        
        # Convert times to float
        csv_df['Time'] = csv_df['Time'].astype('float64')
        json_df['time'] = json_df['time'].astype('float64')

        # Sort dataframes by time
        csv_df = csv_df.sort_values('Time')
        json_df = json_df.sort_values('time')

        # Find the time range in json_df
        min_time = json_df['time'].min()
        max_time = json_df['time'].max()

        # Filter csv_df to be within the time range of json_df
        csv_df = csv_df[(csv_df['Time'] >= min_time) & (csv_df['Time'] <= max_time)]

        merged_df = pd.merge_asof(csv_df, json_df, left_on='Time', right_on='time', direction='nearest')
        return merged_df
        


# **PausingSimulator**

This task is designed to simulate user behavior, it pauses and unpauses a video repeatedly at random intervals using selenium webdriver and it also collects youtube stats for nerds data, as well as latency

In [34]:
class PausingSimulator(Task):
    def __init__(self, video_url=None, duration=None, filepath=None, chrome_location=None, webdriver_arguments=None) -> None:
        super().__init__()
        self.video_url = video_url
        self.duration = duration
        self.filepath = filepath
        self.chrome_location = chrome_location or '/usr/bin/chromium'
        self.webdriver_arguments = webdriver_arguments or []
        self.drive = None
        self.log = []
        self.sfn_open = False
    
    def run(self):
        try:
            self.init_webdriver()
            self.driver.get(self.video_url)
            start_time = self.wait_until_start()
            
            while time.time() - start_time < self.duration:
                temp = random.randint(0, 20)  
                if temp % 3 == 0:  
                    delay = random.randint(5, 30)  
                elif temp % 3 == 1:
                    delay = random.randint(30, 60) 
                else:
                    delay = int(random.gauss(20, 5)) 

                                
                for _ in range(delay):
                    self.log_data()
                    time.sleep(1)
                self.toggle_video_state()
                pass
            
            self.save_file()
            self.driver.quit()
            return Success("Successfully Watched Video and Simulated Pausing Events")

        except Exception as e:
            print(f"An error occurred: {e}")
            return Failure(str(e))
    
    def get_network_metrics(self):
        try:
            response = subprocess.run(["ping", "-c", "1", 'www.youtube.com'], capture_output=True)
            output = response.stdout.decode()
            latency = output.split("time=")[-1].split(" ms")[0]

            return latency
        except Exception as e:
            return f"Error measuring latency: {e}"

    
    def get_youtube_metrics(self):
        """Fetches metrics from 'Stats for nerds'."""
        
        DIVS = [1, 2, 3, 4, 5, 7, 9, 10, 11]

        DIV_TO_KEY = {
            1: 'Video ID',
            2: 'Viewport / Frame Size',
            3: 'Current Res / Optimal Res',
            4: 'Volume / Normalized',
            5: 'Codecs',
            7: 'Color',  
            9: 'Connection Speed',  
            10: 'Network Activity',  
            11: 'Buffer Health',  
        }

        try:
            if not self.enable_stats_for_nerds():
                return "Unable to open Stats for nerds"

            stat_dict = {}
            for div_id in DIVS:
                try:
                    elem = self.driver.find_element(By.CSS_SELECTOR, f".html5-video-info-panel-content > div:nth-child({div_id}) > span:nth-child(2)")
                    stat_dict[DIV_TO_KEY[div_id]] = elem.text
                except Exception as e:
                    print(f"An error occurred in get_stats for DIV {div_id}: {e}")
                    stat_dict[DIV_TO_KEY.get(div_id, f"Unknown_DIV_{div_id}")] = 'Error'
            return stat_dict
        except Exception as e:
            print(f"An error occurred in get_youtube_metrics: {e}")
            return Failure(str(e))
    
    def get_video_state(self):
        try:
            player_status = self.driver.execute_script("return document.getElementById('movie_player').getPlayerState()")
            return player_status
            pass
        except Exception as e:
            print(f"An error occurred: {e}")
            return Failure(str(e))
    
    def toggle_video_state(self):
        try:
            WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((By.CLASS_NAME, "ytp-play-button")))

            play_button = self.driver.find_element(By.CLASS_NAME, "ytp-play-button")

            self.driver.execute_script("arguments[0].click();", play_button)
        except Exception as e:
            print(f"An error occurred: {e}")
            return Failure(str(e))

    def init_webdriver(self):
        try:
            options = Options()
            for argument in self.webdriver_arguments:
                options.add_argument(argument)
            self.driver = webdriver.Chrome(options=options)
        except Exception as e:
            print(f"An error occurred: {e}")
            return Failure(str(e))
    
    def save_file(self):
        try:
            with open(self.filepath, "w") as json_file:
                json.dump(self.log, json_file)
        except Exception as e:
            print(f"An error occurred: {e}")
            return Failure(str(e))

        
    def wait_until_start(self):
        try:
            video_is_playing = False
            curr_time = self.find_element(By.CLASS_NAME, "ytp-time-current")
            while not video_is_playing:
                time.sleep(1)
                try:
                    curr_time = self.find_element(By.CLASS_NAME, "ytp-time-current")
                    video_is_playing = curr_time.text[:4] == curr_time.text[-4:]
                except StaleElementReferenceException:
                    pass
            return time.time()
        except Exception as e:
            print(f"An error occurred: {e}")
            return Failure(str(e))

    def find_element(self, by, value):
        try:
            return WebDriverWait(self.driver, 10).until(EC.presence_of_element_located((by, value)))
        except Exception as e:
            print(f"An error occurred: {e}")
            return Failure(str(e))
    
    def enable_stats_for_nerds(self):
        """Enables the 'Stats for nerds' feature on the YouTube player."""
        try:
            
            if self.sfn_open:
                return True
            
            movie_player = self.driver.find_element(By.ID, 'movie_player')
            ActionChains(self.driver).move_to_element(movie_player).context_click().perform()
            options = self.driver.find_elements(By.CLASS_NAME, 'ytp-menuitem')
            for option in options:
                option_child = option.find_element(By.CLASS_NAME, 'ytp-menuitem-label')
                if option_child.text == 'Stats for nerds':
                    option_child.click()
                    self.sfn_open = True
                    return True
            return False
        except Exception as e:
            print(f"Error enabling 'Stats for nerds': {e}")
            return False
    
    def log_data(self):
        curr_state = self.get_video_state()
        yt_metrics = self.get_youtube_metrics()
        latency = self.get_network_metrics()

        self.log.append({
                    "time": time.time(),
                    "curr_state": curr_state,
                    "yt_metrics": yt_metrics,
                    "latency": latency
                })

In [35]:

ENDPOINT = 'https://pinot.cs.ucsb.edu/netunicorn'
LOGIN = 'team_lb'
PASSWORD = 'userbehavior023'

In [36]:
client = RemoteClient(endpoint=ENDPOINT, login=LOGIN, password=PASSWORD)
nodes = client.get_nodes()

In [37]:
working_nodes = nodes.filter(lambda node: "aws" in node.name and "ARM64" not in node.name).take(6)
working_nodes

[aws-fargate-A-team_lb-1,
 aws-fargate-B-team_lb-2,
 aws-fargate-A-team_lb-3,
 aws-fargate-B-team_lb-4,
 aws-fargate-A-team_lb-5,
 aws-fargate-B-team_lb-6]

In [38]:
capture_task_name = "capture_task"
capture_file_path = "/tmp/capture_task.pcap"
tcpdump_filter = (
    "tcp or udp "
    "and not icmp and not arp "
    "and not port 22 "
    "and not host 169.254.169.254"
)
args=[
        "-i", "any",
        "-tttt",  
        "-s", "0",
        "-w", capture_file_path,
        tcpdump_filter.strip()
    ]

start_capture = StartCapture(filepath=capture_file_path, arguments=args, name=capture_task_name)
stop_capture = StopNamedCapture(start_capture_task_name=capture_task_name)

In [39]:
video_url='https://www.youtube.com/watch?v=KcMlPl9jArM'
duration=600
video_details_filepath='/tmp/video_details.json'
webdriver_args = [
    '--headless', 
    '--no-sandbox', 
    '--autoplay-policy=no-user-gesture-required', 
    '--disable-dev-shm-usage'
    ]

simulate_video = PausingSimulator(video_url=video_url,duration=duration, filepath=video_details_filepath, webdriver_arguments=webdriver_args)

In [40]:
result_filepath =f'/tmp/dataset_{duration}_sec.csv'
combine_data = DataCombiner(pcap_filepath=capture_file_path, video_details_filepath=video_details_filepath, res_filepath=result_filepath)

In [41]:
pipeline = (
    Pipeline()
    .then(start_capture)
    .then(simulate_video)
    .then(stop_capture)
    .then(combine_data)
    .then(UploadToFileIO(filepath=result_filepath, expires="1d"))
)

experiment = Experiment().map(pipeline, working_nodes)
docker_image = DockerImage(image='bravola/netunicorn:v3')
for deployment in experiment:
     deployment.environment_definition = docker_image

In [42]:
experiment_label = "lb_ds_1"

In [43]:
try:
    client.delete_experiment(experiment_label)
except RemoteClientException:
    pass

client.prepare_experiment(experiment, experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status == ExperimentStatus.READY:
        break
    time.sleep(20)

client.start_execution(experiment_label)

while True:
    info = client.get_experiment_status(experiment_label)
    print(info.status)
    if info.status != ExperimentStatus.RUNNING:
        break
    time.sleep(20)

client.get_experiments()[experiment_label]

ExperimentStatus.PREPARING
ExperimentStatus.READY
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.RUNNING
ExperimentStatus.FINISHED


ExperimentExecutionInformation:
status: ExperimentStatus.FINISHED
experiment: 
 - Deployment: Node=aws-fargate-A-team_lb-1, executor_id=180c27bd-0d34-4004-af10-3b5ee1b0671f, prepared=True, error=None
 - Deployment: Node=aws-fargate-B-team_lb-2, executor_id=adb87173-f9e0-4102-977c-5bddb9441f68, prepared=True, error=None
 - Deployment: Node=aws-fargate-A-team_lb-3, executor_id=a39fd931-bdaa-4f62-ac86-e821c0cec6c3, prepared=True, error=None
 - Deployment: Node=aws-fargate-B-team_lb-4, executor_id=1823ad71-3f62-47eb-be44-d8ea2305fd0d, prepared=True, error=None
 - Deployment: Node=aws-fargate-A-team_lb-5, executor_id=44dcd810-6a16-4788-acae-43333720f402, prepared=True, error=None
 - Deployment: Node=aws-fargate-B-team_lb-6, executor_id=7b646ce2-bff2-4e7a-af1f-7af86a8039f8, prepared=True, error=None
execution_result:
[DeploymentExecutionResult:
  Node: aws-fargate-A-team_lb-1
  Result: <class 'NoneType'>
   None
  Logs:
    Parsed configuration: Gateway located on https://pinot.cs.ucsb.edu/n