In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
from pyspark.sql.window import Window

import os
import datetime
import re

import pandas as pd

spark = SparkSession.builder.appName("SparkStandaloneTest").getOrCreate()

spark.conf.set("spark.storage.memoryFraction", 1)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

sc = spark.sparkContext

23/05/16 00:21:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
# data directory
data_dir = "/scratch/tn2208/puffer/puffer-data-release"
# date range for analysis
begin_date = datetime.datetime(2021, 6, 12)
end_date = datetime.datetime(2021, 7,12)
#end_date = datetime.datetime(2019, 3, 1)
num_days = (end_date - begin_date).days + 1
print(f'Number of days: {num_days}')
#spark.conf.set("spark.sql.shuffle.partitions", num_days)

Number of days: 31


In [3]:
# reads csv files according to puffer directory structure into 1 dataframe
# data_dir: data directory
# filetype: "video_sent", "video_acked", "ssim", etc.
# schema: schema object
# begin/end: beginning/ending date range
def read_csvs(data_dir, filetype, schema, begin, end, num_partitions):
    filenames = []
    date_field = r'(.*)([0-9]{4}-[0-9]{2}-[0-9]{2}T11_[0-9]{4}-[0-9]{2}-[0-9]{2}T11)(.*)' # regex to match date format
    file_suffix = r'{}_.*\.csv'.format(filetype)
    for d in sorted(os.listdir(data_dir)):
        if (re.match(date_field, d)):
            date = datetime.datetime(int(d.split('-')[0]), int(d.split('-')[1]), int(d.split('-')[2][:2]))
            if (date >= begin and date <= end):
                for f in os.listdir(os.path.join(data_dir, d)):
                    if (re.match(file_suffix, f)):
                        filenames.append(os.path.join(data_dir, d, f))

    df = spark.read.option("header", True).schema(schema).csv(filenames) \
              .withColumn("filename", F.regexp_extract(F.input_file_name(), date_field, 2)) \
              .repartition(num_partitions, "filename")
    return df

# same but for expt_settings files
def read_expt_settings(data_dir, begin, end, num_partitions):
    filenames = []
    date_field = r'(.*)([0-9]{4}-[0-9]{2}-[0-9]{2}T11_[0-9]{4}-[0-9]{2}-[0-9]{2}T11)(.*)'
    file_prefix = 'expt_settings'
    for d in sorted(os.listdir(data_dir)):
        if (re.match(date_field, d)):
            date = datetime.datetime(int(d.split('-')[0]), int(d.split('-')[1]), int(d.split('-')[2][:2]))
            if (date >= begin and date <= end):
                for f in os.listdir(os.path.join(data_dir, d, "logs")):
                    if (re.match(file_prefix, f)):
                        filenames.append(os.path.join(data_dir, d, "logs", f))

    df = sc.textFile(','.join(filenames)).map(lambda x: (x.split(" ", 1)[0], x.split(" ", 1)[1])).toDF(["expt_id_r", "json"]) # split into tuple of expt_id and json info
    return df.withColumn("filename_r", F.regexp_extract(F.input_file_name(), date_field, 2)) \
             .withColumn("expt", F.get_json_object(F.col("json"), "$.abr")) \
             .withColumn("expt2", F.get_json_object(F.col("json"), "$.abr_name")) \
             .drop(F.col("json")) \
             .repartition(num_partitions, "filename_r")

In [4]:
# video_sent schema
video_sent_schema = StructType().add("time (ns GMT)", LongType(), False) \
                                .add("session_id", StringType(), False) \
                                .add("index", IntegerType(), False) \
                                .add("expt_id", IntegerType(), False) \
                                .add("channel", StringType(), False) \
                                .add("video_ts", LongType(), False) \
                                .add("format", StringType(), False) \
                                .add("size", IntegerType(), False) \
                                .add("ssim_index", DoubleType(), False) \
                                .add("cwnd", IntegerType(), False) \
                                .add("in_flight", IntegerType(), False) \
                                .add("min_rtt", IntegerType(), False) \
                                .add("rtt", IntegerType(), False) \
                                .add("delivery_rate", IntegerType(), False)

# \
#                                 .add("buffer", DoubleType(), False) \
#                                 .add("cum_rebuf", DoubleType(), False)


video_sent_df = read_csvs(data_dir, "video_sent", video_sent_schema, begin_date, end_date, num_days)
# convert raw ssim to ssim in dB using -10 * log10(1 - raw_ssim) 
# ^ found in https://github.com/StanfordSNR/puffer-statistics/blob/master/csv_to_stream_stats.cc
video_sent_df = video_sent_df.withColumn("ssim", -10 * F.log10(1 - F.col("ssim_index")))

