# 5G-IOT visual interface - Data Cleaning and Transformation

This notebook is developed to process application-dependent datasets, collected by the trucks.

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import geopy
import time
from tqdm.notebook import tqdm_notebook
from tqdm import tqdm_notebook as tqdm
from multiprocessing import Pool
import multiprocessing
import itertools
%matplotlib inline
plt.rcParams.update({'figure.figsize':(7,5), 'figure.dpi':100})

### Define Constants


In [2]:
# The data folder source
base = "../data/sources/app-dependent/"

# Column rename map
rename_map = {
    "Bitrate": "Upload speed (Mbits/sec)",
    "Bitrate-RX": "Download speed (MBits/sec)",
    "Transfer size": "Upload size (MBytes)",
    "Transfer size-RX": "Download size (MBytes)",
    "send_data": "Upload throughput (MBits)",
    "svr1": "Response time from CloudflareDNS (ms)",
    "svr2": "Response time from GoogleDNS (ms)",
    "svr3": "Response time from OptusDNS (ms)",
    "svr4": "Response time from EC2DNS (ms)"
}

# Truck number
trucks = list(range(1,12))

# Truck operation dates
dates = [
    "2022-07-06",
    "2022-07-07",
    "2022-07-08",
    "2022-07-11",
    "2022-07-12",
    "2022-07-13",
    "2022-07-14",
    "2022-07-15",
    "2022-07-18",
    "2022-07-19",
    "2022-07-20",
    "2022-07-21",
    "2022-07-22",
]

# performance columns
performance_columns = {
    'Bitrate': ['mean'],
    'Bitrate-RX': ['mean'],
    'Transfer size-RX': ['mean'],
    'Transfer size': ['mean'],
    'send_data': ['mean'],
    'speed': ['mean'],
    'svr1': ['mean'],
    'svr2': ['mean'],
    'svr3': ['mean'],
    'svr4': ['mean']
}

#
geo_columns = {
    'latitude': ['mean'],
    'longitude': ['mean']
}

performance_location_columns = {
    **performance_columns,
    **geo_columns
}

### Examine a data file

In [3]:
raw_data = pd.read_csv(base + "/2022-07-22/2022-07-22-garbo03-combined.log")
print("Columns: ", raw_data.columns)
raw_data

Columns:  Index(['time', 'Day', 'Year', 'Month', 'Date', 'hour', 'min', 'sec',
       'timezone', 'latitude', 'longitude', 'speed', 'truck', 'svr1', 'svr2',
       'svr3', 'svr4', 'Role', 'Transfer size', 'Transfer unit', 'Bitrate',
       'bitrate_unit', 'Retransmissions', 'CWnd', 'cwnd_unit', 'Role-RX',
       'Transfer size-RX', 'Transfer unit-RX', 'Bitrate-RX', 'bitrate_unit-RX',
       'send_data'],
      dtype='object')


Unnamed: 0,time,Day,Year,Month,Date,hour,min,sec,timezone,latitude,...,bitrate_unit,Retransmissions,CWnd,cwnd_unit,Role-RX,Transfer size-RX,Transfer unit-RX,Bitrate-RX,bitrate_unit-RX,send_data
0,1658455306,Fri,2022.0,7.0,22.0,12.0,1.0,46.0,AEST,99.0,...,,,,,,,,,,
1,1658455306,Fri,2022.0,7.0,22.0,12.0,1.0,46.0,AEST,99.0,...,,,,,,,,,,
2,1658455306,Fri,2022.0,7.0,22.0,12.0,1.0,46.0,AEST,99.0,...,,,,,,,,,,
3,1658455306,Fri,2022.0,7.0,22.0,12.0,1.0,46.0,AEST,99.0,...,,,,,,,,,,
4,1658455306,Fri,2022.0,7.0,22.0,12.0,1.0,46.0,AEST,99.0,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
576,1658459117,Fri,2022.0,7.0,22.0,13.0,5.0,17.0,AEST,99.0,...,,,,,,,,,,0.002518
577,1658459118,Fri,2022.0,7.0,22.0,13.0,5.0,18.0,AEST,99.0,...,,,,,,,,,,0.004253
578,1658459119,Fri,2022.0,7.0,22.0,13.0,5.0,19.0,AEST,99.0,...,,,,,,,,,,0.002266
579,1658459120,Fri,2022.0,7.0,22.0,13.0,5.0,20.0,AEST,99.0,...,,,,,,,,,,0.000189


