In [2]:
import pandas as pd
import numpy as np
from datetime import datetime as dt
from IPython.display import display
from bokeh.io import show, output_notebook
import glob
import re
import os.path
import math
import functools
from bokeh.plotting import figure, ColumnDataSource
from bokeh.models import HoverTool, ranges
pd.options.mode.chained_assignment = None
output_notebook()

In [41]:
class parentree:
    def parensplit(self, s, sep = ","):
        ret = []
        cur = ""
        lvl = 0
        for char in s:
            if char == "(": lvl += 1
            elif char == ")": lvl -=1
            if char == sep and lvl == 0:
                ret.append(cur)
                cur = ""
            else: cur += char
        ret.append(cur)
        return ret
    def __str__(self):
        return self.n + (("(" + ", ".join([ str(item) for item in self.items ]) + ")") if len(self.items) > 0 else "")
    def debugstr(self, indent=""):
        ret = indent + self.n + "\n"
        for item in self.items: ret += item.debugstr(indent + "  ")
        return ret
    def __init__(self, s):
        match = re.match("\\s*([^()]*)(\\((.*)\\))?\\s*", s)
        self.n = match.group(1)
        self.items = [ parentree(item) for item in self.parensplit(match.group(3)) ] if match.group(3) else []
    def __getitem__(self, idx):
        return self.items[idx]
    def __len__(self):
        return len(self.items)

class reader:
    eventcol = "event"

    def tscmp(self, a, b): # Compare two timestamps, treating None as infinitely in the future
        if a is None and b is None: return 0
        if b is None: return -1
        if a is None: return 1
        if a == b: return 0
        return -1 if b > a else 1
    def rpctree_service(self, event, idx): # Extract the host and port from an RPC: 0 for sender, 1 for receiver
        assert(event.n == "RPC")
        assert(idx in (0, 1))
        return (event[idx][1].n[1:], event[idx][2].n)
    def event_initiator(self, uuid, event): # Return the host initiating any event
        if event.n == "RPC": return self.rpctree_service(event, 0)
        else: return self.uuidmap[uuid]
    def service_name(self, host, name): # Defines the formatting of a service name
        return host[0] + " " + host[1] + " " + name
    def tree_service_name(self, subtree): # Get the name of one of the services involved in an RPC
        host = subtree[1].n[1:]
        port = subtree[2].n
        return self.service_name((host, port), self.servicemap[(host, port)])
    def service_start(self, service): # Check the time of the first event corresponding to a service
        start = self.df[self.df.apply(lambda x: self.event_initiator(x["id"], x[self.eventcol]) == service, 1)].sort_values("time")["time"].head(1).tolist()
        if len(start) > 0: return start[0]
        return None
    def filter_apply_events(self, event, filters): # Sequentially apply all filters to an event
        for f in filters:
            if not f.fevent(event): return False
        return True
    def filter_service_basic(self, host, filters): # Sequentially apply all filters to a service
        if host not in self.servicemap: return False
        for f in filters:
            if not f.fservice(host[0], host[1], self.servicemap[host]): return False
        return True
    def filter_apply_services(self, uuid, event, filters): # Apply all service filters to an event based on whether it is an RPC
        if event.n == "RPC":
            return self.filter_service_basic(self.rpctree_service(event, 0), filters) and self.filter_service_basic(self.rpctree_service(event, 1), filters)
        else:
            return self.filter_service_basic(self.uuidmap[uuid], filters)
    def filter_apply_mutations(self, tree, filters): # Apply mutations from all filters to an event
        for f in filters: tree = f.mutate(tree)
        return tree

    def __init__(self, root):
        indf = pd.concat([ pd.read_csv(open(file), sep="\t") for file in glob.glob(root + "/*.tsv") if os.path.getsize(file) > 0 ]) # Load CSVs
        alluuids = indf["id"].unique().tolist()
        indf["time"] = pd.to_datetime(indf["time"], unit="ms") # Add datetime column
        indf[self.eventcol] = indf["type"].map(lambda x: parentree(x)) # Add column of parsed case class trees
        self.uuidrevmap = { self.rpctree_service(row[self.eventcol], 1) : row["id"] for row in indf[indf[self.eventcol].apply(lambda x: x.n == "RPC")][["id", self.eventcol]].to_dict("records") } # Map from IP and port to trace UUID
        missing_uuids = [ uuid for uuid in alluuids if uuid not in self.uuidrevmap.values() ] # UUIDs that are not resolvable
        self.uuidrevmap.update({ (uuid, ""): uuid for uuid in missing_uuids }) # JVMs that have never received a message are unresolvable.  Thus, map their UUIDs to themselves so they can still be in the plot
        self.uuidmap = { val : key for key, val in self.uuidrevmap.items() } # Map indeterminately from trace UUID to IP and port
        self.servicemap = { self.rpctree_service(host, 1): host[1][0].n for host in indf[indf[self.eventcol].apply(lambda x: x.n == "RPC")][self.eventcol].drop_duplicates().tolist() } # Map from IP and port to service name
        self.servicemap.update({ (uuid, ""): "" for uuid in missing_uuids }) # Map unresolved UUIDs back to themselves to handle the case in the line after next
        indf["type"] = indf[self.eventcol].map(str)
        self.df = indf
    def filter(self, filters): # Apply filters.  This is intended to be called only once
        self.servicemap = { key: val for key, val in self.servicemap.items() if self.filter_service_basic(key, filters) } # Refresh the service map
        self.uuidmap = { val: key for key, val in self.uuidrevmap.items() if self.filter_service_basic(key, filters) } # Refresh UUID map so UUIDs map to non-excluded services when possible
        self.df = self.df[self.df.apply(lambda x: self.filter_apply_services(x["id"], x[self.eventcol], filters), 1)] # Filter the services
        self.df = self.df[self.df[self.eventcol].apply(lambda x: self.filter_apply_events(x, filters))] # Filter the events
        self.df["type"] = self.df[self.eventcol].map(lambda x: str(self.filter_apply_mutations(x, filters))) # Apply mutations to the string representation of the event
    def rpcs(self): # Pull out RPCs and return as a dataframe
        ret = self.df[self.df[self.eventcol].apply(lambda x: x.n == "RPC")]
        ret["src"] = ret[self.eventcol].map(lambda x: self.tree_service_name(x[0]))
        ret["dst"] = ret[self.eventcol].map(lambda x: self.tree_service_name(x[1]))
        del(ret[self.eventcol])
        return ret
    def events(self): # Pull out non-RPCs and return as a dataframe
        ret = self.df[self.df[self.eventcol].apply(lambda x: x.n != "RPC")]
        ret["name"] = ret["id"].map(lambda x: self.service_name(self.uuidmap[x], self.servicemap[self.uuidmap[x]]))
        del(ret[self.eventcol])
        return ret
    def services(self): # Get a list of the services involved in the trace
        starttimes = sorted([ (service, self.service_start(service)) for service in self.servicemap.keys() ], key=functools.cmp_to_key(lambda a, b: self.tscmp(a[1], b[1])))
        return [ self.service_name(service, self.servicemap[service]) for (service, time) in starttimes ]
    def timerange(self): # Get the start and end time of the trace
        return (self.df["time"].min(), self.df["time"].max())

