In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import datetime
import seaborn as sns
cluster="CPU"
sns.set(rc={'figure.figsize':(8,4.5)})
sns.set(rc=sns.axes_style("darkgrid"))
order = ['original', 
       'fifo_roundrobin', 'fifo_random', 'fifo_fair', 
       'random_roundrobin', 'random_random', 'random_fair', 
       'max_roundrobin', 'max_random', 'max_fair',
       'min_roundrobin', 'min_random', 'min_fair', 
       'rank_roundrobin', 'rank_random', 'rank_fair', 
       'rank_min_roundrobin', 'rank_min_random', 'rank_min_fair', 
       'rank_max_roundrobin', 'rank_max_random', 'rank_max_fair']
workflowNames = ['rnaseq','sarek','chipseq','atacseq','mag','ampliseq','nanoseq','viralrecon','eager']
workflowNameMap = { 
    "rnaseq" : "RNA-Seq", 
    "sarek" : "Sarek", 
    "chipseq" : "ChiP-Seq", 
    "atacseq": "ATAC-seq", 
    "mag": "MAG", 
    "ampliseq": "AmpliSeq", 
    "methylseq": "MethylSeq", 
    "nanoseq": "NanoSeq", 
    "viralrecon": "Viralrecon", 
    "nanoseq": "NanoSeq",
    "eager": "Eager",
}
prioNamesMap = { 
    "fifo" : "FIFO",
    "random" : "Ran",
    "max" : "Size Desc",
    "min" : "Size Asc",
    "rank" : "Rank (FIFO)",
    "rank_max" : "Rank (Max)",
    "rank_min" : "Rank (Min)"
}
assignNamesMap = {
    "roundrobin" : "RR",
    "random" : "Ran",
    "fair" : "Fair"
}

In [None]:
def mapStrategy( strategy ):
    if ( strategy == "original" ):
        return "Original"
    else:
        prio = strategy[:strategy.rindex("_")]
        assign = strategy[strategy.rindex("_")+1:]
        return prioNamesMap[prio] + "-" + assignNamesMap[assign]

def runtimeFromTrace( trace ):
    start = trace["submit"].min()
    end = trace["complete"].max()
    return ( end - start ) / 1000

def readTraceFile( pathToTraceFile ):
    with open( pathToTraceFile, "r") as trace:
        lines = trace.readlines()
        # map lines
        lines = list( map( lambda line: line.replace("taxa:mitochondria,chloroplast;min-freq:10;min-samples:2", "taxa:mitochondria,chloroplast,min-freq:10,min-samples:2"), lines ) )
    #write lines into tmp file
    with open( pathToTraceFile + ".tmp", "w") as trace:
        trace.writelines( lines )
        result = pd.read_csv( pathToTraceFile + ".tmp", sep=";" )
    os.remove( pathToTraceFile + ".tmp" )
    return result

def makespanFromLogs( lines ):
    startDate = -1
    endDate = -1
    for l in lines:
        if ( startDate == -1 and "] Submitted process > " in l and " [Task submitter] INFO  nextflow.Session - [" in l ):
            startDateStr = l[:l.index(" [")]
            startDate = datetime.datetime.strptime(startDateStr, '%b-%d %H:%M:%S.%f')
        if ( "[Task monitor] DEBUG n.processor.TaskPollingMonitor - Task completed > TaskHandler[" in l ):
            endDateStr = l[:l.index(" [")]
            endDate = datetime.datetime.strptime(endDateStr, '%b-%d %H:%M:%S.%f')
    return (endDate - startDate).total_seconds()


