In [None]:
import pandas as pd
from rich.progress import Progress
import re
import networkx as nx
import matplotlib.pyplot as plt
from networkx.drawing.nx_agraph import graphviz_layout
from pathlib import Path
from collections import defaultdict
from collections import deque
import json

In [None]:
!mkdir -p data && cd data && wget -c --retry-connrefused --tries=0 --timeout=50 http://aliopentrace.oss-cn-beijing.aliyuncs.com/v2018Traces/batch_task.tar.gz

In [None]:
!cd data && tar -xvzf batch_task.tar.gz

In [None]:
# From https://github.com/alibaba/clusterdata/blob/master/cluster-trace-v2018/fetchData.sh
df = pd.read_csv('data/batch_task.csv', names=['task_name', 'instance_num', 'job_name', 'task_type', 'status', 'start_time', 'end_time', 'plan_cpu', 'plan_mem'])
df['duration'] = df['end_time'] - df['start_time']

In [None]:
# df = df.head(n=1000)

In [None]:
# Task name is:
# a) containing dependencies (like 'J4_2_3' -> task 4 depends on 2 and 3)
TASK_NAME_RE = re.compile('^[^_]*[A-Z](?P<task_id>\d+)(_(?P<deps>[\d+_]+))?(_Stg\d+)?$') # Note: sometimes job ends with _Stg*
# b) independent task (like 'task_LTE4NjUxMjg5NDY5MDI4NjAzNzU=')
SINGLE_TASK_RE = re.compile('^task_[a-zA-Z0-9]+=*$')
# c) 'MergeTask'

# check that we cover all cases:
assert df.task_name.apply(lambda f: TASK_NAME_RE.match(f) is not None or SINGLE_TASK_RE.match(f) is not None or f == 'MergeTask').all()

In [None]:
# Check that 'MergeTask's are in fact independent tasks (i.e. the only task in a job)
def get_merge_task_stats(df):
    df = df[['job_name', 'task_name']].copy()
    df['is_merge_task'] = df['task_name'] == 'MergeTask'
    return df.groupby('job_name').agg(
        count=pd.NamedAgg('task_name', 'count'),
        mergeCount=pd.NamedAgg('is_merge_task', 'sum')
    )

assert len(get_merge_task_stats(df).query('mergeCount > 0 and count > 1')) == 0

In [None]:
# Extract dependency info
def get_task_index_and_deps(row):
    task_name = row.task_name
    if m := TASK_NAME_RE.match(task_name):
        if m.group('deps'):
            deps = [int(item) for item in m.group('deps').split('_') if item != '']
        else:
            deps = []
        return int(m.group('task_id')), deps
    else:
        return 1, []

df[['task_index', 'task_deps']] = df[['task_name']].apply(get_task_index_and_deps, result_type='expand', axis=1)

In [None]:
df[['job_name', 'task_index', 'task_deps', 'duration', 'instance_num']].head()

In [None]:
# Sample job
df[['job_name', 'task_index', 'task_deps', 'duration', 'instance_num']].query('job_name == "j_3"')

In [None]:
# Filter jobs with 10 or more tasks
jobs = df.groupby("job_name").filter(lambda x: len(x) >= 10)

In [None]:
len(jobs)

In [None]:
# Filter jobs that are uninteresting, such as simple lines or parallel dag covered by other experiments

grouped = jobs.groupby("job_name")
filtered = []

with Progress() as progress:
    task = progress.add_task("[red]Removing simple DAGs", total=len(grouped))
    for group_name, df_group in grouped:
        progress.update(task, advance=1)

        complex_deps = False
        for row_index, row in df_group.iterrows():
            if len(row["task_deps"]) > 1:
                complex_deps = True

        if complex_deps:
            filtered.append(df_group)
            
jobs = pd.concat(filtered)

In [None]:
len(jobs)

In [None]:
job_names = jobs["job_name"]

In [None]:
# Get a sample
sample_job_names = pd.DataFrame({"job_name": job_names.sample(n=30, random_state=1337)})
sample_jobs = jobs[jobs["job_name"].isin(sample_job_names["job_name"])]

In [None]:
# Save dataframe to a file for cutting down on re-processing times
jobs_cache_location = "data/jobs_cache"
jobs_names_cache_location = "data/jobs_names_cache" 

if not Path(jobs_cache_location).is_file() or not Path(jobs_names_cache_location).is_file():
    sample_jobs.to_pickle(jobs_cache_location)
    sample_job_names.to_pickle(jobs_names_cache_location)
    
sample_jobs = pd.read_pickle(jobs_cache_location)
sample_job_names = pd.read_pickle(jobs_names_cache_location)

In [None]:
sample_jobs["job_name"].value_counts()

In [None]:
sample_jobs.query('job_name == "j_3302772"')

In [None]:
# Ensure each job takes no more than 60 seconds

MAX_TIME = 60

for index, row in sample_jobs.iterrows():
    sample_jobs.loc[index, 'original_duration'] = row["duration"]
    if row["duration"] > MAX_TIME:
        print(f"scoping down {row['job_name']} task {row['task_name']} from {row['duration']}")
        sample_jobs.loc[index, 'duration'] = MAX_TIME
    


In [None]:
sample_jobs.query('job_name == "j_3302772"')

In [None]:
# Visualize the DAGs
%matplotlib inline


