In [1]:
# Following variables can be set here or via papermill
# experiment = "experiment-baseline-with-latency-3"

# Imports

In [50]:
from google.oauth2 import service_account
from google.cloud import monitoring_v3
from google.cloud import bigquery
from google.cloud import storage
from google.oauth2 import service_account
from google.cloud.bigquery.job import ExtractJobConfig
import time
import matplotlib.dates as mdates
import pandas as pd
import numpy as np
import json
import matplotlib.pyplot as plt
import itertools
import os
from pathlib import Path
import datetime
import shutil
import subprocess
from subprocess import PIPE
import sharedVariables
from sharedVariables import *

# Network Logs

In [7]:

credentials = service_account.Credentials.from_service_account_file(
    '../infrastructure/credentials.json')

client = bigquery.Client(credentials=credentials)
today = datetime.datetime.today()
table_id = f"compute_googleapis_com_vpc_flows_{today.strftime('%Y%m%d')}"
# table_id = 'compute_googleapis_com_vpc_flows_20210720'
blob_name = "export.log"



extract_conf = ExtractJobConfig()
extract_conf.compression = 'NONE'
extract_conf.destination_format = 'NEWLINE_DELIMITED_JSON'

def getDataset(experiment, outDir = datetime.datetime.today().strftime('%Y%m%d-%H')): 
    ensureDirectory(outDir)
    bucket_name = experiment + "-log-bucket"

    destination_uri = "gs://{}/{}".format(bucket_name, blob_name)
    dataset_ref = bigquery.DatasetReference(project, experiment.replace("-", "_"))
    table_ref = dataset_ref.table(table_id)

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location="US",
        job_config=extract_conf
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print(
        "Exported {}:{}.{} to {}".format(project, experiment, table_id, destination_uri)
    )


    storage_client = storage.Client(credentials=credentials)

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    blob.download_to_filename(f"{outDir}/gcp-flow-network.log")

    print(
        "Blob downloaded successfully."
    )

if 'experiment' not in locals():
    for experiment in sharedVariables.experiments: 
        try: 
            getDataset(experiment)
        except Exception as e: 
            print(e)

Exported dspj-315716:experiment-baseline-with-latency-3.compute_googleapis_com_vpc_flows_20210920 to gs://experiment-baseline-with-latency-3-log-bucket/export.log
Blob downloaded successfully.
404 Not found: Table dspj-315716:experiment_syncmesh_with_latency_3.compute_googleapis_com_vpc_flows_20210920 was not found in location US
404 Not found: Table dspj-315716:experiment_advanced_mongo_with_latency_3.compute_googleapis_com_vpc_flows_20210920 was not found in location US
404 POST https://bigquery.googleapis.com/bigquery/v2/projects/dspj-315716/jobs?prettyPrint=false: Not found: Dataset dspj-315716:experiment_syncmesh_with_latency_6
404 POST https://bigquery.googleapis.com/bigquery/v2/projects/dspj-315716/jobs?prettyPrint=false: Not found: Dataset dspj-315716:experiment_baseline_with_latency_6


# PCAPs

In [27]:
def download_pcap(ip, outDir = datetime.datetime.today().strftime('%Y%m%d-%H')):
    ensureDirectory(outDir)
    print("Dowloading File")
    print(os.path.join(Path.cwd(), "..", "cert"))
    result = subprocess.run(f"scp -i ../infrastructure/orchestrator.pem -o StrictHostKeyChecking=no orchestrator@{ip}:/captures.zip ./captures.zip", shell=True, stdout=PIPE, stderr=PIPE, universal_newlines=True)
    if(result.returncode == 0):
        print(result.stdout)
    else:
        raise Exception(result.stderr)
    shutil.unpack_archive("./captures.zip", outDir)

if 'experiment' not in locals():
    download_pcap("35.224.133.98")


# Monitoring

In [101]:
credentials = service_account.Credentials.from_service_account_file(
    '../infrastructure/credentials.json')

# Our project ID

monitoring_client = monitoring_v3.MetricServiceClient(credentials=credentials)
project_name = f"projects/{project_id}"
now = time.time()
seconds = int(now)
nanos = int((now - seconds) * 10 ** 9)

# 3600 = Get the last hour of metrics
timeframe = 3600 * 2
if 'experiment' not in locals():
    print("set timeframe to one hour")
    timeframe = 3600 * 1

interval = monitoring_v3.TimeInterval(
    {
        "end_time": {"seconds": seconds, "nanos": nanos},
        "start_time": {"seconds": (seconds - timeframe), "nanos": nanos},
    }
)

