In [None]:
import numpy as np
import pandas as pd
import os
import re
import networkx as nx
import matplotlib.pyplot as plt
from collections import defaultdict
from tqdm import tqdm
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).



# **Instructions**

This code analyzes the Alibaba Clusters (visited and processed locally) by levels

**Input**: 'batch_instance.csv' and 'batch_task.csv' from https://github.com/alibaba/clusterdata/blob/master/cluster-trace-v2018/fetchData.sh

**Output 1**: 'level_graphene_result.txt' and 'resource_utilization_by_job.txt' contains four Graphene parameters for all of the DAGs.

**Output 2**: 'levels/' directory contains all of the DAG in each level
          'instance_level.csv' csv file temporarily contains all of the instance infomation of the all of the DAGs in the specific level

In [None]:
def load_job_ids(file_path):
    """
    Reads integers from a text file (one per line) and returns them as a list.

    Args:
        file_path (str): Path to the text file.

    Returns:
        list: A list of integers loaded from the file.
    """
    desired_levels = []

    # Read the file and populate desired_levels
    with open(file_path, "r") as file:
        for line in file:
            try:
                # Convert the line to an integer and append to the list
                job_id = int(line.strip())
                desired_levels.append(job_id)
            except ValueError:
                # Ignore lines that cannot be converted to integers
                continue

    print(f"Job IDs loaded into desired_levels: {desired_levels}")
    return desired_levels

In [None]:
# Folder path containing the CSV files
folder_path = "path/to/the/Alibaba/Data"

# Specify the names of your CSV files
file1_name = "batch_task.csv"
file2_name = "batch_instance.csv"

# Load each file into a separate DataFrame variable
file1_path = os.path.join(folder_path, file1_name)
file2_path = os.path.join(folder_path, file2_name)
selected_columns = ["instance_name", "task_name", "job_name", "start_time", "end_time"]


In [None]:
df1 = pd.read_csv(file1_path, header=None, usecols=[0, 1, 2, 5, 6])
df1 = df1[~df1[0].str.contains(r'^task_|MergeTask', na=False)]
# Filter rows where column [2] matches job IDs in 'desired_levels'
df1 = df1[df1[2].isin([f"j_{job_id}" for job_id in desired_levels])]
# df1.head()

In [None]:
# Step 1: Define a function to extract the job ID (e.g., 'j_x' from "j_123")
def extract_job_id(group_info):
    match = re.match(r'j_(\d+)', group_info)
    return int(match.group(1)) if match else None

In [None]:
# Apply the function to extract job IDs and add a column for job grouping
df1['job_id'] = df1[2].apply(extract_job_id)
print('Done with extract job_id')

# Apply the function to create task dependencies
df1['task_id'] = df1[0].str.extract(r'^[^\d]*(\d+)').astype(int)

# Extract dependencies by finding all numbers after the first underscore and converting them to lists
df1['dependencies'] = df1[0].str.findall(r'_(\d+)').apply(lambda x: [int(dep) for dep in x])

Done with extract job_id


In [None]:
# Step 4: Create a grouped dictionary for further analysis
# Initialize the grouped dictionary
grouped_data = defaultdict(lambda: defaultdict(list))

# Populate the dictionary and calculate duration in a single iteration
for _, row in df1.iterrows():
    job_id = row['job_id']
    task_id = row['task_id']
    # Simplified duration calculation
    duration = row[6] - row[5]
    # Append the row data along with calculated duration to grouped_data
    task_info = row.values.tolist() + [duration]
    grouped_data[job_id][task_id].append(task_info)

print("Data grouped with duration successfully.")

Data grouped with duration successfully.


