In [1]:
import pandas as pd
import numpy as np

from influxdb_client import InfluxDBClient, Point, Dialect

import re
import time
import datetime

import warnings
from influxdb_client.client.warnings import MissingPivotFunction

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib.colors as colors

import pandasql as ps
import sqlite3

import csv


pd.set_option('display.max_rows', 100)

In [2]:
# Function to parse configurations from go config file
def _parse_line(line):

    rx_dict = {
    'token': re.compile(r'var token = "(?P<token>.*)"\n'),
    'url': re.compile(r'var url = "(?P<url>.*)"\n'),
    'org': re.compile(r'var org = "(?P<org>.*)"\n'),
    'bucket': re.compile(r'var bucket = "(?P<bucket>.*)"\n'),
    }   

    """
    Do a regex search against all defined regexes and
    return the key and match result of the first matching regex

    """
    for key, rx in rx_dict.items():
        match = rx.search(line)
        if match:
            return key, match
    # if there are no matches
    return None, None

filepath = '/root/flexi-pipe/config.go'
# open the file and read through it line by line
with open(filepath, 'r') as file_object:
    line = file_object.readline()
    while line:
        # at each line check for a match with a regex
        key, match = _parse_line(line)

        if key == 'token':
            token = match.group('token')
        elif key == 'url':
            url = match.group('url')
        elif key == 'org':
            org = match.group('org')
        elif key == 'bucket':
            bucket = match.group('bucket')
        
        line = file_object.readline()
# url="http://192.168.20.58:8086"
url = "http://localhost:8086"

In [4]:
start_time = 1692622102
end_time = 1692772911


In [3]:
def experiment(start_time, end_time, filepath):
    # Retrieve experiments data from csv
    data = pd.read_csv(filepath, header=None)
    df = pd.DataFrame(data)

    #Rename columns
    experiments = df.rename(columns={0: "start", 1: "end", 2: "topology", 3: "runtime", 4: "parameter", 5: "d", 6: "dlo", 7: "dhi", 8: "dscore", 9: "dlazy", 10: "dout", 11: "gossipFactor", 12: "initialDelay", 13: "interval"}, errors='raise')

    #Correct timestamp
    experiments["start"] = experiments["start"].str.slice(0, 27)
    experiments["end"] = experiments["end"].str.slice(0, 27)

    #String to timestamp
    # experiments['startUnix'] = pd.to_datetime(experiments["start"],format="%Y-%m-%d %H:%M:%S.%f").astype('int64') / 10**9
    # experiments['endUnix'] = pd.to_datetime(experiments["end"],format="%Y-%m-%d %H:%M:%S.%f").astype('int64') / 10**9
    experiments['startUnix'] = pd.to_datetime(experiments["start"],format="mixed").astype('int64') / 10**9
    experiments['endUnix'] = pd.to_datetime(experiments["end"],format="mixed").astype('int64') / 10**9


    experiments['startUnix'] = pd.to_timedelta(experiments['startUnix'], unit='s').dt.total_seconds().astype(int)#.astype(str)
    experiments['endUnix'] = pd.to_timedelta(experiments['endUnix'], unit='s').dt.total_seconds().astype(int)#.astype(str)

    #Drop fields we don't mneed for the moment
    exp = experiments.drop(columns=["runtime", "initialDelay"]).sort_values(by=["start"])

    #Get times for different intervals
    # intervals = exp["interval"].drop_duplicates().sort_values().reset_index(drop=True)
    # intervals.head(10)

    expTime = exp[exp['startUnix'].astype(int).between(start_time, end_time)]
    # expTime['experiment'] = expTime.index
    expTime = expTime.reset_index().rename({'index':'experiment'}, axis = 'columns')

    return expTime

experiments = experiment(start_time, end_time, '../experiments.csv')
experiments.head(10)

NameError: name 'start_time' is not defined