# Add Filter? metric.label.instance_name = "your-instance-id"
results_cpu = monitoring_client.list_time_series(
    request={
        "name": project_name,
        "filter": 'metric.type =  "compute.googleapis.com/instance/cpu/utilization"',
        "interval": interval,
        "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
    }
)
results_io_read = monitoring_client.list_time_series(
    request={
        "name": project_name,
        "filter": 'metric.type = "compute.googleapis.com/instance/disk/read_bytes_count"',
        "interval": interval,
        "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
    }
)
results_io_write = monitoring_client.list_time_series(
    request={
        "name": project_name,
        "filter": 'metric.type = "compute.googleapis.com/instance/disk/write_bytes_count"',
        "interval": interval,
        "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
    }
)
results_iops_read = monitoring_client.list_time_series(
    request={
        "name": project_name,
        "filter": 'metric.type = "compute.googleapis.com/instance/disk/read_ops_count"',
        "interval": interval,
        "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
    }
)
results_iops_write = monitoring_client.list_time_series(
    request={
        "name": project_name,
        "filter": 'metric.type = "compute.googleapis.com/instance/disk/write_ops_count"',
        "interval": interval,
        "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
    }
)

mutli_results_memory = {}
mem_states = ["buffered", "cached", "free", "slab", "used"]
for state in mem_states:
    mutli_results_memory[state] = monitoring_client.list_time_series(
        request={
            "name": project_name,
            "filter": f'metric.type = "agent.googleapis.com/memory/bytes_used" AND metric.labels.state = "{state}"',
            "interval": interval,
            "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
        }
    )

mutli_results_memory_percentage = {}
for state in mem_states:
    mutli_results_memory_percentage[state] = monitoring_client.list_time_series(
        request={
            "name": project_name,
            "filter": f'metric.type = "agent.googleapis.com/memory/percent_used" AND metric.labels.state = "{state}"',
            "interval": interval,
            "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
        }
    )



# Prepare Data so that as the monitoring time_series_list is not aligned
time_series_list_list_labels = ["cpu_util_", "io_read_", "io_write_", "iops_read_", "iops_write_"]
time_series_list_list = [results_cpu, results_io_read, results_io_write, results_iops_read, results_iops_write]
# Add Memory
for state in mem_states:
    time_series_list_list.append(mutli_results_memory[state])
    time_series_list_list_labels.append(f"mem_{state}_")

for state in mem_states:
    time_series_list_list.append(mutli_results_memory_percentage[state])
    time_series_list_list_labels.append(f"mem_perc_{state}_")

print(time_series_list_list_labels)

instance_logs = {}

for index, time_series_list in enumerate(time_series_list_list):
    for time_series in time_series_list:
        if time_series.metric.labels["instance_name"]:
            label = time_series.metric.labels["instance_name"]
        else:
            # Find out label by comapring the instance ID
            for compare_item in time_series_list_list[0]:
                if compare_item.resource.labels["instance_id"] == time_series.resource.labels["instance_id"]:
                    label = compare_item.metric.labels["instance_name"]
        if label not in instance_logs:
            instance_logs[label] = np.empty(len(time_series_list_list), dtype=object)
        instance_logs[label][index] = time_series

def loadMonitoringData(experiment, outDir = datetime.datetime.today().strftime('%Y%m%d-%H')): 
    ensureDirectory(outDir)
    df = pd.DataFrame()
    set_timestamp_column = True
    first_len = 0
    # Filter instances for our experiment
    filteredInstances = {}
    for key, instance in instance_logs.items():
        if key.startswith(experiment):
            filteredInstances[key] = instance

    # Find out which point array is smallest
    first_len = min([len(item.points) for instances in filteredInstances.values() for item in instances])
    for instances in filteredInstances.values():
        print(instances[0].metric.labels["instance_name"], len(instances[0].points))



    print("Overall min len:", first_len)

    for key, instances in filteredInstances.items():
        for index in range(len(instances)):
            label = key
            # if "orchestrator" in label:
            #     # Skip Orchesrtator
            #     continue
            if set_timestamp_column:
                # first_len = min([len(item.points) - 1 for item in instances])
                # print(first_len)
                print("SET min len:", first_len)

                df['timestamp'] = pd.to_datetime([p.interval.start_time.ToDatetime() for p in instances[0].points[:first_len]])
                set_timestamp_column = False
            # print(ts.points[0])
            # When deploying the vm they might take different amount of time leading to some values beeing available a minute early this leading to different length
            # We can trim off the last values as they are orderer from most recent to last
            df[time_series_list_list_labels[index] + label] = [p.value.double_value for p in instances[index].points[:first_len]]

            # df['cpu_util_' + label] = [p.value.double_value for p in ts_cpu.points[:first_len]]
            # df['io_read_' + label] = [p.value.int64_value for p in ts_io_read.points[:first_len]]
            # df['io_write_' + label] = [p.value.int64_value for p in ts_io_write.points[:first_len]]
            # df['iops_read_' + label] = [p.value.int64_value for p in ts_iops_read.points[:first_len]]
            # df['iops_write_' + label] = [p.value.int64_value for p in ts_iops_write.points[:first_len]]

    df.set_index('timestamp', inplace=True)
    df.index = pd.to_datetime(df.index)
    columns = df.columns
    df.to_csv(f"{outDir}/monitoring.csv")
    print(f"Gathered all Monitoring data for {experiment}")
    return df


