# ML4HPC: Ensemble of Forecasters

### Team Members:
- Luca Venerando Greco
- Bice Marzagora
- Elia Vaglietti


### Importing Libraries

In this notebook, we start by importing several essential libraries that are used throughout the workflow:

- `os`: Provides functions for interacting with the operating system, such as creating directories and handling file paths.
- `sys`: Provides access to some variables used or maintained by the Python interpreter and to functions that interact strongly with the interpreter.
- `matplotlib.pyplot`: A plotting library used for creating static, animated, and interactive visualizations in Python.
- `numpy`: A fundamental package for scientific computing with Python, used for working with arrays and matrices.
- `time`: Provides various time-related functions.
- `tqdm`: A library for creating progress bars and progress meters.

These libraries are crucial for tasks such as data manipulation, visualization, and managing the execution of jobs in a high-performance computing environment.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from keras.datasets import mnist
import jax
import jax.numpy as jnp
from jax import grad
import time
import os
import tqdm

2024-12-02 17:14:28.763835: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-12-02 17:14:28.768357: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-12-02 17:14:28.780359: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1733156068.799426 1195325 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1733156068.805278 1195325 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-02 17:14:28.826551: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU ins

### Directory Setup and Job Configuration

We now set up the necessary directories and define the job configurations. Specifically, we create folders for storing charts, data, and logs if they do not already exist. We also define the number of forecasters and nodes for different scaling tests.

If no new data is needed, set the `GENERATE_DATA` variable to `False` to skip the data generation step.


In [2]:
current_dir = os.getcwd()

charts_folder = "charts"
data_folder = "data"
logs_folder = "logs"

if not os.path.exists(data_folder):
    os.makedirs(data_folder)

if not os.path.exists(logs_folder):
    os.makedirs(logs_folder)

if not os.path.exists(charts_folder):
    os.makedirs(charts_folder)

ten_nodes = 10
strong_scaling_nodes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]  # Number of nodes to test
weak_scaling_nodes   = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]  # Number of nodes to test

n_runs = 30

GENERATE_DATA = True

### Job Submission Function

We define a function `submit_job` that handles the submission of jobs to the scheduler. This function takes the number of nodes, the number of forecasters, and a job name as input parameters. It creates the necessary directories for storing data and logs, reads a template launch script, formats it with the provided parameters, writes the formatted script to a file, and submits the job using the `sbatch` command.

In [3]:
def submit_job(launch_file, num_nodes, job_name, num_epochs):
    num_tasks_per_node = 128

    if num_tasks_per_node > 128:
        print("The number of tasks per node should be less than or equal to 128")
        exit(1)

    if not os.path.exists(f"{data_folder}/{job_name}"):
        os.makedirs(f"{data_folder}/{job_name}")

    if not os.path.exists(f"{logs_folder}/{job_name}"):
        os.makedirs(f"{logs_folder}/{job_name}")

    with open(launch_file, 'r') as file:
        launch_script = file.read()

    launch_script = launch_script.format(
        num_nodes=num_nodes,
        num_tasks_per_node=num_tasks_per_node,
        current_dir=current_dir,
        world_size=num_nodes*num_tasks_per_node,
        num_epochs=num_epochs,
        data_folder=f"{data_folder}/{job_name}",
        logs_folder=f"{logs_folder}/{job_name}"
    )

    script_filename = f"{logs_folder}/{job_name}/{launch_file.split('/')[-1]}"
    with open(script_filename, "w") as script_file:
        script_file.write(launch_script)

    os.system(f"sbatch {script_filename}")

### Defining Test Functions

In the following sections, we define functions to run different scalability tests. These functions will help us automate the process of submitting jobs for one million forecasters, strong scaling, and weak scaling tests. Each function will generate a unique job name, submit the job using the `submit_job` function, and return the job names for tracking purposes.

In [4]:
def ten_nodes_test():
    job_names = []
    for i in range(n_runs):
        run_dir = f"{data_folder}/ten_nodes_test/run_{i}"
        if not os.path.exists(run_dir):
            os.makedirs(run_dir)
        
        job_name = f"/ten_nodes_test/run_{i}"

        submit_job(ten_nodes, job_name)

        job_names.append(job_name)

    return job_names

In [5]:
def gpu_test():
    job_names = []
    for i in range(n_runs):
        run_dir = f"{data_folder}/gpu_test/run_{i}"
        if not os.path.exists(run_dir):
            os.makedirs(run_dir)
        
        job_name = f"/gpu_test/run_{i}"

        submit_job("launchers/launch_gpu.sh", 1, job_name, 10)

        job_names.append(job_name)

    return job_names

In [6]:
def strong_scaling():
    job_names = []
    for run in range(1):
        for num_nodes in strong_scaling_nodes:
            job_name = f"/strong_scaling/run_{run}/nodes_{num_nodes}"
            submit_job(num_nodes, job_name)
            job_names.append(job_name)
    return job_names

