In [141]:
import csv
import sys
import pickle
import random
import pandas as pd
from dataclasses import dataclass
from collections import defaultdict, Counter
from typing import Mapping, List, Set, Tuple

In [18]:
# Define a Task dataclass for storage of Task information.
@dataclass
class Task:
    name: str
    job: str
    instances: int
    status: str
    start_time: float
    end_time: float
    expected_duration: float
    actual_duration: float
    cpu_requested: float
    cpu_usage: float
    mem_requested: float
    mem_usage: float

In [2]:
batch_task_data = pd.read_csv(
    "./data/batch_task.csv",
    header=None,
    names=["task_name", "instance_num", "job_name", "task_type", "status", "start_time", "end_time", "plan_cpu", "plan_mem"]
)
print("[x] Loaded {} DAGs from the CSV.".format(len(batch_task_data)))

[x] Loaded 14295731 DAGs from the CSV.


In [19]:
job_names_to_tasks: Mapping[str, List[Task]] = defaultdict(list)
num_rows = len(batch_task_data)
output_frequency = len(batch_task_data) / 20
for index, row in batch_task_data.iterrows():
    job_names_to_tasks[row.job_name].append(
        Task(
            name=row.task_name,
            job=row.job_name,
            instances=row.instance_num,
            status=row.status,
            start_time=row.start_time,
            end_time=row.end_time,
            expected_duration=(row.end_time - row.start_time),
            actual_duration=(row.end_time - row.start_time),
            cpu_requested=row.plan_cpu,
            cpu_usage=row.plan_cpu,
            mem_requested=row.plan_mem,
            mem_usage=row.plan_mem))
    if index % output_frequency == 0:
        print(f"[x] Processed {index} / {num_rows} rows and retrieved {len(job_names_to_tasks)} job definitions.")

[x] Processed 0 / 14295731 rows and retrieved 1 job definitions.


In [32]:
# Mapping of (task_name, job_name) to a list of [duration, cpu_usage, mem_usage, num_instances]
# We use this to aggregate statistics about each task of all the jobs, and will then merge it with
# the tasks created above.
job_task_statistics: Mapping[Tuple[str, str], List[int]] = defaultdict(lambda: [0, 0, 0, 0])
with open('data/batch_instance.csv', 'r') as batch_instance:
    datareader = csv.reader(batch_instance)
    counter = 0
    for row in datareader:
        if not row[1][1].isdigit() or row[4] != "Terminated":
            continue
            
        duration = None
        cpu_usage = None
        mem_usage = None
        try:
            # Add duration statistics.
            duration = int(row[6]) - int(row[5])
            # Add CPU statistics
            cpu_usage = float(row[10])
            # Add memory statistics.
            mem_usage = float(row[12])
            # Append the number of observations made.
            stats_for_this_task[3] += 1
        except ValueError as e:
            continue
            
        # Add to the statistics if everything works.
        stats_for_this_task = job_task_statistics[(row[2], row[1])]
        stats_for_this_task[0] += duration
        stats_for_this_task[1] += cpu_usage
        stats_for_this_task[2] += mem_usage
        stats_for_this_task[3] += 1
        
        counter += 1
        if counter % 1e7 == 0:
            print(f"[x] Finished processing {counter} rows and have results for {len(job_task_statistics)} tasks.")

[x] Finished processing 10000000 rows and have results for 94972 tasks.
[x] Finished processing 20000000 rows and have results for 192431 tasks.
[x] Finished processing 30000000 rows and have results for 288936 tasks.
[x] Finished processing 40000000 rows and have results for 384740 tasks.
[x] Finished processing 50000000 rows and have results for 481181 tasks.
[x] Finished processing 60000000 rows and have results for 578491 tasks.
[x] Finished processing 70000000 rows and have results for 672802 tasks.
[x] Finished processing 80000000 rows and have results for 768920 tasks.
[x] Finished processing 90000000 rows and have results for 866965 tasks.
[x] Finished processing 100000000 rows and have results for 965765 tasks.
[x] Finished processing 110000000 rows and have results for 1060606 tasks.
[x] Finished processing 120000000 rows and have results for 1157846 tasks.
[x] Finished processing 130000000 rows and have results for 1253015 tasks.
[x] Finished processing 140000000 rows and ha

In [40]:
counter = 0
for job_name, tasks in job_names_to_tasks.items():
    for task in tasks:
        key = (job_name, task.name)
        if key in job_task_statistics:
            duration, cpu_usage, mem_usage, num_samples = job_task_statistics[key]
            if num_samples == 0:
                continue
            task.actual_duration = float(duration) / num_samples
            task.cpu_usage = cpu_usage / num_samples
            task.mem_usage = mem_usage / num_samples
            if counter % 20000 == 0:
                print(f'[x] Finished processing {counter} samples from {len(job_task_statistics)} tasks.')
            counter += 1

[x] Finished processing 0 samples from 11895518 tasks.
[x] Finished processing 20000 samples from 11895518 tasks.
[x] Finished processing 40000 samples from 11895518 tasks.
[x] Finished processing 60000 samples from 11895518 tasks.
[x] Finished processing 80000 samples from 11895518 tasks.
[x] Finished processing 100000 samples from 11895518 tasks.
[x] Finished processing 120000 samples from 11895518 tasks.
[x] Finished processing 140000 samples from 11895518 tasks.
[x] Finished processing 160000 samples from 11895518 tasks.
[x] Finished processing 180000 samples from 11895518 tasks.
[x] Finished processing 200000 samples from 11895518 tasks.
[x] Finished processing 220000 samples from 11895518 tasks.
[x] Finished processing 240000 samples from 11895518 tasks.
[x] Finished processing 260000 samples from 11895518 tasks.
[x] Finished processing 280000 samples from 11895518 tasks.
[x] Finished processing 300000 samples from 11895518 tasks.
[x] Finished processing 320000 samples from 11895