# Make sure your experiment name is included in the experiments list, otherwise only errors will occur.
if 'experiment' not in locals():
    for experiment in sharedVariables.experiments: 
        try: 
            loadMonitoringData(experiment)
        except Exception as e: 
            print(e)
# loadMonitoringData("experiment-syncmesh-with-latency-9")


['cpu_util_', 'io_read_', 'io_write_', 'iops_read_', 'iops_write_', 'mem_buffered_', 'mem_cached_', 'mem_free_', 'mem_slab_', 'mem_used_', 'mem_perc_buffered_', 'mem_perc_cached_', 'mem_perc_free_', 'mem_perc_slab_', 'mem_perc_used_']
experiment-syncmesh-with-latency-9-node-instance-2 98
experiment-syncmesh-with-latency-9-node-instance-7 98
experiment-syncmesh-with-latency-9-node-instance-6 97
experiment-syncmesh-with-latency-9-node-instance-4 98
experiment-syncmesh-with-latency-9-node-instance-8 98
experiment-syncmesh-with-latency-9-node-instance-3 96
experiment-syncmesh-with-latency-9-node-instance-1 98
experiment-syncmesh-with-latency-9-node-instance-5 98
experiment-syncmesh-with-latency-9-test-orchestrator 94
experiment-syncmesh-with-latency-9-client-instance 94
experiment-syncmesh-with-latency-9-node-instance-9 98
Overall min len: 94
SET min len: 94
Gathered all Monitoring data for experiment-syncmesh-with-latency-9


  df[time_series_list_list_labels[index] + label] = [p.value.double_value for p in instances[index].points[:first_len]]


