In [171]:
# read graphs and build graph pool 
import graph_tool.all as gt

import sys
sys.path.append('/local0/Kariz/code')
import utils.objectstore as object_store

def build_input_format(inputs_str):
    return ' '.join(inputs_str.split(':')).split()

def load_graph_templates(path):
    graph_skeletons = {}
    with open(path, 'r') as fd:
        graph_strs = fd.read().split('#')[1:]
        for g_str in graph_strs:
            g= build_graph_skeleton(g_str)
            graph_skeletons[g.gp.name] = g
    return graph_skeletons


def build_graph_skeleton(g_str):
    g_elements = g_str.split('\n')
    g_name = g_elements[0].split('\t')[1]
    g_id = 0
    g_queuetime = 0

    g = gt.Graph(directed=True)
    g.gp['name'] = g.new_graph_property("string", g_name)
    g.gp['id'] = g.new_graph_property("string", str(g_id))
    g.gp['queue_time'] = g.new_graph_property("int", g_queuetime)
    g.gp['cur_stage'] = g.new_graph_property("int", -1)
    status = g.new_vertex_property("int")
    inputs = g.new_vertex_property("string")
    cache_runtime = g.new_vertex_property("float")
    remote_runtime = g.new_vertex_property("int")
    color = g.new_vertex_property("string")
    ops = g.new_vertex_property("string")
    vids = g.new_vertex_property("int")
    id_v = {}
    
    # build vertices
    for el in g_elements[1:]:
        if el.startswith('v'):
            vid, operation, inputs_str = el.split(',')[1:]
            v = g.add_vertex()
            vids[v] = int(vid)
            id_v[int(vid)] = v
            inputs[v] = inputs_str #build_input_format(inputs_str)
            #print(inputs_str)
            color[v] = '#fb8072' if len(inputs[v]) > 0 else '#80b1d3'
            cache_runtime[v] = 0
            remote_runtime[v] = 0
            ops[v] = operation if operation else 'SAVE'

    # build edges
    for el in g_elements[1:]:
        if el.startswith('e'):
            v_src, v_dest = el.split(',')[1:]
            e = g.add_edge(id_v[int(v_src)], id_v[int(v_dest)])

    g.vp['id'] = vids
    g.vp['color'] = color
    g.vp['tables'] = inputs
    g.vp['remote_runtime'] = remote_runtime
    g.vp['cache_runtime'] = cache_runtime
    g.vp['status'] = status
    g.vp['feature'] = ops
    print('Load query:', g_name)
    return g

graphs_pool = load_graph_templates('/local0/Kariz/expriments/simulator/multidag/%s'%('pig.tpch.template'))

inputs = object_store.load_object_meta('/local0/Kariz/expriments/simulator/multidag/config/inputs.csv')

Load query: TPCH_Q12
Load query: TPCH_Q17
Load query: TPCH_Q9
Load query: TPCH_Q4
Load query: TPCH_Q16
Load query: TPCH_Q8
Load query: TPCH_Q7
Load query: TPCH_Q22
Load query: TPCH_Q19
Load query: TPCH_Q6
Load query: TPCH_Q3
Load query: TPCH_Q5
Load query: TPCH_Q15
Load query: TPCH_Q18
Load query: TPCH_Q13
Load query: TPCH_Q10
Load query: TPCH_Q14
Load query: TPCH_Q20
Load query: TPCH_Q21
Load query: TPCH_Q2
Load query: TPCH_Q11
Load query: TPCH_Q1


In [98]:
import random 

def choose_input(seen_objects):
    reuse = random.choices([1, 0], cum_weights=(reuse_ratio, 1.00), k=1)[0]
    table, _ = random.choice(list(seen_objects.items())) if ((reuse) and (len(seen_objects) > 5)) else random.choice(list(inputs.items())) 
    seen_objects[table] = 1 if table not in seen_objects else seen_objects[table]+1
    return table

def random_query():
    gid, g = random.choice(list(graphs_pool.items()))
    return gid, g.copy()
    
reuse_ratio = 0.32
cfg_n_similar = 6
max_dag_concurrency=10
min_dag_concurrency=1
n_iterations = 10