### Clean data

In [4]:
def adjust_CWnd(row):
    try:
        return float(row.CWnd)
    except:
        return 0

def adjust_send_data(row):
    try:
        # multiply with 8 to get MBits value
        return row.get('send_data', 0) * 8
    except:
        return 0

def clean(data, filter_location = False):
    temp = data.copy()
    if 'time' in temp.columns:
        temp = temp.drop(columns=['time'])
    
    if 'Time' in temp.columns:
        temp = temp.drop(columns=['Time'])

    non_null_data = temp.dropna(subset=['Day', 'Year', 'Month', 'hour', 'min', 'sec']).copy()
    non_null_data['send_data'] = non_null_data.apply(lambda r: adjust_send_data(r), axis=1)
    non_null_data['CWnd'] = non_null_data.apply(lambda r: adjust_CWnd(r), axis=1)
    if filter_location:
        # remove the invalid location
        return non_null_data.query("latitude < -37 & latitude > -38 & longitude > 144 & longitude < 145")
    else:
        return non_null_data

### Aggregation by time

In [5]:
def aggregate_by_date(data): 
    aggregated_data = data.copy()
    groupby_columns = ['Year', 'Month', 'Date', 'truck']
    aggregated_data = aggregated_data.groupby(groupby_columns, as_index=False).agg(performance_columns)
    aggregated_data.columns = aggregated_data.columns.get_level_values(0)
    return aggregated_data

def aggregate_by_hour(data): 
    aggregated_data = data.copy()
    
    # group columns to perform aggregation
    groupby_columns = ['Year', 'Month', 'Date', 'truck', 'hour']
    aggregated_data = aggregated_data.groupby(groupby_columns, as_index=False).agg(performance_columns)
    
    # flatten the columns to view the data properly
    aggregated_data.columns = aggregated_data.columns.get_level_values(0)
    return aggregated_data

def aggregate_by_minute(data): 
    aggregated_data = data.copy()
    groupby_columns = ['truck', 'hour', 'Date', 'Month', 'Year', 'min' ]
    aggregated_data = aggregated_data.groupby(groupby_columns, as_index=False).agg(performance_location_columns)
    aggregated_data.columns = aggregated_data.columns.get_level_values(0)
    return aggregated_data

def aggregate_by_second(data):
    aggregated_data = data.copy()
    aggregated_data = aggregated_data.groupby(groupby_columns, as_index=False).agg(performance_location_columns)
    aggregated_data.columns = aggregated_data.columns.get_level_values(0)
    return aggregated_data

def pipeline_by_minute(data): 
    cleaned = clean(data, filter_location=True)
    return aggregate_by_minute(cleaned)

def pipeline_by_hour(data): 
    cleaned = clean(data)
    return aggregate_by_hour(cleaned)

def pipeline_by_date(data):
    cleaned = clean(data)
    return aggregate_by_date(cleaned)

def pipeline_by_second(data):
    cleaned = clean(data, filter_location=True)
    return aggregate_by_second(cleaned)

## Other util functions

In [6]:
def get_dataset_name(date, truck, base_uri):
    truck_name = 'garbo' + (('0' + str(truck)) if truck < 10 else str(truck)) 
    return base_uri + "/" + date + "/" + date + "-" + truck_name + '-combined.log'

get_dataset_name("2022-07-06", 1, base)

'../data/sources/app-dependent//2022-07-06/2022-07-06-garbo01-combined.log'

## Execute the aggregation

In [7]:
output_folder = "../data/outputs/"

In [8]:
def generate_daily_dataset():
    datasets = []
    for date in tqdm(dates):
        for truck in trucks:
            dataset_name = get_dataset_name(date, truck, base)
            try:
                datasets.append(pipeline_by_date(pd.read_csv(dataset_name)))
            except ValueError as e:
                print('ValueError on ', dataset_name)
                print(e)
            except Exception as e:
                print('Other error: ', dataset_name)
                print(e)
    
    return pd.concat(datasets).sort_values(by=['truck', 'Month', 'Date'])