In [None]:
# Group tasks by job ID and create the DAGs
job_dags = {}
hierarchy_levels = {}
for job_id, group in tqdm(df1.groupby('job_id'), desc="Processing jobs"):
    G = nx.DiGraph()

    # Add nodes and edges based on dependencies within each job
    for _, row in group.iterrows():
        task_id = row['task_id']
        G.add_node(task_id)  # Add task as a node in the graph
        for dep in row['dependencies']:
            G.add_edge(dep, task_id)  # Add directed edge for each dependency

    # Store the graph for this job
    job_dags[job_id] = G

    # Uncomment for drawing
    try:
        # Check if the graph is acyclic (i.e., no cycles)
        if nx.is_directed_acyclic_graph(G):
            hierarchy_levels[job_id] = nx.dag_longest_path_length(G)
        else:
            print(f"Cycle detected in job {job_id}. Skipping longest path calculation.")
            hierarchy_levels[job_id] = None  # Or handle it as needed (e.g., set to 0 or other value)
    except nx.NetworkXError as e:
        print(f"Error processing job {job_id}: {e}")
        hierarchy_levels[job_id] = 0  # Handle the error by assigning a default value or skipping the job

# Filter out jobs with None values in hierarchy_levels
filtered_hierarchy_levels = {job_id: level for job_id, level in hierarchy_levels.items() if level is not None}

In [None]:
# Create a folder named "levels" if it doesn't already exist
os.makedirs("levels", exist_ok=True)

# Loop through the unique values in the dictionary
for value in set(filtered_hierarchy_levels.values()):
    # Collect keys with the current value
    desired_levels = [key for key, val in filtered_hierarchy_levels.items() if val == value]

    # Create a file named "levels_<value>.txt" inside the "levels" folder
    filename = os.path.join("levels", f"levels_{value}.txt")
    with open(filename, "w") as file:
        # Write each key to the file
        for level in desired_levels:
            file.write(f"{level}\n")

    print(f"Keys with value {value} saved to {filename}.")

In [None]:
# Load job IDs into desired_levels. Take DAG with 11 levels as an example
level_file = "levels/levels_11.txt"
desired_levels = load_job_ids(level_file)

In [None]:
# Load the top DAGs from large csv file and
# Numbers to match in the 'j_xxxxx' format
# numbers_to_match = ['1161482', '1667543', '3105001']
numbers_to_match = desired_levels
pattern = '|'.join([f'j_{num}' for num in numbers_to_match])

# Dictionary to count the number of rows collected for each number
collected_rows = {num: [] for num in numbers_to_match}

# Read the CSV file in chunks
chunksize = 10**6  # Adjust chunk size if needed
for chunk in pd.read_csv(file2_path, chunksize=chunksize, header=None):
    # Filter rows that match the pattern in column 2
    matching_rows = chunk[chunk[2].str.contains(pattern, na=False)]

    # Use a single loop to categorize rows into collected_rows
    matching_rows = matching_rows.copy()
    matching_rows['group'] = matching_rows[2].str.extract(r'j_(\d+)')[0].astype(int)  # Extract the number after "j_"
    matching_rows = matching_rows[matching_rows['group'].isin(numbers_to_match)]  # Keep only relevant numbers

    for number in numbers_to_match:
        # Append rows matching the current number
        rows = matching_rows[matching_rows['group'] == number]
        collected_rows[number].extend(rows[[0, 1, 2, 5, 6, 10, 12]].values.tolist())

    # Check if we have collected rows for all specified numbers
    if all(collected_rows[num] for num in numbers_to_match):
        break


# Combine all collected rows into a single DataFrame
filtered_rows = []
for rows in collected_rows.values():
    filtered_rows.extend(rows)
df2 = pd.DataFrame(filtered_rows)
df2.to_csv('instance_level.csv')


# Load in df2 from local .csv file
df2 = pd.read_csv('instance_level.csv',usecols=range(1,8))
df2.columns=[0,1,2,3,4,5,6]
df2['task_id'] = df2[1].str.extract(r'^[^\d]*(\d+)').astype(int)
df2['instance_time'] = df2[4] - df2[3]  # Column 6 - Column 5
df2['resource_usage_1'] = df2[5] * df2['instance_time']
df2['resource_usage_2'] = df2[6] * df2['instance_time']
# df2.head()