seen_objects = {}
workload_str=''
g = None
for it in range(0, n_iterations):
    n_similar = cfg_n_similar
    n_concurrent_dags = 10 #random.randint(min_dag_concurrency, max_dag_concurrency)
    workload_str += ("%" + 'r,%d,%d\n'%(it,n_concurrent_dags))
    for i in range(0, n_concurrent_dags):
        if not (g and g.num_vertices() > 4 and n_similar > 1):
            gid, g = random_query()
        else:
            n_similar -= 1
            
        workload_str += ('#t,%d%d,%s\n'%(it,i,gid))
        for v in g.vertices():
            t_compute = random.randint(5, 100)
            t_reduction = 1
            if len(g.vp.tables[v]) > 0: 
                table = choose_input(seen_objects) 
                g.vp.tables[v] = table
                t_reduction = random.uniform(0.3, 1)
            workload_str += ('v,%d,%s,%s,%d,%.2f\n'%(g.vp.id[v],g.vp.feature[v],g.vp.tables[v],
                                            t_compute, t_reduction))
        for e in g.edges():
            workload_str += ('e,%d,%d\n'%(g.vp.id[e.source()],g.vp.id[e.target()]))
    
print(workload_str)
workload_file = '/local0/Kariz/expriments/simulator/multidag/config/synthetic_worload_8.g'
with open(workload_file, 'w') as fd:
    fd.write(workload_str)

%r,0,10
#t,00,TPCH_Q13
v,0,COGROUP,o12,41,0.74
v,1,GROUP_BY:COMBINER,,85,1.00
v,2,SAMPLER,,16,1.00
v,3,ORDER_BY,,18,1.00
e,0,1
e,1,2
e,2,3
#t,01,TPCH_Q18
v,0,COGROUP,g16,85,0.30
v,1,HASH_JOIN,w12,88,0.81
v,2,GROUP_BY:COMBINER,,8,1.00
v,3,SAMPLER,,49,1.00
v,4,ORDER_BY,,61,1.00
e,0,1
e,1,2
e,2,3
e,3,4
#t,02,TPCH_Q18
v,0,COGROUP,e8,41,0.36
v,1,HASH_JOIN,b16,28,0.74
v,2,GROUP_BY:COMBINER,,27,1.00
v,3,SAMPLER,,87,1.00
v,4,ORDER_BY,,17,1.00
e,0,1
e,1,2
e,2,3
e,3,4
#t,03,TPCH_Q18
v,0,COGROUP,o18,61,0.46
v,1,HASH_JOIN,b16,51,0.48
v,2,GROUP_BY:COMBINER,,6,1.00
v,3,SAMPLER,,53,1.00
v,4,ORDER_BY,,79,1.00
e,0,1
e,1,2
e,2,3
e,3,4
#t,04,TPCH_Q18
v,0,COGROUP,c5,53,0.64
v,1,HASH_JOIN,c5,18,0.68
v,2,GROUP_BY:COMBINER,,65,1.00
v,3,SAMPLER,,79,1.00
v,4,ORDER_BY,,39,1.00
e,0,1
e,1,2
e,2,3
e,3,4
#t,05,TPCH_Q18
v,0,COGROUP,l18,84,0.67
v,1,HASH_JOIN,g16,23,0.73
v,2,GROUP_BY:COMBINER,,26,1.00
v,3,SAMPLER,,30,1.00
v,4,ORDER_BY,,43,1.00
e,0,1
e,1,2
e,2,3
e,3,4
#t,06,TPCH_Q18
v,0,COGROUP,e0,44,0.60
v,1,HASH_JOIN

In [125]:
# prepare inputs
import string
import random

input_min = 1
input_max = 256

index_min = 11
index_max = 20

fpath = '/local0/Kariz/expriments/simulator/multidag/config/inputs.csv'

input_str=''
for c in string.ascii_lowercase:
    for i in range(index_min, index_max):
        input_str += '%s%d,%d\n'%(c, i, random.randint(input_min, input_max))

with open(fpath, 'a') as fd:
    fd.write(input_str)

inputs= []
for i in range(0, n_iterations):
    alphabets = list(string.ascii_lowercase)
    inputs.append(random.sample(alphabets, k=5))



In [159]:
metadatas={'lineitem': {}, 'part': {}, 
           'nation': {}, 'supplier': {}, 'partsupp': {}, 
           'orders': {}, 'customers': {} , 'region': {}}
inputs=['lineitem', 'part', 'nation', 'supplier', 'partsupp', 'orders', 'customers', 'region']

