# ML4HPC: Ensemble of Forecasters

### Team Members:
- Luca Venerando Greco
- Bice Marzagora
- Elia Vaglietti


### Importing Libraries

In this notebook, we start by importing several essential libraries that are used throughout the workflow:

- `os`: Provides functions for interacting with the operating system, such as creating directories and handling file paths.
- `sys`: Provides access to some variables used or maintained by the Python interpreter and to functions that interact strongly with the interpreter.
- `matplotlib.pyplot`: A plotting library used for creating static, animated, and interactive visualizations in Python.
- `numpy`: A fundamental package for scientific computing with Python, used for working with arrays and matrices.
- `time`: Provides various time-related functions.
- `tqdm`: A library for creating progress bars and progress meters.

These libraries are crucial for tasks such as data manipulation, visualization, and managing the execution of jobs in a high-performance computing environment.

In [1]:
import os
import sys
import matplotlib.pyplot as plt
import numpy as np
import time
from tqdm import tqdm

### Directory Setup and Job Configuration

We now set up the necessary directories and define the job configurations. Specifically, we create folders for storing charts, data, and logs if they do not already exist. We also define the number of forecasters and nodes for different scaling tests.

If no new data is needed, set the `GENERATE_DATA` variable to `False` to skip the data generation step.


In [2]:
current_dir = os.getcwd()

charts_folder = "charts"
data_folder = "data"
logs_folder = "logs"

if not os.path.exists(data_folder):
    os.makedirs(data_folder)

if not os.path.exists(logs_folder):
    os.makedirs(logs_folder)

if not os.path.exists(charts_folder):
    os.makedirs(charts_folder)

one_milion_forecasters = int(1e5)
strong_scaling_forecasters = int(1e4)
weak_scaling_forecasters = int(1e4)

one_milion_nodes = 10
strong_scaling_nodes = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]  # Number of nodes to test
weak_scaling_nodes   = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]  # Number of nodes to test

n_runs = 5

GENERATE_DATA = True


### Job Submission Function

We define a function `submit_job` that handles the submission of jobs to the scheduler. This function takes the number of nodes, the number of forecasters, and a job name as input parameters. It creates the necessary directories for storing data and logs, reads a template launch script, formats it with the provided parameters, writes the formatted script to a file, and submits the job using the `sbatch` command.

In [3]:
def submit_job(num_nodes, num_forecasters, job_name):
    num_tasks_per_node = 128

    if num_tasks_per_node > 128:
        print("The number of tasks per node should be less than or equal to 128")
        exit(1)

    if not os.path.exists(f"{data_folder}/{job_name}"):
        os.makedirs(f"{data_folder}/{job_name}")

    if not os.path.exists(f"{logs_folder}/{job_name}"):
        os.makedirs(f"{logs_folder}/{job_name}")

    with open('launch.sh', 'r') as file:
        launch_script = file.read()

    launch_script = launch_script.format(
        num_nodes=num_nodes,
        num_tasks_per_node=num_tasks_per_node,
        current_dir=current_dir,
        world_size=num_nodes*num_tasks_per_node,
        num_forecasters=num_forecasters,
        data_folder=f"{data_folder}/{job_name}",
        logs_folder=f"{logs_folder}/{job_name}"
    )

    script_filename = f"{logs_folder}/{job_name}/launch.sh"
    with open(script_filename, "w") as script_file:
        script_file.write(launch_script)

    os.system(f"sbatch {script_filename}")

### Defining Test Functions

In the following sections, we define functions to run different scalability tests. These functions will help us automate the process of submitting jobs for one million forecasters, strong scaling, and weak scaling tests. Each function will generate a unique job name, submit the job using the `submit_job` function, and return the job names for tracking purposes.

In [4]:
def one_milion_test():
    job_names = []
    for i in range(n_runs):
        run_dir = f"{data_folder}/one_milion_test/run_{i}"
        if not os.path.exists(run_dir):
            os.makedirs(run_dir)
        
        job_name = f"/one_milion_test/run_{i}"

        submit_job(one_milion_nodes, one_milion_forecasters, job_name)

        job_names.append(job_name)

    return job_names

In [5]:
def strong_scaling():
    job_names = []
    for run in range(1):
        for num_nodes in strong_scaling_nodes:
            job_name = f"/strong_scaling/run_{run}/nodes_{num_nodes}"
            submit_job(num_nodes, strong_scaling_forecasters, job_name)
            job_names.append(job_name)
    return job_names

In [6]:
def weak_scaling():
    job_names = []
    for run in range(1):
        for num_nodes in weak_scaling_nodes:
            job_name = f"/weak_scaling/run_{run}/nodes_{num_nodes}_forecasters_{weak_scaling_forecasters*num_nodes}"
            submit_job(num_nodes, weak_scaling_forecasters*num_nodes, job_name)
            job_names.append(job_name)
    return job_names