In [None]:
# Find bottleneck for all of the selected DAGs
# specified_job_ids = [1161482, 1667543, 3105001]  # Replace with the actual job IDs you want to analyze
specified_job_ids = desired_levels
# Dictionary to store selected DAGs and bottleneck points for the specified jobs
selected_dags = {}
bottleneck_points = {}

# Process only the specified job IDs
for job_id, group in tqdm(df1.groupby('job_id'), desc="Processing specified jobs"):
    if job_id in specified_job_ids:
        G = nx.DiGraph()

        # Add nodes and edges based on dependencies within each specified job
        for _, row in group.iterrows():
            task_id = row['task_id']
            G.add_node(task_id)  # Add task as a node in the graph
            for dep in row['dependencies']:
                G.add_edge(dep, task_id)  # Add directed edge for each dependency

        # Store the graph for this job
        selected_dags[job_id] = G

        # Step 1: Assign levels to nodes using topological sort
        levels = defaultdict(list)
        for node in nx.topological_sort(G):
            level = 0
            for pred in G.predecessors(node):
                level = max(level, levels[pred][0] + 1)
            levels[node] = [level]

        # Step 2: Group nodes by levels
        level_nodes = defaultdict(list)
        for node, level_info in levels.items():
            level_nodes[level_info[0]].append(node)

        # Step 3: Find levels with only one node, excluding source and sink levels
        unique_levels = [
            level for level, nodes in level_nodes.items()
            if len(nodes) == 1 and level != 0 and level != max(level_nodes.keys())
        ]

        unique_nodes=[
            key for key, val_list in levels.items() if any(target in val_list for target in unique_levels)
        ]

        # Step 3: Find the bottleneck node based on the conditions
        for node in unique_nodes:
            # Get ancestors, descendants, and check the condition
            ancestors = set(nx.ancestors(G, node))
            descendants = set(nx.descendants(G, node))
            remaining_nodes = set(G.nodes) - ancestors - descendants - {node}

            # Ensure remaining nodes is empty and levels above/below have more than one node
            node_level = levels[node][0]
            above_level_nodes = [n for n in levels if levels[n][0] == node_level - 1]
            below_level_nodes = [n for n in levels if levels[n][0] == node_level + 1]

            # Count in-degree and out-degree
            in_degree = G.in_degree(node)  # Number of edges pointing to the node
            out_degree = G.out_degree(node)  # Number of edges pointing out from the node

            # print('checking',node, remaining_nodes, in_degree, out_degree)

            if len(remaining_nodes) == 0 and (in_degree > 1 or out_degree > 1):
                bottleneck_points[job_id] = node

# Remove job_id entries from selected_dags that are not in bottleneck_points
selected_dags = {job_id: G for job_id, G in selected_dags.items() if job_id in bottleneck_points}

In [None]:
# Three calculate at once
# Resource capacities (replace with actual values)
resource_capacity_1 = 9600
resource_capacity_2 = 100

# Dictionaries to store the critical path lengths, twork, and modcp for each sub-DAG
cplength = defaultdict(lambda: {"subdag_1": 0, "subdag_2": 0})
twork_subdags = defaultdict(lambda: {"subdag_1": 0, "subdag_2": 0})
modcp = defaultdict(lambda: {"subdag_1": 0, "subdag_2": 0})