input_str=''
for it in range(0, n_iterations):
    for al in inputs:
        size = random.randint(10, 256)
        for i in range(0, max_dag_concurrency):
            metadatas[al]['%d_%d'%(it,i)] = size
            input_str += '%s_%d_%d,%d\n'%(al, it, i, size)

fpath = '/local0/Kariz/expriments/simulator/multidag/config/inputs.csv'
with open(fpath, 'a') as fd:
    fd.write(input_str)
print(input_str)

lineitem_0_0,243
lineitem_0_1,243
lineitem_0_2,243
lineitem_0_3,243
lineitem_0_4,243
lineitem_0_5,243
lineitem_0_6,243
lineitem_0_7,243
lineitem_0_8,243
lineitem_0_9,243
part_0_0,244
part_0_1,244
part_0_2,244
part_0_3,244
part_0_4,244
part_0_5,244
part_0_6,244
part_0_7,244
part_0_8,244
part_0_9,244
nation_0_0,206
nation_0_1,206
nation_0_2,206
nation_0_3,206
nation_0_4,206
nation_0_5,206
nation_0_6,206
nation_0_7,206
nation_0_8,206
nation_0_9,206
supplier_0_0,236
supplier_0_1,236
supplier_0_2,236
supplier_0_3,236
supplier_0_4,236
supplier_0_5,236
supplier_0_6,236
supplier_0_7,236
supplier_0_8,236
supplier_0_9,236
partsupp_0_0,85
partsupp_0_1,85
partsupp_0_2,85
partsupp_0_3,85
partsupp_0_4,85
partsupp_0_5,85
partsupp_0_6,85
partsupp_0_7,85
partsupp_0_8,85
partsupp_0_9,85
orders_0_0,25
orders_0_1,25
orders_0_2,25
orders_0_3,25
orders_0_4,25
orders_0_5,25
orders_0_6,25
orders_0_7,25
orders_0_8,25
orders_0_9,25
customers_0_0,209
customers_0_1,209
customers_0_2,209
customers_0_3,209
customer

In [173]:
reuse_ratio = 0.32
cfg_n_similar = 6
max_dag_concurrency=10
min_dag_concurrency=1
n_iterations = 10

seen_objects = {}

random_dags = {}

def query_runtime(g):
    roots = [v for v in g.vertices() if v.in_degree() == 0]
    leaves = [v for v in g.vertices() if v.out_degree() == 0]
    runtime = g.new_edge_property("float")
    t_query = []
    
    if g.num_edges() == 0: return g.vp.remote_runtime[0]
    
    for e in g.edges(): runtime[e] = -1*g.vp.remote_runtime[e.source()]
    g.ep['runtime'] = runtime
    
    predict_list= []
    for root in roots:
        for leaf in leaves:
            t_query.append(-1*gt.shortest_distance(g, g.vertex(root), g.vertex(leaf), weights=g.ep.runtime, 
                                            negative_weights=True, pred_map=None))
    return max(t_query)
    

n_inputs = 0
workload_str=''
g = None
for it in range(0, n_iterations):
    for i in range(0, n_concurrent_dags):
        gid, g = random_query()
        n_inputs = 0
        if it not in random_dags: random_dags[it] = {'graphs' : [], 'runtimes': [], 'shared': []}
        
        for v in g.vertices():
            t_compute = random.randint(5, 100)
            g.vp.remote_runtime[v] = t_compute
            g.vp.cache_runtime[v] = 1
            if len(g.vp.tables[v]) > 0: 
                n_inputs += 1
                g.vp.cache_runtime[v] = random.uniform(0.3, 1)        
        random_dags[it]['graphs'].append(g)
        random_dags[it]['runtimes'].append(query_runtime(g))
        
        
    #random_dags[it]['runtimes'], random_dags[it]['graphs'] = zip(*sorted(zip(random_dags[it]['runtimes'], 
    #                                                                         random_dags[it]['graphs']),
    #                                                                    key=lambda pair: pair[0]))    
    print(it, random_dags[it]['runtimes'])
    

it_sim = {}