def readRun( pathToRun ):
    try:
        overallRuntime = -1
        # Overall Runtime from nextflow.log
        with open( pathToRun + "nextflow.log", "r") as nextflowlog:
            lines = nextflowlog.readlines()
            firstLine = lines[0]
            startDateStr = firstLine[:firstLine.index(" [")]
            startDate = datetime.datetime.strptime(startDateStr, '%b-%d %H:%M:%S.%f')
            lastLine = lines[-1]        
            endDateStr = lastLine[:lastLine.index(" [")]
            endDate = datetime.datetime.strptime(endDateStr, '%b-%d %H:%M:%S.%f')
            overallRuntime = (endDate - startDate).total_seconds()
            makespanFromLog = makespanFromLogs( lines )
        # Read the trace file
        trace = readTraceFile( pathToRun + "trace.csv" )
        avgTaskRuntime = trace["realtime"].mean() / 1000
        medianTaskRuntime = trace["realtime"].median() / 1000
        makespan = runtimeFromTrace( trace )
        queueDelay = (trace["start"] - trace["submit"]).mean() / 1000
    except Exception as e:
        print("Error reading run " + pathToRun)
        raise e

    return { "overallRuntime" : overallRuntime, "trace" : trace, "makespan" : makespan, "makespanFromLogs" : makespanFromLog, "avgTaskRuntime" : avgTaskRuntime, "medianTaskRuntime" : medianTaskRuntime, "taskCount" : len(trace), "queueDelay" : queueDelay }

In [None]:
def readResults( cluster ):
    resultDir = cluster + "/"
    workflows = os.listdir(resultDir)
    resultsList = []
    results = {}
    for workflow in workflows:
        results[workflow] = {}
        strategies = os.listdir( resultDir + workflow )
        for strategy in strategies:
            results[workflow][strategy] = {}
            runs = os.listdir( resultDir + workflow + "/" + strategy )
            if ( len(runs) < 5 ):
                print("Only found " + str(len(runs)) + " runs for " + workflow + " " + strategy + ".")
            for run in runs:
                result = readRun( resultDir + workflow + "/" + strategy + "/" + run + "/" )
                results[workflow][strategy][run] = result
                assign = ""
                advanced = True
                prio = ""
                if ( strategy == "original"):
                    assign = "original"
                    prio = "original"
                    advanced = False
                else:
                    assign = strategy[strategy.rindex("_")+1:]
                    prio = strategy[:strategy.rindex("_")]
                    advanced = True

                resultsList.append( { 
                    "workflow" : workflow, 
                    "strategy" : strategy, 
                    "run" : run, 
                    "overallRuntime" : result["overallRuntime"], 
                    "makespan" : result["makespan"], 
                    "makespanFromLogs" : result["makespanFromLogs"], 
                    "avgTaskRuntime" : result["avgTaskRuntime"],
                    "medianTaskRuntime" : result["medianTaskRuntime"],
                    "assign" : assign, 
                    "prio" : prio, 
                    "advanced" : advanced,
                    "taskCount" : result["taskCount"],
                    "queueDelay" : result["queueDelay"]
                    } )

    resultsDf = pd.DataFrame( resultsList )
    return resultsDf, workflows, results

In [None]:
resultsDf,workflows,rawdata = readResults( cluster )

In [None]:
sumHoursRuntime = resultsDf['overallRuntime'].sum() / 60 / 60
daysRuntime = int (sumHoursRuntime / 24)
hoursRuntime = sumHoursRuntime % 24
print("Total runtime: " + str(daysRuntime) + " days " + str(hoursRuntime) + " hours")

In [None]:
def calculateChange( row, df, metric ):
    # row workflow is x and row strategy is y
    t = df[df["workflow"] == row["workflow"]]
    t = t[t["strategy"] == "original"]
    return (row[metric] - t[metric].median()) / t[metric].median() * 100

def mapYLabel( y, _ ):
    if y == 0 :
        return "Median"
    elif y < 0:
        return "-" + str(y * -1) + "%"
    else:
        return "+" + str(y) + "%"

