In [608]:
import numpy as np
import pandas as pd
from datetime import datetime

In [609]:
# 1- read processed file
file_dir = '../../data/raw-data-linode-run3/'
merged_dir = file_dir + 'merged/' 

# containers / istio input files
service_cpu_usage_file = '1_service_cpu_use.csv'
service_memory_usage_file = '2_service_memory_use.csv'
service_cpu_sat_file = '3_service_cpu_sat.csv'
service_net_usage_file = '4_service_net_usage.csv'
service_disk_usage_file = '5_service_disk_usage.csv'
service_req_total_file = '6_service_req_total.csv'
service_ltcy_file = '7_service_ltcy.csv'
service_errors_file = '8_service_errors.csv'
service_request_size_file = '9_service_request_size.csv'
service_response_size_file = '10_service_response_size.csv'
containers_count_file = '11_containers_count.csv'

# system input files
system_cpu_usage_file = '12_system_cpu_usage.csv'


service_input_files = [service_cpu_usage_file
                      , service_memory_usage_file
                      , service_cpu_sat_file
                      , service_net_usage_file
                      , service_disk_usage_file
                      , service_req_total_file
                      , service_ltcy_file
                      , service_errors_file
                      , service_request_size_file
                      , service_response_size_file                       
                      , containers_count_file
                     ]

system_input_files = [
    system_cpu_usage_file
]

concated_data_file = 'all_data.csv'

services = ['checkoutservice'
,'cartservice'
,'emailservice'
,'currencyservice'
,'paymentservice'
,'productcatalogservice'
,'shippingservice'
] 


save=True
frequency = '1S'  # S for second , T for minute

rate_time_window = '1T'  # S for second , T for minute

latency_percentile = 0.95


In [610]:
# T for minutes, S for seconds
# remedy duplicates by either taking the maximum (=max) or average  (=mean) them
def resample(df, value_column_name, index_col_name='date', frequency = frequency, interpolate = True
             , interpolate_method = 'linear', base=6, dup = 'mean'):
    # eliminate dups in timestamp
    if dup == 'max':
        df = df.groupby([index_col_name])[value_column_name].max()   # taking max
        df = pd.DataFrame(df)
    else:
        df = df.groupby([index_col_name]).mean()   # taking mean
        
    df.index = pd.to_datetime(df.index)
    df.sort_index(inplace=True)  # order the timeseries
    
    # fill in missing interval (upsample)
    resampled = df.resample(frequency, kind='timestamp', base=base).bfill()
    
    if interpolate:
        resampled = resampled.interpolate(method=interpolate_method)    
        
    return resampled

def toTimeSeries(df, index_col_name='date', value_col_name='value'):
    df[index_col_name] = pd.to_datetime(df[index_col_name])
    df[value_col_name] = pd.to_numeric(df[value_col_name])
    df.set_index(index_col_name, inplace=True)
    df.sort_index(inplace=True)
    return df


# This function extracts timeseries of one named service from the whole raw timeseries data
def extractMetricSeries(df, col_name, col_value):
    metric = df.loc[df[col_name] == col_value].drop([col_name], axis=1).rename(index=str, columns={"value": col_value})
    metric.sort_index(inplace=True)
    return metric

# This function merges and alines the metrics timeseries data into a data frame, a column for every feature
def expand(df, by_col, by_col_values, dup='mean'):
    # first convert to time series
    df = toTimeSeries(df, 'date')
    metrics_df = pd.DataFrame()
    i = 0
    for col_value in by_col_values:
        print("Processing metric for column: %", col_value)
        series = extractMetricSeries(df, by_col, col_value)
        series = resample(series, value_column_name=col_value, dup=dup)
        #service_series = diffSeries(service_series)   
        if i == 0:
            metrics_df = series
        else:
            metrics_df = merge(metrics_df, series)
        i = i + 1
    return metrics_df   