### Waiting for jobs

Now we wait for all the jobs to complete, in the meantime the `tqdm` progress bar will be updated.

In [7]:
all_jobs_to_wait = []

if GENERATE_DATA:
    all_jobs_to_wait.extend(one_milion_test())
    all_jobs_to_wait.extend(strong_scaling())
    all_jobs_to_wait.extend(weak_scaling())

    print("Waiting for jobs to finish...")
    print(all_jobs_to_wait)

Submitted batch job 4465155
Submitted batch job 4465156
Submitted batch job 4465157
Submitted batch job 4465158
Submitted batch job 4465159
Submitted batch job 4465160
Submitted batch job 4465161
Submitted batch job 4465162
Submitted batch job 4465163
Submitted batch job 4465164
Submitted batch job 4465165
Submitted batch job 4465166
Submitted batch job 4465167
Submitted batch job 4465168
Submitted batch job 4465169
Submitted batch job 4465170
Submitted batch job 4465171
Submitted batch job 4465172
Submitted batch job 4465173
Submitted batch job 4465174
Submitted batch job 4465175
Submitted batch job 4465176
Submitted batch job 4465177
Submitted batch job 4465178
Submitted batch job 4465179
Waiting for jobs to finish...
['/one_milion_test/run_0', '/one_milion_test/run_1', '/one_milion_test/run_2', '/one_milion_test/run_3', '/one_milion_test/run_4', '/strong_scaling/run_0/nodes_1', '/strong_scaling/run_0/nodes_2', '/strong_scaling/run_0/nodes_3', '/strong_scaling/run_0/nodes_4', '/stron

In [None]:
for job_name in tqdm(all_jobs_to_wait):
    while not os.path.exists(f"{data_folder}/{job_name}/timings.txt"):
        time.sleep(10)  # Poll every 10 seconds

### Data Visualization and Analysis

In this section, we load the forecast, weights, and biases data from the one million forecasters test. We then visualize the data using histograms to understand the distribution of these variables. 

- **Forecast Plot**: We create histograms for each forecast variable across different dimensions.
- **Weights Plot**: We generate histograms for the weights across different dimensions.
- **Biases Plot**: We plot the histogram for the biases.
- **Forecasted Values Plot**: We plot the histograms of the forecasted values and their normalized versions to analyze their distribution and trends.

As expected all the weight, biases and forecasted values are normally distributed, since the noise from which they are generated is normally distributed.

Moreover, since the model is autorergressive, we can apprecciate the increasing variance of the forecasted values as the time step increases.

Finally, we plot the forecasted values and their normalized versions to analyze their distribution and trends.

In [None]:
import numpy as np
import jax
import jax.numpy as jnp
import matplotlib.pyplot as plt

one_milion_folder = f"data/num_forecasters_{one_milion_forecasters}_num_nodes_{one_milion_nodes}_one_milion_test"

forecast = np.load(f"{one_milion_folder}/forecasting.npy")
weights  = np.load(f"{one_milion_folder}/weights.npy")
biases   = np.load(f"{one_milion_folder}/biases.npy")

print(forecast.shape)
print(weights.shape)
print(biases.shape)

n_bins = 25

## FORECASTER PLOT
variable = forecast
fig, axs = plt.subplots(5, 2, figsize=(6, 12))

# Flatten the axs array for easy iteration
axs = axs.flatten()

# Plot histograms in each subplot
for z in range(variable.shape[2]):
    for y in range(variable.shape[1]):
        f_list = []
        for x in range(variable.shape[0]):
            f_list.append(variable[x][y][z])
        axs[z*variable.shape[1]+y].hist(f_list, bins=n_bins)
        axs[z*variable.shape[1]+y].set_title(f'forecast {z*variable.shape[1]+y}')
f_list = []

plt.tight_layout()
plt.show()

## WEIGHTS PLOT
variable = weights
fig, axs = plt.subplots(variable.shape[1], variable.shape[2], figsize=(12, 4))

# Flatten the axs array for easy iteration
axs = axs.flatten()

# Plot histograms in each subplot
for z in range(variable.shape[2]):
    for y in range(variable.shape[1]):
        f_list = []
        for x in range(variable.shape[0]):
            f_list.append(variable[x][y][z])
        axs[z*variable.shape[1]+y].hist(f_list, bins=n_bins)
        axs[z*variable.shape[1]+y].set_title(f'weights {z*variable.shape[1]+y}')
f_list = []

plt.tight_layout()
plt.show()

## BIASES PLOT
variable = biases

# Plot histograms of f_list
for x in range(variable.shape[0]):
    f_list.append(variable[x][0])

plt.hist(f_list, bins=n_bins)
#Put a name
plt.title('biases')


forecasted_values_tot = []
## PLOT OF THE FORECASTED VALUES
for j in range(forecast.shape[1]):
    forecasted = []
    for i in range(forecast.shape[0]):
        forecasted.append(forecast[i][j][0])
    forecasted_values_tot.append(forecasted)

# New figure for a new plot
plt.figure()
# Plot the hist forecasted values in the same graph with different colours
for i in range(5):
    plt.hist(forecasted_values_tot[i], bins=n_bins, alpha=0.5, label=f'forecast {i}')

plt.legend()
forecasts_mean = np.mean(forecast, axis=0)

# normalize by subtracting the mean
forecast_normalized = forecast - forecasts_mean

forecasted_values_tot = []
## PLOT OF THE FORECASTED VALUES
for i in range(1000):
    forecasted = []
    
    for j in range(5):
        forecasted.append(forecast_normalized[i][j][0])
    
    forecasted_values_tot.append(forecasted)

plt.figure()

# plot the lines
for i in range(1000):
    plt.plot(forecasted_values_tot[i], label=f'forecast {i}')


### Strong Scalability Test

In this section, we analyze the execution times for the strong scalability test. We have already submitted jobs for different numbers of nodes and collected the execution times. The results are plotted on a logarithmic scale to better visualize the differences in execution times as the number of nodes increases.

The strong scalability test helps us understand how the execution time decreases as we increase the number of nodes while keeping the problem size constant. Ideally, the execution time should decrease proportionally with the increase in the number of nodes, indicating efficient parallelization and resource utilization.

In [None]:
execution_times_strong_scaling = []

# Submit jobs for each test configuration
for num_nodes in strong_scaling_nodes:
    execution_time_file = f"{data_folder}/num_forecasters_{strong_scaling_forecasters}_num_nodes_{num_nodes}_strong_scaling/timings.txt"

    with open(execution_time_file, "r") as f:
        execution_time = float(f.read())
    execution_times_strong_scaling.append(execution_time)
    print(f"Execution time for {num_nodes} nodes: {execution_time} seconds")


In [None]:
# Plot the results
plt.figure(figsize=(10, 6))
plt.yscale('log')
plt.plot(strong_scaling_nodes, execution_times_strong_scaling, label='Strong Scaling', marker='o')
plt.xlabel('Number of Nodes')
plt.ylabel('Execution Time (seconds)')
plt.title('Strong Scalability Test')
plt.grid(True)
plt.legend()
plt.savefig(f"{charts_folder}/scalability_plot.png") 
plt.show()

### Weak Scalability Test

In this section, we analyze the execution times for the weak scalability test. 

The weak scalability test helps us understand how the execution time changes as we increase the number of nodes while keeping the workload per node constant. Ideally, the execution time should remain constant with the increase in the number of nodes, indicating efficient parallelization and resource utilization.

In [None]:
execution_times_weak_scaling = []

# Submit jobs for each test configuration
for num_nodes in weak_scaling_nodes:
    execution_time_file = f"{data_folder}/num_forecasters_{weak_scaling_forecasters*num_nodes}_num_nodes_{num_nodes}_weak_scaling/timings.txt"

    with open(execution_time_file, "r") as f:
        execution_time = float(f.read())
    execution_times_weak_scaling.append(execution_time)

In [None]:

# Plot the weak scalability results
plt.figure(figsize=(10, 6))
plt.plot(weak_scaling_nodes, execution_times_weak_scaling, label='Weak Scaling', marker='o')
plt.xlabel("Number of Nodes")
plt.ylabel("Execution Time (seconds)")
plt.title("Weak Scalability Test")
plt.grid(True)
plt.savefig(f"{charts_folder}/weak_scalability.png")
plt.show()

### Conclusion

In this notebook, we have successfully set up and executed a series of scalability tests for an ensemble of forecasters in a high-performance computing environment. We started by importing the necessary libraries and setting up the directory structure. We then defined functions to submit jobs for one million forecasters, strong scaling, and weak scaling tests.

We visualized the data generated from the one million forecasters test, analyzing the distribution of forecast, weights, and biases. The histograms confirmed that these variables are normally distributed, as expected.

The strong scalability test demonstrated how the execution time decreases with an increasing number of nodes, indicating efficient parallelization. The weak scalability test showed that the execution time remains relatively constant as the number of nodes increases, suggesting good resource utilization.

Overall, these tests provide valuable insights into the performance and scalability of our forecasting ensemble, helping us optimize and improve our high-performance computing workflows.