In [1]:
import json
import pandas as pd
import os
from pandas.io.json import json_normalize

def read_jsonl(relative_path):
    with open(relative_path, "r") as file:
        lines = file.readlines()
    jsons = [json.loads(line) for line in lines]
    return pd.json_normalize(jsons)

def list_files_by_extension(extension, directory_path):
    files = [file for file in os.listdir(directory_path) if file.lower().endswith(extension.lower())]
    sorted_by_name = sorted(files)
    return sorted_by_name

def list_directores_in_path(relative_path):
    return [os.path.join(relative_path, fname) for fname in os.listdir(relative_path) if os.path.isdir(os.path.join(relative_path, fname))]

In [2]:
from functools import reduce
from itertools import chain
import numpy as np

def load_data_run(path):
    print(f"Loading from {path}")
    files = [os.path.join(path, file) for file in ["job_descriptions.jsonl", "sys_info.jsonl"]]
    meta = [read_jsonl(file) for file in files]
    events = read_jsonl(os.path.join(path, "metrics.jsonl"))
    meta = pd.merge(meta[0], meta[1], on="jobId")
    return np.array([meta, events])

def load_from_sources(paths):
    frames = np.array([load_data_run(path) for path in paths])
    return pd.concat(frames[:, 0]), pd.concat(frames[:, 1])

def load_frames_for_base(path, base):
    base_path = os.path.join(path,base)
    categories = list_directores_in_path(base_path)
    frames_paths =list(chain(*[list_directores_in_path(category) for category in categories]))
    return load_from_sources(frames_paths)

In [3]:
data_root = "../parsed-logs2/"

aws_bases = list_directores_in_path(data_root + "aws")
gcloud_bases = list_directores_in_path(data_root +"gcloud")

In [4]:
df = load_frames_for_base(data_root, "gcloud")

Loading from ../parsed-logs2/gcloud\c2-standard-4\montage2__0.01__1.0.0__2020-05-31-11-34-47
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage2__0.01__1.0.0__2020-06-01-18-58-26
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage2__0.25__1.0.0__2020-05-31-13-13-15


  return np.array([meta, events])


Loading from ../parsed-logs2/gcloud\c2-standard-4\montage2__0.25__1.0.0__2020-06-01-08-39-05
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage2__1.0__1.0.0__2020-05-31-13-56-26
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage2__1.0__1.0.0__2020-06-01-09-21-55
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage__0.25__1.0.0__2020-05-31-12-35-35
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage__0.25__1.0.0__2020-06-01-08-03-31
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage__1.0__1.0.0__2020-05-31-12-41-26
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage__1.0__1.0.0__2020-06-01-08-08-23
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage__2.0__1.0.0__2020-05-31-12-57-16
Loading from ../parsed-logs2/gcloud\c2-standard-4\montage__2.0__1.0.0__2020-06-01-08-23-02
Loading from ../parsed-logs2/gcloud\c2-standard-4\soykb__134__1.0.0__2020-05-31-18-54-39
Loading from ../parsed-logs2/gcloud\c2-standard-4\soykb__134__1.0.0__2020-06-01-14-15-

Loading from ../parsed-logs2/gcloud\n2d-standard-4\montage2__1.0__1.0.0__2020-06-21-18-43-46
Loading from ../parsed-logs2/gcloud\n2d-standard-4\montage__0.25__1.0.0__2020-06-20-19-19-24
Loading from ../parsed-logs2/gcloud\n2d-standard-4\montage__0.25__1.0.0__2020-06-21-14-10-27
Loading from ../parsed-logs2/gcloud\n2d-standard-4\montage__1.0__1.0.0__2020-06-20-19-33-55
Loading from ../parsed-logs2/gcloud\n2d-standard-4\montage__1.0__1.0.0__2020-06-21-16-46-00
Loading from ../parsed-logs2/gcloud\n2d-standard-4\montage__2.0__1.0.0__2020-06-20-19-53-33
Loading from ../parsed-logs2/gcloud\n2d-standard-4\montage__2.0__1.0.0__2020-06-21-17-04-44
Loading from ../parsed-logs2/gcloud\n2d-standard-4\soykb__134__1.0.0__2020-06-21-03-35-15
Loading from ../parsed-logs2/gcloud\n2d-standard-4\soykb__134__1.0.0__2020-06-22-00-37-22
Loading from ../parsed-logs2/gcloud\n2d-standard-4\soykb__238__1.0.0__2020-06-21-07-11-24
Loading from ../parsed-logs2/gcloud\n2d-standard-4\soykb__238__1.0.0__2020-06-22-11

In [5]:
meta, events = df

In [6]:
filtered_events = events.loc[events["parameter"] != "event"].reset_index()
filtered_events

MemoryError: Unable to allocate 397. MiB for an array with shape (8, 6505759) and data type object

