<a href="https://colab.research.google.com/github/akashjainstart/coolhackathon/blob/main/SwiftCool.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from datetime import timedelta, datetime
from collections import deque
import time
import threading
import pandas as pd

# Define the job class
class Job:
    def __init__(self, partition, gres_gpu, nodes, ntasks, cpus_per_task, memory_limit_G, time_limit, actual_run_time, job_name, output_log, error_log, mail_type, mail_user):
        self.partition = partition
        self.gres_gpu = gres_gpu
        self.nodes = nodes
        self.ntasks = ntasks
        self.cpus_per_task = cpus_per_task
        self.memory_limit_G = memory_limit_G
        self.time_limit = timedelta(hours=int(time_limit.split(':')[0]), minutes=int(time_limit.split(':')[1]), seconds=int(time_limit.split(':')[2]))
        self.actual_run_time = timedelta(hours=int(actual_run_time.split(':')[0]), minutes=int(actual_run_time.split(':')[1]), seconds=int(actual_run_time.split(':')[2]))
        self.job_name = job_name
        self.output_log = output_log
        self.error_log = error_log
        self.mail_type = mail_type
        self.mail_user = mail_user
        self.start_time = None
        self.end_time = None
        self.allocated_nodes = []
        self.free_time = None

# Define the scheduler class
class JobScheduler:
    def __init__(self):
        self.job_queue = deque()
        self.running_jobs = []
        self.completed_jobs = []
        self.racks = [(i, 32) for i in range(100)]  # Each rack has 32 nodes initially available
        self.lock = threading.Lock()
        self.df_rack_allocation = pd.DataFrame(columns=[f'Rack_{i}' for i in range(100)])

    def submit_job(self, job):
        self.job_queue.append(job)
        print(f"Job {job.job_name} submitted")
        # Check and free up nodes if necessary
        self.free_nodes()
        self.run_next_job()

    def run_next_job(self):
        if not self.job_queue:
            return

        job = self.job_queue.popleft()
        if self.allocate_resources(job):
            job.start_time = datetime.now()
            job.end_time = job.start_time + job.actual_run_time
            job.free_time = job.start_time + job.actual_run_time / 100
            self.running_jobs.append(job)
            print(f"Job {job.job_name} started at {job.start_time} on nodes {job.allocated_nodes}")

            # Update the rack allocation DataFrame
            self.update_rack_allocation(job)
            # Print the DataFrame showing the allocation
            self.print_rack_allocation()

            # Simulate job running (in actual implementation, this would be asynchronous and non-blocking)
            threading.Timer(1, self.complete_job, args=[job]).start()  # This represents the job running (1 second for demonstration)

    def allocate_resources(self, job):
        required_nodes = job.nodes
        allocated_nodes = []

        for i, (rack, available_nodes) in enumerate(self.racks):
            if available_nodes > 0:
                nodes_to_allocate = min(required_nodes, available_nodes)
                self.racks[i] = (rack, available_nodes - nodes_to_allocate)
                required_nodes -= nodes_to_allocate
                allocated_nodes.extend([(rack, node) for node in range(32 - available_nodes, 32 - available_nodes + nodes_to_allocate)])

                if required_nodes == 0:
                    job.allocated_nodes = allocated_nodes
                    return True

        # If we couldn't allocate all required nodes, roll back the allocation
        for rack, node in allocated_nodes:
            for i, (r, available_nodes) in enumerate(self.racks):
                if r == rack:
                    self.racks[i] = (r, available_nodes + 1)
                    break

        return False

    def free_nodes(self):
        with self.lock:
            current_time = datetime.now()
            for job in self.running_jobs[:]:
                if job.free_time and job.free_time <= current_time:
                    self.running_jobs.remove(job)
                    for rack, node in job.allocated_nodes:
                        for i, (r, available_nodes) in enumerate(self.racks):
                            if r == rack:
                                self.racks[i] = (r, available_nodes + 1)
                                break
                    print(f"Freed nodes for job {job.job_name} at {current_time}")

    def complete_job(self, job):
        self.completed_jobs.append(job)
        print(f"Job {job.job_name} completed at {job.end_time}")

        # Simulate sending email notification
        self.send_notification(job)

        # Schedule the next job
        self.run_next_job()

    def update_rack_allocation(self, job):
        allocation_dict = {f'Rack_{rack}': 0 for rack in range(100)}
        for rack, node in job.allocated_nodes:
            allocation_dict[f'Rack_{rack}'] += 1
        new_row = pd.DataFrame(allocation_dict, index=[job.job_name])
        self.df_rack_allocation = pd.concat([self.df_rack_allocation, new_row])

    def print_rack_allocation(self):
        print(tabulate(self.df_rack_allocation, headers='keys', tablefmt='psql'))

    def send_notification(self, job):
        print(f"Sending notification to {job.mail_user} for job {job.job_name}")

