# Predicting Dynamic Memory Requirements for Scientific Workflow Tasks

### Setup BASE_DIR
- Path to tsb_resource_allocation_data
- No / at the end

In [1]:
BASE_DIR = '/Users/nilsdiedrich/Library/Mobile Documents/com~apple~CloudDocs/TUBerlin/WS22/kTests/k-Segments-traces' 

In [2]:
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import os
import math
import pandas as pd
from tsb_resource_allocation.witt_task_model import WittTaskModel
from tsb_resource_allocation.tovar_task_model import TovarTaskModel
from tsb_resource_allocation.simulation import Simulation
from tsb_resource_allocation.k_segments_model import KSegmentsModel
from tsb_resource_allocation.file_events_model import FileEventsModel
from tsb_resource_allocation.default_model import DefaultModel
sns.set_theme(style="darkgrid")

# Helper methods

def get_file_names(directory, number_of_files = -1):
    file_names = [name.rsplit('_',1)[0] for name in os.listdir(directory) if not os.path.isdir(f"{directory}{name}") and name.endswith("_memory.csv")]
    if number_of_files != -1:
        return file_names[:number_of_files]
    return file_names

def run_simulation(directory, training, test, monotonically_increasing = True, k = 4, collection_interval = 2):
    
    # MODELS
    simulations = []
    
    # KSegments retry: selective
    task_model = KSegmentsModel(k = k, monotonically_increasing = monotonically_increasing)
    simulation = Simulation(task_model, directory, retry_mode = 'selective', provided_file_names = training)
    simulations.append(simulation)
    
    # KSegments retry: selective - NO UNDERPREDICTION
    task_model = KSegmentsModel(k = k, monotonically_increasing = monotonically_increasing, time_mode = -1)
    simulation = Simulation(task_model, directory, retry_mode = 'selective', provided_file_names = training)
    #simulations.append(simulation)
    
    # KSegments retry: partial
    task_model = KSegmentsModel(k = k, monotonically_increasing = monotonically_increasing)
    simulation = Simulation(task_model, directory, retry_mode = 'partial', provided_file_names = training)
    simulations.append(simulation)
    
    # WITT LR MEAN+- TASK MODEL 
    task_model = WittTaskModel(mode = "mean+-")
    simulation = Simulation(task_model, directory, retry_mode = 'full', provided_file_names = training)
    simulations.append(simulation)

    # TOVAR TASK MODEL - full retry
    task_model = TovarTaskModel()
    simulation = Simulation(task_model, directory, retry_mode = 'full', provided_file_names = training)
    simulations.append(simulation)
    
     # TOVAR TASK MODEL - tovar retry
    task_model = TovarTaskModel()
    simulation = Simulation(task_model, directory, retry_mode = 'tovar', provided_file_names = training)
    simulations.append(simulation)
    
    # Default Model
    task_model = DefaultModel()
    simulation = Simulation(task_model, directory, retry_mode = 'full', provided_file_names = training)
    simulations.append(simulation)
    
    waste, retries, runtimes = [0 for _ in range(len(simulations))],[0 for _ in range(len(simulations))],[0 for _ in range(len(simulations))]
    for file_name in test:
        for i,s in enumerate(simulations):
            result = s.execute(file_name, True)
            waste[i] += ((result[0]/1000) * collection_interval)
            retries[i] += result[1]
            runtimes[i] += (result[2] * collection_interval)
    
    avg_waste = list(map(lambda w: w / len(test), waste))
    avg_retries = list(map(lambda r: r / len(test), retries))
    avg_runtime = list(map(lambda r: r / len(test), runtimes))
    
    return avg_waste, avg_retries, avg_runtime

# Table Generation

In [3]:
import random
# OUTPUT = ( [Waste: [Witt: 25, Tovar: 25, k-segments:25], [50] , [75]], [Retries], [Runtime])
def benchmark_task(task_dir = '/eager/markduplicates', base_directory = BASE_DIR):
    directory = f'{base_directory}/{task_dir}'
    file_names_orig = []
    file_order = get_file_order(directory)
    if file_order != None:
        file_names_orig = file_order
    else:
        file_names_orig = get_file_names(directory)

    percentages = [0.25, 0.5, 0.75]

    x = []
    y_waste = []
    y_retries = []
    y_runtime = []

    file_names = list(filter(lambda x: len(pd.read_csv(f'{directory}/{x}_memory.csv', skiprows = 3)) >= 8, file_names_orig))
    if len(file_names) == 0:
        return -1
    print(f'Usable Data: {len(file_names)}/{len(file_names_orig)}')
    
    for i in [ int(len(file_names)*p) for p in percentages ]:
        training = file_names[:i]
        test = file_names[i:] # file_names[i:] - other mode
        print(f"training: {len(training)}, test: {len(test)}",end="\r", flush=True)
        avg_waste, avg_retries, avg_runtime = run_simulation(directory, training, test, k = 4)
        x.append(i)
        y_waste.append(list(map(lambda w: round(w, 2),avg_waste)))
        y_retries.append(avg_retries)
        y_runtime.append(avg_runtime)

    return (y_waste, y_retries, y_runtime)

# File Order