In [None]:
filtered_events.parameter.unique()

{"read": 1225, "write": 1, "readSyscalls": 5, "writeSyscalls": 1, "readReal": 0, "writeReal": 0, "writeCancelled": 0}

In [7]:
io_params = ["read", "write", "readSyscalls", "writeSyscalls", "readReal", "writeReal", "writeCancelled"]
network_params = ["rxBytes", "rxPackets", "rxErrors", "rxDrop", "rxFifo", "rxFrame", "rxCompressed", "rxMulticast", "txBytes", "txPackets", "txErrors", "txDrop", "txFifo", "txColls", "txCarrier", "txCompressed"]

io_agg_kwargs={f"{param}_sum" : (f"value.{param}", "sum") for param in io_params}
network_agg_kwargs={f"{param}_sum" : (f"value.{param}", "sum") for param in network_params}
cpu_agg_kwargs={"cpu_mean": ('value', 'mean'), "cpu_max": ('value', 'max')}
memory_agg_kwargs={"memory_mean": ('value', 'mean'), "memory_max": ('value', 'max')}
ctime_agg_kwargs={"ctime_mean": ('value', 'mean'), "ctime_max": ('value', 'max'), "ctime_sum": ('value', 'sum')}

def extract_meta_from_metrics(frame):
    frame["value"] = pd.to_numeric(frame["value"])
    def postprocess(aggregated_frame, parameter_type):
        aggregated_frame = aggregated_frame.reset_index()
        return aggregated_frame.loc[aggregated_frame["parameter"] == parameter_type].drop("parameter", axis=1)
    
    frame_grp = frame.groupby(['jobId', 'parameter'])
    metrics = [
        postprocess(frame_grp.agg(**io_agg_kwargs), "io"),
        postprocess(frame_grp.agg(**network_agg_kwargs), "network"),          
        postprocess(frame_grp.agg(**cpu_agg_kwargs), "cpu"),
        postprocess(frame_grp.agg(**memory_agg_kwargs), "memory"),
        postprocess(frame_grp.agg(**ctime_agg_kwargs), "ctime")   
    ]   
    return reduce(lambda left, right: pd.merge(left,right,on='jobId'), metrics)

In [9]:
# extract_meta_from_metrics(filtered_events)

In [10]:
# import matplotlib.pyplot as plt
# plt.rcParams['figure.figsize'] = [10, 10]

In [None]:
meta.groupby('executable').count().sort_values('size').plot.barh(y='size', logx=True, legend=False)

### Todo - zmerge'ować base z metadanymi. Zrobić matrycę pearsona. Podstawowe klasyfikatory. GPT-2. Podział na 

In [1]:
meta.iloc[5]
# meta.columns

NameError: name 'meta' is not defined

In [None]:
def preprocess_metrics(events_frame):
    print("Preprocessing metrics")
    events_frame = events_frame.loc[events_frame["parameter"] != "event"].reset_index()
    return extract_meta_from_metrics(events_frame)

def preprocess_meta(meta_frame):
    print("Preprocessing meta")
    return meta_frame.drop(["hyperflowId", 'version', 'nodeName', 'cpu.socket', 'cpu.speedmin', 
                            'cpu.speedmax', 'cpu.governor', 'cpu.revision', 'cpu.voltage', 'env.nodeName', 
                            'env.podIp', 'env.podServiceAccount', 'env.podName', 'env.podNamespace', 'stdout'], axis=1)

def join_metrics_with_meta(metrics_frame, meta_frame):
    print("Joining metrics with meta")
    return pd.merge(metrics_frame, meta_frame, on="jobId")

def dump_preprocessed(data_frame, path="merged.csv"):
    print("Dumping preprocessed")
    data_frame.to_csv(os.path.join(data_root, path))

In [None]:
def store_intermediate():
    metrics = preprocess_metrics(events)
    metadata = preprocess_meta(meta)
    joint_df = join_metrics_with_meta(metrics, metadata)
    dump_preprocessed(joint_df, "gcloud.csv")

In [None]:
store_intermediate()

In [16]:
meta.inputs.iloc[0]

[{'name': 'region-oversized.hdr', 'size': 277},
 {'name': '2mass-atlas-001021s-j0490233.fits', 'size': 1537525}]

In [17]:
metrics = preprocess_metrics(events)
metadata = preprocess_meta(meta)
joint_df = join_metrics_with_meta(metrics, metadata)

Preprocessing metrics
Preprocessing meta
Joining metrics with meta


In [18]:
metrics.iloc[0]

