In [1]:
import presto
import pandas as pd
import numpy as np
import pulsar
import os
import plotly.express as px
import plotly.graph_objects as go
import json
import csv

In [2]:
os.chdir('../')
from src.MV2 import cfg, schema

In [3]:
pulsar_url="pulsar://localhost:6650"
client = pulsar.Client(pulsar_url)

In [4]:
def presto_query(query, user='test', catalog='pulsar', schema='public/default', host='localhost', port=8081):
    conn = presto.dbapi.connect(
        host=host,
        port=port,
        user=user,
        catalog=catalog,
        schema=schema)
    cur = conn.cursor()
    cur.execute(query)
    data = cur.fetchall()
    columns = cur.description
    df = pd.DataFrame(data)
    df.columns = [x[0] for x in columns]
    return df.replace('', np.nan)

def get_all_data(topic, schema):
    data = []

    reader = client.create_reader(topic, start_message_id=pulsar.MessageId.earliest)

    while reader.has_message_available():
        msg = reader.read_next()
        data.append(json.loads(msg.data()))
        #print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        # No acknowledgment
    return data

In [5]:
topic = f"persistent://public/default/allocation_topic"
data = get_all_data(topic, schema.AllocationSchema)
dfa = pd.DataFrame(data)
dfa['supplier_times'] = dfa.apply(lambda row: -row["supplieroffertimestamps"][0] + row["timestamp"], axis=1)
dfa['customer_times'] = dfa.apply(lambda row: -row["customeroffertimestamp"] + row["timestamp"], axis=1)
dfa['customer_alo_to_start'] = dfa.apply(lambda row: -row["timestamp"] + row["start"], axis=1)
#supplier_times = dfa.loc[df['supplier_times'] < 3, 'supplier_times']
#customer_times = dfa.loc[df['customer_times'] < 3, 'customer_times']

In [6]:
dfa.head()

Unnamed: 0,jobid,allocationid,customer,suppliers,start,end,service_name,price,replicas,timestamp,customerofferid,supplierofferids,customeroffertimestamp,supplieroffertimestamps,supplierbehaviors,supplier_times,customer_times,customer_alo_to_start
0,a6ae0163-5356-4f76-b1d4-f3928140bb82,f3ec9c44-da5d-4084-9802-9fef9525667d,c-1-1,[s-1-5],1604342000.0,1604342000.0,rand_nums,1e-06,1,1604342000.0,1817eec9-d0b6-4d15-93d6-c62892aea608,[bad9d569-7de1-4f7f-863a-64d658b67832],1604342000.0,[1604341576.1064682],[correct],0.870915,0.03794,118.480733
1,95589eb7-6fe6-42ec-aef1-6a4d209b65c7,7ea6654d-8049-4d20-90e9-35fd298d8f0a,c-1-0,[s-1-3],1604342000.0,1604342000.0,rand_nums,1e-06,1,1604342000.0,9d614130-4aae-484c-ada3-8f011df96bc6,[9727e1f9-5e4a-4e54-9b7f-b4202566c84b],1604342000.0,[1604341576.114783],[correct],0.953643,0.090401,118.38969
2,03d09336-f8bf-4b9a-989f-0a6f627c0693,c6e4331b-ea5a-406b-85f9-295f4692b1d0,c-1-4,[s-1-2],1604342000.0,1604342000.0,rand_nums,1e-06,1,1604342000.0,ae7eb0ca-3374-4efd-99f9-0d9d0fd5c0f9,[0ef584b8-8ada-41e0-91e3-6a5ab7550f25],1604342000.0,[1604341576.156399],[correct],1.01476,0.183345,118.286957
3,fc8fd5c9-507c-41f7-aeef-927b82996c7d,73d10798-9332-49d2-91df-acd228fde0c1,c-1-3,[s-1-6],1604342000.0,1604342000.0,rand_nums,1e-06,1,1604342000.0,1c4377ad-ad54-4e4b-8899-a5e780bf4fcc,[845e0d76-2af7-4098-9cef-f6da119be2a5],1604342000.0,[1604341576.1816058],[correct],1.132145,0.324977,118.144365
4,f4d64ca8-16ed-43b4-bc3c-ffadc6660301,f3895fb4-7e85-4b49-8626-fc6a374c5783,c-1-2,[s-1-13],1604342000.0,1604342000.0,rand_nums,1e-06,1,1604342000.0,53580240-8322-45e0-ad2e-283c2f708185,[c21997d0-a9fe-45b3-8d7d-5069c444bfba],1604342000.0,[1604341576.196383],[correct],1.218592,0.416275,118.043141


In [7]:
dfs = []
for customer in dfa['customer'].unique().tolist():
    #df = presto_query("SELECT * FROM output", schema=f"{customer}/rand_nums")
    topic = f"persistent://{customer}/rand_nums/output"
    data = get_all_data(topic, schema.OutputDataSchema)
    dfs.append(pd.DataFrame(data))
    #dfs.append(df)
dfo = pd.concat(dfs, axis=0)
dfo['output_times'] = dfo.apply(lambda row: float(row['suppliertimestamp']) - float(row['customertimestamp']), axis=1)

In [8]:
dfo.head()

