Setup

In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import os

import pandas as pd
import numpy as np

In [3]:
from IPython.display import display, HTML

CSS = """
.output {
    flex-direction: row;
}
"""

HTML('<style>{}</style>'.format(CSS))

Given a set of observation traces, we first need to parse each trace:

In [4]:
trace = {f.split(r'.')[0]: pd.read_csv(os.path.join('data', f)) for f in os.listdir('data')}

VM

In [5]:
display(trace['vm'].head(20))
display(trace['tid'].head(20))
display(trace['activity'].head(20))

Unnamed: 0,epoch,timestamp,thread,id,state
0,1,1565621927794,Reference Handler,2,True
1,1,1565621927794,Finalizer,3,False
2,1,1565621927794,Signal Dispatcher,4,True
3,1,1565621927794,main,1,True
4,1,1565621927794,Chaperone,36,True
5,1,1565621927794,Common-Cleaner,21,False
6,2,1565621927796,Reference Handler,2,True
7,2,1565621927796,Finalizer,3,False
8,2,1565621927796,Signal Dispatcher,4,True
9,2,1565621927796,main,1,True


Unnamed: 0,thread,tid
0,main,8373
1,Chaperone,8461


Unnamed: 0,epoch,timestamp,total,state
0,1,1565621927794,1.164854,0.064933
1,2,1565621927796,1.143821,0.049798
2,3,1565621927797,1.168077,0.069164
3,4,1565621927798,1.773208,0.999936
4,5,1565621927800,1.125044,0.034681
5,6,1565621927801,1.158095,0.046836
6,7,1565621927802,1.160983,0.048479
7,8,1565621927803,1.625238,0.999958
8,9,1565621927805,1.121073,0.034696
9,10,1565621927806,1.164782,0.066831


The vm trace requires three parsing steps:

    1) Add the thread tid to each record for alignment with os data
    2) Add a trimmed thread name for alignment with os data that does not have a tid
    3) Add chappie's fractional activeness
    4) Normalize the state by epoch -> needs to happen post alignment with os due to os threads

In [6]:
trace['vm']['tid'] = trace['vm'].thread.map(trace['tid'].set_index('thread').to_dict()['tid']).fillna(-1).astype(int)
trace['vm']['os_thread'] = trace['vm'].thread.str[:15]
trace['vm'].loc[trace['vm'].thread == 'Chaperone', 'state'] = trace['activity'].state
# trace['vm'].state = (trace['vm'].state / trace['vm'].groupby('epoch').state.sum()).fillna(1)

In [7]:
trace['vm'].head(20)

Unnamed: 0,epoch,timestamp,thread,id,state,tid,os_thread
0,1,1565621927794,Reference Handler,2,True,-1,Reference Handl
1,1,1565621927794,Finalizer,3,False,-1,Finalizer
2,1,1565621927794,Signal Dispatcher,4,True,-1,Signal Dispatch
3,1,1565621927794,main,1,True,8373,main
4,1,1565621927794,Chaperone,36,0.0346813,8461,Chaperone
5,1,1565621927794,Common-Cleaner,21,False,-1,Common-Cleaner
6,2,1565621927796,Reference Handler,2,True,-1,Reference Handl
7,2,1565621927796,Finalizer,3,False,-1,Finalizer
8,2,1565621927796,Signal Dispatcher,4,True,-1,Signal Dispatch
9,2,1565621927796,main,1,True,8373,main


OS

In [8]:
display(trace['os'].head(20))
display(trace['sys'].head(20))

Unnamed: 0,epoch,tid,timestamp,record
0,4,8372,1565621927798,8372 (java) S 8061 8060 13706 34818 8060 10779...
1,4,8373,1565621927798,8373 (java) R 8061 8060 13706 34818 8060 41946...
2,4,8374,1565621927798,8374 (GC Thread#0) S 8061 8060 13706 34818 806...
3,4,8375,1565621927798,8375 (G1 Main Marker) S 8061 8060 13706 34818 ...
4,4,8376,1565621927798,8376 (G1 Conc#0) S 8061 8060 13706 34818 8060 ...
5,4,8377,1565621927798,8377 (G1 Refine#0) S 8061 8060 13706 34818 806...
6,4,8378,1565621927798,8378 (G1 Young RemSet) S 8061 8060 13706 34818...
7,4,8379,1565621927798,8379 (VM Thread) S 8061 8060 13706 34818 8060 ...
8,4,8380,1565621927798,8380 (Reference Handl) S 8061 8060 13706 34818...
9,4,8381,1565621927798,8381 (Finalizer) S 8061 8060 13706 34818 8060 ...


Unnamed: 0,record
0,cpu 97877922 0 8218426 1204199311 1395325 0 1...
1,cpu0 3611245 0 325980 28804203 8848 0 21166 0 0 0
2,cpu1 2549381 0 282918 29938072 6885 0 17541 0 0 0
3,cpu2 2816600 0 236961 29717423 6009 0 32843 0 0 0
4,cpu3 2602616 0 233464 29944076 5907 0 6130 0 0 0
5,cpu4 2478736 0 210530 30096523 4956 0 3674 0 0 0
6,cpu5 2323792 0 182478 30280457 4998 0 1992 0 0 0
7,cpu6 2362644 0 182300 30243776 4233 0 1379 0 0 0
8,cpu7 2272382 0 177629 30341565 3865 0 1084 0 0 0
9,cpu8 2273965 0 169730 30347345 4058 0 1034 0 0 0


The os trace requires four parsing steps:

    1) Parse the space delimited records
    2) Compute forward difference of jiffies (which are recorded as monotonically increasing)
    3) Align thread and system data by cpu
    4) Normalize jiffies data

