In [5]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymnasium as gym
import pygame
import random
from collections import deque



# Get device
CPU or GPU

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


# Create environment
The Schedule needs an environment, lets create it

In [4]:
#Create a class for the schedule problem using the gym interface
class ScheduleGym():
    def __init__(self, num_days, num_hours, num_classes, num_subjects, verbose=False):
        self.num_days = num_days
        self.num_hours = num_hours
        self.num_classes = num_classes
        self.num_subjects = num_subjects
        self.num_slots = num_days * num_hours
        self.target_hours = np.zeros((num_classes, num_subjects), dtype=int) # Target hours for each class and subject
        self.schedule = -1*np.ones((num_classes, num_days, num_hours), dtype=int) # Schedule for each class, -1 means no subject assigned
        self.num_actions_left = 1000 #Number of actions left to take
        self.verbose = verbose #Debug on or off
        #This is to go from a one dimensional action space to a 4 dimensional action space
        #class_id, day, hour, subject_id
        self.max_values = np.array([num_classes, num_days, num_hours, num_subjects])
        self.cumprod_max_values = np.cumprod(self.max_values[::-1])[::-1]
        # self.decoder_base = np.cumprod(self.max_values)
        # self.encoder_base = np.flip(np.cumprod(np.flip(self.max_values)))
        self.initial_hours_to_assign = 0 #How many hours we have initially to assign, used to calculate score later

    def reset(self, seed=None):
        if seed is not None:
            np.random.seed(seed)
        for class_id in range(self.num_classes):
            for subject_id in range(self.num_subjects):
                self.target_hours[class_id, subject_id] = np.random.randint(1, 5)
        
        self.initial_hours_to_assign = self.target_hours.sum().sum()

        self.schedule = -1*np.ones((self.num_classes, self.num_days, self.num_hours), dtype=int) # Schedule for each class, -1 means no subject assigned
        self.num_actions_left = self.initial_hours_to_assign * 10 #Optimally we would need to take number of hours to assign steps to complete the schedule, lets give it some wiggle room
        
        #Return the current state,info
        info = {}
        #return (self.target_hours, self.schedule), info
        return self.state2vector(), info
    
    def state2vector(self):
        #Convert the state to a vector
        return np.concatenate([self.target_hours.flatten(), self.schedule.flatten()])/(self.initial_hours_to_assign * 10.0 + 1.0)
    
    def render(self):
        #Print the schedule
        for class_id in range(self.num_classes):
            print(f"Class {class_id + 1}:")
            for day in range(self.num_days):
                print(f"Day {day + 1}: {self.schedule[class_id, day]}")
            print()
        print(f'Fitness: {self.fitness()}, Actions left: {self.num_actions_left}')

        #print the target hours
        print("Target Hours:")
        for class_id in range(self.num_classes):
            print(f"Class {class_id + 1}: {self.target_hours[class_id]}")



    def decode_action(self, actions):
        actions = np.array(actions).reshape(-1) # Ensure numbers is a 1D column array
        aAll = np.zeros((actions.shape[0], len(self.max_values)), dtype=int)
        for i in range(len(self.max_values) - 1):
            aAll[:, i] = actions // self.cumprod_max_values[i+1]
            actions -= aAll[:, i]*self.cumprod_max_values[i+1]
        aAll[:,-1] = actions
        
        return aAll
    
    # Go from a 4D action to a 1D action
    def encode_action(self, actions):
        number = np.zeros(actions.shape[0], dtype=int)
        for i in range(len(self.max_values) - 1):
            number += actions[:,i]*self.cumprod_max_values[i+1]
        number += actions[:,-1]
        
        return number

            
    #next_state, reward, done, truncated, info = env.step(action)
    def step(self, action):
        # Update the schedule based on the action

        #Check if the action is a tuple or a single value
        if isinstance(action, tuple):
            #We are already in the decoded format
            class_id, day, hour, subject_id = action
        else:
            #Need to go from 1D to 4D
            decoded = self.decode_action(action).squeeze()
            class_id = decoded[0]
            day = decoded[1]
            hour= decoded[2]
            subject_id = decoded[3]

         
        #If subject_id is >= num_subjects then this is a remove action for the class_id, day, hour slot
        #If the slot is already occupied, then the old subject will be placed back into the target hours
        
        current_subject_id = self.schedule[class_id, day, hour]

        if self.verbose:
            print('Before action:')
            for class_id in range(self.num_classes):
                print(f"Class {class_id + 1}: {self.target_hours[class_id]}")
 
        #The slot was already booked, lets reomove it (and all of its dependencies)
        result = "N/A"
        reward = 0
        if current_subject_id != -1:
            self.schedule[class_id, day, hour] = -1
            self.target_hours[class_id, current_subject_id] += 1
            result = "Removed"
            reward = -0.16 #Penalty for removing a subject

        
        #See if it is an add action
        elif subject_id < self.num_subjects:
            #Yes its an add action, lets see if we have enough hours to actually add it
            if self.target_hours[class_id, subject_id] > 0:
                self.schedule[class_id, day, hour] = subject_id
                self.target_hours[class_id, subject_id] -= 1
                result = "Added"
                reward = 0.15 #Reward for adding a subject
            else:
                #We are trying to add a subject that is already empty
                reward = -0.25 #Penalty for invalid action    
        else:
            #We are trying to remove a subject that is not in the schedule, this is an invalid action
            reward = -0.5 #Penalty for invalid action
        
        self.num_actions_left -= 1
        done = self.is_done()  
        if done:
            reward += self.fitness()  #If we are done we get the full score of the schedule
        
        if self.verbose:
            print(f"Action: {action}, Class: {class_id}, Subject: {subject_id}, Day: {day}, Hour: {hour}, , current_subject_id: {current_subject_id}, Result: {result}, reward: {reward}, done: {done}")     
            print('After action:')
            for class_id in range(self.num_classes):
                print(f"Class {class_id + 1}: {self.target_hours[class_id]}")

            if done:
                print('Final schedule:')
                self.render()
                
        truncated, info = False, {} #To be implemented later if needed
        #Return the next state, reward, done, truncated and info
        #return (self.target_hours, self.schedule), reward, done, truncated, info
        return self.state2vector(), reward, done, truncated, info
           

    def is_done(self):
        #Check if the schedule is complete
        return np.all(self.target_hours == 0) or self.num_actions_left <= 0
    
    def get_action_sizes(self):
        #Return the sizes of the action space
        return [self.num_classes, self.num_days, self.num_hours, self.num_subjects]
        #return np.prod(self.max_values)
    
    def get_state_sizes(self):
        #Return the shapes of the state space
        #return self.target_hours.shape, self.schedule.shape
        return self.state2vector().shape

    
    def fitness(self):
        #Calculate the fitness of the schedule
        #fitness = self.num_actions_left * 0.001 #We want to maximize the number of actions left
        fitness = 0.0
        # target hours remaining
        target_hours_remaining = self.target_hours.sum().sum()

        fitness -= target_hours_remaining * 1

        # Count the number of holes in the schedule, that is where no subject is assigned, but is surrounded by subjects
        # If there are no subjects assigned to the edges, that is not considered a hole
        num_holes = 0

        # Create shifted versions of the schedule to compare adjacent hours
        left_shifted = np.roll(self.schedule, shift=-1, axis=2)
        right_shifted = np.roll(self.schedule, shift=1, axis=2)

        # Identify holes: -1 in the current schedule, and not -1 in both the left and right shifted schedules
        # Avoid considering the edges by setting the comparison for the first and last hour to False
        holes = (self.schedule == -1) & (left_shifted != -1) & (right_shifted != -1)
        holes[:, :, 0] = False  # Ignore first hour edge cases
        holes[:, :, -1] = False  # Ignore last hour edge cases

        # Count the number of holes
        num_holes = np.sum(holes)

        fitness -= num_holes * 0.5

        #return -fitness #Swapped fitness now....
        return fitness