In [10]:
def from_influx(url, token, org, measurement, start_time, end_time,grouping_key):
    client = InfluxDBClient(url=url, token=token, org=org,  timeout=900_000)

    # write_api = client.write_api(write_options=SYNCHRONOUS)
    query_api = client.query_api()

    data_frame = query_api.query_data_frame('from(bucket: "gs") '
                                        ' |> range(start: '+str(start_time)+', stop:'+str(end_time)+') '
                                        ' |> filter(fn: (r) => r._measurement == "'+measurement+'") '
                                        ' |> group(columns: ["_measurement", "_field"], mode: "by") '
                                        ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")')
    client.close()

    # df = data_frame.drop(columns=['result', 'table','_start', '_stop', '_measurement', 'topic', 'receivedFrom']).sort_values(by=["_time"]).reset_index(drop=True)
    data_frame.reset_index(inplace=True)
    df = data_frame[['_time', grouping_key]].sort_values(by=["_time"]).reset_index(drop=True)
    df["_time"] = pd.to_datetime(df["_time"])

    return df

In [22]:
ref = experiments.loc[experiments["parameter"] == "reference"]
start_reference = ref["startUnix"].min().astype(int)
end_reference = ref["endUnix"].max().astype(int)

ref_published 		= from_influx(url, token, org, "deliverMessage", start_reference, end_reference, "messageID")
ref_delivered 		= from_influx(url, token, org, "publishMessage", start_reference, end_reference, "messageID")

ref_published.head(10)

Unnamed: 0,_time,messageID
0,2023-08-21 14:09:45.838643+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
1,2023-08-21 14:09:45.838787+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
2,2023-08-21 14:09:45.839210+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
3,2023-08-21 14:09:45.839463+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
4,2023-08-21 14:09:45.839751+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
5,2023-08-21 14:09:45.839764+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
6,2023-08-21 14:09:45.840053+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
7,2023-08-21 14:09:45.840195+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
8,2023-08-21 14:09:45.840359+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
9,2023-08-21 14:09:45.840382+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...


In [23]:
ref_delivered.head(10)

Unnamed: 0,_time,messageID
0,2023-08-21 14:09:45.838036+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
1,2023-08-21 14:09:45.838444+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
2,2023-08-21 14:09:45.838762+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
3,2023-08-21 14:09:45.839099+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
4,2023-08-21 14:09:45.839419+00:00,ACQIARIgqtBzkLlcGE0XEpfVekpvvEWamsRXGjEL1K5+bI...
5,2023-08-21 14:09:46.095788+00:00,ACQIARIgD4gWt6P6qKy0jtQWeQ2Y+G3W7oQyvLe71RrdjS...
6,2023-08-21 14:09:46.096191+00:00,ACQIARIgD4gWt6P6qKy0jtQWeQ2Y+G3W7oQyvLe71RrdjS...
7,2023-08-21 14:09:46.096525+00:00,ACQIARIgD4gWt6P6qKy0jtQWeQ2Y+G3W7oQyvLe71RrdjS...
8,2023-08-21 14:09:46.096984+00:00,ACQIARIgD4gWt6P6qKy0jtQWeQ2Y+G3W7oQyvLe71RrdjS...
9,2023-08-21 14:09:46.177911+00:00,ACQIARIgI1fcHl95jQzZMa2FaRW3bAKVmBA6C+PstfIuT4...


In [24]:
par = experiments.loc[experiments["parameter"] == "d"]
start_query = par["startUnix"].min().astype(int)
end_query = par["endUnix"].max().astype(int)

published 	= from_influx(url, token, org, "publishMessage", start_query, end_query, "messageID")
received 	= from_influx(url, token, org, "deliverMessage", start_query, end_query, "messageID")

published.head(10)

Unnamed: 0,_time,messageID
0,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
1,2023-08-21 19:45:22.353669+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
2,2023-08-21 19:45:22.353940+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
3,2023-08-21 19:45:22.354177+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
4,2023-08-21 19:45:22.354402+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
5,2023-08-21 19:45:22.354669+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
6,2023-08-21 19:45:22.355109+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
7,2023-08-21 19:45:22.382170+00:00,ACQIARIg3KUU2uoJavOyzcVNRJLjO/l6ByIRKxWBd/gS56...
8,2023-08-21 19:45:22.382460+00:00,ACQIARIg3KUU2uoJavOyzcVNRJLjO/l6ByIRKxWBd/gS56...
9,2023-08-21 19:45:22.382682+00:00,ACQIARIg3KUU2uoJavOyzcVNRJLjO/l6ByIRKxWBd/gS56...


In [25]:
received.head(10)

Unnamed: 0,_time,messageID
0,2023-08-21 19:45:22.353238+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
1,2023-08-21 19:45:22.353271+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
2,2023-08-21 19:45:22.353590+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
3,2023-08-21 19:45:22.353753+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
4,2023-08-21 19:45:22.354024+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
5,2023-08-21 19:45:22.354079+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
6,2023-08-21 19:45:22.354127+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
7,2023-08-21 19:45:22.354324+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
8,2023-08-21 19:45:22.354398+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
9,2023-08-21 19:45:22.354477+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...


In [26]:
exp = experiments.loc[experiments['topology'] == "unl"]
exp = exp.loc[exp['parameter'] == "d"]
exp = pd.concat([exp, ref])

received 		= pd.concat([received, ref_delivered])
published 		= pd.concat([published, ref_published])

received.head(10)

Unnamed: 0,_time,messageID
0,2023-08-21 19:45:22.353238+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
1,2023-08-21 19:45:22.353271+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
2,2023-08-21 19:45:22.353590+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
3,2023-08-21 19:45:22.353753+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
4,2023-08-21 19:45:22.354024+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
5,2023-08-21 19:45:22.354079+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
6,2023-08-21 19:45:22.354127+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
7,2023-08-21 19:45:22.354324+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
8,2023-08-21 19:45:22.354398+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
9,2023-08-21 19:45:22.354477+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...


In [27]:
published.head(10)

Unnamed: 0,_time,messageID
0,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
1,2023-08-21 19:45:22.353669+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
2,2023-08-21 19:45:22.353940+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
3,2023-08-21 19:45:22.354177+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
4,2023-08-21 19:45:22.354402+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
5,2023-08-21 19:45:22.354669+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
6,2023-08-21 19:45:22.355109+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...
7,2023-08-21 19:45:22.382170+00:00,ACQIARIg3KUU2uoJavOyzcVNRJLjO/l6ByIRKxWBd/gS56...
8,2023-08-21 19:45:22.382460+00:00,ACQIARIg3KUU2uoJavOyzcVNRJLjO/l6ByIRKxWBd/gS56...
9,2023-08-21 19:45:22.382682+00:00,ACQIARIg3KUU2uoJavOyzcVNRJLjO/l6ByIRKxWBd/gS56...


In [58]:

# def calcAverageTime(publish, received, expTime, parameter):
expTime = experiments
parameter = "d"
publish = published

publish = publish[['_time', 'messageID']]
received = received[['_time', 'messageID']]

joined = publish.merge(received, on=['messageID'])
joined['diff'] = ((joined['_time_y'] - joined['_time_x'])/ pd.Timedelta(microseconds=1)).astype(int)
joined = joined.loc[joined["diff"] >= 0].dropna()
joined.head(10)

Unnamed: 0,_time_x,messageID,_time_y,diff
2,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.353590+00:00,198
3,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.353753+00:00,361
4,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354024+00:00,632
5,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354079+00:00,687
6,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354127+00:00,735
7,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354648+00:00,1256
8,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354765+00:00,1373
9,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354784+00:00,1392
10,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354788+00:00,1396
11,2023-08-21 19:45:22.353392+00:00,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,2023-08-21 19:45:22.354795+00:00,1403


In [59]:
#Make the db in memory
conn = sqlite3.connect(':memory:')
#write the tables
joined.to_sql('joined', conn, index=False)
expTime.to_sql('expTime', conn, index=False)

qry = '''
	    select  
	        joined._time_x,
	        joined.diff,
	        joined.messageID,
	        expTime.experiment,
	        expTime.'''+parameter+'''
	    from
	        joined join expTime on
	        joined._time_x between expTime.start and expTime.end
	    '''
dfNew = pd.read_sql_query(qry, conn)
dfNew = dfNew.set_index('experiment').rename(columns={"_time_x": "_time"}).dropna()#.drop(columns=["messageID"])
dfNew.head(20)	

Unnamed: 0_level_0,_time,diff,messageID,d
experiment,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
12,2023-08-21 19:45:22.353392+00:00,198,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,361,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,632,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,687,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,735,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,1256,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,1373,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,1392,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,1396,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24
12,2023-08-21 19:45:22.353392+00:00,1403,ACQIARIgD0L9DFUrEMrmskLqnXInFpXWSpAe/CjcDO6NgA...,24


In [64]:
dfNew.loc[dfNew["d"] == 8].head(100)

Unnamed: 0_level_0,_time,diff,messageID,d
experiment,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2,2023-08-21 14:10:02.983714+00:00,48,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:10:22.998319+00:00,253,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:10:22.998530+00:00,42,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:11:22.040798+00:00,56,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:11:42.053464+00:00,282,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:12:02.066827+00:00,233,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:12:22.080852+00:00,339,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:12:22.081179+00:00,12,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:12:42.094797+00:00,103,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8
2,2023-08-21 14:13:02.108701+00:00,380,ACQIARIg4OtKsZMdIo2i8pZSqW+IQGQGykQif/q8FoDTVF...,8


In [69]:
#Average propagation time per interval
df = dfNew.drop(columns=['_time', 'messageID'])

avgPropExp = df.groupby(['experiment']).agg('mean')
avgPropExp.reset_index(inplace=True)
avgPropExp = avgPropExp.drop(columns=['experiment'])
# avgPropExp.head(10)

avgProp = avgPropExp.groupby([parameter]).agg({'diff':['mean','std']}).fillna(0)
avgProp.columns = avgProp.columns.droplevel(0)
avgProp.reset_index(inplace=True)

avgProp.head(10)

Unnamed: 0,d,mean,std
0,6.0,1614.433799,44.714436
1,8.0,280.236842,0.0
2,12.0,1449.683148,119.457246
3,24.0,1556.014551,24.627918
