# Processing

Base version of the notebook that we used to pre-process the collected data, and to then generate CSV files for the graphing. Comments are provided to aid in the usage of the notebook. Modifications may be necessary to get the code functioning.

In [None]:
%load_ext jupyternotify

In [None]:
import numpy as np
import pandas as pd

from scipy import stats

import json
import datetime
from datetime import time, timedelta 

import re
import os
import io

from scipy import stats
from pathlib import Path

from tqdm.notebook import tqdm

import csv

import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter('ignore', category=RuntimeWarning)

In [None]:
def print_datetime():
    """Creating a datetime object containing current date and time"""
    
    now = datetime.datetime.now()
    dt_string = now.strftime('%d/%m/%Y %H:%M:%S')
    return dt_string

In [None]:
# Setting correct data paths and experiment names

base_path = "/data" # CHANGE TO CORRECT BASE PATH CONTAINING DATA FOLDERS
experiments = ['26.6.2023-scaling-10-results'] # SET TO THE CORRECT RESULTS FOLDER

In [None]:
service_names = ['primary', 'sift', 'encoding', 'lsh', 'matching']
exp_servers = ['gpu', 'cm', 'aws']

service_statuses = ['DEPLOY_REQUEST', 'UNDEPLOY_REQUEST', 'DEPLOYED', 'DEAD']

In [None]:
# Functions to read the different types of log files 

def analyse_queue_log(line):
    cleaned_line = ""
    return cleaned_line

def read_log(file_name, file_list):
    with open(file_name, errors='ignore') as f:
        lines = f.readlines()
        for line in lines:
            try:
                line = line.replace("\\", "")
                json_line = json.loads(line)
                file_list.append(json_line)
            except:

                try:
                    if line == "\n":
                        pass
                    else:
                        ignore_strings = ["SIFT extraction time =", "Incl prefiltering & memcpy =", 
                                          ";QUEUE;analytics;{analytics:", "Waiting for sidecar service connection...",
                                          "Sidecar CONNECTED!", "Blocking, press ctrl+c to kill the sidecar...",
                                          "-- Detecting", "-- Looking", "EOF", "%]", "warning", "Warning", "--",
                                          "Scanning", "|", "declared here", "/home/src/cuda_files.cu:"]
                        res = [partial_string for partial_string in ignore_strings if(partial_string in line)]
                        if not bool(res):
                            pass
                except:
                    pass
    return file_list

def read_eventLogger(file_name, dataframe):
    colnames = ['timestamp', 'service', 'event', 'value', '']
    events_df = pd.read_csv(file_name.decode("utf-8"), delimiter=';', 
                            names=colnames)

    server_type = file_name.decode('utf-8').split('/')[-1].split('-')[-1].split('.')[0]
    events_df['server'] = server_type

    dataframe = pd.concat([dataframe, events_df])
    return dataframe

def read_gpu(file_name, dataframe):
    gpu_df = pd.read_csv(file_name.decode('utf-8'), delimiter=',', on_bad_lines='skip')

    server_type = file_name.decode('utf-8').split('/')[-1].split('-')[-1].split('.')[0]
    gpu_df['server'] = server_type

    dataframe = pd.concat([dataframe, gpu_df])
    return dataframe

def read_gpu_json(file_name, dataframe):
    gpu_read_df = pd.read_json(file_name.decode('utf-8'), lines=True)

    server_type = file_name.decode('utf-8').split('/')[-1].split('-')[-1].split('.')[0]
    gpu_read_df['server'] = server_type

    dataframe = pd.concat([dataframe, gpu_read_df])
    return dataframe