# sum df rows, remove expanded columns and set a new column with a metric name
def sumTimeseries(df, columns_to_delete, metric_name, metric_col_name='metric'):
    df['value'] = df.sum(axis=1)
    df[metric_col_name] = metric_name
    for col in columns_to_delete:
        df = df.drop([col], axis=1)
    return df 

# max df rows, remove expanded columns and set a new column with a metric name
def maxTimeseries(df, columns_to_delete, metric_name, metric_col_name='metric'):
    df['value'] = df.max(axis=1)
    df[metric_col_name] = metric_name
    for col in columns_to_delete:
        df = df.drop([col], axis=1)
    return df

# average df rows, remove expanded columns and set a new column with a metric name
def avgTimeseries(df, columns_to_delete, metric_name, metric_col_name='metric'):
    df['value'] = df.mean(axis=1)
    df[metric_col_name] = metric_name
    for col in columns_to_delete:
        df = df.drop([col], axis=1)
    return df

# average df rows, remove expanded columns and set a new column with a metric name
def countTimeseries(df, columns_to_delete, metric_name, metric_col_name='metric'):
    df['value'] = df.count(axis=1)
    df[metric_col_name] = metric_name
    for col in columns_to_delete:
        df = df.drop([col], axis=1)
    return df

def merge(df, series):
    return pd.merge_asof(df, series, left_index=True, right_index=True, tolerance=pd.Timedelta('1 second')).bfill()    

# df is a timeseries and resampled per second
# window in a form of nS or nT  or nH , where n is an interger (S for seconds, T for minutes, H for hours)
def rate(df, col, interval='S'):    
    #df = df.resample('S', kind='timestamp', base=6).bfill()
    df[col] = df.pct_change(fill_method='ffill', freq=interval)  # change per second
    #df = df.resample(interval, kind='timestamp', base=6).mean()
    df = df.dropna(axis=0, subset=[col])    
    return df
    
    

    

# ----------- Start TESTING  --------------------

In [611]:
rng = pd.date_range('1/1/2020', periods=72, freq='S')
ts = pd.DataFrame(np.random.randn(len(rng)), index=rng, columns=['value'])
ts = rate(ts, 'value', '1T')

ts.head(3)

Unnamed: 0,value
2020-01-01 00:01:00,-0.474699
2020-01-01 00:01:01,1.705744
2020-01-01 00:01:02,-0.949326


In [612]:
service_test_df = pd.read_csv(file_dir + service_cpu_usage_file)
service_test_df.head(5)

Unnamed: 0,name,ztime,service,value
0,service_cpu_usage,2020-02-28 05:59:52,productcatalogservice,0.036354
1,service_cpu_usage,2020-02-28 05:59:33,productcatalogservice,0.036354
2,service_cpu_usage,2020-02-28 05:59:15,productcatalogservice,0.036354
3,service_cpu_usage,2020-02-28 05:59:04,productcatalogservice,0.036354
4,service_cpu_usage,2020-02-28 05:58:40,productcatalogservice,0.036354


In [613]:
service_test_df.rename(columns={'ztime': 'date', 'name':'metric'}, inplace=True)
service_test_df = toTimeSeries(service_test_df, 'date')
service_test_df.head(5)

Unnamed: 0_level_0,metric,service,value
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2020-02-27 22:49:50,service_cpu_usage,productcatalogservice,0.036354
2020-02-27 22:49:52,service_cpu_usage,emailservice,0.034776
2020-02-27 22:49:53,service_cpu_usage,shippingservice,0.022885
2020-02-27 22:49:53,service_cpu_usage,checkoutservice,0.02696
2020-02-27 22:49:53,service_cpu_usage,currencyservice,0.032067


In [614]:
series = extractMetricSeries(service_test_df, 'service', 'cartservice')
series