In [4]:
def record_file_order(workflow_tasks, base_directory, depth):
    if depth > 1:
        return
    f = open(f"{base_directory}/file_order.txt", "w")
    for task in workflow_tasks:
        basename = os.path.basename(task)
        f.write(f"{basename}\n")
        if depth > 0:
            continue
        record_file_order(get_file_names(task), f'{base_directory}/{basename}', depth+1)
        
def get_file_order(base_directory):
    try:
        with open(f'{base_directory}/file_order.txt') as f:
            return f.read().splitlines()
    except:
        return None

# Test Folder

In [5]:
base_directory = f'{BASE_DIR}/sarek'
workflow_tasks = []
file_order = get_file_order(base_directory)
if file_order != None:
    workflow_tasks = file_order
else:
    workflow_tasks = [os.path.join(base_directory, item) for item in os.listdir(base_directory) if os.path.isdir(os.path.join(base_directory, item))]
    workflow_tasks = [task for task in workflow_tasks if len(os.listdir(task)) > 40]
    workflow_tasks = list(map(os.path.basename, workflow_tasks))

categories = ["Wastage", "Retries", "Runtime"]
percentages = ["25%", "50%", "75%"]

# 0 = WASTE, 1 = RETRIES, 2 = RUNTIME
for task in workflow_tasks:
    r = benchmark_task(task, base_directory)
    if r == -1:
        continue
    task_name = os.path.basename(task)
    m = ', '.join(map(str, r[0][2]))
    print(f'{task_name}')
    for i, category in enumerate(categories): 
        for j, percentage in enumerate(percentages): 
            print(f'{category} {percentage}: {r[i][j]}')

Usable Data: 36/36
CRAM_QC_MOSDEPTH_SAMTOOLS_MOSDEPTH
Wastage 25%: [146.37, 94.35, 134.73, 119.23, 1010.12, 88.11]
Wastage 50%: [69.59, 69.59, 71.17, 106.59, 788.11, 87.77]
Wastage 75%: [72.08, 72.08, 72.25, 67.16, 67.16, 90.46]
Retries 25%: [0.2222222222222222, 0.1111111111111111, 0.1111111111111111, 0.07407407407407407, 0.07407407407407407, 0.0]
Retries 50%: [0.0, 0.0, 0.0, 0.05555555555555555, 0.05555555555555555, 0.0]
Retries 75%: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
Runtime 25%: [117.18518518518519, 107.03703703703704, 108.5925925925926, 107.62962962962963, 107.62962962962963, 101.03703703703704]
Runtime 50%: [103.33333333333333, 103.33333333333333, 103.33333333333333, 108.55555555555556, 108.55555555555556, 103.33333333333333]
Runtime 75%: [106.22222222222223, 106.22222222222223, 106.22222222222223, 106.22222222222223, 106.22222222222223, 106.22222222222223]
Usable Data: 864/1512
GATK4_APPLYBQSRtest: 216
Wastage 25%: [221.04, 220.66, 135.31, 142.27, 4014.68, 513.92]
Wastage 50%: [217.6

GERMLINE_CNVKIT_BATCH
Wastage 25%: [53876.51, 59337.16, 105887.43, 36150.46, 36150.46, 36150.46]
Wastage 50%: [54949.55, 58593.48, 125408.66, 36857.15, 36857.15, 36857.15]
Wastage 75%: [20339.77, 29276.17, 105497.54, 35523.25, 35523.25, 35523.25]
Retries 25%: [1.3333333333333333, 1.0, 0.6666666666666666, 0.0, 0.0, 0.0]
Retries 50%: [0.8333333333333334, 0.8333333333333334, 0.8333333333333334, 0.0, 0.0, 0.0]
Retries 75%: [0.3333333333333333, 0.3333333333333333, 0.6666666666666666, 0.0, 0.0, 0.0]
Runtime 25%: [7612.0, 6277.333333333333, 7071.777777777777, 4362.444444444444, 4362.444444444444, 4362.444444444444]
Runtime 50%: [7367.333333333333, 7367.333333333333, 7896.0, 4422.333333333333, 4422.333333333333, 4422.333333333333]
Runtime 75%: [4466.666666666667, 4466.666666666667, 7066.666666666667, 4275.333333333333, 4275.333333333333, 4275.333333333333]
Usable Data: 36/36
GATK4_MARKDUPLICATES98
Wastage 25%: [23889.63, 23889.63, 43332.96, 33021.6, 53688.04, 32534.25]
Wastage 50%: [19611.51, 

FASTPing: 27, test: 98
Wastage 25%: [180.89, 123.6, 170.24, 154.41, 4312.38, 232.09]
Wastage 50%: [5.81, 5.81, 65.6, 9.47, 9.47, 232.59]
Wastage 75%: [6.25, 6.25, 8.67, 9.61, 9.61, 240.57]
Retries 25%: [0.5185185185185185, 0.3333333333333333, 0.2962962962962963, 0.25925925925925924, 0.25925925925925924, 0.0]
Retries 50%: [0.0, 0.0, 0.1111111111111111, 0.0, 0.0, 0.0]
Retries 75%: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
Runtime 25%: [169.55555555555554, 149.92592592592592, 158.2962962962963, 154.74074074074073, 154.74074074074073, 120.81481481481481]
Runtime 50%: [121.0, 121.0, 134.33333333333334, 121.0, 121.0, 121.0]
Runtime 75%: [125.11111111111111, 125.11111111111111, 125.11111111111111, 125.11111111111111, 125.11111111111111, 125.11111111111111]