# Define job data (example data as provided)
job_data = [
    ["debug", 1, 32, 1, 4, 512, "72:00:00", "26:03:33", "ai_training_1", "ai_training_1.log", "ai_training_1.err", "ALL", "user@example.com"],
    ["compute", 8, 4, 1, 4, 512, "24:00:00", "38:41:20", "ai_training_2", "ai_training_2.log", "ai_training_2.err", "ALL", "user@example.com"],
    ["compute", 1, 1, 1, 32, 64, "24:00:00", "41:00:01", "ai_training_3", "ai_training_3.log", "ai_training_3.err", "ALL", "user@example.com"],
    ["gpu", 1, 2, 2, 8, 512, "24:00:00", "28:10:54", "ai_training_4", "ai_training_4.log", "ai_training_4.err", "ALL", "user@example.com"]
]

# Instantiate the scheduler
scheduler = JobScheduler()

# Submit jobs to the scheduler
for data in job_data:
    job = Job(*data)
    scheduler.submit_job(job)

# Periodically free nodes
def free_nodes_periodically():
    while scheduler.job_queue or scheduler.running_jobs:
        scheduler.free_nodes()
        time.sleep(0.1)  # Check every 0.1 seconds for demonstration

free_nodes_thread = threading.Thread(target=free_nodes_periodically)
free_nodes_thread.daemon = True
free_nodes_thread.start()

# Run jobs sequentially (FIFO)
while scheduler.job_queue or scheduler.running_jobs:
    scheduler.run_next_job()

    time.sleep(0.1)  # Give some time for jobs to be processed
    if (len(scheduler.job_queue)) == 0:
        break

print("All jobs have been submitted and processed.")