for nsim in [0, 2, 4, 6, 8, 10]:
    workload_str = ''
    for it in range(0, n_iterations):
        workload_str += ("%" + 'r,%d,%d\n'%(it,n_concurrent_dags))
        if it not in it_sim:
            it_sim[it] = []
        i=0
        dags_wshare = random.sample(random_dags[it]['graphs'], 2) if nsim else []
        for g in dags_wshare:
            it_sim[it].append(g)
            random_dags[it]['graphs'].remove(g)
            
        for g in it_sim[it]:
            workload_str += ('#t,%d%d,%s\n'%(it,i,g.gp.id))
            for v in g.vertices():
                input_str=''
                if len(g.vp.tables[v]) > 0:  
                    inputs = g.vp.tables[v].split(':')
                    input_str = ':'.join(['%s_%d_%d'%(al,it,0) for al in inputs])
                workload_str += ('v,%d,%s,%s,%d,%.2f\n'%(g.vp.id[v],g.vp.feature[v],input_str,
                                            g.vp.remote_runtime[v], g.vp.cache_runtime[v]))
            for e in g.edges():
                workload_str += ('e,%d,%d\n'%(g.vp.id[e.source()],g.vp.id[e.target()]))
            i += 1
        
        for g in random_dags[it]['graphs']:    
            workload_str += ('#t,%d%d,%s\n'%(it,i,g.gp.id))
            for v in g.vertices():
                input_str=''
                if len(g.vp.tables[v]) > 0:  
                    inputs = g.vp.tables[v].split(':')
                    input_str = ':'.join(['%s_%d_%d'%(al,it,i) for al in inputs])                 
                workload_str += ('v,%d,%s,%s,%d,%.2f\n'%(g.vp.id[v],g.vp.feature[v],input_str,
                                            g.vp.remote_runtime[v], g.vp.cache_runtime[v]))
            for e in g.edges():
                workload_str += ('e,%d,%d\n'%(g.vp.id[e.source()],g.vp.id[e.target()]))
            i += 1
    print('Workflow for %d string was created'%(nsim))
    workload_file = '/local0/Kariz/expriments/simulator/multidag/config/synthetic_worload_md_%d.g'%(nsim)
    with open(workload_file, 'w') as fd:
        fd.write(workload_str)


0 [396.0, 267.0, 12.0, 39, 361.0, 224.0, 18.0, 145.0, 191.0, 339.0]
1 [429.0, 378.0, 121.0, 255.0, 147.0, 224.0, 245.0, 332.0, 351.0, 216.0]
2 [170.0, 117.0, 180.0, 235.0, 50.0, 219.0, 141.0, 369.0, 257.0, 290.0]
3 [135.0, 358.0, 82.0, 160.0, 323.0, 274.0, 515.0, 163.0, 69, 360.0]
4 [146.0, 219.0, 362.0, 193.0, 280.0, 81.0, 149.0, 193.0, 210.0, 194.0]
5 [13.0, 254.0, 163.0, 150.0, 36.0, 91.0, 184.0, 61, 195.0, 304.0]
6 [16.0, 255.0, 71.0, 325.0, 173.0, 98.0, 132.0, 330.0, 339.0, 245.0]
7 [88.0, 354.0, 433.0, 68.0, 124.0, 277.0, 314.0, 327.0, 218.0, 380.0]
8 [158.0, 247.0, 15.0, 117.0, 123.0, 216.0, 85.0, 224.0, 168.0, 345.0]
9 [254.0, 335.0, 64.0, 169.0, 429.0, 197.0, 157.0, 9.0, 22.0, 160.0]
Workflow for 0 string was created
Workflow for 2 string was created
Workflow for 4 string was created
Workflow for 6 string was created
Workflow for 8 string was created
Workflow for 10 string was created


In [None]:
 
for i in range(0, n_concurrent_dags):
    if not (g and g.num_vertices() > 4 and n_similar > 1):
        gid, g = random_query()
    else:
        n_similar -= 1

    workload_str += ('#t,%d%d,%s\n'%(it,i,gid))
    for v in g.vertices():
        t_compute = random.randint(5, 100)
        t_reduction = 1
        if len(g.vp.tables[v]) > 0: 
            table = choose_input(seen_objects) 
            g.vp.tables[v] = table
            t_reduction = random.uniform(0.3, 1)
        workload_str += ('v,%d,%s,%s,%d,%.2f\n'%(g.vp.id[v],g.vp.feature[v],g.vp.tables[v],
                                        t_compute, t_reduction))
    for e in g.edges():
        workload_str += ('e,%d,%d\n'%(g.vp.id[e.source()],g.vp.id[e.target()]))