# Create an agent that can do scheduling
The agent is of type Actor Critic

In [67]:

class ScheduleAgent(nn.Module):
    def __init__(self, state_size, action_sizes, hidden_dim=256, gamma=0.99):
        super().__init__()
        
        self.action_sizes = action_sizes
        self.gamma = gamma
        
        self.shared = nn.Sequential(
            nn.Linear(state_size, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU()
        )
        
        self.actor_heads = [nn.Linear(hidden_dim, dim) for dim in action_sizes]
        
        self.critic = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)
        )
        
    def forward(self, state):
        #Go through the shared layers
        shared_output = self.shared(state)
        
        #Each head predicts its own action, like Class, Day, Hour, Subject etc
        action_probs = [torch.softmax(head(shared_output), dim=-1) for head in self.actor_heads]
        
        #The critic predicts the value of the state
        state_value = self.critic(shared_output)
        
        return action_probs, state_value
    
    def choose_action(self, state):
        device = next(self.parameters()).device
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
       
        #Get the probabilities of the actions for each head
        with torch.no_grad():
            action_probs, _ = self.forward(state)
        
        #Choose an action for each head
        actions =[torch.multinomial(probs, 1).item() for probs in action_probs]

        return actions
    
    



# Create environment

In [10]:
env = ScheduleGym(num_days=2, num_hours=4, num_classes=1, num_subjects=2)
state_dim = env.get_state_sizes()[0]
action_dims = env.get_action_sizes()