def load_data(experiment_name):
    directory = os.fsencode(f'{base_path}/{experiment_name}')  # log directory

    # create lists and the dataframes to house the data
    client_data = []
    pipeline_data = []

    events_df = pd.DataFrame()
    deployment_df = pd.DataFrame()
    gpu_total_df = pd.DataFrame()
    gpu_process_df = pd.DataFrame()
    gpu_json_df = pd.DataFrame()

    for path, subdirs, files in os.walk(directory):
        for i in range(len(files)):
            curr_file = os.path.join(path, files[i])
            file_size = os.path.getsize(curr_file)
            print(f"[{print_datetime()}] Current file is {curr_file.decode('utf-8')} with filesize {file_size/100} KB")
            if file_size == 0:
                print(f"[{print_datetime()}] File {curr_file.decode('utf-8')} has no data, will skip")
                continue
            if any(file_ext in curr_file.decode('utf-8')
                   for file_ext in ['.sh', '.swp', '.pem']):
                pass
            elif 'clients.log' in str(curr_file):
                read_log(curr_file, client_data)
            elif 'pipeline' in str(curr_file):
                read_log(curr_file, pipeline_data)
            elif 'eventLogger' in str(curr_file):
                events_df = read_eventLogger(curr_file, events_df)
            elif 'deployment' in str(curr_file):
                if 'scal' in experiment_name:
                    deployment_file = open(curr_file.decode('utf-8'), "r")
                    deployment_data = deployment_file.read()
                    deployment_data = deployment_data.replace('gpu02;cm-01-05-061', 'gpu02:cm-01-05-061')
                    deployment_data_buffer = io.StringIO(deployment_data)
                else:
                    deployment_data_buffer = curr_file.decode('utf-8')
                deployment_df = pd.read_csv(deployment_data_buffer , sep=';')
            elif 'gpu' in str(curr_file):
                if 'json' in str(curr_file):
                    gpu_json_df = read_gpu_json(curr_file, gpu_json_df)
                elif 'global' in str(curr_file):
                    gpu_total_df = read_gpu(curr_file, gpu_total_df)
                elif 'processes' in str(curr_file):
                    gpu_process_df = read_gpu(curr_file, gpu_process_df)


            print(f"[{print_datetime()}] Loaded {curr_file.decode('utf-8')} succesfully")

    # preprocessing to be run only once, dataframes are not to be modified after
    # convert lists to dataframes if required
    client_data = [i for i in client_data if not isinstance(i, int)]
    client_df = pd.DataFrame(client_data)

    pipeline_clean_ints = [i for i in pipeline_data if not isinstance(i, int)]
    pipeline_clean_floats = [i for i in pipeline_clean_ints if not isinstance(i, float)]
    pipeline_df = pd.DataFrame(pipeline_clean_floats)

    # convert the timestamps to milliseconds
    client_df['timestamp'] = client_df['timestamp'].map(lambda x: float(x)/1000)
    events_df['timestamp'] = events_df['timestamp'].map(lambda x: float(x)/1000)

    # pre-processing for the pipeline df, if timestamp does not have valid data, remove
    pipeline_df = pipeline_df.dropna(axis=0, subset=['timestamp'])
    pipeline_df['timestamp'] = pipeline_df['timestamp'].astype('string')
    pipeline_df = pipeline_df[pipeline_df['timestamp'].str.contains("\d{13}.\d{2}", regex=True)]
    pipeline_df = pipeline_df[pipeline_df['timestamp'].apply(lambda x: True if len(x) == 16 else False)]
    pipeline_df['timestamp'] = pipeline_df['timestamp'].map(lambda x: float(x)/1000)

    resources_df = events_df[events_df['event'].isin(['RESOURCES'])]
    resources_df['value'] = resources_df['value'].map(lambda x: json.loads(x.replace("{log:\"","").replace("\"}","").replace("null","\"\"")))
        
    dataframes = {
        'client': client_df,
        'events': events_df,
        'pipeline': pipeline_df,
        'deployment': deployment_df, 
        'resources': resources_df,        
        'gpu_total': gpu_total_df,
        'gpu_process': gpu_process_df,
        'gpu_json': gpu_json_df
    }
    
    return dataframes

In [None]:
# Load the results from the different files as variable raw_results

raw_results = {}

for experiment in experiments:
    print(f'[{print_datetime()}] Current experiment is {experiment}')
    raw_results[experiment] = load_data(experiment)

In [None]:
# Pre-processing the raw results

preprocessed_results = {}

def preprocessing_none(input_df):
    """Pre-processing happened when data is first loaded, or pre-processing not required"""
    return input_df    

