In [1]:
import csv
import json
import numpy as np
import pandas as pd
import sys

In [2]:
# Extract the n values stored in str_in and expressed as "col_1=val_1,col_2=val_2,...,col_n=val_n" into n columns
def split(str_in):
    pairs = str_in.split(',')
    res = {}
    for pair in pairs:
        col_name, value = pair.split('=')
        res[col_name] = value
    return res

# Used to check if all the strings in a column of a dataframe are digits
def df_isdigit(s):
    return s.isdigit()

# Used to standardize the unit of measurement, so as to express everything in gigabytes. 
def convert_to_GB(input_string): 
    num = input_string[:-1]
    res = float(num)
    unit = input_string[-1]

    if unit == 'M':
        return res / 1024
    elif unit == 'G':
        return res
    elif unit == 'T':
        return res * 1024
    else:
        raise ValueError("Unit of measurement not recognized")

In [3]:
df0 = pd.read_parquet("job_table.parquet")
print(df0.shape)

(6236346, 100)


In [4]:
# The submission time features are collected only in jobs performed between May and November 2020, so we focus on that split of data for the experiments.
# Theese features are stored as strings in the "tres_req_str" field, so we filter all the data for which that field is a non-empty string.

# Check that the data in the "tres_req_str" field are strings
if all(isinstance(x, str) for x in df0['tres_req_str']):
    # Filter all the data for which that field is a non-empty string
    df0 = df0[df0['tres_req_str'] != '']

In [5]:
# We consider only completed jobs
df0 = df0[df0['job_state']=='COMPLETED']
print(df0.shape)

(894412, 100)


In [6]:
# We consider only jobs belonging to the "m100_usr_prod" partition (which includes more than 99% of the jobs)
for partition in df0['partition'].unique():
    print(partition, '=', df0['partition'].value_counts().get(partition, 0))

df0 = df0[df0['partition']=='m100_usr_prod']

m100_usr_prod = 887539
m100_all_serial = 3908
m100_sys_test = 145
m100_fua_prod = 2820


In [7]:
# The duration of the jobs is reported as strings in the “run_time_str” (D-HH:MM:SS) and “run_time” (total seconds) fields.
# To be safe, we also check that those values correspond to the “end_time” – “start_time” of each job.

# Estract the fields
run_time_str = df0['run_time_str']
run_time = df0['run_time']
end_time = df0['end_time']
start_time = df0['start_time']

# “end_time” and “start_time” are stored as datetime64, therefore their difference will be timedelta64
run_time_td = end_time - start_time

# "run_time_str" is stored in the format "D-HH:MM:SS" but the conversion from string to timedelta requires a format "D days HH:MM:SS"
run_time_str = run_time_str.str.replace('-', ' days ')
# Converts "run_time_str" from string to timedelta64
run_time_str_td = pd.to_timedelta(run_time_str)

# Checks that all the values in "run_time_str_td" are equal to the ones in "run_time_td" (which represents "end_time" - "start_time")
if (run_time_str_td == run_time_td).all():
    print('All the values stored in "run_time_str" are correct!')
else:
    print('ERROR: Some of the values stored in "run_time_str" are not correct.')

# Express “end_time” - “start_time” in seconds, converting from timedelta64 to float 
run_time_sec = run_time_td.dt.total_seconds()
# Checks that the result is equal to the values in "run_time"
if (run_time_sec == run_time.astype(float)).all():
    print('All the values stored in "run_time" are correct!')
else:
    print('ERROR: Some of the values stored in "run_time" are not correct.')

All the values stored in "run_time_str" are correct!
All the values stored in "run_time" are correct!


In [8]:
# Extract the features available at submission time
tres_req_str = df0['tres_req_str']

# Extract other relevant features
r_f = df0[['user_id', 'qos', 'time_limit', 'start_time']]

In [9]:
# 1) Extract the n values stored as strings in the column "tres_req_str" and expressed as "col_1=val_1,col_2=val_2,...,col_n=val_n" into n columns
tres_req_df = tres_req_str.apply(split).apply(pd.Series)

In [10]:
# Combine the columns extracted above with the other relevant features and the column "run_time"
df1 = tres_req_df.join(r_f)
df1 = df1.join(run_time)
print(df1.shape)

(887539, 10)


In [11]:
# 2) From a quick inspection, we can see that there are several NaN in some of the columns.
#    We choose to tackle this problem by removing all the rows where there are NaN.
df1 = df1.dropna().reset_index(drop=True)
print(df1.shape)

(631026, 10)


In [12]:
# 3) Remove all the rows where the "run_time" is 0 seconds.
df1 = df1[df1['run_time']!='0']
print(df1.shape)

(628977, 10)


In [13]:
# 4) We can also guess that the "cpu" column and the "billing" column contain the same values. 
#    If so we can remove the "billing" column, which is redundant.
if (df1['cpu'] == df1['billing']).all():
    df = df1.drop(columns='billing')

In [14]:
# 5) Convert the values in columns to the correct type and standardize the unit of measurement when required