In [5]:
expt_settings_df = read_expt_settings(data_dir, begin_date, end_date, num_days)
# expt_settings_df.show(5, truncate=False)

In [6]:
video_sent_df = video_sent_df.join(expt_settings_df, (video_sent_df["expt_id"] == expt_settings_df["expt_id_r"]) & (video_sent_df["filename"] == expt_settings_df["filename_r"]), "left")
video_sent_df = video_sent_df.drop("expt_id_r", "filename_r")
# video_sent_df.show(5, truncate=False)

In [7]:
tcp_stats = video_sent_df.groupBy('session_id').agg(
    F.count('delivery_rate').alias('delivery_count'),
    F.mean('delivery_rate').alias('delivery_rate_mean'),
    F.stddev('delivery_rate').alias('delivery_rate_std'),
    F.mean('rtt').alias('rtt_rate_mean'),
    F.stddev('rtt').alias('rtt_rate_std'),
    F.first('expt').alias('expt_name')
)

tcp_stats = tcp_stats.na.fill({'expt_name': 'bba'})

# tcp_stats = video_sent_df.groupBy('session_id').agg(
#     F.count('delivery_rate').alias('delivery_count'),
#     F.min('delivery_rate').alias('delivery_rate_min'),
#     F.max('delivery_rate').alias('delivery_rate_max'),
#     F.mean('delivery_rate').alias('delivery_rate_mean'),
#     F.stddev('delivery_rate').alias('delivery_rate_std'),
#     F.mean('rtt').alias('rtt_rate_mean'),
#     F.stddev('rtt').alias('rtt_rate_std'),
#     F.mean('buffer').alias('buffer_size_mean'),
#     F.stddev('buffer').alias('buffer_size_std')
# )

In [8]:
client_buffer_schema = StructType().add("time (ns GMT)", LongType(), False) \
                                .add("session_id", StringType(), False) \
                                .add("index", IntegerType(), False) \
                                .add("expt_id", IntegerType(), False) \
                                .add("channel", StringType(), False) \
                                .add("event", StringType(), False) \
                                .add("buffer", DoubleType(), False) \
                                .add("cum_rebuf", DoubleType(), False)
client_buffer_df = read_csvs(data_dir, "client_buffer", client_buffer_schema, begin_date, end_date, num_days)
# client_buffer_df.show(5, truncate=False)

agg_rebuffer = client_buffer_df.groupBy('session_id').agg(
    F.sum(F.when(F.col('event') == 'rebuffer', 1).otherwise(0)).alias('rebuffer_times'),
    F.max('cum_rebuf').alias('cum_rebuf'),
    F.mean('buffer').alias('buffer_size_mean'),
    F.stddev('buffer').alias('buffer_size_std'),
)

In [9]:
session_stats_df = agg_rebuffer.join(tcp_stats, on='session_id', how='left')
session_stats_df = session_stats_df.filter(session_stats_df['delivery_count'] > 50)
session_stats_df = session_stats_df.filter(session_stats_df['rebuffer_times'] > 0)

In [10]:
numeric_columns = [col for col, dtype in session_stats_df.dtypes if dtype in ('bigint', 'double')]

# Use the VectorAssembler to create a features vector from the numeric columns
assembler = VectorAssembler(inputCols=numeric_columns, outputCol="features")
# vector_df = assembler.transform(session_stats_df)

# Compute the correlation matrix
# corr_matrix = Correlation.corr(vector_df, "features").head()[0]

In [11]:
# scheme_df = session_stats_df.filter(session_stats_df['expt_name'] == s)
# scheme_vector_df = assembler.transform(scheme_df)
# scheme_corr_matrix = Correlation.corr(scheme_vector_df, "features").head()[0]
# results.append(scheme_corr_matrix.toArray())

In [17]:
name =['rebuffer_times',
 'cum_rebuf',
 'buffer_size_mean',
 'buffer_size_std',
 'delivery_count',
 'delivery_rate_mean',
 'delivery_rate_std',
 'rtt_rate_mean',
 'rtt_rate_std',]

In [13]:
puffer_ttp_df = session_stats_df.filter(session_stats_df['expt_name'] == 'puffer_ttp')

puffer_ttp_vector_df = assembler.transform(puffer_ttp_df)

puffer_ttp_corr_matrix = Correlation.corr(puffer_ttp_vector_df, "features").head()[0]