def preprocessing_gpu_total_process(input_df):
    try:
        # rename timestamp to datetime, then convert to UNIX timestamps
        df_gpu = input_df.rename(columns={'timestamp': 'datetime'})

        # remove rows that do not contain data
        df_gpu = df_gpu[df_gpu['datetime'].str.contains('timestamp') == False] 

        df_gpu['timestamp'] = pd.to_datetime(df_gpu['datetime']) + pd.Timedelta(hours=-1)
        df_gpu['timestamp'] = (df_gpu['timestamp'] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1ms') / 1000
        df_gpu = df_gpu.sort_values(by='datetime', ascending=True)

        # clean column names
        df_gpu.columns = df_gpu.columns.str.replace(' ', '').str.replace('\[.*$', '')
        df_gpu = df_gpu.rename(columns=str.lower)
    except:
        df_gpu = input_df
    return df_gpu

def preprocessing_gpu_json(input_df):
    try:
        df_gpu = input_df.rename(columns={'query_time': 'datetime'})
        df_gpu = df_gpu.sort_values(by='datetime', ascending=True)
        
        df_gpu['timestamp'] = pd.to_datetime(df_gpu['datetime']) + pd.Timedelta(hours=-2)
        df_gpu['timestamp'] = (df_gpu['timestamp'] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1ms') / 1000

        df_time_server = df_gpu[['timestamp', 'datetime', 'server']]

        df_gpu['gpus'] = df_gpu['gpus'].apply(lambda x: json.dumps(x))
        df_gpu['gpus'] = df_gpu['gpus'].apply(json.loads)
        df_gpu = df_gpu.explode(column='gpus').reset_index(drop=True)

        df_gpu = pd.concat([df_gpu[['timestamp', 'datetime', 'server']], pd.json_normalize(df_gpu['gpus'])], axis=1)

        df_gpu['processes'] = df_gpu['processes'].apply(lambda x: json.dumps(x))
        df_gpu['processes'] = df_gpu['processes'].apply(json.loads)
        df_gpu = df_gpu.explode(column='processes').reset_index(drop=True)

        df_gpu = pd.concat([df_gpu, pd.json_normalize(df_gpu['processes'])], axis=1)
        
        selected_time = df_gpu[df_gpu['timestamp'].between(1687722739, 1687725650)]
        unique_lines = selected_time['processes']
        for i in range(len(unique_lines)):
            curr_line = unique_lines.iloc[i]['full_command']
        
    except:
        df_gpu = input_df
    return df_gpu

preprocessing_functions = {
    'client': preprocessing_none,
    'events': preprocessing_none,
    'pipeline': preprocessing_none,
    'deployment': preprocessing_none, 
    'resources': preprocessing_none,        
    'gpu_total': preprocessing_gpu_total_process,
    'gpu_process': preprocessing_gpu_total_process,
    'gpu_json': preprocessing_gpu_json
}

for experiment, raw_result in raw_results.items():
    print(f'Currently pre-processing {experiment}')
    preprocessed_results[experiment] = {}
    for rr_key, rr_value in raw_result.items():
        preprocessed_results[experiment][rr_key] = {}
        preprocessed_results[experiment][rr_key] = preprocessing_functions[rr_key](rr_value)

In [None]:
# Analysis functions

def analysis_fps(df_messages_received, deploy_start, num_clients):
    """Generate the FPS results"""
    
    client_count = None
    
    # find where each client first comes online
    fps_data = df_messages_received.sort_values(by=['timestamp'])
            
    max_timestamp = fps_data['timestamp'].max()
    min_timestamp = fps_data['timestamp'].min()

    fps_data = fps_data.reset_index()
    fps_data = fps_data[fps_data['timestamp'].between(deploy_start, max_timestamp)]
    
    fps_data["frame_no"] = fps_data["frame_no"].astype(str).astype(int)
    
    clients = fps_data['id'].unique()[:num_clients+1]
    clients_locs = []
    for client in clients:
        first_occurs = (fps_data['id'] == client).idxmax()
        clients_locs.append(first_occurs)
    clients_locs.append(len(fps_data))
    
    try:
        rel_data_loc = clients_locs[num_clients+1]
        fps_rel_data = fps_data.iloc[:rel_data_loc]
        fps_rel_data = fps_rel_data[fps_rel_data.id.isin(clients)]
    except:
        fps_rel_data = fps_data

    client_log_items = fps_rel_data[fps_rel_data['id'].str.contains(clients[-1])]

    # find frames closest to frame number 0 and 1600 for curr client
    frame_0 = client_log_items.loc[(client_log_items.frame_no - 0).abs().idxmin()]
    frame_1600 = client_log_items.loc[(client_log_items.frame_no - 1600).abs().idxmin()]

    frames_len = len(client_log_items[client_log_items['frame_no'].between(0, 1600)])
    frames_time = frame_1600.timestamp - frame_0.timestamp

    fps = frames_len / frames_time
    
    client_results = {
        'results': {
            'avg': fps,
            'std': 0
        }, 
        'clients': clients
    }
    return client_results

def analyse_metrics(df_client, clients, deploy_start, deploy_end):  
    """Generating results for E2E latency, jitter, and success rate"""
    
    # dictionary to store the results
    clients_e2e_results = {}
    
    pure_e2e_results = []
    jitter_results = []
    success_rate = []
    
    latency_values = np.array([])
    latency_clients = []
    
    e2e_latency = 0
    
    curr_client = clients

    df_client_rel = df_client[df_client['timestamp'].between(deploy_start, deploy_end)] # relevant client
    client_log_items = df_client_rel[df_client_rel['id'].str.contains(curr_client)].astype('string')
    client_log_items['datetime'] = pd.to_datetime(client_log_items['timestamp'], unit='s')

    # selecting out the logs related to the application 
    curr_client_sf = client_log_items[client_log_items['message'].str.contains("Sent Frame")].astype('string')
    curr_client_rf = client_log_items[client_log_items['message'].str.contains("Received results for")].astype('string')

    # calculating the end-to-end latency
    if len(curr_client_sf) > 0:
        timestamp_first_frame = curr_client_sf['timestamp'].iloc[0]
        total_sent = len(curr_client_sf)
        total_recv = len(curr_client_rf)

        client_logs_merged = pd.merge(curr_client_sf, curr_client_rf, on='frame_no')
        client_logs_merged = client_logs_merged.drop(['service_name_x', 'service_name_y', 'id_y', 'message_x', 'message_y'], axis=1)

        client_logs_merged['timestamp_sent'] = pd.to_numeric(client_logs_merged['timestamp_x'])
        client_logs_merged['timestamp_recv_results'] = pd.to_numeric(client_logs_merged['timestamp_y'])

        client_logs_merged['e2e_latency'] = client_logs_merged['timestamp_recv_results'] - client_logs_merged['timestamp_sent']                    
        client_logs_merged['e2e_latency'] = client_logs_merged['e2e_latency'] * 1000 # convert into ms
        
        latency_values = np.append(latency_values, client_logs_merged['e2e_latency'])
        latency_clients.append(client_logs_merged['id_x'].values.tolist())

        if total_recv == 0:
            e2e_latency = 0
        else:
            try:
                e2e_latency = client_logs_merged['e2e_latency'].median()
            except:
                e2e_latency = 0

        # store the client, time of first frame sent, whether any frames were received,
        clients_e2e_results[curr_client] = {
            'begin_timestamp': timestamp_first_frame,
            'total_sent': total_sent,
            'total_recv': total_recv,
            'success_rate': (total_recv/total_sent),
            'e2e_latency': e2e_latency
        }

        success_rate.append((total_recv/total_sent)*100)

        # calculating the jitter
        jitter_subtr = abs(client_logs_merged['e2e_latency'][:-1].to_numpy() - client_logs_merged['e2e_latency'][1:].to_numpy())
        jitter_vals = np.sum(jitter_subtr) / len(jitter_subtr)
        jitter_results.append(jitter_vals)
    
    results = {
        'e2e': {
            'avg': np.nanmedian(e2e_latency), 
            'std': np.nanstd(e2e_latency)
        },
        'jitter': {
            'avg': np.nanmedian(jitter_results), 
            'std': np.nanstd(jitter_results)
        },
        'success_rate': {
            'avg': np.nanmedian(success_rate), 
            'std': np.nanstd(success_rate)
        }
    }
    return results, latency_values, latency_clients

def select_value(dictionary, key):
    if isinstance(dictionary, list):
        return dictionary[0][key]
    else:
        return dictionary[key]

def analysis_cpu_mem(client_resources):
    """Generating CPU and memory results"""
    
    pipeline_resources = client_resources[~client_resources['service'].str.contains('NODE_ENGINE')]
    pipeline_services = pipeline_resources['service'].unique()
    
    pipeline_resource_usage = {
        'cpu': {
            'avg': 0,
            'std': 0
        },
        'memory': {
            'avg': 0,
            'std': 0
        }
    }
    
    # split gpu_total_resources by server type
    server_types = pipeline_resources['server'].unique()
    for server in server_types:
        curr_server_res = pipeline_resources[pipeline_resources['server'] == server]
        for es_server in exp_servers:
            if server[:2] == es_server[:2]:
                curr_server_services = curr_server_res['service'].unique()                
    
                # retrieve metric results for each service
                for service in curr_server_services:
                    service_name = service.split(".")[2]
                    curr_service_results = pipeline_resources[pipeline_resources['service'].str.contains(service)]
                    for rt_key, rt_item in pipeline_resource_usage.items():
                        curr_resources = curr_service_results['value'].map(lambda x: select_value(x, rt_key))
                        curr_resources = pd.to_numeric(curr_resources)
                        
                        pipeline_resource_usage[rt_key]['avg'] += curr_resources.median()
                        pipeline_resource_usage[rt_key]['std'] += curr_resources.std()     
                        
    return pipeline_resource_usage

def analysis_gpu_total(gpu_total_resources, exp_servers):    
    """Generating GPU usage results for the total usage of the GPU"""
    
    gpu_total_resources = gpu_total_resources[['memory.used', 'server']]
    gpu_total_resources['memory'] = gpu_total_resources['memory.used'].str.replace('MiB', '')
    gpu_total_resources['memory'] = pd.to_numeric(gpu_total_resources['memory']) * 1.04858 # convert to MB
    
    gpu_total_memory_avg = 0
    gpu_total_memory_std = 0 
        
    # split gpu_total_resources by server type
    server_types = gpu_total_resources['server'].unique()
    for server in server_types:
        curr_gpu_res = gpu_total_resources[gpu_total_resources['server'] == server]
        for es_server in exp_servers:
            if server[:2] == es_server[:2]:
                gpu_total_memory_avg += curr_gpu_res['memory'].median()
                gpu_total_memory_std += curr_gpu_res['memory'].std()
    
    gpu_resource_results = {
        'avg': gpu_total_memory_avg,
        'std': gpu_total_memory_std
    } 
    return gpu_resource_results
      
def analysis_gpu_process(gpu_process_res, exp_servers, df_rel_resource_events):
    """Generating GPU usage reulsts for each process/service"""
    
    gpu_process_resources = gpu_process_res[['used_gpu_memory', 'server', 'timestamp', 'pid']]
    gpu_process_resources['memory'] = gpu_process_resources['used_gpu_memory'].str.replace('MiB', '')
    gpu_process_resources['memory'] = pd.to_numeric(gpu_process_resources['memory']) * 1.04858 # convert to MB
    gpu_process_resources['pid'] = pd.to_numeric(gpu_process_resources['pid'])
    
    gpu_process_memory_avg = 0
    gpu_process_memory_std = 0
    
    service_results = {}
    indiv_process_results = {}

    if len(gpu_process_resources) == 0:
        return indiv_process_results
                
    ru_logs_combined = pd.DataFrame() # resource usage logs combined
    # compiling all the log messages for the processes on the GPU into one dataframe
    for service_type in service_names:
        ru_logs_combined = service_process_resources(ru_logs_combined, df_rel_resource_events, service_type)
        
    # analyse for each service, i.e., selecting all GPU resource usage from each ID
    for service in service_names:
        curr_service_results = ru_logs_combined[ru_logs_combined['service'] == service]
        service_process_ids = curr_service_results['Pid'].unique()
                        
        for pid in service_process_ids:
            curr_service_gpu_results = gpu_process_resources[gpu_process_resources['pid'] == pid]['memory'].values.tolist() 
            curr_service_gpu_results = pd.to_numeric(curr_service_gpu_results) / 1000
                                    
            try:
                service_results[service] += [curr_service_gpu_results]
            except:
                service_results[service] = []
                service_results[service] += [curr_service_gpu_results]
        
        # caclulate the average and standard deviation for these processes
        if service != "primary":
            process_gpu_avg = np.nanmedian(service_results[service][0])
            process_gpu_std = np.nanstd(service_results[service][0])
        elif service == 'primary': 
            process_gpu_avg = 0
            process_gpu_std = 0
            
        indiv_process_results[service] = {
            'avg': process_gpu_avg,
            'std': process_gpu_std
        }      
    return indiv_process_results

def analysis_gpu_json(gpu_res, exp_servers):
    """Generating GPU usage results from the Python GPU utility"""
    service_results = {}

    for service in service_names:
        try:
            curr_service_res = gpu_res[gpu_res['full_command'].apply(lambda x: True if './server' in x[0] else False)]
            curr_service_res = curr_service_res[curr_service_res['full_command'].apply(lambda x: True if x[1] == service else False)]
            
            metrics_to_consider = ['temperature.gpu', 'utilization.gpu', 'memory.used',
                                  'gpu_memory_usage', 'cpu_percent', 'cpu_memory_usage']

            service_results[service] = {}

            for metric in metrics_to_consider: 
                rel_data = curr_service_res[metric]
                rel_data[rel_data == 0] = np.nan
                process_avg = np.nanmedian(rel_data)
                process_std = np.nanstd(rel_data)

                if metric == 'utilization.gpu':
                    if process_avg == 0:
                        process_avg == process_std

                service_results[service][metric] = {}
                service_results[service][metric] = {
                    'avg': process_avg,
                    'std': process_std
                }
        except:
            pass
    return service_results

def service_process_resources(df_usage, df_input, service_name):    
    """Generating usage results for the individual services"""
    
    rre_data = df_input[df_input['service_name'] == service_name]
    rre_data['value'] = rre_data['value'].map(lambda x: x.replace("{log:\"","").replace("\"}","").replace("null","\"\""))
    
    resource_usage_logs = pd.DataFrame()
    resource_usage_logs['server'] = rre_data['server']
    resource_usage_logs['timestamp'] = rre_data['timestamp']

    resource_usage_logs['value'] = rre_data['value'].apply(json.loads)
    resource_usage_logs = resource_usage_logs.explode(column='value').reset_index(drop=True)
    resource_usage_logs = pd.concat([resource_usage_logs[['server', 'timestamp']], pd.json_normalize(resource_usage_logs['value'])], axis=1) 
    
    try:
        resource_usage_logs['service'] = resource_usage_logs['job_name'].str.extract("(?<=percomm.)(.*?)(?=.deploy)")
    except:
        pass
    
    df_usage = pd.concat([df_usage, resource_usage_logs])
    return df_usage

def analysis_cpu_mem_process(df_rel_events, df_rel_resource_events):    
    """Generating CPU and memory usage results"""
    
    ru_logs_combined = pd.DataFrame() # resource usage logs combined
    indiv_process_results = {}
    
    if len(df_rel_resource_events) == 0:
        return indiv_process_results
    
    # compiling all the log messages for the processes into one dataframe
    for service_type in service_names:
        ru_logs_combined = service_process_resources(ru_logs_combined, df_rel_resource_events, service_type)
                
    # analyse for each service, i.e., selecting all CPU resource usage from each ID
    for service in service_names:
        curr_service_results = ru_logs_combined[ru_logs_combined['service'] == service]
        indiv_process_results[service] = {}
                
        cpu_mem = ['cpu', 'memory']
        for item in cpu_mem:
            curr_results = pd.to_numeric(curr_service_results[item]) 
            
            if item == 'memory':
                curr_results = curr_results / 1000 / 1000 / 1000
        
            process_avg = np.nanmedian(curr_results)
            process_std = np.nanstd(curr_results)
            
            indiv_process_results[service][item] = {}
            indiv_process_results[service][item] = {
                'avg': process_avg,
                'std': process_std
            }        
            
    return indiv_process_results

def process_latencies(deploy_start, deploy_end, deployment_clients, df_pipeline):
    """Generating the processing latency values when a frame is in a service"""
    
    pipeline_rel = df_pipeline[df_pipeline['timestamp'].between(deploy_start, deploy_end)]
    
    services = pipeline_rel['service_name'].unique()

    relevant_logs = {
        'primary': {
            'entry': "received and has a filesize",
            'exit': "sent to sift service for processing"
        },
        'sift': {
            'entry': "Received data from 'primary' service",
            'exit': "Sent sift data to lsh"
        },
        'encoding': {
            'entry': "sift data received for Frame",
            'exit': "Forwarded Frame"
        },
        'lsh': {
            'entry': "Received data from 'encoding' service",
            'exit': "Forwarded Frame"
        },
        'matching': {
            'entry': "Received data from 'lsh' service and will now",
            'exit': "sent to client with number of markers of"
        }
    }
    
    results = {}
    service_results_combined = {}

    for service in service_names: 
        service_latency = np.array([])
        for client in deployment_clients: 
            service_logs = pipeline_rel.loc[pipeline_rel['service_name'] == service]
            try:
                service_logs = service_logs[service_logs['client_id'].str.contains(client)]
            except:
                continue
    
            if service == '':
                continue
            
            rl_item = relevant_logs[service]
            
            # selecting out the logs related to the service 
            service_entry = service_logs[service_logs['message'].str.contains(rl_item['entry'])].astype('string')
            service_exit = service_logs[service_logs['message'].str.contains(rl_item['exit'])].astype('string')
            
            if "sift" == service:
                try:
                    df_packet_sizes = service_exit['message'].str.extract(
                        r'has a service buffer size of (.*?) Bytes and sift buffer size of', 
                        expand=False)
                                        
                    df_sift_sizes = service_exit['message'].str.extract(
                        r' Bytes and sift buffer size of(.*?) Bytes', 
                        expand=False)
                except:
                    pass
            
            total_entry = len(service_entry)
            total_exit = len(service_exit)
            if total_exit == 0 or total_entry == 0:
                success_rate = 0
            else:
                success_rate = total_exit/total_entry

            service_logs_merged = pd.merge(service_entry, service_exit, on='frame_no')
            service_logs_merged = service_logs_merged.drop(
                ['service_name_x', 'service_name_y', 'client_id_x', 
                 'client_id_y',  'message_x', 'message_y'], axis=1)

            service_logs_merged['timestamp_entry'] = pd.to_numeric(service_logs_merged['timestamp_x'])
            service_logs_merged['timestamp_exit'] = pd.to_numeric(service_logs_merged['timestamp_y'])
            
            service_logs_merged['latency'] = service_logs_merged['timestamp_exit'] - service_logs_merged['timestamp_entry']            
            service_logs_merged['latency'] = service_logs_merged['latency'] * 1000 # convert into ms
                        
            service_latency = np.append(service_latency, service_logs_merged['latency'].values)
                        
            service_results = {
                'total_entry': total_entry,
                'total_exit': total_exit,
                'success_rate': success_rate,
                'latency': {
                    'avg': service_logs_merged['latency'].median(),
                    'std': service_logs_merged['latency'].std()
                }
            }
            try:
                results[service][client] = service_results
            except:
                results[service] = {}
                results[service][client] = service_results
        
        latency_avg = np.nanmedian(service_latency)
        latency_std = np.nanstd(service_latency)

        service_results_combined[service] = {
            'avg': latency_avg,
            'std': latency_std
        }  
    
    service_results_ordered = {k: service_results_combined[k] for k in 
                               service_names if k in service_results_combined}
    return service_results_ordered 

def service_statistics(deploy_start, deploy_end, df_rel_events, client_online_times):
    """Generating service statistics"""
    
    deploy_rel_events = df_rel_events[df_rel_events['timestamp'].between(deploy_start, deploy_end)]
    service_events = deploy_rel_events[deploy_rel_events['service_name'] != 'client']
    
    previous_online_time = client_online_times[0] - 120
    for i in range(len(client_online_times)):
        online_time = client_online_times[i]
        
        curr_rel_events = service_events[service_events['timestamp'].between(previous_online_time, online_time)] 
        
        previous_online_time = online_time

In [None]:
def analyse_deployment(deploy_start, deploy_end, experiment_setup, df_rel_events,
                       df_resource_events, df_resources, df_messages_received, 
                       df_gpu_total, df_gpu_process, df_gpu_json, df_client,
                       df_pipeline, curr_client_num):        
    """Analysing the results for a deployment within a experiment"""
        
    # calculating FPS, E2E latency, and jitter
    analyse_clients = analysis_fps(df_messages_received, deploy_start, curr_client_num)
    fps_results = analyse_clients['results']
    deployment_clients = analyse_clients['clients'][-1]
            
    analysed_metrics = analyse_metrics(df_client, deployment_clients, deploy_start, deploy_end)
    metric_calculations = analysed_metrics[0]
    deployment_latencies = analysed_metrics[1]
    deployment_latencies_clients = analysed_metrics[2]
            
    # find which servers are being used
    exp_servers = set(experiment_setup['permutation'])    
    
    # obtaining resource usage for current deployment
    client_resources = df_resources[df_resources['timestamp'].between(deploy_start, deploy_end)]   
    cpu_mem_results = analysis_cpu_mem(client_resources)
    
    df_resource_events['timestamp'] = pd.to_numeric(df_resource_events['timestamp'])
    df_rel_resource_events = df_resource_events[df_resource_events['timestamp'].between(deploy_start, deploy_end)] 
    
    try:
        gpu_total_resources = df_gpu_total[df_gpu_total['timestamp'].between(deploy_start, deploy_end)]
        gpu_total_results = analysis_gpu_total(gpu_total_resources, exp_servers)
    except:
        gpu_total_results = 0
        
    try:
        gpu_process_resources = df_gpu_process[df_gpu_process['timestamp'].between(deploy_start, deploy_end)]
        gpu_process_results = analysis_gpu_process(gpu_process_resources, exp_servers, df_rel_resource_events)
    except:
        gpu_process_results = 0
        
    try:
        gpu_resources = df_gpu_json[df_gpu_json['timestamp'].between(deploy_start, deploy_end)]
        gpu_results = analysis_gpu_json(gpu_resources, exp_servers)
    except:
        gpu_results = 0
        
    # finding latency taken in each process
    service_latencies = process_latencies(deploy_start, deploy_end, deployment_clients, df_pipeline)   
    
    deployment_results = {
        'total_period': (deploy_end - deploy_start),
        'deploy_start': deploy_start,
        'deploy_end': deploy_end,
        'fps': fps_results,
        'e2e': metric_calculations['e2e'],
        'jitter': metric_calculations['jitter'],
        'success_rate': metric_calculations['success_rate'],
        'cpu': cpu_mem_results['cpu'],
        'memory': cpu_mem_results['memory'],
        'gpu_total_memory': gpu_total_results,
        'gpu_process_memory': gpu_process_results,
        'gpu_results': gpu_results,
        'service_latencies': service_latencies, 
        'deployment_latencies': deployment_latencies,
        'deployment_clients': deployment_latencies_clients
    }
    return deployment_results

In [None]:
def results_to_df(df, results, experiment, experiment_setup):
    """Generating a dataframe to store the results"""
    
    number_clients = len(results)
    
    for item in results:
        curr_client_num = item[0]
        curr_client_results = item[1]
        
        result_row = {
            'experiment': experiment, 
            'permutation': experiment_setup, 
            'client': curr_client_num,
        }
        
        for curr_result_key, curr_result in curr_client_results.items():
            if curr_result_key in ['gpu_total_memory', 'gpu_process_memory', 'gpu_results'] and curr_result == 0:
                    continue
            if curr_result_key in ['fps', 'e2e', 'jitter', 'success_rate', 
                                   'cpu', 'memory', 'gpu_total_memory']:
                for crk_key, crk_val in curr_result.items():
                    result_row[f'{curr_result_key}_{crk_key}'] = crk_val
            elif curr_result_key in ['gpu_results', 'cpu_mem']:
                for service, crk_val in curr_result.items():
                    for scr_key, crkv_v in crk_val.items():
                        for crkvv_k, crkvv_v in crkv_v.items():
                            if 'gpu' in curr_result_key:
                                result_row[f'gpu_{service}_{scr_key}_{crkvv_k}'] = crkvv_v
                            else:
                                result_row[f'{scr_key}_{service}_{crkvv_k}'] = crkvv_v
            elif curr_result_key in ['service_latencies', 'gpu_process_memory']:
                for service, crk_val in curr_result.items():
                    for scr_key, crkv_v in crk_val.items():
                        if 'gpu' in curr_result_key:
                            result_row[f'gpu_{service}_{scr_key}'] = crkv_v
                        else:
                            result_row[f'services_{service}_{scr_key}'] = crkv_v
            else:
                result_row[curr_result_key] = curr_result
        df = df.append(result_row, ignore_index=True)
    return df 

def analyse_experiment(df_combined_results, experiment, experiment_start, 
                       experiment_end, experiment_setup, df_deployment_events, 
                       df_rel_events, df_resource_events, df_resources,
                       df_messages_received, df_gpu_total, df_gpu_process, 
                       df_gpu_json, df_client, df_pipeline):
    """Analysing one experiment/permutation which includes different permutations"""
    
    df_deployment_events['timestamp'] = df_deployment_events['timestamp'].astype(str).astype(int)
    df_rel_deploy = df_deployment_events[df_deployment_events['timestamp'].between(experiment_start, experiment_end)]

    deploy_start = None
    deploy_end = None
    curr_experiment_setup = None
    
    last_msg_time = ""

    results = [] # list to store results from analyse_deployment function 
    clients = {} # dictionary to store clients and their initialisation time

    deployments = [] # list to store deployments

    curr_deployment = {}
    
    client_up = []

    # find each deployment
    for i, event in df_rel_deploy.iterrows():      
        if event['event'] == 'DEPLOY_START':
            curr_deployment = {}
            deploy_start = float(event['timestamp'])
            curr_experiment_setup = json.loads(event['value'].replace("\'","\""))
            
            client_up = []

            curr_deployment['deploy_start'] = deploy_start
        elif event['event'] == 'CLIENT_UP':
            num_clients = json.loads(event['value'].replace("\'","\""))['clients']
            client_up_time = event['timestamp']
        
            curr_deployment[f'client{num_clients}'] = client_up_time
            client_up.append(int(event['timestamp']))

        elif event['event'] == 'UNDEPLOY':
            deploy_end = float(event['timestamp'])
            
            curr_deployment['deploy_end'] = deploy_end
            deployments.append(curr_deployment)
            
    total_deployments = len(deployments)
    for i in range(total_deployments):
        curr_deployment = deployments[i]

        deployment_begin = curr_deployment['deploy_start']

        if i+1 == total_deployments:
            deployment_end = experiment_end
        else:
            deployment_end = deployments[i+1]['deploy_start']
        print(f"Deployment began at {pd.to_datetime(deployment_begin, unit='s')} ({deployment_begin}) and ended at {pd.to_datetime(deployment_end, unit='s')} ({deployment_end}) with setup {curr_experiment_setup}")

        df_rel_msgs_rcvd = df_messages_received[df_messages_received['timestamp'].between(deployment_begin, deployment_end)]
           
        client_online_times = []
        client_ids = df_rel_msgs_rcvd['id'].unique()
        clients = [i for i in curr_deployment if i.startswith('client')]
                
        # for each client find the first message in client_df
        for j in range(len(client_ids)):
            curr_client = client_ids[j]

            df_client_rel = df_client[df_client['timestamp'].between(deployment_begin, deployment_end)]
            client_first_msg = df_client_rel[df_client_rel['id'] == curr_client].iloc[0]
            client_online_times.append(client_first_msg['timestamp'])
            
        print(f"Clients {client_ids} came online at {client_online_times}")
        
        # analysing service statistics
        service_statistics(experiment_start, experiment_end, df_rel_events, client_online_times)
        
        num_clients = len(clients)

        if num_clients > 0:
            for j in range(num_clients):
                curr_client_num = j
                deploy_start = curr_deployment['client0']
                
                if curr_client_num+1 == num_clients:
                    deploy_end = deployment_end #curr_deployment['deploy_end']
                else:
                    try:
                        deploy_end = curr_deployment[f'client{curr_client_num+1}']
                        if deploy_end > deployment_end:
                            deploy_end = deployment_end #curr_deployment['deploy_end']
                    except:
                        deploy_end = deployment_end #curr_deployment['deploy_end']
            
                temp_rel_msgs_rcvd = df_messages_received[df_messages_received['timestamp'].between(deploy_start, deploy_end)]
                deployment_results = analyse_deployment(
                    deploy_start, deploy_end, experiment_setup, df_rel_events,
                    df_resource_events, df_resources, df_rel_msgs_rcvd, 
                    df_gpu_total, df_gpu_process, df_gpu_json, df_client,
                    df_pipeline, curr_client_num, 
                )
                results.append([j, deployment_results])
                
        # convert results for this current permutation into a dataframe
        df_combined_results = results_to_df(df_combined_results, results, experiment, experiment_setup)
    return df_combined_results

In [None]:
# Looping through each experiment and analysing their results

for experiment, pp_results in preprocessed_results.items():
    print(f'[{print_datetime()}] Current experiment to analyse is {experiment}')
    experiment_results = pp_results
    
    # indexing out the pre-processed data
    df_deployments = experiment_results['deployment']
    df_events = experiment_results['events']
    df_gpu_total = experiment_results['gpu_total']
    df_gpu_process = experiment_results['gpu_process']
    df_gpu_json = experiment_results['gpu_json']
    df_resources = experiment_results['resources']
    df_client = experiment_results['client']
    df_pipeline = experiment_results['pipeline']
        
    # select out the deployment events for this experiment
    df_deployment_events = df_deployments[df_deployments['event'].isin(
        ['DEPLOY_START','UNDEPLOY', 'EXPERIMENT_START', 'EXPERIMENT_END', 'CLIENT_UP']
    )] 
        
    ###
    
    # select out the events in question from the 'service' column that match any of the above statuses
    df_rel_events = df_events[df_events['event'].isin(service_statuses)]

    # create new column of just the pure service name, and of the number of the 
    df_rel_events['service_name'] = df_rel_events['service'].str.extract("(?<=percomm.)(.*?)(?=.deploy)")
    df_rel_events['service_number'] = df_rel_events['service'].str.extract(".(?<=instance.)(.*?)$")

    # make new timestamp column that's in seconds, and in datetime for printability
    df_rel_events['timestamp_secs'] = df_rel_events['timestamp']
    df_rel_events['datetime'] = pd.to_datetime(df_rel_events['timestamp_secs'], unit='s')
        
    ###
    
    # select out the resource logging
    df_resource_events = df_events[df_events['event'].str.contains('RESOURCES')].astype('string')
    df_resource_events = df_resource_events[df_resource_events['service'] != 'NODE_ENGINE']
    df_resource_events['service_name'] = df_resource_events['service'].str.extract("(?<=percomm.)(.*?)(?=.deploy)")

    df_messages_received = df_client[df_client.message.str.contains('Received')]
    df_messages_received['datetime'] = pd.to_datetime(df_messages_received['timestamp'], unit='s')
    print(f"Client logs between {df_messages_received['datetime'].min()} and ended at {df_messages_received['datetime'].max()}\n")
    
    df_messages_sent = df_client[df_client.message.str.contains('Sent')]
    df_messages_sent['datetime'] = pd.to_datetime(df_messages_sent['timestamp'], unit='s')
    
    ###
    
    # iterate through all the experiments
    experiment_count = 0 # keep track of the experiments
    
    experiment_setup = None
    experiment_start = None
    experiment_end = None
            
    df_combined_results = pd.DataFrame() # store the results
    for i, event in df_deployment_events.iterrows():
        if event['event'] == 'EXPERIMENT_START':
            experiment_start = float(event['timestamp'])
            experiment_setup = json.loads(event["value"].replace("\'","\""))
            continue
        if event['event'] == 'EXPERIMENT_END':
            experiment_end = float(event['timestamp'])
            
            # check if there is both an experiment start and end time that are sequential, if not do no analysis 
            if (experiment_end-experiment_start) > 0:
                print(f"Experiment began at {pd.to_datetime(experiment_start, unit='s')} ({experiment_start}) and ended at {pd.to_datetime(experiment_end, unit='s')} ({experiment_end})")
                print(f"The experiment had the following setup/permutation:\n{str(experiment_setup)}")
                
                df_combined_results = analyse_experiment(df_combined_results, experiment,
                    experiment_start, experiment_end, experiment_setup, 
                    df_deployment_events, df_rel_events, df_resource_events, 
                    df_resources, df_messages_received, df_gpu_total, 
                    df_gpu_process, df_gpu_json, df_client, df_pipeline)
                print("\n")
                experiment_count += 1