Unnamed: 0_level_0,cpu_util_experiment-syncmesh-with-latency-9-node-instance-2,io_read_experiment-syncmesh-with-latency-9-node-instance-2,io_write_experiment-syncmesh-with-latency-9-node-instance-2,iops_read_experiment-syncmesh-with-latency-9-node-instance-2,iops_write_experiment-syncmesh-with-latency-9-node-instance-2,mem_buffered_experiment-syncmesh-with-latency-9-node-instance-2,mem_cached_experiment-syncmesh-with-latency-9-node-instance-2,mem_free_experiment-syncmesh-with-latency-9-node-instance-2,mem_slab_experiment-syncmesh-with-latency-9-node-instance-2,mem_used_experiment-syncmesh-with-latency-9-node-instance-2,...,mem_buffered_experiment-syncmesh-with-latency-9-node-instance-9,mem_cached_experiment-syncmesh-with-latency-9-node-instance-9,mem_free_experiment-syncmesh-with-latency-9-node-instance-9,mem_slab_experiment-syncmesh-with-latency-9-node-instance-9,mem_used_experiment-syncmesh-with-latency-9-node-instance-9,mem_perc_buffered_experiment-syncmesh-with-latency-9-node-instance-9,mem_perc_cached_experiment-syncmesh-with-latency-9-node-instance-9,mem_perc_free_experiment-syncmesh-with-latency-9-node-instance-9,mem_perc_slab_experiment-syncmesh-with-latency-9-node-instance-9,mem_perc_used_experiment-syncmesh-with-latency-9-node-instance-9
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2021-09-30 04:57:00,0.024400,0.0,0.0,0.0,0.0,61444096.0,2.869711e+09,2.819400e+08,191016960.0,451203072.0,...,60674048.0,2.866962e+09,3.108250e+08,191287296.0,425566208.0,1.573777,74.363897,8.062246,4.961652,11.038429
2021-09-30 04:56:00,0.042535,0.0,0.0,0.0,0.0,61411328.0,2.869682e+09,2.817147e+08,191029248.0,451477504.0,...,60624896.0,2.866942e+09,3.108250e+08,191258624.0,425664512.0,1.572502,74.363366,8.062246,4.960908,11.040979
2021-09-30 04:55:00,0.028337,0.0,0.0,0.0,0.0,61358080.0,2.869662e+09,2.819727e+08,191012864.0,451309568.0,...,60579840.0,2.866917e+09,3.110830e+08,191258624.0,425476096.0,1.571333,74.362728,8.068939,4.960908,11.036092
2021-09-30 04:54:00,0.018776,0.0,0.0,0.0,0.0,61304832.0,2.869637e+09,2.817147e+08,191021056.0,451637248.0,...,60530688.0,2.866885e+09,3.115663e+08,191279104.0,425054208.0,1.570058,74.361878,8.081476,4.961439,11.025149
2021-09-30 04:53:00,0.019203,0.0,0.0,0.0,0.0,61251584.0,2.869608e+09,2.819727e+08,191008768.0,451473408.0,...,60481536.0,2.866860e+09,3.108782e+08,191254528.0,425840640.0,1.568783,74.361241,8.063627,4.960802,11.045547
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021-09-30 03:28:00,0.018795,0.0,0.0,0.0,0.0,53829632.0,2.843849e+09,3.668746e+08,188784640.0,401977344.0,...,53346304.0,2.842218e+09,3.669197e+08,189100032.0,403730432.0,1.383708,73.722083,9.517243,4.904918,10.472048
2021-09-30 03:27:00,0.019321,0.0,0.0,0.0,0.0,53739520.0,2.843845e+09,3.705201e+08,188780544.0,398430208.0,...,53248000.0,2.842210e+09,3.704586e+08,189149184.0,400248832.0,1.381158,73.721871,9.609037,4.906193,10.381742
2021-09-30 03:26:00,0.018864,0.0,0.0,0.0,0.0,53641216.0,2.843017e+09,3.642941e+08,188735488.0,405626880.0,...,53174272.0,2.842161e+09,3.696845e+08,189149184.0,401145856.0,1.379246,73.720596,9.588957,4.906193,10.405009
2021-09-30 03:25:00,0.021517,0.0,0.0,0.0,0.0,43597824.0,2.656801e+09,6.624502e+08,163115008.0,329351168.0,...,51900416.0,2.815693e+09,3.833610e+08,186118144.0,418242560.0,1.346204,73.034054,9.943702,4.827573,10.848467


# Get TimeStamps

In [23]:
# # Workaround for now
# # df_sync3 = pd.read_csv(f'{outdir}/experiment-syncmesh-with-latency-3.csv')
# # df_base3 = pd.read_csv(f'{outdir}/experiment-baseline-with-latency-3.csv')
# df_sync3 = loadData(f'{outdir}/experiment-syncmesh-with-latency-3.log')
# df_base3 = loadData(f'{outdir}/experiment-baseline-with-latency-3.log')
# # df_base3.set_index('timestamp', inplace=True)
# # df_sync3.set_index('timestamp', inplace=True)

# # df_sync6 = loadData(f'{outdir}/experiment-syncmesh-with-latency-6.log')
# # df_base6 = loadData(f'{outdir}/experiment-baseline-with-latency-6.log')

# def filterDataForSeperator(df): 
#     df = df[df["jsonPayload.connection.src_ip"].isin([ip_seperator, ip_orchestrator])]
#     df = df[df["jsonPayload.connection.dest_ip"].isin([ip_seperator, ip_orchestrator])]
#     df = df[df["jsonPayload.connection.dest_port"] == 443]
#     return df

# seperator_base3 = filterDataForSeperator(df_base3)
# seperator_sync3 = filterDataForSeperator(df_sync3)

# # df = df[df["jsonPayload.connection.dest_ip"]]
# print(seperator_sync3.index)
# print(seperator_base3.index)
# # df_base3.info()

In [30]:
# Script for automatic data retrieval via papermill
if 'experiment' in locals():
    todaystring = datetime.datetime.today().strftime('%Y%m%d-%H')
    outdir = f"data/{todaystring}-{experiment}"

    f = open(os.path.join(Path.cwd(), "..", "infrastructure", "orchestrator.txt"), "r")
    ip = f.read()
    download_pcap(ip, outDir=outdir)
    loadMonitoringData(experiment, outDir=outdir)
    # getDataset(experiment, outDir=outdir)

    

Dowloading File
c:\Develop\GitHub\DSPJ2021\syncmesh\evaluation\..\cert

experiment-advanced-mongo-with-latency-3-node-instance-2: 165
164
experiment-advanced-mongo-with-latency-3-node-instance-2: 36


ValueError: Length of values (36) does not match length of index (164)