# Iterate over each DAG
for job_id, G in selected_dags.items():
    # Get the bottleneck node for this DAG
    bottleneck_node = bottleneck_points[job_id]
    subdag_1_nodes = set(nx.ancestors(G, bottleneck_node))
    subdag_2_nodes = set(nx.descendants(G, bottleneck_node)).union({bottleneck_node})

    # Create two sub-DAGs
    subdag_1 = G.subgraph(subdag_1_nodes).copy()
    subdag_2 = G.subgraph(subdag_2_nodes).copy()

    # Step 1: Calculate critical path length for both sub-DAGs
    task_durations = {task_id: sum(row[-1] for row in grouped_data[job_id][task_id]) for task_id in G.nodes}

    def calculate_longest_path_duration(subdag, task_durations):
        max_path_duration = 0
        start_nodes = [node for node in subdag.nodes if subdag.in_degree(node) == 0]
        end_nodes = [node for node in subdag.nodes if subdag.out_degree(node) == 0]

        # Calculate longest path for each path in the sub-DAG
        for start_node in start_nodes:
            for end_node in end_nodes:
                for path in nx.all_simple_paths(subdag, source=start_node, target=end_node):
                    path_duration = sum(task_durations[task] for task in path)
                    max_path_duration = max(max_path_duration, path_duration)
        return max_path_duration

    cplength[job_id]["subdag_1"] = calculate_longest_path_duration(subdag_1, task_durations)
    cplength[job_id]["subdag_2"] = calculate_longest_path_duration(subdag_2, task_durations)

    # Step 2: Calculate `twork` for both sub-DAGs
    for subdag_name, subdag_nodes in zip(["subdag_1", "subdag_2"], [subdag_1_nodes, subdag_2_nodes]):
        subdag_group = df2[(df2[2] == f"j_{job_id}") & (df2['task_id'].isin(subdag_nodes))]

        total_usage_1 = subdag_group['resource_usage_1'].sum()
        total_usage_2 = subdag_group['resource_usage_2'].sum()

        max_usage = max(total_usage_1 / resource_capacity_1, total_usage_2 / resource_capacity_2)
        twork_subdags[job_id][subdag_name] = max_usage

    # Step 3: Calculate `modcp` for each sub-DAG
    for subdag_name, subdag_nodes in zip(["subdag_1", "subdag_2"], [subdag_1_nodes, subdag_2_nodes]):
        subdag = G.subgraph(subdag_nodes).copy()  # Extract the sub-DAG graph

        # Calculate `cplength` and `twork` for each task
        task_metrics = {}
        for task_id in subdag.nodes:
            matching_rows = df2[(df2[2] == f"j_{job_id}") & (df2['task_id'] == task_id)]

            # Calculate task-specific cplength
            task_cplength = matching_rows["instance_time"].sum()

            # Calculate task-specific twork
            resource_usage_1_sum = matching_rows["resource_usage_1"].sum()
            resource_usage_2_sum = matching_rows["resource_usage_2"].sum()
            task_twork = max(resource_usage_1_sum / resource_capacity_1, resource_usage_2_sum / resource_capacity_2)

            task_metrics[task_id] = {"cplength": task_cplength, "twork": task_twork}

        # Retrieve minimum instance time for each task in the sub-DAG from grouped_data
        min_instance_times = {task_id: min(row[-1] for row in grouped_data[job_id][task_id]) for task_id in subdag.nodes}

        # Calculate modcp for each path in the sub-DAG
        max_modcp_path = 0
        start_nodes = [node for node in subdag.nodes if subdag.in_degree(node) == 0]
        end_nodes = [node for node in subdag.nodes if subdag.out_degree(node) == 0]

        for start_node in start_nodes:
            for end_node in end_nodes:
                for path in nx.all_simple_paths(subdag, source=start_node, target=end_node):
                    # For each path, calculate modcp
                    path_modcp_values = []

                    for i, task in enumerate(path):
                        task_cplength = task_metrics[task]["cplength"]
                        task_twork = task_metrics[task]["twork"]
                        max_cplength_twork = max(task_cplength, task_twork)

                        min_times_sum = sum(
                            min_instance_times[other_task]
                            for j, other_task in enumerate(path) if i != j
                        )

                        path_modcp_value = max_cplength_twork + min_times_sum
                        path_modcp_values.append(path_modcp_value)

                    path_max_modcp = max(path_modcp_values)
                    max_modcp_path = max(max_modcp_path, path_max_modcp)

        modcp[job_id][subdag_name] = max_modcp_path

