In [1]:
import pandas as pd
from typing import Dict
from datetime import datetime
from enum import Enum
import numpy as np

In [1]:
exp_name = "D2024-04-18_15:54:02_R1_W8"
file_loc = "../../data/{f}/worker_transfer.csv".format(f=exp_name)

In [3]:
dat = pd.read_csv(file_loc)

In [4]:
dat.head()

Unnamed: 0.1,Unnamed: 0,start,stop,middle,duration,keys,total,bandwidth,who,type,called_from,time,compressed
0,0,1713456000.0,1713456000.0,1713456000.0,0.025946,"{('array-8838909ee9756e34565341881e2e6f0c', 12...",336,12949.784006,tcp://10.201.0.212:45449,incoming_transfer,tcp://10.201.0.212:38577,1713456000.0,
1,1,1713456000.0,1713456000.0,1713456000.0,0.025946,"{('array-8838909ee9756e34565341881e2e6f0c', 12...",336,12949.784006,tcp://10.201.0.212:45449,incoming_transfer,tcp://10.201.0.212:38577,1713456000.0,
2,2,1713456000.0,1713456000.0,1713456000.0,0.030664,"{('array-8838909ee9756e34565341881e2e6f0c', 11...",1344,43829.263669,tcp://10.201.0.212:45449,incoming_transfer,tcp://10.201.0.212:38577,1713456000.0,
3,3,1713456000.0,1713456000.0,1713456000.0,0.029351,"{('array-8838909ee9756e34565341881e2e6f0c', 11...",1344,45790.237645,tcp://10.201.0.212:38577,outgoing_transfer,tcp://10.201.0.212:45449,1713456000.0,2533.0
4,4,1713456000.0,1713456000.0,1713456000.0,0.039901,"{('array-8838909ee9756e34565341881e2e6f0c', 87...",336,8420.737122,tcp://10.201.0.212:38553,incoming_transfer,tcp://10.201.0.212:34699,1713456000.0,


In [5]:
dat["keys"][2]

"{('array-8838909ee9756e34565341881e2e6f0c', 116): 336, ('array-8838909ee9756e34565341881e2e6f0c', 100): 336, ('array-8838909ee9756e34565341881e2e6f0c', 108): 336, ('array-8838909ee9756e34565341881e2e6f0c', 92): 336}"

In [6]:
def assert_keys_only_ever_1_long(dat) :
    dat["keylen"] = dat["keys"].apply(lambda x: len(list(eval(x).keys())) == 1)

assert_keys_only_ever_1_long(dat)
dat["keylen"].value_counts()

keylen
True     6260
False     718
Name: count, dtype: int64

In [7]:
dat["st_lt_mid"] = dat.apply(lambda f:
    True if (f.start < f.middle) else False,
    axis=1)

dat["mid_lt_end"] = dat.apply(lambda f:
    True if (f.middle < f.stop) else False,
    axis=1)

dat["st_lt_end"] = dat.apply(lambda f:
    True if (f.start < f.stop) else False,
    axis=1)

dat["end_st_eq_dur"] = dat.apply(lambda f:
    True if ((f.stop - f.start) == f.duration) else False,
    axis=1)

dat["is_it_out_types_fault"] = dat.apply(lambda f: 
    True if ((not f["st_lt_mid"]) | (not f["mid_lt_end"])) & (f["type"] == "outgoing_transfer") else False
    , axis=1)
dat["is_it_in_types_fault"] = dat.apply(lambda f: 
    True if ((not f["st_lt_mid"]) | (not f["mid_lt_end"])) & (f["type"] == "incoming_transfer") else False
    , axis=1)
print(dat["st_lt_mid"].value_counts())
print()
print(dat["mid_lt_end"].value_counts())
print()
print(dat["st_lt_end"].value_counts())
print()
print(dat["end_st_eq_dur"].value_counts())
print()
print("Number of unexpected occurences: ")
print(dat["st_lt_mid"].value_counts()[False] + dat["mid_lt_end"].value_counts()[False])
print(dat["is_it_out_types_fault"].value_counts())
print()
print(dat["is_it_in_types_fault"].value_counts())

st_lt_mid
True     4339
False    2639
Name: count, dtype: int64