def build_graph(dag):
    graph = nx.DiGraph()
    dependencies = []
    for _, task_data in dag.iterrows():
        graph.add_node(task_data["task_name"], duration=task_data["duration"])
        unique_deps = set(task_data["task_deps"])
        for dependency_index in unique_deps:
            for _, dependency in dag.iterrows():
                if dependency["task_index"] == dependency_index:
                    dependencies.append((dependency["task_name"], task_data["task_name"]))
    graph.add_edges_from(dependencies)
    return graph


def critical_path(graph):
    distance = defaultdict(lambda: 0)
    indegree = defaultdict(lambda: 0)
    critical_path = 0
    queue = deque()
    
    for f, t in graph.edges():
        indegree[t] += 1
        
    for node in graph.nodes():
        if indegree[node] == 0:
            queue.append(node)
            distance[node] = graph.nodes[node]["duration"]
    
    while len(queue) != 0:
        top = queue.popleft()
        for f, t in graph.edges(top):
            assert f == top
            
            indegree[t] -= 1
            node_data = graph.nodes(top)
            distance[t] = max(distance[t], distance[top] + graph.nodes[t]["duration"])
            
            if indegree[t] == 0:
                queue.append(t)
    
    for _, value in distance.items():
        if value > critical_path:
            critical_path = value
    
    return critical_path


def total_work(graph):
    work = 0
    
    for node in graph.nodes():
        work += graph.nodes[node]["duration"]
        
    return work


def adjusted_tasks(dag):
    changes = 0
    for _, task_data in dag.iterrows():
        if task_data["duration"] != task_data["original_duration"]:
            changes += 1
    return changes


def draw(graph, name):
    plt.figure(name)
    plt.title(name)
    mapping = {}
    for node in graph.nodes():
        mapping[node] = graph.nodes[node]["duration"]
    pos = graphviz_layout(graph, prog='dot')  
    nx.draw(
        graph,
        pos,
        with_labels=False,
        node_size=150,
        node_color="#000000",
        width=0.8,
        font_size=14,
    )
    nx.draw_networkx_labels(graph, pos, mapping, font_size=8, font_color="whitesmoke")


job_info = pd.DataFrame(columns=['job_name', 'critical_path'])

for job_name in sample_job_names["job_name"]:
    job_data = sample_jobs[sample_jobs["job_name"] == job_name].copy()
    graph = build_graph(job_data)
    
    info = pd.DataFrame({
        'job_name': job_name,
        'critical_path': critical_path(graph),
        'tasks': len(job_data),
        'total_work': total_work(graph),
        'adjusted_tasks': adjusted_tasks(job_data),
    }, index=[0])
    job_info = pd.concat([job_info, info], ignore_index=True)
    
    draw(graph, job_name)
    


In [None]:
job_info.sort_values('tasks', ascending=False)

In [None]:
# Export to Airflow's format

from pathlib import Path
import shutil


MAX_DURATION = 60
INDENT = "    "
DIRECTORY = "data/generated_dags"

def task_template(task_data, job_data) -> (str, [str]):
    dependencies = []
    base_task = f"""
task_{task_data['task_name']} = BashOperator(
    task_id='{task_data['task_name']}',
    bash_command='sleep {min(task_data['duration'], MAX_DURATION)}',
)
    """.strip()
    
    unique_deps = set(task_data["task_deps"])
    for dependency_index in unique_deps:
        dependency = None
        for _, task in job_data.iterrows():
            if task["task_index"] == dependency_index:
                dependency = task
                break
        assert dependency is not None
        
        dependency_template = f"""
task_{dependency['task_name']} >> task_{task_data['task_name']}
""".strip()
        dependencies.append(dependency_template)
    
    return base_task, dependencies

for job_name in sample_job_names["job_name"]:
    job_data = sample_jobs[sample_jobs["job_name"] == job_name].copy()
    info = job_info[job_info["job_name"] == job_name].copy().iloc[0]
    schedule = 5 if info["critical_path"] < 201 else 10
    
    
    imports = f"""
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator


with DAG(
    dag_id='{job_name}',
    schedule_interval='*/{schedule} * * * *',
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
""".strip()
    
    templated_tasks = []
    templated_dependencies = []
    
    for _, task_data in job_data.iterrows():
        task_templated, dependencies_templated = task_template(task_data, job_data)
        templated_tasks.append(task_templated)
        templated_dependencies.extend(dependencies_templated)
    
    Path(DIRECTORY).mkdir(parents=True, exist_ok=True)
    Path(f'{DIRECTORY}/{job_name}').mkdir(parents=True, exist_ok=True)
    
    with open(f'{DIRECTORY}/{job_name}.py', 'w+') as f:
        f.write(imports)
        
        task_lines = "\n".join(templated_tasks)
        tasks_data = [f"{INDENT}{line}" for line in task_lines.split("\n")]
        tasks = "\n".join(tasks_data)
        
        f.write("\n")
        f.write(tasks)
        
        dependencies_lines = "\n".join(templated_dependencies)
        dependencies_data = [f"{INDENT}{line}" for line in dependencies_lines.split("\n")]
        deps = "\n".join(dependencies_data)
        
        f.write("\n")
        f.write(deps)
        f.write("\n")
    
    shutil.copyfile(f'{DIRECTORY}/{job_name}.py', f'{DIRECTORY}/{job_name}/{job_name}.py')
        
    

In [None]:
# Get beeflow experiments config
    
experiments = []

for job_name in job_info.sort_values('tasks', ascending=False)["job_name"]:
    experiments.append(
    {
        "dags_local_path": f'notebooks/{DIRECTORY}/{job_name}',
        "dag_ids": [job_name],
        "metrics_collection_time_seconds": 3600,
        "experiment_id": job_name,    
    })
    
print(json.dumps(experiments, indent=2))