class displayfilter:
    def fevent(event): return True
    def fservice(host, port, name): return True
    def mutate(tree): return tree

def seqplot(data, filters=[]):
    trace = reader(data)
    trace.filter(filters)
    rpcs = trace.rpcs()
    events = trace.events()
    services = trace.services()
    timerange = trace.timerange()

    hover = HoverTool()
    hover.tooltips = "<div style='max-width: 400px; word-wrap: wrap-all'>@type</div>"
    p = figure(y_axis_type="datetime", x_range=services, tools=["ypan", "ywheel_zoom", hover, "reset"], active_scroll="ywheel_zoom")
    p.circle("src", "time", size=8, source=ColumnDataSource(rpcs), color="blue")
    p.segment(y0=rpcs["time"], y1=rpcs["time"], x0=rpcs["src"], x1=rpcs["dst"], color="blue")
    p.circle("name", "time", size=8, source=ColumnDataSource(events), color="red")
    p.y_range = ranges.Range1d(timerange[1], timerange[0])
    p.xaxis.major_label_orientation = math.pi/6
    p.sizing_mode = "scale_width"
    p.height = 400
    return p

In [4]:
class remove_cruft(displayfilter):
    def fevent(event):
        if event.n == "RPC":
            if event[2].n == "HeartbeatResponse": return False
            if event[2].n == "RequestMessage":
                if event[2][2].n == "Heartbeat": return False
        if event.n == "ListenerEvent":
            if event[0].n == "SparkListenerExecutorMetricsUpdate": return False # Triggered by heartbeats
        if event.n == "BMMUpdate": return False # Seems to fire on all block put/get/delete requests
        if event.n == "TrackerRegisterShuffle": return False # Duplicates RegisterShuffle
        return True
    def fservice(host, port, name):
        if name == "": return False # Unresolved services
        if name == "driverPropsFetcher": return False
        return True
    def mutate(tree):
        if tree.n == "RPC":
            tree = tree[2]
            if tree.n == "RequestMessage": tree = tree[2]
        return tree

class only_tasks(displayfilter):
    pass

class remove_events(displayfilter):
    def fevent(event):
        if event.n == "RPC": return True
        return False

class remove_rpcs(displayfilter):
    def fevent(event): return not remove_events.fevent(event)

class events_only_block(displayfilter):
    def fevent(event):
        if event.n == "RPC": return True
        if event.n in ["TrackerRegisterShuffle", "RegisterShuffle", "UnregisterShuffle", "BlockFetch", "BlockUpload",
            "GetBlock", "GetBlockData", "PutBlock", "DeleteBlock", "FreeBlock", "BMMRegister", "BMMUpdate",
            "BMMRemoveBlock", "BMMRemoveRDD", "BMMRemoveShuffle", "BMMRemoveBroadcast"]: return True
        return False

class events_only_management(displayfilter):
    def fevent(event):
        if event.n == "RPC": return True
        if event.n in ["JVMStart", "MainStart", "MainEnd", "SpawnExecutor", "SubmitApplication", "SubmittedApplication",
            "NewSparkContext", "InitializedSparkContext"]: return True
        return False

In [48]:
#root = "/tmp/spark-trace"
root = "/home/matt/code/spark-tracing/matt/runs/remote"
filters = [remove_cruft, remove_rpcs]
show(seqplot(root, filters))