Unnamed: 0_level_0,metric,cartservice
date,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-02-27 22:49:53,service_cpu_usage,1763.896692
2020-02-27 22:49:57,service_cpu_usage,4005.321089
2020-02-27 22:49:59,service_cpu_usage,0.035686
2020-02-27 22:50:00,service_cpu_usage,2237.442300
2020-02-27 22:50:09,service_cpu_usage,1764.323045
...,...,...
2020-02-28 20:43:50,service_cpu_usage,2170.578358
2020-02-28 20:43:52,service_cpu_usage,1480.918093
2020-02-28 20:43:55,service_cpu_usage,0.024715
2020-02-28 20:43:56,service_cpu_usage,0.027122


In [615]:
max_df = series.groupby(['date'])['cartservice'].max()
max_df = pd.DataFrame(max_df)

max_df.index = pd.to_datetime(max_df.index)
max_df.sort_index(inplace=True)
#max_df['date'] = pd.to_datetime(max_df['date'])
#max_df = toTimeSeries(max_df, value_col_name='cartservice')
max_df

Unnamed: 0_level_0,cartservice
date,Unnamed: 1_level_1
2020-02-27 22:49:53,1763.896692
2020-02-27 22:49:57,4005.321089
2020-02-27 22:49:59,0.035686
2020-02-27 22:50:00,2237.442300
2020-02-27 22:50:09,1764.323045
...,...
2020-02-28 20:43:50,2170.578358
2020-02-28 20:43:52,1480.918093
2020-02-28 20:43:55,0.024715
2020-02-28 20:43:56,0.027122


# ----------- END TESTING  --------------------

In [616]:
# process istio and container metrics data signals
for file in service_input_files:
    orig_file = file
    print('processing input file {}'.format(file))
    pos = file.find('_')
    metric_name = file[pos+1:]
    pos = metric_name.find('.')
    metric_name = metric_name[:pos]
    print('processing metric {}'.format(metric_name))
    
    data_df = pd.read_csv(file_dir + file)
    data_df.rename(columns={'ztime': 'date', 'name':'metric'}, inplace=True)
    
    if orig_file == containers_count_file:
        remedy_dup = 'max'
    else:
        remedy_dup = 'mean'
        
    # expand and either average or max any duplicates in timestamps series
    expanded_df = expand(data_df, by_col='service', by_col_values=services, dup=remedy_dup)
    
    # sum timeseries rows
    sum_df = sumTimeseries(expanded_df, columns_to_delete=services, metric_name=metric_name)

    if save:
        print('saving {} data with shape {}'.format(orig_file, svc_cpu_usage_df_sum.shape))
        save_to_file = merged_dir + orig_file
        sum_df.to_csv(path_or_buf=save_to_file, index=True) 
        print("----------------")
    else:
        print("Metric data is not saved. Savig flag is turned off!")        


processing input file 1_service_cpu_use.csv
processing metric service_cpu_use
Processing metric for column: % checkoutservice
Processing metric for column: % cartservice
Processing metric for column: % emailservice
Processing metric for column: % currencyservice
Processing metric for column: % paymentservice
Processing metric for column: % productcatalogservice
Processing metric for column: % shippingservice
saving 1_service_cpu_use.csv data with shape (78845, 2)
----------------
processing input file 2_service_memory_use.csv
processing metric service_memory_use
Processing metric for column: % checkoutservice
Processing metric for column: % cartservice
Processing metric for column: % emailservice
Processing metric for column: % currencyservice
Processing metric for column: % paymentservice
Processing metric for column: % productcatalogservice
Processing metric for column: % shippingservice
saving 2_service_memory_use.csv data with shape (78845, 2)
----------------
processing input file

In [617]:
# convert latency to percentile of rate change
ltcy_file = merged_dir + service_ltcy_file
ltcy_df = pd.read_csv(ltcy_file)
ltcy_df = toTimeSeries(ltcy_df)

metric = ltcy_df.metric[0]