def generate_hourly_dataset():
    datasets = []
    for date in tqdm(dates):
        for truck in trucks:
            dataset_name = get_dataset_name(date, truck, base)
            try:
                datasets.append(pipeline_by_hour(pd.read_csv(dataset_name)))
            except ValueError as e:
                print('ValueError on ', dataset_name)
                print(e)
            except Exception as e:
                print('Other error: ', dataset_name)
                print(e)
    
    return pd.concat(datasets).sort_values(by=['truck', 'Year', 'Month', 'Date', 'hour'])

def generate_minutely_dataset():
    datasets = []
    for date in tqdm(dates):
        for truck in trucks:
            dataset_name = get_dataset_name(date, truck, base)
            try:
                datasets.append(pipeline_by_minute(pd.read_csv(dataset_name)))
            except ValueError as e:
                print('ValueError on ', dataset_name)
                print(e)
            except Exception as e:
                print('Other error: ', dataset_name)
                print(e)
    
    return pd.concat(datasets).sort_values(by=['truck', 'Year', 'Month', 'Date', 'hour', 'min'])

def generate_secondly_dataset():
    datasets = []
    for date in tqdm(dates):
        for truck in trucks:
            dataset_name = get_dataset_name(date, truck, base)
            try:
                datasets.append(pipeline_by_second(pd.read_csv(dataset_name)))
            except ValueError as e:
                print('ValueError on ', dataset_name)
                print(e)
            except Exception as e:
                print('Other error: ', dataset_name)
                print(e)
    
    return pd.concat(datasets).sort_values(by=['truck', 'Year', 'Month', 'Date', 'hour', 'min', 'sec'])
    
# daily_data = generate_daily_dataset()
# daily_data = daily_data.rename(columns=rename_map)
# daily_data.to_csv(output_folder + "date.csv")
# print("Finished daily data")

# hourly_data = generate_hourly_dataset()
# hourly_data = hourly_data.rename(columns=rename_map)
# hourly_data.to_csv(output_folder + "hour.csv")
# print("Finished hourly data")

minutely_data = generate_minutely_dataset()
minutely_data = minutely_data.rename(columns=rename_map)
minutely_data.to_csv(output_folder + "minute.csv")
print("Finished minutely data")


Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  for date in tqdm(dates):


  0%|          | 0/13 [00:00<?, ?it/s]

  datasets.append(pipeline_by_minute(pd.read_csv(dataset_name)))


Other error:  ../data/sources/app-dependent//2022-07-08/2022-07-08-garbo01-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-08/2022-07-08-garbo01-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-08/2022-07-08-garbo07-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-08/2022-07-08-garbo07-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-08/2022-07-08-garbo08-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-08/2022-07-08-garbo08-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-08/2022-07-08-garbo11-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-08/2022-07-08-garbo11-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-11/2022-07-11-garbo02-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-11/2022-07-11-garbo02-combined.

  datasets.append(pipeline_by_minute(pd.read_csv(dataset_name)))


Other error:  ../data/sources/app-dependent//2022-07-15/2022-07-15-garbo07-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-15/2022-07-15-garbo07-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-15/2022-07-15-garbo10-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-15/2022-07-15-garbo10-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-18/2022-07-18-garbo02-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-18/2022-07-18-garbo02-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-18/2022-07-18-garbo07-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-18/2022-07-18-garbo07-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-18/2022-07-18-garbo10-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-18/2022-07-18-garbo10-combined.

  datasets.append(pipeline_by_minute(pd.read_csv(dataset_name)))
  datasets.append(pipeline_by_minute(pd.read_csv(dataset_name)))


Other error:  ../data/sources/app-dependent//2022-07-20/2022-07-20-garbo10-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-20/2022-07-20-garbo10-combined.log'
Other error:  ../data/sources/app-dependent//2022-07-21/2022-07-21-garbo08-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-21/2022-07-21-garbo08-combined.log'


  datasets.append(pipeline_by_minute(pd.read_csv(dataset_name)))


Other error:  ../data/sources/app-dependent//2022-07-21/2022-07-21-garbo10-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-21/2022-07-21-garbo10-combined.log'


  datasets.append(pipeline_by_minute(pd.read_csv(dataset_name)))


Other error:  ../data/sources/app-dependent//2022-07-22/2022-07-22-garbo10-combined.log
[Errno 2] No such file or directory: '../data/sources/app-dependent//2022-07-22/2022-07-22-garbo10-combined.log'
Finished minutely data