In [None]:
# def weak_scaling():
#     job_names = []
#     for run in range(1):
#         for num_nodes in weak_scaling_nodes:
#             job_name = f"/weak_scaling/run_{run}/nodes_{num_nodes}_forecasters_{weak_scaling_forecasters*num_nodes}"
#             submit_job(num_nodes, job_name)
#             job_names.append(job_name)
#     return job_names

In [7]:
def baseline_test():
    job_names = []
    for i in range(n_runs):
        run_dir = f"{data_folder}/baseline_test/run_{i}"
        if not os.path.exists(run_dir):
            os.makedirs(run_dir)
        
        job_name = f"/baseline_test/run_{i}"

        submit_job("launchers/launch_baseline.sh", 1, job_name, 10)

        job_names.append(job_name)

    return job_names

### Waiting for jobs

Now we wait for all the jobs to complete, in the meantime the `tqdm` progress bar will be updated.

In [8]:
import socket

hostname = socket.gethostname()
print(f"Hostname: {hostname}")

Hostname: access1.aion-cluster.uni.lux


In [9]:
all_jobs_to_wait = []

if GENERATE_DATA:
    # if hostname contains "aion":
    if "aion" in hostname:
        all_jobs_to_wait.extend(baseline_test())
        # all_jobs_to_wait.extend(ten_nodes_test())
        # all_jobs_to_wait.extend(strong_scaling())
        # all_jobs_to_wait.extend(weak_scaling())
    elif "iris" in hostname:
        all_jobs_to_wait.extend(gpu_test())

    print("Waiting for jobs to finish...")
    print(all_jobs_to_wait)