In [15]:
# We can see that the values contained in the columns "cpu", "node" and "gres/gpu" are integers stored as strings.
# If so, we can convert them from string to int

if df['cpu'].apply(df_isdigit).all():
    df['cpu'] = df['cpu'].astype(int)

if df['node'].apply(df_isdigit).all():
    df['node'] = df['node'].astype(int)

if df['gres/gpu'].apply(df_isdigit).all():
    df['gres/gpu'] = df['gres/gpu'].astype(int)

In [16]:
# Check all the possible values contained in the column "mem"
# df['mem'].unique()

# They are units of measurement of memory, expressed as strings in the format: "nM", "nG", "nT" where:
# - n is a number 
# - M represents megabytes
# - G represents gigabytes
# - T represents terabytes

# We want to standardize the unit of measurement, so as to express everything in gigabytes. 
df['mem'] = df['mem'].apply(convert_to_GB)
df.rename(columns={'mem': 'mem (GB)'}, inplace=True)

In [17]:
# We can see that the values contained in the columns "time_limit" and "run_time" are numbers stored as strings.
# "time_limit" represents an interval of time expressed in minutes, while "run_time" is expressed in seconds.
# So, we convert them from string to float and we express "time_limit" in seconds.
df['time_limit'] = df['time_limit'].astype(int)*60
df['run_time'] = df['run_time'].astype(int)

# So, we convert them from string to float and we express "run_time" in minutes.
# df['time_limit'] = df['time_limit'].astype(float)
# df['run_time'] = df['run_time'].astype(float) / 60

In [18]:
# 6) Encode the categorical variable contained in columns "account", "user_id" and "qos" as integer
# df['account'] = pd.factorize(df['account'])[0]
df['user_id'] = pd.factorize(df['user_id'])[0]
df['qos'] = pd.factorize(df['qos'])[0]

In [19]:
df.head()

Unnamed: 0,cpu,mem (GB),node,gres/gpu,user_id,qos,time_limit,start_time,run_time
0,1,1.855469,1,4,0,0,600,2020-05-22 10:41:04+00:00,5
1,1,1.855469,1,4,0,0,600,2020-05-22 14:28:26+00:00,6
2,4,9.765625,1,1,1,0,3600,2020-05-22 16:12:29+00:00,6
3,128,237.5,1,4,2,0,86400,2020-05-22 12:15:20+00:00,4
4,1,1.855469,1,4,0,0,600,2020-05-22 23:50:05+00:00,6


In [20]:
# Determine the reference point (one minute before the first job)
reference_point = df['start_time'].min() - pd.Timedelta(seconds=1)

# Calculate the difference in seconds from the reference point
df['start_time'] = (df['start_time'] - reference_point).dt.total_seconds()

In [21]:
df

Unnamed: 0,cpu,mem (GB),node,gres/gpu,user_id,qos,time_limit,start_time,run_time
0,1,1.855469,1,4,0,0,600,1429687.0,5
1,1,1.855469,1,4,0,0,600,1443329.0,6
2,4,9.765625,1,1,1,0,3600,1449572.0,6
3,128,237.500000,1,4,2,0,86400,1435343.0,4
4,1,1.855469,1,4,0,0,600,1477028.0,6
...,...,...,...,...,...,...,...,...,...
631021,4,7.812500,1,1,300,0,43200,13495923.0,2701
631022,4,7.812500,1,1,300,0,43200,13501187.0,625
631023,128,237.500000,1,4,2,0,86400,13506479.0,2
631024,8,6.933594,1,1,178,0,600,13493140.0,2


In [23]:
# Calculates a split date by subtracting one day from the latest job 'start_time'
last_job_start_time = df['start_time'].max()
days_to_simulate = 1
seconds_in_a_day = 24*60*60
split_date = last_job_start_time - days_to_simulate*seconds_in_a_day

In [24]:
# Splits the DataFrame into two subsets:
# - df_train: contains rows where the 'start_time' is earlier than the split_date, which are used for training
# - df_sched: contains rows where the 'start_time' is after the split_date, which are used for scheduling jobs
df_train = df[df['start_time'] < split_date]
df_sched = df[df['start_time'] >= split_date]
print('Train_jobs = {}'.format(df_train.shape[0]))
print('Scheduling_jobs = {}'.format(df_sched.shape[0]))

Train_jobs = 624570
Scheduling_jobs = 4407


In [25]:
# Remove the column 'start_time' since the training does not require it
df_train = df_train.drop(columns=['start_time'])