In [None]:
# Print information within modcp, cplength, and twork_subdags for testing
print("=== modcp ===")
for job_id, subdags in modcp.items():
    print(f"Job ID: {job_id}")
    for subdag_name, modcp_value in subdags.items():
        print(f"  {subdag_name}: modcp = {modcp_value}")
    break

print("=== cplength ===")
for job_id, subdags in cplength.items():
    print(f"Job ID: {job_id}")
    for subdag_name, cplength_value in subdags.items():
        print(f"  {subdag_name}: cplength = {cplength_value}")
    break

print("=== twork_subdags ===")
for job_id, subdags in twork_subdags.items():
    print(f"Job ID: {job_id}")
    for subdag_name, twork_value in subdags.items():
        print(f"  {subdag_name}: twork = {twork_value}")
    break

In [None]:
# Dictionaries to store the newlb, summed cplength, and twork for each DAG
newlb = {}
summed_cplength = {}
summed_twork = {}

# Iterate over each job (DAG) to calculate the newlb, summed cplength, and summed twork
for job_id in selected_dags.keys():
    # Calculate the maximum value between cplength, twork, and modcp for each sub-DAG
    subdag_1_max = max(cplength[job_id]["subdag_1"], twork_subdags[job_id]["subdag_1"], modcp[job_id]["subdag_1"])
    subdag_2_max = max(cplength[job_id]["subdag_2"], twork_subdags[job_id]["subdag_2"], modcp[job_id]["subdag_2"])

    # Sum the maximum values of the two sub-DAGs for newlb
    newlb_value = subdag_1_max + subdag_2_max
    newlb[job_id] = newlb_value

    # Sum the cplength values for both sub-DAGs
    summed_cplength_value = cplength[job_id]["subdag_1"] + cplength[job_id]["subdag_2"]
    summed_cplength[job_id] = summed_cplength_value

    # Sum the twork values for both sub-DAGs
    summed_twork_value = twork_subdags[job_id]["subdag_1"] + twork_subdags[job_id]["subdag_2"]
    summed_twork[job_id] = summed_twork_value

# Print the results
print("=== Results ===")
for job_id in selected_dags.keys():
    print(f"Job ID: {job_id}")
    print(f"  newlb = {newlb[job_id]}")
    print(f"  Summed cplength = {summed_cplength[job_id]}")
    print(f"  Summed twork = {summed_twork[job_id]}")
    break



In [None]:
# File path for saving the results
output_file = "level_graphene_result.txt"

# Save data to the file
with open(output_file, "w") as file:
    # Write the header row
    file.write("Job ID\tNewLB\tSummed CPLength\tSummed TWork\n")

    # Write data for each job_id
    for job_id in selected_dags.keys():
        file.write(f"{job_id}\t{newlb[job_id]}\t{summed_cplength[job_id]}\t{summed_twork[job_id]}\n")

print(f"Results saved to {output_file}")

# Aggregate resource utilization by job_id
resource_utilization_by_job = defaultdict(lambda: {"resource_1": 0, "resource_2": 0})

for job_id in selected_dags.keys():  # Iterate over the job_ids from selected_dags
    # Filter df2 for the current job_id
    job_df = df2[df2[2] == f"j_{job_id}"]

    # Sum resource usage for the job
    total_usage_1 = job_df[5].sum()
    total_usage_2 = job_df[6].sum()

    # Normalize by resource capacities and store the results
    resource_utilization_by_job[job_id]["resource_1"] = total_usage_1 / resource_capacity_1
    resource_utilization_by_job[job_id]["resource_2"] = total_usage_2 / resource_capacity_2

# Save the normalized utilization into a .txt file
output_file = "resource_utilization_by_job.txt"

with open(output_file, "w") as file:
    # Write header
    file.write("Job ID\tResource 1 Utilization\tResource 2 Utilization\n")

    # Write data
    for job_id, utilization in resource_utilization_by_job.items():
        file.write(f"{job_id}\t{utilization['resource_1']:.6f}\t{utilization['resource_2']:.6f}\n")

print(f"Normalized resource utilization saved to {output_file}")