Unnamed: 0,value,customer,service_name,jobid,start,end,supplier,allocationid,customertimestamp,suppliertimestamp,msgnum,output_times
0,3,c-1-1,rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,1604342000.0,1604342000.0,s-1-5,f3ec9c44-da5d-4084-9802-9fef9525667d,1604342000.0,1604342000.0,0,0.037809
1,1,c-1-1,rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,1604342000.0,1604342000.0,s-1-5,f3ec9c44-da5d-4084-9802-9fef9525667d,1604342000.0,1604342000.0,1,0.026313
2,3,c-1-1,rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,1604342000.0,1604342000.0,s-1-5,f3ec9c44-da5d-4084-9802-9fef9525667d,1604342000.0,1604342000.0,2,0.020341
3,3,c-1-1,rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,1604342000.0,1604342000.0,s-1-5,f3ec9c44-da5d-4084-9802-9fef9525667d,1604342000.0,1604342000.0,3,0.021506
4,4,c-1-1,rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,1604342000.0,1604342000.0,s-1-5,f3ec9c44-da5d-4084-9802-9fef9525667d,1604342000.0,1604342000.0,4,0.006954


In [16]:
dfs = []
for customer in dfa['customer'].unique().tolist():
    #df = presto_query("SELECT * FROM output", schema=f"{customer}/rand_nums")
    topic = f"persistent://{customer}/rand_nums/mediation"
    data = get_all_data(topic, schema.MediationSchema)
    #print(data)
    dfs.append(pd.DataFrame(data))
    #dfs.append(df)
dfm = pd.concat(dfs, axis=0)
dfm['mediation_times'] = dfm.apply(lambda row: float(row['mediationtimestamp']) - float(row['checktimestamp']), axis=1)

In [17]:
dfm.head()

Unnamed: 0,result,customer,supplierspass,suppliersfail,service_name,jobid,allocationid,checktimestamp,mediationtimestamp,mediation_times
0,pass,c-1-1,[none],[s-1-5],rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,f3ec9c44-da5d-4084-9802-9fef9525667d,1604342000.0,1604342000.0,0.007826
1,pass,c-1-1,[none],[s-1-11],rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,36144c7f-2325-427c-a3d0-c2f194c2cc58,1604342000.0,1604342000.0,0.041281
2,pass,c-1-1,[none],[s-1-9],rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,c72ab35f-56b5-4281-9c2c-0d8abd6db4bf,1604342000.0,1604342000.0,0.026625
3,pass,c-1-1,[none],[s-1-13],rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,5d573317-d3bb-4d43-9378-c7cdc1f9e214,1604342000.0,1604342000.0,0.023583
4,pass,c-1-1,[none],[s-1-1],rand_nums,a6ae0163-5356-4f76-b1d4-f3928140bb82,77e81295-d718-4edd-bc42-a812282ca79d,1604342000.0,1604342000.0,0.110044


In [11]:
dfs = []
for customer in dfa['customer'].unique().tolist():
    #df = presto_query("SELECT * FROM output", schema=f"{customer}/rand_nums")
    topic = f"persistent://{customer}/rand_nums/check"
    data = get_all_data(topic, schema.CheckSchema)
    dfs.append(pd.DataFrame(data))
    #dfs.append(df)
dfc = pd.concat(dfs, axis=0)

In [12]:
dfc.tail()

Unnamed: 0,result,customer,suppliers,supplierbehaviors,service_name,jobid,allocationid,timestamp
6,pass,c-1-2,[s-1-3],[correct],rand_nums,f4d64ca8-16ed-43b4-bc3c-ffadc6660301,40fdc0d0-0026-407d-b274-f1305ee44265,1604342000.0
7,pass,c-1-2,[s-1-4],[correct],rand_nums,f4d64ca8-16ed-43b4-bc3c-ffadc6660301,379e0c92-f09f-4bb1-94fd-5da0ff8e2ed0,1604342000.0
8,pass,c-1-2,[s-1-9],[correct],rand_nums,f4d64ca8-16ed-43b4-bc3c-ffadc6660301,75568547-557f-47c5-8a1b-1d2b4d963a57,1604342000.0
9,pass,c-1-2,[s-1-13],[correct],rand_nums,f4d64ca8-16ed-43b4-bc3c-ffadc6660301,58518da5-c5e8-4fee-b809-346663138d14,1604342000.0
10,pass,c-1-2,[s-1-1],[correct],rand_nums,f4d64ca8-16ed-43b4-bc3c-ffadc6660301,a56b616e-b920-4d88-a32b-7034e12fd860,1604342000.0


In [21]:
fig = go.Figure()
#fig.add_trace(go.Box(y=dfa['supplier_times'].values.tolist()))
#fig.add_trace(go.Box(y=dfa['customer_times'].values.tolist()))
#fig.add_trace(go.Box(y=dfa['customer_alo_to_start'].values.tolist()))
#fig.add_trace(go.Box(y=dfo['output_times'].values.tolist()))
fig.add_trace(go.Box(y=dfm['mediation_times'].values.tolist()))
#fig.add_trace(go.Box(y=output_times))

fig.show()

In [20]:
#dfa.to_csv(os.path.join(os.getcwd(), "notebooks", "data", "dfa2.csv"))
#dfo.to_csv(os.path.join(os.getcwd(), "notebooks", "data", "dfo2.csv"))
dfm.to_csv(os.path.join(os.getcwd(), "notebooks", "data", "dfm.csv"))

In [22]:
def write_to_csv(data, name, output_file_name):
    with open(os.path.join(os.getcwd(), "notebooks", "data", output_file_name), "w") as f:
        writer = csv.writer(f)
        writer.writerow([name])
        d = [[x] for x in data]
        writer.writerows(d)

In [23]:
x = dfm['mediation_times'].values.tolist()
write_to_csv(x, 'time', 'mediation.csv')