In [1]:
import json
import json_coder
import pandas as pd
import networkx as nx
import befaas as bf
import befaas.logentry as le
from befaas.graph import build_function_graph, add_default_metadata, conv_to_ms

filepath = "../dumps/"
dump = "dump_smartFactory.json"
outfileNetwork = "../img/2023_smartFactory_networkTransmission.pkl"
outfileTrigger = "../img/2023_smartFactory_triggerDelay.pkl"
networkData = []
triggerData = []

print("Everything is set up.")

Everything is set up.


In [2]:
print(f"Include dump {dump} ...")
print("Load dump... (may take a while)")
data = bf.load_logs(filepath + dump)
print("Done.")

print("Sort data to contexts...")
contexts = {}

for entry in data:
    id = entry.context_id
    if id == None:
        continue

    if not (id in contexts):
        contexts[id] = []

    contexts[id].append(entry)
print("Done.")

print("Find xpairs...")
xpairs = {}

for ctx_id in contexts:
    ctx = contexts[ctx_id]
    for entry in ctx:
        id = entry.event["xPair"]
        if not (id in xpairs):
            xpairs[id] = []

        added = False

        if isinstance(entry, le.PerfLog):
            if not entry.perf_type_data == "":
                parts = entry.perf_type_data.split(":")
                if len(parts) > 1:
                    id = parts[1]
                    if not (id in xpairs):
                        xpairs[id] = []
                    if not (str(entry.perf_type[1]).__contains__("db")):
                        xpairs[id].append(entry)
                        added = True
            else:
                xpairs[id].append(entry)
                added = True

        if isinstance(entry, le.RequestLog):
            xpairs[id].append(entry)
            added = True

        if isinstance(entry, le.ArtilleryLog):
            xpairs[id].append(entry)
            added = True

        #if not added:
            # print(entry)

print("done.")

Include dump dump_smartFactory.json ...
Load dump... (may take a while)
Done.
Sort data to contexts...
Done.
Find xpairs...
done.


In [3]:
print("Calculate delays...")
errors = []

for xpairID in xpairs:
    xpair = xpairs[xpairID]

    src = ""
    dst = ""
    srcFun = ""
    dstFun = ""
    eventFun = ""
    # Transmission delay between calling and called provider, round-trip
    transmission = 0
    # Publisher execution time
    publisher = 0
    # Trigger delay: start of publisher to start of event function
    trigger = 0

    isArtillery = False
    callingDuration = 0
    calledDuration = 0
    triggerStart = None
    triggerEnd = None

    if len(xpair) > 8:

        for entry in xpair:
            if isinstance(entry, le.ArtilleryLog):
                isArtillery = True;
                break;
            if isinstance(entry, le.PerfLog):
                if str(entry.perf_type[0]) == "measure" and str(entry.perf_type[1]) == "rpcIn":
                    dst = entry.platform
                    publisher = entry.perf["duration"]
                    calledDuration = entry.perf["duration"]
                if str(entry.perf_type[0]) == "measure" and str(entry.perf_type[1]) == "rpcOut":
                    src = entry.platform
                    callingDuration = entry.perf["duration"]
                    srcFun = entry.function
                    dstFun = entry.perf_type_data.split(":")[0]
                if str(entry.perf_type[0]) == "start" and str(entry.perf_type[1]) == "rpcIn":
                    triggerStart = entry.timestamp
                if str(entry.perf_type[0]) == "start" and str(entry.perf_type[1]) == "msg":
                    triggerEnd = entry.timestamp
                    eventFun = entry.function

        if not isArtillery:
            transmission = callingDuration - calledDuration
            row = {}
            row["pair"] = src + " - " + dst
            row["latency"] = transmission
            row["call"] = srcFun + " - " + dstFun
            if (transmission <= 0):
                errors.append("Error in transmission latency for row " + json.dumps(row))
            else:
                networkData.append(row)


            trigger = (triggerEnd - triggerStart).microseconds / 1000
            row = {}
            row["pair"] = dst
            row["kind"] = "publisher"
            row["latency"] = publisher
            row["call"] = srcFun + " - " + dstFun
            row["eventFun"] = eventFun

            if (publisher <= 0):
                errors.append("Error in publisher execution duration for row " + json.dumps(row))
            else:
                #print(row)
                triggerData.append(row)

            row = {}
            row["pair"] = dst
            row["kind"] = "trigger"
            row["latency"] = trigger
            row["call"] = srcFun + " - " + dstFun
            row["eventFun"] = eventFun

            if (trigger <= 0):
                errors.append("Error in trigger latency for row " + json.dumps(row))
            else:
                #print(row)
                triggerData.append(row)

    else:
        errors.append("Just " + str(len(xpair)) + " values, skip.")

for e in errors:
    print(e)


Calculate delays...
Just 1 values, skip.
Error in trigger latency for row {"pair": "google", "kind": "trigger", "latency": 0.0, "call": "cushionOrder - publisher", "eventFun": "cushionProduction"}


In [4]:
df_calls = pd.DataFrame(networkData)
df_calls.head()
df_calls.describe()
df_calls.to_pickle(outfileNetwork)

In [5]:
df_calls = pd.DataFrame(triggerData)
df_calls.head()
df_calls.describe()
df_calls.to_pickle(outfileTrigger)