In [90]:
# Filter out the DAGs that do not have the dependency information.
filtered_dags: Set[str] = set()
counter = 0
for job_name, tasks in job_names_to_tasks.items():
    counter += 1
    is_filtered = False
    for task in tasks:
        if not task.name[1].isdigit() or task.status != "Terminated" or 'Stg' in task.name:
            is_filtered = True
            break
    if is_filtered:
        filtered_dags.add(job_name)
    if counter % 1e6 == 0:
        print(f'[x] Processed {counter} jobs from {len(job_names_to_tasks)} jobs.')

[x] Processed 1000000 jobs from 4201014 jobs.
[x] Processed 2000000 jobs from 4201014 jobs.
[x] Processed 3000000 jobs from 4201014 jobs.
[x] Processed 4000000 jobs from 4201014 jobs.


In [103]:
# Find the entire set of DAGs that need to be written to the pickle files.
jobs_to_pickle: Set[str] = set()
for job_name in job_names_to_tasks.keys():
    if job_name not in filtered_dags:
        jobs_to_pickle.add(job_name)

In [104]:
print(f'[x] A total of {len(jobs_to_pickle)} DAGs were extracted from a total of {len(job_names_to_tasks)} available DAGs.') 

[x] A total of 3141545 DAGs were extracted from a total of 4201014 available DAGs.


In [120]:
# Find all the straight chain DAGs in the system.
straight_chain_dags: Set[str] = set()
for job_name in jobs_to_pickle:
    is_straight_chain_dag = True
    visited_parents: Set[str] = set()
    for task in job_names_to_tasks[job_name]:
        if len(task.name.split('_')) > 2:
            is_straight_chain_dag = False
            break
        parent_task = task.name[1]
        if parent_task in visited_parents:
            is_straight_chain_dag = False
            break
        visited_parents.add(parent_task)
        
    if is_straight_chain_dag:
        straight_chain_dags.add(job_name)

In [121]:
# Find the length of the straight chain DAGs.
straight_chain_dag_length: Mapping[int, List[str]] = defaultdict(list)

for job_name in straight_chain_dags:
    job_tasks = job_names_to_tasks[job_name]
    straight_chain_dag_length[len(job_tasks)].append(job_name)

In [125]:
print(f'[x] Found a total of {len(straight_chain_dags)} straight chain DAGs with the lengths: {straight_chain_dag_length.keys()}')

[x] Found a total of 2253617 straight chain DAGs with the lengths: dict_keys([1, 2, 8, 3, 4, 5, 6, 9, 7])


In [129]:
_rng = random.Random()
_rng.seed(42)

def split_into_sets(job_names: Set[str], num_sets: int = 500) -> List[Set[str]]:
    split_sets = [set() for _ in range(num_sets)]
    for job_name in job_names:
        chosen_set = _rng.choice(split_sets)
        chosen_set.add(job_name)
    return split_sets

In [145]:
filtered_dag_splits = split_into_sets(filtered_dags, num_sets=150)

In [146]:
min_length = sys.maxsize
max_length = 0
for split_set in filtered_dag_splits:
    set_length = len(split_set)
    if set_length < min_length:
        min_length = set_length
    if set_length > max_length:
        max_length = set_length

print(f'[x] Created {len(filtered_dag_splits)} sets with size range between {min_length} and {max_length}.')

[x] Created 150 sets with size range between 6877 and 7251.


In [147]:
pickle_file_name = "alibaba_random_dags_{}.pkl"
for i, split_set in enumerate(filtered_dag_splits):
    split_job_tasks = {}
    for job_name in split_set:
        split_job_tasks[job_name] = job_names_to_tasks[job_name]
    with open(pickle_file_name.format(i), 'wb') as f:
        pickle.dump(split_job_tasks, f)
    print('[x] Pickled {}'.format(pickle_file_name.format(i))) 

[x] Pickled alibaba_random_dags_0.pkl
[x] Pickled alibaba_random_dags_1.pkl
[x] Pickled alibaba_random_dags_2.pkl
[x] Pickled alibaba_random_dags_3.pkl
[x] Pickled alibaba_random_dags_4.pkl
[x] Pickled alibaba_random_dags_5.pkl
[x] Pickled alibaba_random_dags_6.pkl
[x] Pickled alibaba_random_dags_7.pkl
[x] Pickled alibaba_random_dags_8.pkl
[x] Pickled alibaba_random_dags_9.pkl
[x] Pickled alibaba_random_dags_10.pkl
[x] Pickled alibaba_random_dags_11.pkl
[x] Pickled alibaba_random_dags_12.pkl
[x] Pickled alibaba_random_dags_13.pkl
[x] Pickled alibaba_random_dags_14.pkl
[x] Pickled alibaba_random_dags_15.pkl
[x] Pickled alibaba_random_dags_16.pkl
[x] Pickled alibaba_random_dags_17.pkl
[x] Pickled alibaba_random_dags_18.pkl
[x] Pickled alibaba_random_dags_19.pkl
[x] Pickled alibaba_random_dags_20.pkl
[x] Pickled alibaba_random_dags_21.pkl
[x] Pickled alibaba_random_dags_22.pkl
[x] Pickled alibaba_random_dags_23.pkl
[x] Pickled alibaba_random_dags_24.pkl
[x] Pickled alibaba_random_dags_25.