23/05/16 00:25:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
23/05/16 00:25:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [14]:
linear_bba_df = session_stats_df.filter(session_stats_df['expt_name'] == 'linear_bba')

linear_bba_df_vector_df = assembler.transform(linear_bba_df)

linear_bba_corr_matrix = Correlation.corr(linear_bba_df_vector_df, "features").head()[0]


                                                                                

In [19]:
n = 9
m1 = puffer_ttp_corr_matrix.toArray()
m2 = linear_bba_corr_matrix.toArray()

print(m1)
print(m2)

corr_difference = {'corr':[], 'fugu':[], 'bba':[]}
entries = [[0,1], [1,5], [1, 6], [1,7]]

corr_df = pd.DataFrame(corr_difference)

for i in range(n):
    for j in range(i+1, n):
#         if abs(m1[i][j] - m2[i][j]) > 0.1 :
        new_row = {'corr':name[i] + ' ' + name[j], 'fugu': m1[i][j], 'bba':m2[i][j]}
        corr_df = corr_df.append(new_row, ignore_index = True)

            #             print(name[i], name[j], m1[i][j],m2[i][j])
corr_df
            


[[ 1.          0.57180669 -0.29685372  0.08325703  0.06131621 -0.0435851
   0.02408156  0.15848369  0.08430437]
 [ 0.57180669  1.         -0.42477067  0.15744931  0.04570692 -0.12411882
  -0.07649146  0.35325799  0.26003598]
 [-0.29685372 -0.42477067  1.         -0.40413871  0.11906294  0.4330467
   0.30522731 -0.59261274 -0.36955728]
 [ 0.08325703  0.15744931 -0.40413871  1.         -0.25483498 -0.32384407
  -0.22785475  0.35515346  0.39486995]
 [ 0.06131621  0.04570692  0.11906294 -0.25483498  1.         -0.057842
  -0.04634739 -0.05710298 -0.07365444]
 [-0.0435851  -0.12411882  0.4330467  -0.32384407 -0.057842    1.
   0.93621817 -0.42964717 -0.26440538]
 [ 0.02408156 -0.07649146  0.30522731 -0.22785475 -0.04634739  0.93621817
   1.         -0.33484148 -0.19385614]
 [ 0.15848369  0.35325799 -0.59261274  0.35515346 -0.05710298 -0.42964717
  -0.33484148  1.          0.66744362]
 [ 0.08430437  0.26003598 -0.36955728  0.39486995 -0.07365444 -0.26440538
  -0.19385614  0.66744362  1.     

Unnamed: 0,corr,fugu,bba
0,rebuffer_times cum_rebuf,0.571807,0.027174
1,rebuffer_times buffer_size_mean,-0.296854,-0.500066
2,rebuffer_times buffer_size_std,0.083257,0.067621
3,rebuffer_times delivery_count,0.061316,0.131781
4,rebuffer_times delivery_rate_mean,-0.043585,-0.192072
5,rebuffer_times delivery_rate_std,0.024082,-0.154173
6,rebuffer_times rtt_rate_mean,0.158484,0.225505
7,rebuffer_times rtt_rate_std,0.084304,0.110536
8,cum_rebuf buffer_size_mean,-0.424771,-0.054727
9,cum_rebuf buffer_size_std,0.157449,0.045725


In [21]:
corr_difference = {'corr':[], 'fugu':[], 'bba':[]}

corr_df3 = pd.DataFrame(corr_difference)

for i in range(n):
    for j in range(i+1, n):
        if abs(m1[i][j] - m2[i][j]) > 0.2 :
            new_row = {'corr':name[i] + ' ' + name[j], 'fugu': m1[i][j], 'bba':m2[i][j]}
            corr_df3 = corr_df3.append(new_row, ignore_index = True)

            #             print(name[i], name[j], m1[i][j],m2[i][j])
corr_df3

Unnamed: 0,corr,fugu,bba
0,rebuffer_times cum_rebuf,0.571807,0.027174
1,rebuffer_times buffer_size_mean,-0.296854,-0.500066
2,cum_rebuf buffer_size_mean,-0.424771,-0.054727
3,cum_rebuf rtt_rate_mean,0.353258,0.053313
4,cum_rebuf rtt_rate_std,0.260036,0.046776


In [16]:
session_stats_df.columns

['session_id',
 'rebuffer_times',
 'cum_rebuf',
 'buffer_size_mean',
 'buffer_size_std',
 'delivery_count',
 'delivery_rate_mean',
 'delivery_rate_std',
 'rtt_rate_mean',
 'rtt_rate_std',
 'expt_name']