jobId                 -tDswEFbA-1-1
read_sum                 27656260.0
write_sum                      10.0
readSyscalls_sum             1054.0
writeSyscalls_sum              10.0
readReal_sum             14708736.0
writeReal_sum                   0.0
writeCancelled_sum              0.0
rxBytes_sum                     0.0
rxPackets_sum                   0.0
rxErrors_sum                    0.0
rxDrop_sum                      0.0
rxFifo_sum                      0.0
rxFrame_sum                     0.0
rxCompressed_sum                0.0
rxMulticast_sum                 0.0
txBytes_sum                     0.0
txPackets_sum                   0.0
txErrors_sum                    0.0
txDrop_sum                      0.0
txFifo_sum                      0.0
txColls_sum                     0.0
txCarrier_sum                   0.0
txCompressed_sum                0.0
cpu_mean                  86.905473
cpu_max                        99.0
memory_mean              11498291.2
memory_max               124

In [19]:
metadata.iloc[0]

workflowName                                                  montage2
size                                                              0.01
jobId                                                    32i_W49Sz-1-2
executable                                                    mProject
args                 [-X, 2mass-atlas-001021s-j0490233.fits, p2mass...
inputs               [{'name': 'region-oversized.hdr', 'size': 277}...
outputs              [{'name': 'p2mass-atlas-001021s-j0490233.fits'...
name                                                          mProject
command              mProject -X 2mass-atlas-001021s-j0490233.fits ...
execTimeMs                                                       11882
cpu.manufacturer                                               IntelÂ®
cpu.brand                                       XeonÂ® Platinum 8259CL
cpu.vendor                                                            
cpu.family                                                            
cpu.mo

In [20]:
metadata.loc[metadata['jobId'] == '-8Ip2oamm-1-10'].inputs.iloc[0]

IndexError: single positional indexer is out-of-bounds

In [21]:
metadata.iloc[0]

workflowName                                                  montage2
size                                                              0.01
jobId                                                    32i_W49Sz-1-2
executable                                                    mProject
args                 [-X, 2mass-atlas-001021s-j0490233.fits, p2mass...
inputs               [{'name': 'region-oversized.hdr', 'size': 277}...
outputs              [{'name': 'p2mass-atlas-001021s-j0490233.fits'...
name                                                          mProject
command              mProject -X 2mass-atlas-001021s-j0490233.fits ...
execTimeMs                                                       11882
cpu.manufacturer                                               IntelÂ®
cpu.brand                                       XeonÂ® Platinum 8259CL
cpu.vendor                                                            
cpu.family                                                            
cpu.mo

In [22]:
metadata.inputs.iloc[0]

[{'name': 'region-oversized.hdr', 'size': 277},
 {'name': '2mass-atlas-001021s-j0490233.fits', 'size': 1537525}]

In [23]:
joint_df.iloc[0]

jobId                -tDswEFbA-1-1
read_sum                27656260.0
write_sum                     10.0
readSyscalls_sum            1054.0
writeSyscalls_sum             10.0
                         ...      
mem.slab                 479772672
mem.buffcache           2873090048
mem.swaptotal                    0
mem.swapused                     0
mem.swapfree                     0
Name: 0, Length: 66, dtype: object

In [24]:
joint_df.columns

Index(['jobId', 'read_sum', 'write_sum', 'readSyscalls_sum',
       'writeSyscalls_sum', 'readReal_sum', 'writeReal_sum',
       'writeCancelled_sum', 'rxBytes_sum', 'rxPackets_sum', 'rxErrors_sum',
       'rxDrop_sum', 'rxFifo_sum', 'rxFrame_sum', 'rxCompressed_sum',
       'rxMulticast_sum', 'txBytes_sum', 'txPackets_sum', 'txErrors_sum',
       'txDrop_sum', 'txFifo_sum', 'txColls_sum', 'txCarrier_sum',
       'txCompressed_sum', 'cpu_mean', 'cpu_max', 'memory_mean', 'memory_max',
       'ctime_mean', 'ctime_max', 'ctime_sum', 'workflowName', 'size',
       'executable', 'args', 'inputs', 'outputs', 'name', 'command',
       'execTimeMs', 'cpu.manufacturer', 'cpu.brand', 'cpu.vendor',
       'cpu.family', 'cpu.model', 'cpu.stepping', 'cpu.speed', 'cpu.cores',
       'cpu.physicalCores', 'cpu.processors', 'cpu.cache.l1d', 'cpu.cache.l1i',
       'cpu.cache.l2', 'cpu.cache.l3', 'mem.total', 'mem.free', 'mem.used',
       'mem.active', 'mem.available', 'mem.buffers', 'mem.cached', 'mem

In [25]:
joint_df.inputs.iloc[0]

[{'name': '2mass-atlas-980914s-j0820044.fits', 'size': 1529220},
 {'name': 'region-oversized.hdr', 'size': 277}]

In [26]:
joint_df.loc[joint_df['jobId'] == '32i_W49Sz-1-2'].inputs.iloc[0]

[{'name': 'region-oversized.hdr', 'size': 277},
 {'name': '2mass-atlas-001021s-j0490233.fits', 'size': 1537525}]