mid_lt_end
True     6788
False     190
Name: count, dtype: int64

st_lt_end
True    6978
Name: count, dtype: int64

end_st_eq_dur
False    6499
True      479
Name: count, dtype: int64

Number of unexpected occurences: 
2829
is_it_out_types_fault
False    4149
True     2829
Name: count, dtype: int64

is_it_in_types_fault
False    6978
Name: count, dtype: int64


In [8]:
class TransferTypeEnum(Enum) :
    INCOMING = 'incoming_transfer'
    OUTGOING = 'outgoing_transfer'
    
class WXferEvent :
    start: datetime
    stop: datetime
    middle: datetime
    duration: float

    keys: Dict[str,int]

    total: int
    bandwith: float
    compressed: float

    requestor: str # ip addr; who
    fulfiller: str # called_from

    transfer_type: TransferTypeEnum

    time : datetime

    def __init__(self, data) :
        self.start = datetime.fromtimestamp(data['start'])
        self.stop = datetime.fromtimestamp(data['stop'])
        self.middle = datetime.fromtimestamp(data['middle'])
        self.duration = data['duration']

        self.keys = eval(data['keys'])

        self.total = data['total']
        self.bandwith = data['bandwidth']
        self.compressed = data['compressed']

        self.requestor = data['who']
        self.fulfiller = data['called_from']

        self.transfer_type = TransferTypeEnum(data['type'])

        self.time = datetime.fromtimestamp(data['time'])
    
    def __str__(self) -> str :
        out = "Worker Transfer Event (Type: {t})".format(t=self.transfer_type)
        out += "\n\tEvent time: {t}".format(t=self.time)
        out += "\n\tRequestor (Them): {r}\tFulfiller (Me): {f}".format(r=self.requestor, f=self.fulfiller)
        out += "\n\tStart: {s}\tMiddle: {m}\tEnd: {e}\t(Duration: {d})".format(
            s=self.start, m=self.middle, e=self.stop, d=self.duration
        )
        out += "\n\tTotal Transfer: {t}".format(t=self.total)
        out += "\n\tAffiliated Keys:"
        for key in self.keys :
            out += "\n\t\t{k}".format(k=key)
        return out

In [9]:
r = WXferEvent(dat.iloc(axis=0)[0])
print(r)

Worker Transfer Event (Type: TransferTypeEnum.INCOMING)
	Event time: 2024-04-18 11:54:44.762709
	Requestor (Them): tcp://10.201.0.212:45449	Fulfiller (Me): tcp://10.201.0.212:38577
	Start: 2024-04-18 11:54:44.735809	Middle: 2024-04-18 11:54:44.748782	End: 2024-04-18 11:54:44.761756	(Duration: 0.0259463787078857)
	Total Transfer: 336
	Affiliated Keys:
		('array-8838909ee9756e34565341881e2e6f0c', 120)


In [10]:
r = WXferEvent(dat.iloc(axis=0)[1])
print(r)

Worker Transfer Event (Type: TransferTypeEnum.INCOMING)
	Event time: 2024-04-18 11:54:44.811011
	Requestor (Them): tcp://10.201.0.212:45449	Fulfiller (Me): tcp://10.201.0.212:38577
	Start: 2024-04-18 11:54:44.735809	Middle: 2024-04-18 11:54:44.748782	End: 2024-04-18 11:54:44.761756	(Duration: 0.0259463787078857)
	Total Transfer: 336
	Affiliated Keys:
		('array-8838909ee9756e34565341881e2e6f0c', 120)


In [11]:

r = WXferEvent(dat.iloc(axis=0)[6977])
print(r)

Worker Transfer Event (Type: TransferTypeEnum.OUTGOING)
	Event time: 2024-04-18 11:57:59.790403
	Requestor (Them): tcp://10.201.0.212:34699	Fulfiller (Me): tcp://10.201.0.212:45449
	Start: 2024-04-18 11:57:45.887894	Middle: 2024-04-18 11:57:45.822504	End: 2024-04-18 11:57:45.890036	(Duration: 0.0021421909332275)
	Total Transfer: 4194304
	Affiliated Keys:
		('stack-b4b92b70b53be1d3b50e36696c31dac6', 48, 0, 0)