Job ai_training_1 submitted
Job ai_training_1 started at 2024-06-23 17:12:34.686563 on nodes [(0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (0, 6), (0, 7), (0, 8), (0, 9), (0, 10), (0, 11), (0, 12), (0, 13), (0, 14), (0, 15), (0, 16), (0, 17), (0, 18), (0, 19), (0, 20), (0, 21), (0, 22), (0, 23), (0, 24), (0, 25), (0, 26), (0, 27), (0, 28), (0, 29), (0, 30), (0, 31)]
+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+---------

In [None]:
scheduler.df_rack_allocation

Unnamed: 0,Rack_0,Rack_1,Rack_2,Rack_3,Rack_4,Rack_5,Rack_6,Rack_7,Rack_8,Rack_9,...,Rack_90,Rack_91,Rack_92,Rack_93,Rack_94,Rack_95,Rack_96,Rack_97,Rack_98,Rack_99
ai_training_1,32,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
ai_training_2,0,4,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
ai_training_3,0,1,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
ai_training_4,0,2,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [None]:
import plotly.graph_objects as go

# Create a Plotly figure
fig = go.Figure()

# Add each column as a separate trace
for col in scheduler.df_rack_allocation.columns:
    fig.add_trace(go.Scatter(
        y=scheduler.df_rack_allocation[col]*100/32,
        mode='lines',
        name=col
    ))

# Update the layout
fig.update_layout(
    title='Utilisation graph for Data center',
    xaxis_title='Time',
    yaxis_title='Utilisation percent',
    showlegend=True  # Hide legend if it becomes too cluttered
)

# Show the plot
fig.show()


In [None]:
!pip install tabulate
from tabulate import tabulate

In [None]:
from datetime import timedelta, datetime
from collections import deque
import time
import threading
import pandas as pd
from tabulate import tabulate

# Define the job class
class Job:
    def __init__(self, partition, gres_gpu, nodes, ntasks, cpus_per_task, memory_limit_G, time_limit, actual_run_time, job_name, output_log, error_log, mail_type, mail_user):
        self.partition = partition
        self.gres_gpu = gres_gpu
        self.nodes = nodes
        self.ntasks = ntasks
        self.cpus_per_task = cpus_per_task
        self.memory_limit_G = memory_limit_G
        self.time_limit = timedelta(hours=int(time_limit.split(':')[0]), minutes=int(time_limit.split(':')[1]), seconds=int(time_limit.split(':')[2]))
        self.actual_run_time = timedelta(hours=int(actual_run_time.split(':')[0]), minutes=int(actual_run_time.split(':')[1]), seconds=int(actual_run_time.split(':')[2]))
        self.job_name = job_name
        self.output_log = output_log
        self.error_log = error_log
        self.mail_type = mail_type
        self.mail_user = mail_user
        self.start_time = None
        self.end_time = None
        self.allocated_nodes = []

    def start(self):
        self.start_time = datetime.now()
        self.end_time = self.start_time + self.actual_run_time
        print(f"Job {self.job_name} started at {self.start_time}")

    def is_complete(self):
        return datetime.now() >= self.end_time

# Define the scheduler class
class JobScheduler:
    def __init__(self):
        self.job_queue = deque()
        self.running_jobs = []
        self.completed_jobs = []
        self.racks = [(i, 32) for i in range(100)]  # Each rack has 32 nodes initially available
        self.lock = threading.Lock()
        self.df_rack_allocation = pd.DataFrame(columns=[f'Rack_{i}' for i in range(100)])

    def submit_job(self, job):
        self.job_queue.append(job)
        print(f"Job {job.job_name} submitted")
        self.run_next_job()

    def run_next_job(self):
        if not self.job_queue:
            return

        job = self.job_queue.popleft()
        if self.allocate_resources(job):
            job.start()
            self.running_jobs.append(job)
            print(f"Job {job.job_name} started on nodes {job.allocated_nodes}")

            # Update the rack allocation DataFrame
            self.update_rack_allocation(job)
            # Print the DataFrame showing the allocation
            self.print_rack_allocation()

            # Start a timer to complete the job
            threading.Timer(job.actual_run_time.total_seconds(), self.complete_job, args=[job]).start()

    def allocate_resources(self, job):
        required_nodes = job.nodes
        allocated_nodes = []

        for i, (rack, available_nodes) in enumerate(self.racks):
            if available_nodes > 0:
                nodes_to_allocate = min(required_nodes, available_nodes)
                self.racks[i] = (rack, available_nodes - nodes_to_allocate)
                required_nodes -= nodes_to_allocate
                allocated_nodes.extend([(rack, node) for node in range(32 - available_nodes, 32 - available_nodes + nodes_to_allocate)])

                if required_nodes == 0:
                    job.allocated_nodes = allocated_nodes
                    return True

        # If we couldn't allocate all required nodes, roll back the allocation
        for rack, node in allocated_nodes:
            for i, (r, available_nodes) in enumerate(self.racks):
                if r == rack:
                    self.racks[i] = (r, available_nodes + 1)
                    break

        return False

    def free_nodes(self):
        with self.lock:
            current_time = datetime.now()
            for job in self.running_jobs[:]:
                if job.is_complete():
                    self.running_jobs.remove(job)
                    for rack, node in job.allocated_nodes:
                        for i, (r, available_nodes) in enumerate(self.racks):
                            if r == rack:
                                self.racks[i] = (r, available_nodes + 1)
                                break
                    print(f"Freed nodes for job {job.job_name} at {current_time}")

    def complete_job(self, job):
        self.completed_jobs.append(job)
        print(f"Job {job.job_name} completed at {job.end_time}")

        # Simulate sending email notification
        self.send_notification(job)

        # Free up nodes immediately
        self.free_nodes()

        # Schedule the next job
        self.run_next_job()

    def update_rack_allocation(self, job):
        allocation_dict = {f'Rack_{rack}': 0 for rack in range(100)}
        for rack, node in job.allocated_nodes:
            allocation_dict[f'Rack_{rack}'] += 1
        new_row = pd.DataFrame(allocation_dict, index=[job.job_name])
        self.df_rack_allocation = pd.concat([self.df_rack_allocation, new_row])

    def print_rack_allocation(self):
        print(tabulate(self.df_rack_allocation, headers='keys', tablefmt='psql'))

    def send_notification(self, job):
        print(f"Sending notification to {job.mail_user} for job {job.job_name}")

# Define job data (example data as provided)
job_data = [
    ["debug", 1, 3, 1, 4, 512, "72:00:00", "00:00:05", "ai_training_1", "ai_training_1.log", "ai_training_1.err", "ALL", "user@example.com"],
    ["compute", 8, 4, 1, 4, 512, "24:00:00", "00:00:10", "ai_training_2", "ai_training_2.log", "ai_training_2.err", "ALL", "user@example.com"],
    ["compute", 1, 1, 1, 32, 64, "24:00:00", "00:00:03", "ai_training_3", "ai_training_3.log", "ai_training_3.err", "ALL", "user@example.com"],
    ["gpu", 1, 2, 2, 8, 512, "24:00:00", "00:00:07", "ai_training_4", "ai_training_4.log", "ai_training_4.err", "ALL", "user@example.com"]
]

# Instantiate the scheduler
scheduler = JobScheduler()

# Submit jobs to the scheduler
for data in job_data:
    job = Job(*data)
    scheduler.submit_job(job)

# Periodically free nodes
def free_nodes_periodically():
    while scheduler.job_queue or scheduler.running_jobs:
        scheduler.free_nodes()
        time.sleep(0.1)  # Check every 0.1 seconds for demonstration

free_nodes_thread = threading.Thread(target=free_nodes_periodically)
free_nodes_thread.daemon = True
free_nodes_thread.start()

# Run jobs sequentially (FIFO)
while scheduler.job_queue or scheduler.running_jobs:
    scheduler.run_next_job()
    time.sleep(0.1)  # Give some time for jobs to be processed
    if not scheduler.job_queue and not scheduler.running_jobs:
        break

print("All jobs have been submitted and processed.")


Job ai_training_1 submitted
Job ai_training_1 started at 2024-06-23 17:26:24.992804
Job ai_training_1 started on nodes [(0, 0), (0, 1), (0, 2)]
+---------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+---------

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.neural_network import MLPRegressor
from sklearn.metrics import mean_absolute_error
import numpy as np

# Load the data
file_path = 'slurm_dataset.csv'
data = pd.read_csv(file_path)

# Function to convert time string to seconds
def time_to_seconds(time_str):
    h, m, s = map(int, time_str.split(':'))
    return h * 3600 + m * 60 + s

# Apply the function to convert 'actual_run_time' and 'time_limit'
data['actual_run_time'] = data['actual_run_time'].apply(time_to_seconds)
data['time_limit'] = data['time_limit'].apply(time_to_seconds)

# Define features and target, focusing on 'partition' and 'nodes'
X = data[['partition', 'nodes', 'gres_gpu', 'ntasks', 'cpus_per_task']]
y = data['actual_run_time']

# Preprocess categorical features
categorical_features = ['partition']
numerical_features = ['nodes', 'gres_gpu', 'ntasks', 'cpus_per_task']

# Column transformer for preprocessing
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_features),
        ('cat', OneHotEncoder(), categorical_features)
    ])

# Define the neural network model
model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', MLPRegressor(hidden_layer_sizes=(100, 100), max_iter=500, random_state=42))
])

# Split the data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train the model
model.fit(X_train, y_train)

# Make predictions
y_pred = model.predict(X_test)

# Evaluate the model
mae = mean_absolute_error(y_test, y_pred)
print(f"Mean Absolute Error: {mae}")
hours = int(mae // 3600)
minutes = int((mae % 3600) // 60)

print(f'Mean Absolute Error: {mae} seconds ({hours} hours and {minutes} minutes)')
# Display a few predictions and actual values
comparison = pd.DataFrame({'Actual': y_test, 'Predicted': y_pred})
comparison.head()