Submitted batch job 4584486
Submitted batch job 4584487
Submitted batch job 4584488
Submitted batch job 4584489
Submitted batch job 4584490
Submitted batch job 4584491
Submitted batch job 4584492
Submitted batch job 4584493
Submitted batch job 4584494
Submitted batch job 4584495
Submitted batch job 4584496
Submitted batch job 4584497
Submitted batch job 4584498
Submitted batch job 4584499
Submitted batch job 4584500
Submitted batch job 4584501
Submitted batch job 4584502
Submitted batch job 4584503
Submitted batch job 4584504
Submitted batch job 4584505
Submitted batch job 4584506
Submitted batch job 4584507
Submitted batch job 4584508
Submitted batch job 4584509
Submitted batch job 4584510
Submitted batch job 4584511
Submitted batch job 4584512
Submitted batch job 4584514
Submitted batch job 4584515
Submitted batch job 4584516
Waiting for jobs to finish...
['/baseline_test/run_0', '/baseline_test/run_1', '/baseline_test/run_2', '/baseline_test/run_3', '/baseline_test/run_4', '/baselin

In [None]:
for job_name in tqdm(all_jobs_to_wait):
    while not os.path.exists(f"{data_folder}/{job_name}/timings.txt"):
        time.sleep(10)  # Poll every 10 seconds

### Timing Analysis

In this section, we analyze the execution times for the one million forecasters test. We read the timing data from the generated files, calculate the mean and standard deviation of the execution times, and create a dataframe to summarize the results.

The dataframe includes the following columns:
- **Run**: The run identifier.
- **Timing**: The total execution time for each run.
- **Aggregate sum CPU time**: The sum of CPU times across all ranks for each run.
- **Aggregate mean CPU time**: The mean CPU time across all ranks for each run.

We then print the dataframe and the calculated mean and standard deviation of the execution times.

In [10]:
import pandas as pd

def get_mean_and_std_of_times(job_name):
    timings = []
    cpu_times = []

    for i in range(n_runs):
        with open(f"{data_folder}/{job_name}/run_{i}/timings.txt", "r") as file:
            lines = file.readlines()
            timings.append(float(lines[0].lstrip("Real time:")))
            cpu_times.append(float(lines[1].lstrip("CPU time:")))
        
    df = pd.DataFrame({
        'Run': [f'run_{i}' for i in range(n_runs)],
        'Timing': timings,
        'CPU Time': cpu_times
    })

    mean_timing = df['Timing'].mean()
    std_timing = df['Timing'].std()

    return df, mean_timing, std_timing

In [13]:
# Get the dataframe for GPU test
df_gpu, mean_timing_gpu, std_timing_gpu = get_mean_and_std_of_times("gpu_test")
print("GPU Test DataFrame:")
print(df_gpu)
print(f"Mean Timing: {mean_timing_gpu}, Std Timing: {std_timing_gpu}")

# Get the dataframe for Baseline test
df_baseline, mean_timing_baseline, std_timing_baseline = get_mean_and_std_of_times("baseline_test")
print("Baseline Test DataFrame:")
print(df_baseline)
print(f"Mean Timing: {mean_timing_baseline}, Std Timing: {std_timing_baseline}")

# print mean speedup
speedup = mean_timing_baseline / mean_timing_gpu
print(f"Speedup: {speedup}")


GPU Test DataFrame:
       Run      Timing    CPU Time
0    run_0  28709.0652  39757.7324
1    run_1  36566.1120  47746.8150
2    run_2  31055.7411  41502.9126
3    run_3  28726.7053  39774.6897
4    run_4  36648.8478  47538.9791
5    run_5  30798.2712  41696.3612
6    run_6  28754.0781  39875.8302
7    run_7  29904.9714  41372.7079
8    run_8  36952.2636  48081.6617
9    run_9  29912.6036  41221.4646
10  run_10  30589.9110  41483.6396
11  run_11  36125.5605  47388.1764
12  run_12  28775.6875  39912.3642
13  run_13  29876.3299  41287.4759
14  run_14  28846.6141  39927.3059
15  run_15  35719.2082  46932.3074
16  run_16  29703.3193  41051.0415
17  run_17  28895.6916  39990.9123
18  run_18  36719.4498  47700.9287
19  run_19  31030.9284  41815.6667
20  run_20  29793.8221  41257.7382
21  run_21  28752.4261  39817.0975
22  run_22  31804.3652  42532.9634
23  run_23  31249.3289  41405.2272
24  run_24  29758.5783  41097.4697
25  run_25  28831.0640  39913.4079
26  run_26  29710.2823  41048.8701


### GPU speedup analysis

The results indicate that the GPU execution times are longer than the baseline CPU execution times. This can be attributed to the fact that the kernel size is too small to justify the overhead associated with using the GPU. 

GPUs are highly efficient for large-scale parallel computations, but they incur significant overhead for tasks such as data transfer between the CPU and GPU, kernel launch, and synchronization. When the computational workload is relatively small, these overheads can outweigh the benefits of parallel execution on the GPU, leading to longer overall execution times compared to the CPU. 

In this case, the small kernel size results in insufficient computational workload to fully utilize the GPU's capabilities, making the CPU a more efficient choice for this specific task.

### Strong Scalability Test

In this section, we analyze the execution times for the strong scalability test. We have already submitted jobs for different numbers of nodes and collected the execution times. The results are plotted on a logarithmic scale to better visualize the differences in execution times as the number of nodes increases.

The strong scalability test helps us understand how the execution time decreases as we increase the number of nodes while keeping the problem size constant. Ideally, the execution time should decrease proportionally with the increase in the number of nodes, indicating efficient parallelization and resource utilization.

In [None]:
execution_times_strong_scaling = []

# Submit jobs for each test configuration
for num_nodes in strong_scaling_nodes:
    execution_time_file = f"{data_folder}/strong_scaling/run_0/nodes_{num_nodes}/timings.txt"

    with open(execution_time_file, "r") as f:
        line = f.readline().strip()
        execution_time = float(line.replace("Total execution time: ", ""))
    execution_times_strong_scaling.append(execution_time)
    print(f"Execution time for {num_nodes} nodes: {execution_time} seconds")


In [None]:
# Plot the results
plt.figure(figsize=(10, 6))
plt.yscale('log')
plt.plot(strong_scaling_nodes, execution_times_strong_scaling, label='Strong Scaling', marker='o')
plt.xlabel('Number of Nodes')
plt.ylabel('Execution Time (seconds)')
plt.title('Strong Scalability Test')
plt.grid(True)
plt.legend()
plt.savefig(f"{charts_folder}/scalability_plot.png") 
plt.show()

### Weak Scalability Test

In this section, we analyze the execution times for the weak scalability test. 

The weak scalability test helps us understand how the execution time changes as we increase the number of nodes while keeping the workload per node constant. Ideally, the execution time should remain constant with the increase in the number of nodes, indicating efficient parallelization and resource utilization.

In [None]:
# execution_times_weak_scaling = []

# # Submit jobs for each test configuration
# for num_nodes in weak_scaling_nodes:
#     execution_time_file = f"{data_folder}/weak_scaling/run_0/nodes_{num_nodes}_forecasters_{weak_scaling_forecasters*num_nodes}/timings.txt"

#     with open(execution_time_file, "r") as f:
#         line = f.readline().strip()
#         execution_time = float(line.replace("Total execution time: ", ""))
#     execution_times_weak_scaling.append(execution_time)

In [None]:

# # Plot the weak scalability results
# plt.figure(figsize=(10, 6))
# plt.plot(weak_scaling_nodes, execution_times_weak_scaling, label='Weak Scaling', marker='o')
# plt.xlabel("Number of Nodes")
# plt.ylabel("Execution Time (seconds)")
# plt.title("Weak Scalability Test")
# plt.grid(True)
# plt.savefig(f"{charts_folder}/weak_scalability.png")
# plt.show()

### Conclusion