In [None]:
def plotAll( data, yAxis = "overallRuntime", yLabel = "y", col_wrap = 3 ):
    sns.set_style("whitegrid")
    resultsDf2 = data
    resultsDf2["yAxis"] = [ calculateChange( row, resultsDf2, yAxis ) for index, row in resultsDf2.iterrows() ]

    plot = sns.FacetGrid(resultsDf2, col="workflow", height=(4 if col_wrap==3 else 6), aspect=1, col_wrap = col_wrap, sharey=False, col_order=workflowNames )
    orderNew = order.copy()
    orderNew.insert(0, "original")
    plot.map(sns.boxplot, "strategy", "yAxis", order=orderNew, medianprops=dict(zorder=110) )

    #tab10 color palette https://stackoverflow.com/questions/64369710/what-are-the-hex-codes-of-matplotlib-tab10-palette
    counter = 0
    for axes in plot.axes.flat:
        _ = axes.set_xticklabels(axes.get_xticklabels(), rotation=90)


        axes.artists.remove(axes.artists[1])
        for i in range(6):
            axes.lines.remove(axes.lines[6])

        for i,box in enumerate(axes.artists):
            strategy = order[i]
            if ( strategy == "original" ):
                box.set_facecolor('#7f7f7f')
            elif ( strategy.startswith("fifo") ):
                box.set_facecolor('#ff7f0e')
            elif ( strategy.startswith("random") ):
                box.set_facecolor('#2ca02c')
            elif ( strategy.startswith("max") ):
                box.set_facecolor('#d62728')
            elif ( strategy.startswith("min") ):
                box.set_facecolor('#9467bd')
            elif ( strategy.startswith("rank_min") ):
                box.set_facecolor('#e377c2')
            elif ( strategy.startswith("rank_max") ):
                box.set_facecolor('#1f77b4')
            elif ( strategy.startswith("rank") ):
                box.set_facecolor('#8c564b')
            box.zorder = 100
        cWorkflow = resultsDf2[resultsDf2["workflow"] == workflowNames[counter]]
        orig = cWorkflow[ cWorkflow["strategy"] == "original" ][yAxis]
        axes.axhline(y=0, color='black', linestyle='-', linewidth=1, zorder = 1, label="Median original run")
        axes.axhline(y=(orig.min() - orig.median()) / orig.median() * 100, color='black', linestyle='dotted', linewidth=1, zorder = 1, label="Best original run")
        labels = [mapStrategy(item.get_text()) for item in axes.get_xticklabels()]
        if( len(labels) > 0 ):
            labels[1] = ""
        axes.set_xticklabels( labels )
        plt.setp(axes.get_xticklabels()[:1], fontweight="bold")
        axes.yaxis.set_major_formatter(mapYLabel)

        axes.set_ylabel( yAxis )
        handles, labels = axes.get_legend_handles_labels()
        axes.title.set_text( workflowNameMap[workflowNames[counter]] )
        counter += 1

    
    plot.set_xlabels("Scheduling strategy")
    plot.set_ylabels(yLabel)
    plot.add_legend( {labels[i]: handles[i] for i in range(len(labels))} )
    sns.move_legend(
        plot, "lower center",
        bbox_to_anchor=(.5, -0.03), ncol=3, title=None, frameon=True,
    )
    additional = ""
    if ( col_wrap != 3 ):
        additional = "_" + str(col_wrap)
    plt.tight_layout()
    plt.savefig( "plots/" + cluster + "_" + yAxis + additional + ".jpg", dpi = 100, bbox_inches="tight" )
    plt.savefig( "plots/" + cluster + "_" + yAxis + additional + ".pdf", dpi = 100, bbox_inches="tight" )

In [None]:
plotAll( resultsDf, "overallRuntime", "Runtime change compared\nto original median" )
plotAll( resultsDf, "makespanFromLogs", "Makespan" )

In [None]:
resultsDf["runtimeMakespanDiff"] = resultsDf["overallRuntime"] - resultsDf["makespan"]
resultsDf["runtimeMakespanFromLogsDiff"] = resultsDf["overallRuntime"] - resultsDf["makespanFromLogs"]
(resultsDf.groupby(["workflow","advanced"])[["overallRuntime","makespan","makespanFromLogs","runtimeMakespanDiff","runtimeMakespanFromLogsDiff"]].median() / 60).round(2).sort_values(by=["workflow","advanced"], ascending=False)

In [None]:
def mapAdvanced( x ):
    if x == "True" :
        return "Advanced"
    elif x == "False" :
        return "Original"
ax = sns.boxplot(x="advanced", y="runtimeMakespanFromLogsDiff", data=resultsDf)
ax.set_ylabel("Nextflow Overhead in Seconds")
ax.set_xlabel("Scheduling Strategy")
labels = [mapAdvanced(item.get_text()) for item in ax.get_xticklabels()]
ax.set_xticklabels( labels )