ltcy_df = ltcy_df.drop('metric', 1)
ltcy_df = rate(ltcy_df, 'value', rate_time_window)
ltcy_df = ltcy_df.groupby(['date']).quantile(latency_percentile)
ltcy_df['metric'] = metric


if save:
    print('saving {} latency quantile file to {} with dimension {}'.format(Percentile, ltcy_file, ltcy_df.shape))       
    ltcy_df.to_csv(path_or_buf=ltcy_file, index=True) 
else:
    print("Latency percentils data is not saved. Savig flag is turned off!")   

saving 0.95 latency quantile file to ../../data/raw-data-linode-run3/merged/7_service_ltcy.csv with dimension (78781, 2)


In [618]:
# system_cpu_usage
system_cpu_usage_df = pd.read_csv(file_dir + system_cpu_usage_file)
system_cpu_usage_df.rename(columns={'ztime': 'date', 'name':'metric'}, inplace=True)

# find nodes in the system
nodes = system_cpu_usage_df.dropna(subset=['node']).node.unique()

system_cpu_usage_expanded = expand(system_cpu_usage_df, by_col='node'
                                   , by_col_values=nodes, dup='mean')

system_cpu_usage_sum = sumTimeseries(system_cpu_usage_expanded, columns_to_delete=nodes
                                          , metric_name='system_cpu_usage')

if save:
    print('saving system_cpu_usage to file {} with dimension {}'.format(merged_dir + system_cpu_usage_file, system_cpu_usage_sum.shape))       
    system_cpu_usage_sum.to_csv(path_or_buf=merged_dir + system_cpu_usage_file, index=True) 
else:
    print("system_cpu_usage data is not saved. Savig flag is turned off!") 

Processing metric for column: % 192.168.181.164
Processing metric for column: % 192.168.228.12
Processing metric for column: % 192.168.227.189
Processing metric for column: % 192.168.189.71
Processing metric for column: % 192.168.227.202
saving system_cpu_usage to file ../../data/raw-data-linode-run3/merged/12_system_cpu_usage.csv with dimension (78841, 2)


# Concatenate all data together

In [619]:
i = 1
all_files = service_input_files + system_input_files
for file in all_files:
    input_file = merged_dir + file
    print('reading data from {}'.format(input_file))
    input_df = pd.read_csv(input_file)
    #input_df = toTimeSeries(input_df)
    if i == 1:
        timeseries_df = input_df
    else:
        timeseries_df = pd.concat([timeseries_df, input_df], ignore_index=True)
    i = i +1

if save:
    print('saving system_cpu_usage to file {} with dimension {}'.format(merged_dir + concated_data_file, timeseries_df.shape))       
    timeseries_df.to_csv(path_or_buf=merged_dir + concated_data_file, index=True) 
else:
    print("system_cpu_usage data is not saved. Savig flag is turned off!")
    

reading data from ../../data/raw-data-linode-run3/merged/1_service_cpu_use.csv
reading data from ../../data/raw-data-linode-run3/merged/2_service_memory_use.csv
reading data from ../../data/raw-data-linode-run3/merged/3_service_cpu_sat.csv
reading data from ../../data/raw-data-linode-run3/merged/4_service_net_usage.csv
reading data from ../../data/raw-data-linode-run3/merged/5_service_disk_usage.csv
reading data from ../../data/raw-data-linode-run3/merged/6_service_req_total.csv
reading data from ../../data/raw-data-linode-run3/merged/7_service_ltcy.csv
reading data from ../../data/raw-data-linode-run3/merged/8_service_errors.csv
reading data from ../../data/raw-data-linode-run3/merged/9_service_request_size.csv
reading data from ../../data/raw-data-linode-run3/merged/10_service_response_size.csv
reading data from ../../data/raw-data-linode-run3/merged/11_containers_count.csv
reading data from ../../data/raw-data-linode-run3/merged/12_system_cpu_usage.csv
saving system_cpu_usage to fil