In [9]:
def parse_stats_record(record):
    name_length = len(record) - 52
    idx = lambda x: x + name_length

    name = ' '.join(record[1:idx(2)])[1:-1]
    state = 1 if record[idx(2)] == 'R' else 0
    jiffies = int(record[idx(13)]) + int(record[idx(14)])
    core = 1 if int(record[idx(38)]) < 20 else 2

    return [name, state, jiffies, core]

In [10]:
trace['os'] = trace['os'].dropna(subset = ['record'])
records = trace['os'].record.str.split(' ').map(parse_stats_record)

trace['os'].loc[:, 'os_thread'] = records.map(lambda x: x[0])
main_id = trace['tid'].set_index('thread').to_dict()['tid']['main']
trace['os'].loc[trace['os'].tid == main_id, 'os_thread'] = 'main'

trace['os'].loc[:, 'state'] = records.map(lambda x: x[1])
trace['os'].loc[:, 'jiffies'] = records.map(lambda x: x[2])
trace['os'].loc[:, 'socket'] = records.map(lambda x: x[3])
trace['os'].pop('record')

trace['os'].jiffies = trace['os'].groupby('tid').jiffies.diff().fillna(0).astype(int)

In [11]:
trace['sys'] = trace['sys'].record[trace['sys'].record.str.contains(r'cpu\d+')]

trace['sys'] = trace['sys'].str.replace('  ', ' ')
trace['sys'] = trace['sys'].str.split(' ')
trace['sys'] = trace['sys'].values.tolist()
trace['sys'] = pd.DataFrame(trace['sys'], columns = ['core'] + ['jiffies{}'.format(i) for i in range(len(trace['sys'][0]) - 1)], dtype = int)

trace['sys'].core = trace['sys'].core.str.replace('cpu', '')
trace['sys'] = trace['sys'].astype(int)
trace['sys']['socket'] = trace['sys'].core.map(lambda c: 1 if c < 20 else 2)
trace['sys'].pop('jiffies3')

trace['sys']['jiffies'] = trace['sys'][[col for col in trace['sys'].columns if 'core' not in col]].sum(axis = 1).astype(int)

cores = len(trace['sys'].core.unique())
epochs = trace['os'].epoch.unique()
epochs = [cores * [epoch] for epoch in epochs]
trace['sys']['epoch'] = [e for epoch in epochs for e in epoch][:len(trace['sys'])]

trace['sys'] = trace['sys'].groupby(['epoch', 'socket'])['jiffies'].sum().reset_index()
trace['sys'].jiffies = trace['sys'].groupby(['socket']).jiffies.diff().fillna(0).astype(int)

In [12]:
display(trace['os'].head(20))
display(trace['sys'].head(20))

Unnamed: 0,epoch,tid,timestamp,os_thread,state,jiffies,socket
0,4,8372,1565621927798,java,0,0,2
1,4,8373,1565621927798,main,1,0,2
2,4,8374,1565621927798,GC Thread#0,0,0,2
3,4,8375,1565621927798,G1 Main Marker,0,0,2
4,4,8376,1565621927798,G1 Conc#0,0,0,2
5,4,8377,1565621927798,G1 Refine#0,0,0,2
6,4,8378,1565621927798,G1 Young RemSet,0,0,1
7,4,8379,1565621927798,VM Thread,0,0,1
8,4,8380,1565621927798,Reference Handl,0,0,1
9,4,8381,1565621927798,Finalizer,0,0,2


Unnamed: 0,epoch,socket,jiffies
0,4,1,0
1,4,2,0
2,8,1,0
3,8,2,2
4,12,1,1
5,12,2,0
6,16,1,0
7,16,2,1
8,20,1,1
9,20,2,0