In [None]:
advanced = resultsDf[resultsDf["advanced"] == True].groupby(["workflow"])["runtimeMakespanFromLogsDiff"].median()
original = resultsDf[resultsDf["advanced"] == False].groupby(["workflow"])["runtimeMakespanFromLogsDiff"].median()
advancedOriginalDiff = advanced.to_frame("advanced").join(original.to_frame("original")).sort_values(by="advanced", ascending=False)
advancedOriginalDiff["diff"] = advancedOriginalDiff["advanced"] - advancedOriginalDiff["original"]
print("Average difference in seconds: " + str(advancedOriginalDiff["diff"].mean()))
print("Median difference in seconds: " + str(advancedOriginalDiff["diff"].median()))
print("Max difference in seconds: " + str(advancedOriginalDiff["diff"].max()))
print("Min difference in seconds: " + str(advancedOriginalDiff["diff"].min()))
print("Standard deviation in seconds: " + str(advancedOriginalDiff["diff"].std()))

In [None]:
advancedOriginalDiff["diff"]

In [None]:
def calculateChanges(df, groupbyOrder, groupyby = "strategy" ):

    allWorkflows = df.workflow.unique()
    attribute = "overallRuntime"

    changes = []
    for strategy in groupbyOrder:
        betterThanMin = 0
        betterThanMedian = 0
        betterThanOrig = 0
        medBetterThanOrigMed = 0
        diffMin = []
        diffWorst = []
        diffMedian = []
        diffMedMed = []
        standardDeviation = []
        queueDelay = []
        runs = 0
        for workflow in allWorkflows:
            t = df[df["workflow"] == workflow]
            orig = t[ t[groupyby] == "original" ][attribute]
            orig = orig.sort_values()
            oMed = orig.median()
            oMin = orig.min()
            oMax = orig.max()
            c = t[ t[groupyby] == strategy ][attribute]
            c = c.sort_values()
            cMed = c.median()
            betterThanMin += (c < oMin).sum()
            betterThanMedian += (c < oMed).sum()
            #betterThanOrig += (c < orig).sum()
            if ( c.median() < oMed ):
                medBetterThanOrigMed += 1
            # improvement of c
            diffMedian.extend(list((c - oMed) / oMed))
            diffMin.extend(list((c - oMin) / oMin))
            diffWorst.extend(list((c - oMax) / oMax))
            diffMedMed.append((c.median() - oMed) / oMed)
            standardDeviation.append(((c - oMed) / oMed).std())
            queueDelay.extend( list(t[ t[groupyby] == strategy ]["queueDelay"]) )
            runs += len(c)
        changes.append( { 
            "strategy" : strategy, 
            "betterThanMin" : betterThanMin / runs, # how often is the new strategy better than the best original run (in percent)
            "betterThanMedian" : betterThanMedian / runs, # how often is the new strategy better than the median original run (in percent)
            "medBetterThanOrigMed" : medBetterThanOrigMed / len(allWorkflows), # how often is the median of the new strategy better than the median of the original runs (in percent)
            #"betterThanOrig": betterThanOrig / runs, 
            "diffMedian" : np.array(diffMedian).mean(), # how much better are all runs of the new strategy than the median of the original runs (average change in percent)
            "diffMin" : np.array(diffMin).mean(), # how much better are all runs of the new strategy than the best of the original run (average change in percent)
            "diffMedMed" : np.array(diffMedMed).mean(), # Change of the median of the strategy over the median of the original (average change in percent)
            "bestDiffMedMed" : np.array(diffMedMed).min(), # Best of the median of the strategy over the median of the original (average change in percent)
            "worstDiffMedMed" : np.array(diffMedMed).max(), # Worst of the median of the strategy over the median of the original (average change in percent)
            "diffWorstWorst" : np.array(diffWorst).max(), # how much worst is the worst run of the new strategy than the worst of the original run (average change in percent)
            "diffMinMin" : np.array(diffMin).min(), # how much better is the best run of the new strategy than the best of the original run (average change in percent)
            "avgStd" : np.array(standardDeviation).mean(), # mean std of the new strategy
            "bestStd" : np.array(standardDeviation).min(), # best std of the new strategy
            "worstStd" : np.array(standardDeviation).max(), # worst std of the new strategy
            "avgQueueDelay" : np.array(queueDelay).mean(), # mean queue delay of the new strategy
        } )

    changesDf = pd.DataFrame(changes)
    return changesDf