In [26]:
# Modifies df_sched so that the earliest 'start_time' becomes 0 and then renames start_time to 'submission_time'
df_sched['start_time'] = df_sched['start_time'] - df_sched['start_time'].min()
df_sched.rename(columns={'start_time': 'submission_time'}, inplace=True)
df_sched['submission_time'] = df_sched['submission_time'].astype(int)
df_sched

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_sched['start_time'] = df_sched['start_time'] - df_sched['start_time'].min()
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_sched.rename(columns={'start_time': 'submission_time'}, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_sched['submission_time'] = df_sched['submission_time'].astype(int)


Unnamed: 0,cpu,mem (GB),node,gres/gpu,user_id,qos,time_limit,submission_time,run_time
421497,16,29.6875,4,16,2,0,300,81401,3
421498,16,29.6875,4,16,2,0,300,68111,5
421499,16,16.0000,1,1,371,0,21600,58902,4413
421500,16,29.6875,4,16,2,0,300,62816,2
421501,16,29.6875,4,16,2,0,300,68730,2
...,...,...,...,...,...,...,...,...,...
627224,4,4.0000,1,1,368,0,3600,11873,23
627225,16,29.6875,4,16,2,0,300,33829,1
627226,16,29.6875,4,16,2,0,300,29430,1
627227,4,4.0000,1,1,368,0,3600,11549,13


In [27]:
# Sort the DataFrame by the 'submission_time' column
df_sched = df_sched.sort_values(by='submission_time')

# Reset the index
df_sched = df_sched.reset_index(drop=True)
df_sched

Unnamed: 0,cpu,mem (GB),node,gres/gpu,user_id,qos,time_limit,submission_time,run_time
0,128,237.5000,1,4,45,0,86400,0,64227
1,128,237.5000,1,4,45,0,86400,50,64657
2,64,118.7500,1,4,328,0,10800,575,3622
3,1024,1796.8750,8,32,14,3,43200,728,43017
4,16,234.3750,1,4,48,0,82740,2533,44566
...,...,...,...,...,...,...,...,...,...
4402,16,29.6875,4,16,2,0,300,85769,3
4403,16,29.6875,4,16,2,0,300,85894,4
4404,16,29.6875,4,16,2,0,300,86020,2
4405,16,29.6875,4,16,2,0,300,86144,3


In [28]:
df_sched['cpu'].unique()

array([ 128,   64, 1024,   16,  896,   32,    1,    4, 2048,   36,  512,
          8,  256,  768,  192,  384, 1280])

In [29]:
def nearest_power_of_two(x):
    if x > 256:
        return 256
    else:
        return 2 ** np.round(np.log2(x)).astype(int)

In [30]:
# Caps the number of 'cpu' at 256, to ensure that the small platform used for the simulation is able to handle the request
df_sched['cpu'] = df['cpu'].apply(nearest_power_of_two)
df_sched['cpu'].unique()

array([  1,   4, 128,   2, 256,  16,  32,  64,   8], dtype=int64)

In [35]:
df_sched

Unnamed: 0,cpu,mem (GB),node,gres/gpu,user_id,qos,time_limit,submission_time,run_time
0,1,237.5000,1,4,45,0,86400,0,64227
1,1,237.5000,1,4,45,0,86400,50,64657
2,4,118.7500,1,4,328,0,10800,575,3622
3,128,1796.8750,8,32,14,3,43200,728,43017
4,1,234.3750,1,4,48,0,82740,2533,44566
...,...,...,...,...,...,...,...,...,...
4402,64,29.6875,4,16,2,0,300,85769,3
4403,64,29.6875,4,16,2,0,300,85894,4
4404,64,29.6875,4,16,2,0,300,86020,2
4405,256,29.6875,4,16,2,0,300,86144,3


In [38]:
# Generates a workload JSON file for Batsim based on job specifications from a CSV file
def create_batsim_workload(csv_file, json_file):
    jobs = []
    profiles = {}
    with open(csv_file, mode='r') as file:
        csv_reader = csv.DictReader(file)
        job_id = 0
        # Iterates through the CSV file, parsing each row into a job description
        for row in csv_reader:
            job_id += 1
            # The profile_name is dynamically generated based on 'cpu' and 'run_time'
            profile_name = "D_{}_{}".format(int(row['cpu']), int(row['run_time']))
            
            job = {
                "id": job_id,
                "subtime": float(row['submission_time']),
                "res": int(row['cpu']),
                "profile": profile_name,               
                "user_id": int(row['user_id']),
                "qos": int(row['qos']),
                "node": int(row['node']),
                "gpu": int(row['gres/gpu']),
                "cpu": int(row['cpu']),
                "mem": float(row['mem (GB)']),
                "time_limit": int(row['time_limit'])         
            }
            # If the estimated_runtime field is provided
            if 'time_limit' in row:
                job['walltime'] = int(row['time_limit'])
            jobs.append(job)

            command_str = "mpirun --mca btl_tcp_if_exclude ib0,ib1,lo,eth2,eth1 --mca btl self,sm,tcp -hostfile \\$OAR_NODEFILE --path $PATH {}".format(profile_name)
            profile = {
                profile_name: {
                    "command": command_str,
        			"delay": float(row['run_time']),
        			"np": int(row['cpu']),
                    "runtime": float(row['run_time']),
                    "type": "delay"
                }
            }
                
            profiles.update(profile)
    
    workload = {
             "nb_res": 512,
             "jobs": jobs,
             "profiles": profiles 
    }

    with open(json_file, mode='w') as file:
        json.dump(workload, file, indent=4)
    
    print(f"Workload JSON file created: {json_file}")

In [39]:
create_batsim_workload("sched_jobs.csv", "slurm_workload.json")

Workload JSON file created: slurm_workload.json