To align the application and system records, we need to smooth out the data. Jiffies are updated by a timer, which is unique for each system component. Since our sampling rate is not aware of the timer, we can't claim a one-to-one mapping. We scan for the first record that satisfies

$j_{sys}(t) \geq j_{app}(t) \cap j_{sys}(t) > 0$

and accumulate all previous records to compute application jiffies consumption for all epochs in that group. We continue to step forward until are records are scanned.

In [13]:
df = pd.merge(
    trace['os'].groupby(['epoch', 'socket'])[['jiffies']].sum(),
    trace['sys'],
    on = ['epoch', 'socket'],
    suffixes = ('_app', '_sys')
).set_index(['epoch', 'socket'])

In [14]:
dfs = []
for socket, g in df.reset_index().groupby('socket'):
    g = g.sort_values('epoch')
    
    A = S = curr = 0
    idx = []
    vals = []
    j = g[['epoch', 'jiffies_app', 'jiffies_sys']].values
    for i, a, s in j:
        if a > 0 and A + a <= S + s:
            idx.append(i)
            vals.append([A + a, S + s])
            A = S = 0
        else:
            A += a
            S += s
    j = pd.DataFrame(index = idx, data = vals, columns = ['j_app', 'j_sys'])
    j.index.name = 'epoch'
    j['jiffies'] = j.j_app / j.j_sys
    g = pd.concat([g.set_index('epoch'), j], axis = 1).bfill().fillna(0).reset_index().set_index(['epoch', 'socket'])
    dfs.append(g)
j = pd.concat(dfs)[['jiffies']].sort_index()

In [15]:
trace['os'] = pd.merge(trace['os'], j, on = ['epoch', 'socket'], how = 'left', suffixes = ('_raw', '')).drop(columns = 'jiffies_raw')

In [16]:
sys.exi

NameError: name 'sys' is not defined

In [22]:
tid_mapped = pd.merge(trace['vm'][trace['vm'].tid > 0], trace['os'], on = ['epoch', 'tid'], suffixes = ('', '_'))[['epoch', 'thread', 'id', 'tid', 'state', 'jiffies']]

In [35]:
name_mapped = pd.merge(trace['os'][~trace['os'].os_thread.isin(tid_mapped.thread.unique())], trace['vm'][~trace['vm'].thread.isin(tid_mapped.thread.unique())], on = ['epoch', 'os_thread'], how = 'left', suffixes = ('', '_'))#
name_mapped.thread = name_mapped.thread.fillna(name_mapped.os_thread)
name_mapped.state = name_mapped.state.fillna(name_mapped.state_)
name_mapped.id = name_mapped.id.fillna(-1)
name_mapped = name_mapped[['epoch', 'timestamp', 'thread', 'id', 'tid', 'state', 'jiffies']]

In [47]:
d = trace['vm'][['epoch', 'timestamp']].drop_duplicates().set_index('epoch').to_dict()['timestamp']

In [56]:
df = pd.concat([tid_mapped, name_mapped])
df.timestamp = df.epoch.apply(lambda x: d[x])
df = df.set_index(['epoch', 'thread'])
df.state = df.state.astype(float) / (df.groupby('epoch').state.sum())
df = df.reset_index().set_index('epoch')

In [60]:
trace['energy']

Unnamed: 0,epoch,socket,package,dram
0,1,1,133832.796143,64344.014400
1,1,2,32478.994873,35307.991095
2,2,1,133832.843506,64344.022500
3,2,2,32479.024170,35307.997335
4,3,1,133832.870117,64344.026520
5,3,2,32479.084839,35308.007715
6,4,1,133832.948120,64344.038400
7,4,2,32479.147644,35308.018095
8,5,1,133832.948120,64344.038400
9,5,2,32479.177795,35308.023285


In [59]:
pd.concat([df, trace['energy'].set_index('epoch')], axis = 1)

InvalidIndexError: Reindexing only valid with uniquely valued Index objects

ENERGY

Now that both the vm and os data have been fully processed, we can align them along epoch and backfill the 

In [None]:
pd.concat(vm, j)

# BELOW NEEDS TO BE REVISED PENDING ABOVE #

In [None]:
import random

import pandas as pd
import numpy as np

max_epoch = 20

vm_trace = pd.DataFrame(index = list(range(1, max_epoch)), data = [random.random() for i in range(1, max_epoch)], columns = ['vm_state'])
os_trace = pd.DataFrame(index = list(range(1, max_epoch, 2)), data = [random.random() for i in range(1, max_epoch, 2)], columns = ['os_state'])
nrg_trace = pd.DataFrame(index = list(range(1, max_epoch, 3)), data = [10 * random.random() for i in range(1, max_epoch, 3)], columns = ['energy'])
mthd_trace = pd.DataFrame(index = list(range(1, max_epoch, 5)), data = [chr(random.randint(0, 25) + 65) for i in range(1, max_epoch, 5)], columns = ['stack_trace'])