changesDf = calculateChanges( resultsDf, order, "strategy" ) 
changesDf

In [None]:
# how do the assignment strategies compare to each other
changesAssignDf = calculateChanges( resultsDf, ["fair","random","roundrobin"], "assign" ) 
changesAssignDf

In [None]:
# how do the prio strategies compare to each other
changesPrioDf = calculateChanges( resultsDf, ["fifo","random","max","min","rank","rank_max","rank_min"], "prio" ) 
changesPrioDf

In [None]:
# how do the workflows compare to each other
runtimesByWorkflowAndStragegy = resultsDf[["workflow","strategy","overallRuntime"]]
allWorkflows = resultsDf.workflow.unique()
results = []
for wf in allWorkflows:
    r = resultsDf[ resultsDf["workflow"] == wf ]
    results.append( { "workflow" : wf, "std" : (r["overallRuntime"] / r["overallRuntime"].mean()).std()} )

pd.DataFrame(results).sort_values("std")


In [None]:
#Build Table with change values for Latex

def formatColumn( data, column, markMax = False, markMin = False ):
    values = []
    valuesMapped = []
    for strategy in order:
        if ( strategy == "original" ):
            continue
        values.append( data[data["strategy"] == strategy][column].values[0] )
    maxValue = max(values)
    minValue = min(values)
    for v in values:
        if ( (markMax and v == maxValue) or (markMin and v == minValue) ):
            valuesMapped.append( "\\textbf{" + '{0:.1f}'.format(v * 100) + "}" )
        else :
            valuesMapped.append( '{0:.1f}'.format(v * 100) )
    return " & ".join( valuesMapped ) + " \\\\"


firstLine = "\\multicolumn{1}{c}{Metric} "
for o in order : 
    if ( o == "original" ):
        continue
    firstLine += "& \\multicolumn{1}{c}{\\rot{" + mapStrategy(o) + "}} "
print("\\begin{tabular}{c*{21}{|r}}")
print("\\multicolumn{1}{c}{} & \\multicolumn{21}{c}{\\textbf{Strategies}} \\\\")
print(firstLine + "\\\\")
print("\\hline")
print("Better med. & " + formatColumn( changesDf, "betterThanMedian", True, False ) )
print("Better min. & " + formatColumn( changesDf, "betterThanMin", True, False ) )
print("Med. better med. & " + formatColumn( changesDf, "medBetterThanOrigMed", True, False ) )
print("Med. change (avg) & " + formatColumn( changesDf, "diffMedian", False, True ) )
print("Min change (avg) & " + formatColumn( changesDf, "diffMin", False, True ) )
print("Med. med. change (avg) & " + formatColumn( changesDf, "diffMedMed", False, True ) )
print("Med. med. change (best) & " + formatColumn( changesDf, "bestDiffMedMed", False, True ) )
print("Med. med. change (worst) & " + formatColumn( changesDf, "worstDiffMedMed", False, True ) )
print("Worst diff to worst ori. & " + formatColumn( changesDf, "diffWorstWorst", False, True ) )
print("Max impr. to best ori. & " + formatColumn( changesDf, "diffMinMin", False, True ) )
print("Standard dev. (avg) & " + formatColumn( changesDf, "avgStd", False, True ) )
print("Standard dev. (best) & " + formatColumn( changesDf, "bestStd", False, True ) )
print("Standard dev. (worst) & " + formatColumn( changesDf, "worstStd", False, True ) )
print("\\hline")
print("\\end{tabular}")

In [None]:
# data footprint in kb, manually measured after a run
dataFootprint = {
    "rnaseq" : 507450,
    "sarek" : 548964,
    "chipseq" : 2699644, 
    "atacseq": 5929144, 
    "mag": 19002924, 
    "ampliseq": 273880, 
    "nanoseq": 14964553, 
    "viralrecon": 915517, 
    "eager": 2440970,
}