# Create Agent

In [68]:
agent = ScheduleAgent(state_dim, action_dims, hidden_dim=256).

In [69]:
x,_ = env.reset()
x.shape

(10,)

In [65]:
ost = agent(torch.FloatTensor(x))

In [80]:
ost2 = agent.choose_action(x)

In [81]:
ost2

[0, 1, 2, 0]

# Create optimizer

In [82]:
optimizer = optim.Adam(agent.parameters(), lr=3e-4)

In [83]:
#Send the agent to the correct device
agent.to(device)

ScheduleAgent(
  (shared): Sequential(
    (0): Linear(in_features=10, out_features=256, bias=True)
    (1): ReLU()
    (2): Linear(in_features=256, out_features=256, bias=True)
    (3): ReLU()
  )
  (critic): Sequential(
    (0): Linear(in_features=256, out_features=256, bias=True)
    (1): ReLU()
    (2): Linear(in_features=256, out_features=1, bias=True)
  )
)

In [85]:

num_episodes = 10
gamma = 0.99
for episode in range(num_episodes):
    state, _ = env.reset()
    state = torch.FloatTensor(state).unsqueeze(0).to(device)
    done = False
    episode_reward = 0
    
    while not done:
        actions = agent.choose_action(state)
        next_state, reward, done = env.step(actions)
        
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
        reward = torch.FloatTensor([reward]).unsqueeze(0).to(device)
        done = torch.FloatTensor([int(done)]).unsqueeze(0).to(device)
        
        action_probs, state_value = agent(state)
        _, next_state_value = agent(next_state)
        
        td_error = reward + gamma * next_state_value * (1 - done) - state_value
        
        actor_loss = -(1 / len(action_probs)) * torch.sum(torch.log(action_probs[torch.arange(len(actions)), actions])) * td_error.detach()
        critic_loss = td_error.pow(2)
        
        loss = actor_loss + critic_loss
        
        optimizer.zero_grad()
        loss.mean().backward()
        optimizer.step()
        
        state = next_state
        episode_reward += reward
    
    if episode % 100 == 0:
        print(f"Episode {episode}, Reward: {episode_reward}")

print("Final schedule:")
print(env.schedule)

TypeError: expected TensorOptions(dtype=float, device=cpu, layout=Strided, requires_grad=false (default), pinned_memory=false (default), memory_format=(nullopt)) (got TensorOptions(dtype=float, device=cuda:0, layout=Strided, requires_grad=false (default), pinned_memory=false (default), memory_format=(nullopt)))

In [None]:
def update(self, state, actions, reward, next_state, done):
        