display(vm_trace)
display(os_trace)
display(nrg_trace)
display(mthd_trace)

In order to attribute energy to a method, we need to align all records along a shared timestamp axis. However, since the traces are not guaranteed to share timestamps, there will be holes that need to be filled:

In [None]:
aligned_trace = pd.concat([vm_trace, os_trace, nrg_trace, mthd_trace], axis = 1)
aligned_trace

In the principled attribution model we discussed early today, we need to find the first timestamp such that all 4 values have changed. The question is how do we fill holes in the data.

I propose what we can do is the following:

1) Back fill all values

In [None]:
trace = aligned_trace.bfill()
trace

2) Find the longest stretch of identical values over time adjacent records

In [None]:
matches = dict(zip(aligned_trace.columns, [False] * 4))
i = 0
g = 0
group_id = []
for _ in range(100):
    if len(trace) == 0:
        break
    i += 1
    record = (trace != trace.shift(-i)).head(1).to_dict('record')[0]
    matches = {key: matches[key] | record[key] for key in matches}
    if sum(int(val) for val in matches.values()) == len(trace.columns):
        g += 1
        subtrace = trace.head(i)
        subtrace.loc[:, 'group'] = g
        group_id.append(subtrace['group'])
        trace = trace.tail(len(trace) - i)
        matches = dict(zip(aligned_trace.columns, [False] * 4))
        i = 0

group_id = pd.concat(group_id)
trace = pd.concat([aligned_trace, group_id.astype(int)], axis = 1)
trace

Now we need to settle on an attribution method for the values of each group, which I believe is where we got stuck. In this example, the timestamps are evenly spaced, so we could aggregate:

In [None]:
df = trace.groupby('group').agg({'vm_state': 'mean', 'os_state': 'mean', 'energy': 'sum', 'stack_trace': set})
df.energy *= df.os_state * df.vm_state
df.stack_trace = df.stack_trace.map(lambda a: {x for x in a if x == x})
df.stack_trace = df.stack_trace.map(lambda x: list(x)[0] if len(x) > 0 else 'end')
df.index = df.index.astype(int)
df[['energy', 'stack_trace']]

In [None]:
max_epoch = 1000

vm_trace = pd.DataFrame(index = list(range(1, max_epoch)), data = [random.random() for i in range(1, max_epoch)], columns = ['vm_state'])
os_trace = pd.DataFrame(index = list(range(1, max_epoch)), data = [random.random() for i in range(1, max_epoch)], columns = ['os_state'])
nrg_trace = pd.DataFrame(index = list(range(1, max_epoch)), data = [10 * random.random() for i in range(1, max_epoch)], columns = ['energy'])
mthd_trace = pd.DataFrame(index = list(range(1, max_epoch)), data = [chr(random.randint(0, 25) + 65) for i in range(1, max_epoch)], columns = ['stack_trace'])

aligned_trace = pd.concat([vm_trace, os_trace, nrg_trace, mthd_trace], axis = 1)
for col in aligned_trace.columns:
    aligned_trace.loc[aligned_trace.index.isin(np.random.randint(0, 1000, 300)), col] = np.nan
    
aligned_trace

In [None]:
trace = aligned_trace.bfill()
trace

matches = dict(zip(aligned_trace.columns, [False] * 4))
i = 0
g = 0
group_id = []
for _ in range(100):
    if len(trace) == 0:
        break
    i += 1
    record = (trace != trace.shift(-i)).head(1).to_dict('record')[0]
    matches = {key: matches[key] | record[key] for key in matches}
    if sum(int(val) for val in matches.values()) == len(trace.columns):
        g += 1
        subtrace = trace.head(i)
        subtrace.loc[:, 'group'] = g
        group_id.append(subtrace['group'])
        trace = trace.tail(len(trace) - i)
        matches = dict(zip(aligned_trace.columns, [False] * 4))
        i = 0

group_id = pd.concat(group_id)
trace = pd.concat([aligned_trace, group_id.astype(int)], axis = 1)
trace

df = trace.groupby('group').agg({'vm_state': 'mean', 'os_state': 'mean', 'energy': 'sum', 'stack_trace': set})
df.energy *= df.os_state * df.vm_state
df.stack_trace = df.stack_trace.map(lambda a: {x for x in a if x == x})
df.stack_trace = df.stack_trace.map(lambda x: list(x)[0] if len(x) > 0 else 'end')
df.index = df.index.astype(int)
df[['energy', 'stack_trace']]