In [None]:
# Latex table with the runtimes
import locale
locale.setlocale(locale.LC_NUMERIC, 'English')
metric = "overallRuntime"
dataTable = ""
for wf in workflowNames:
    if ( dataTable != "" ):
        dataTable += "\n"
    cwf = resultsDf[resultsDf["workflow"] == wf]
    numberOfTasks = cwf["taskCount"].min()
    runtimes = cwf.groupby("strategy")[metric].median()
    realtimeList = []
    for strategy in rawdata[wf]:
        for run in rawdata[wf][strategy]:
            #print(rawdata[wf][strategy][run])
            realtimeList.extend( list(rawdata[wf][strategy][run]["trace"]["realtime"]) )
    medianRuntime = np.median(np.array(realtimeList)) / 1000
    meanRuntime = np.mean(np.array(realtimeList)) / 1000
    stdRuntime = np.std(np.array(realtimeList)) / 1000
    bestStrategy = runtimes.idxmin()
    change = ( runtimes.at[bestStrategy] - runtimes.at["original"] ) / runtimes.at["original"]

    dataTable += workflowNameMap[wf] + " & "
    dataTable += locale.format_string('%d', numberOfTasks, True )
    dataTable += " & "
    dataTable += locale.format_string('%.1f MB', dataFootprint[wf] / 1024, True )
    dataTable += " & "
    dataTable += mapStrategy(bestStrategy)
    dataTable += " & "
    dataTable += locale.format_string('%.1f', runtimes.at["original"], True ) + "s"
    dataTable += " & "
    dataTable += locale.format_string('%.1f', meanRuntime, True ) + "s"
    dataTable += " & "
    dataTable += locale.format_string('%.1f', medianRuntime, True ) + "s"
    dataTable += " & "
    dataTable += locale.format_string('%.1f', stdRuntime, True ) + "s"
    dataTable += " & "
    dataTable += locale.format_string('%.1f', runtimes.at[bestStrategy], True ) + "s"
    dataTable += " & "
    dataTable += '{0:.1f}\\%'.format( change * -100 )
    dataTable += " \\\\"
print("\\begin{tabular}{c|r|r|c|r|r|r|r|r|r}")
head = [
    "Workflow",
    "Strategy with best median run",
    "Original median runtime",
    "Best median runtime",
    "Improvement"
]
print("\\multicolumn{1}{p{1.2cm}|}{\\parbox[c][4em]{\\hsize}{\\centering Workflow}} & \\multicolumn{1}{p{1.2cm}|}{\\parbox[c][4em]{\\hsize}{\\centering \\# tasks instances}} & \\multicolumn{1}{p{1.5cm}|}{\\parbox[c][4em]{\\hsize}{\\centering generated data}} & \\multicolumn{1}{p{2cm}|}{\\parbox[c][4em]{\\hsize}{\\centering Strategy with the best median run}} & \\multicolumn{1}{p{1.8cm}|}{\\parbox[c][4em]{\\hsize}{\\centering Original median runtime}} & \\multicolumn{1}{p{1.2cm}|}{\\parbox[c][4em]{\\hsize}{\\centering Avg. task runtime}} & \\multicolumn{1}{p{1.2cm}|}{\\parbox[c][4em]{\\hsize}{\\centering Median task runtime}}  & \\multicolumn{1}{p{1.2cm}|}{\\parbox[c][4em]{\\hsize}{\\centering Standard dev. task runtime}} & \\multicolumn{1}{p{0.9cm}|}{\\parbox[c][4em]{\\hsize}{\\centering Best median runtime}} & \\multicolumn{1}{p{1.6cm}}{\\parbox[c][4em]{\\hsize}{\\centering Improvement}} \\\\")
print("\\hline")
print(dataTable)
print("\\hline")
print("\\end{tabular}")


In [None]:
# part of the longest running task of sarek
part = []
taskName = []
for strategy in rawdata["sarek"]:
    for run in rawdata["sarek"][strategy]:
        overallRuntime = rawdata["sarek"][strategy][run]["overallRuntime"]
        maximum = rawdata["sarek"][strategy][run]["trace"]["realtime"].max() / 1000
        idMax = rawdata["sarek"][strategy][run]["trace"]["realtime"].idxmax()
        task = rawdata["sarek"][strategy][run]["trace"]["process"][idMax]
        taskName.append(task)
        part.append( maximum / overallRuntime )
print("Part of longest task of sarek: " + str(np.mean(np.array(part))))
print("Task name: " + str(np.unique(np.array(taskName))))