### Schedule a set of ATCO's into n number of positions

In [27]:
#Imports
import numpy as np
import matplotlib.pyplot as plt

In [74]:
import random
class Agent:
    A=['same','other']
    a_count=len(A)
    #discounting
    alpha=0.1
    gamma = 1
    epsilon = 0.1
    def __init__(self,S):
        self.Q = {s: {a: 0 for a in Agent.A} for s in S}

    def update_Q(self,s,sn,a,r):
        if self.Q[sn]['same']>=self.Q[sn]['other']:
            max_q = self.Q[sn]['same']
        else:
            max_q = self.Q[sn]['other']
        self.Q[s][a]=self.Q[s][a]+Agent.alpha*(r+Agent.gamma*max_q-self.Q[s][a])

    def action(self, s):
        c=random.random()
        if self.Q[s]['same']>=self.Q[s]['other']:
            if c>=Agent.epsilon:
                return 'same'
            else:
                return 'other'
        else:
            if c>=Agent.epsilon:
                return 'other'
            else:
                return 'same'

In [29]:
import copy

class ConstraintSystem:
    def __init__(self, no_of_agents, no_of_people):
        self.prev_action=[None for i in range(no_of_agents)]
        self.no_of_agents=no_of_agents
        self.no_of_people=no_of_people
        self.no_of_free=no_of_people-no_of_agents

    def checkActions(self, agent_actions):
        consys_actions=copy.deepcopy(agent_actions)
        penalise=[]
        count_extra = self.no_of_free - self.prev_action.count('same')
        for i in range(self.no_of_agents):
            if self.prev_action[i]!=agent_actions[i]:
                penalise.append(0)
            elif agent_actions[i]=='same': #will lead to overwork
                penalise.append(-100)
                consys_actions[i]='other'
            elif count_extra>0:
                count_extra -= 1
                penalise.append(0)
            else:
                penalise.append(-100) #will lead to no one being scheduled
                consys_actions[i]='same'
        self.prev_action=copy.deepcopy(consys_actions)
        return consys_actions, penalise

In [30]:
#define environment
from queue import Queue, Empty


class Environment:
    def __init__(self,S,no_of_positions,people):
        self.table={p: {s: None for s in S} for p in range(no_of_positions)}

        self.rest= Queue()
        #in final code, replace the below block with actual people names
        for person in people:
            self.rest.put(person)
        #initializing the first state
        for i in self.table:
            self.table[i][0] = self.rest.get()

    def schedule(self,actions,s,reward):
        sn=s+1
        removed=[]
        for p in self.table:
            if actions[p] == 'same':
                reward[p] += 1
                self.table[p][sn]=self.table[p][s]
            else:
                if self.rest.empty():
                    reward[p]-=100
                    #print('oops')
                    return reward, True
                removed.append(self.table[p][s])
                self.table[p][sn]=self.rest.get()
                reward[p]+=1
        for ppl in removed:
            self.rest.put(ppl)
        return reward, False

In [69]:
def scheduler(shift, required_positions, people):
    #World
    if shift ==1:
        no_of_time_blocks=8
    elif shift == 2:
        no_of_time_blocks=9
    elif shift == 3:
        no_of_time_blocks=15
    else:
        print("shift out of bounds, defaulting to 6hrs")
        no_of_time_blocks=8
    
    #states - time blocks
    S=[s for s in range(no_of_time_blocks)]
    terminal = no_of_time_blocks-1

    #initialize agents, 1 per position
    no_of_agents = required_positions
    agents = [Agent(S) for i in range(no_of_agents)]

    #train the agents
    constraints = ConstraintSystem(no_of_agents,len(people))
    no_of_episodes=100
    for e in range(no_of_episodes):
        env=Environment(S,no_of_agents,people)
        for s in S[:terminal]:
            agent_actions=[]
            for agent in agents:
                agent_actions.append(agent.action(s))
            consys_actions, reward = constraints.checkActions(agent_actions)
            reward,restart = env.schedule(consys_actions,s,reward)
            i=0
            for agent in agents:
                agent.update_Q(s,s+1,agent_actions[i],reward[i])
                i=i+1
            if restart:
                break

    #generate the schedule
    env=Environment(S,no_of_agents,people)
    for s in S[:terminal]:
        agent_actions=[]
        for agent in agents:
            agent_actions.append(agent.action(s))
        consys_actions, reward = constraints.checkActions(agent_actions)
        reward,restart = env.schedule(consys_actions,s,reward)
        i=0
        for agent in agents:
            agent.update_Q(s,s+1,agent_actions[i],reward[i])
            i=i+1
        if restart:
            print("failed to generate schedule")
            break
    
    return env.table
    

### Schedule the ATCO's into shifts based on ratings

In [None]:
import pulp
import random
import copy

# A temporary atco generator. This is not part of the problem.
# As the atco employee list is expected to be given.
def generate_random_atcos(n):
    atcos = {}
    ratings_list = ['A', 'B', 'C', 'D', 'E', 'F']
    
    for i in range(1, n+1):
        num_ratings = random.randint(1, 3)  # Each atco has between 1 and 3 ratings
        atco_ratings = random.sample(ratings_list, num_ratings)
        preferences = {}
        for rating in atco_ratings:
            preferences[rating] = random.randint(1, 20)  # Preference scores between 1 and 20
        atcos[f'Atco{i}'] = {
            'ratings': atco_ratings,
            'preferences': preferences
        }
    return atcos

def add_admin_atco(atcos, admin_atco_count):
    ratings_list = ['A', 'B', 'C', 'D', 'E', 'F']
    admin_atco_name = f'admin_atco_{admin_atco_count}'
    preferences = {rating: 10 for rating in ratings_list}  # Preference score of 10 for all ratings
    atcos[admin_atco_name] = {
        'ratings': ratings_list,
        'preferences': preferences
    }
    return atcos

# main function where the magic happens
def assign_atcos_to_ratings(ratings, atcos):
    admin_atco_count = 0  # Counter for admin atcos
    
    while True:
        # Create a list of all assignments and a dictionary of preferences
        assignments = []
        preferences = {}
        for atco, data in atcos.items():
            quals = data['ratings']
            prefs = data['preferences']
            for rating in quals:
                assignments.append((atco, rating))
                preferences[(atco, rating)] = prefs.get(rating, 0)  # Default to 0 if not specified

        # Create the problem variable to contain the problem data
        prob = pulp.LpProblem("Atco_Rating_Assignment_With_Preferences", pulp.LpMaximize)

        # Decision variables: x[(atco, rating)] = 1 if atco is assigned to rating, 0 otherwise
        x = pulp.LpVariable.dicts("assign", assignments, cat='Binary')

        # Objective Function: Maximize total preference score
        prob += pulp.lpSum([preferences[(atco, rating)] * x[(atco, rating)] for (atco, rating) in assignments]), "TotalPreferenceScore"

        # Constraints

        # 1. Each rating must be assigned the required number of atcos
        for rating in ratings:
            prob += (
                pulp.lpSum([x[(atco, rating)] for atco in atcos if (atco, rating) in assignments])
                == ratings[rating],
                f"RatingRequirement_{rating}"
            )

        # 2. Each atco must be assigned to exactly one rating
        for atco in atcos:
            prob += (
                pulp.lpSum([x[(atco, rating)] for rating in atcos[atco]['ratings']])
                <= 1,
                f"AtcoAssignment_{atco}"
            )

        # Solve the problem
        prob.solve()

        # Check if a feasible solution was found
        if prob.status == 1:
            # Feasible solution found
            break
        else:
            # Infeasible solution; add an admin atco and try again
            admin_atco_count += 1
            atcos = add_admin_atco(atcos, admin_atco_count)

    # Output the results
    print("Status:", pulp.LpStatus[prob.status])
    print()
    if prob.status == 1:
        #Assigned Atcos and Their Ratings
        total_pref_score = 0
        assigned_atcos = {}  # Dictionary to store assigned atcos per rating
        for atco, data in atcos.items():
            assigned = False
            for rating in data['ratings']:
                if x.get((atco, rating)) and x[(atco, rating)].varValue == 1:
                    pref_score = preferences[(atco, rating)]
                    total_pref_score += pref_score
                    if rating not in assigned_atcos:
                        assigned_atcos[rating] = []
                    assigned_atcos[rating].append(atco)
                    assigned = True
                    break  # Since the atco is assigned to one rating
            
            if not assigned:
                # If not assigned, find the rating with the highest preference
                best_rating = max(data['ratings'], key=lambda r: preferences[(atco, r)])
                print(f"{atco}: Not assigned to any rating, assigning to {best_rating} based on highest preference.")
                
                # Assign atco to the rating with the highest preference
                if best_rating not in assigned_atcos:
                    assigned_atcos[best_rating] = []
                assigned_atcos[best_rating].append(atco)
                total_pref_score += preferences[(atco, best_rating)]

        # Grouping atcos assigned to each rating into 5 groups
        shift = {}
        for rating, atcos_list in assigned_atcos.items():
            groups = split_list(atcos_list, 5)
            for i, group in enumerate(groups, 1):
                group_key = i
                if group_key not in shift:
                    shift[group_key] = {}
                shift[group_key][rating] = group


        #print(f"Total Preference Score: {total_pref_score}")
        print(f"\nNumber of admin atcos added: {admin_atco_count}\n")
    else:
        print("No feasible solution found.")
    return shift

# split the list for each grp into number of day-cycles
def split_list(lst, n):
    if len(lst) < n:
        return [lst[i:i+1] for i in range(len(lst))]
    else:
        k, m = divmod(len(lst), n)
        return [lst[i*(k+1) : i*(k+1)+(k+1)] if i < m else lst[i*k + m : (i+1)*k + m] for i in range(n)]

def display_shifts(shifts):
    print("Grouping of Atcos Assigned to Each Group and Rating:\n")
    for group, ratings in shifts.items():
        print(f"{group}:")
        for rating, atcos_list in ratings.items():
            print(f"{rating}: {atcos_list}")
        print()

# Ratings and positions per rating
ratings = {
    'A': 2,
    'B': 5,
    'C': 12,
    'D': 8,
    'E': 3,
    'F': 4,
}
no_of_atcos = copy.deepcopy(ratings)

# Calculate total atcos and initial required number of atcos per position
n = 0
for r in ratings:
    x = ratings[r]
    no_of_atcos[r] = (x // 2 + (x - x // 2) * 2) * 5
    n += no_of_atcos[r]
atcos = generate_random_atcos(n)
shifts = assign_atcos_to_ratings(no_of_atcos, atcos)
display_shifts(shifts)

Status: Optimal

Atco90: Not assigned to any rating, assigning to A based on highest preference.

Number of admin atcos added: 1

Grouping of Atcos Assigned to Each Group and Rating:

1:
C: ['Atco1', 'Atco8', 'Atco11', 'Atco12', 'Atco14', 'Atco15', 'Atco16', 'Atco18', 'Atco28', 'Atco33', 'Atco34', 'Atco35', 'Atco36', 'Atco38', 'Atco39', 'Atco40', 'Atco47', 'Atco48']
B: ['Atco2', 'Atco3', 'Atco20', 'Atco26', 'Atco29', 'Atco30', 'Atco37', 'Atco41']
F: ['Atco4', 'Atco5', 'Atco17', 'Atco21', 'Atco25', 'Atco44']
D: ['Atco6', 'Atco9', 'Atco10', 'Atco13', 'Atco19', 'Atco22', 'Atco23', 'Atco31', 'Atco42', 'Atco60', 'Atco62', 'Atco65']
A: ['Atco7', 'Atco24', 'Atco57', 'Atco78']
E: ['Atco27', 'Atco32', 'Atco46', 'Atco49', 'Atco53']

2:
C: ['Atco52', 'Atco54', 'Atco55', 'Atco58', 'Atco61', 'Atco63', 'Atco64', 'Atco68', 'Atco69', 'Atco70', 'Atco74', 'Atco79', 'Atco81', 'Atco87', 'Atco88', 'Atco93', 'Atco94', 'Atco98']
B: ['Atco43', 'Atco45', 'Atco59', 'Atco77', 'Atco83', 'Atco86', 'Atco89', 'Atco1

### Scheduling Day 1 Morning Shift (simply keep looping to generate additional days and shifts)

In [90]:
shift=1
for rating in ratings:
    table = scheduler(shift,ratings[rating],shifts[shift][rating])
    print(f"Rating {rating} schedule for shift {shift}")
    for i in range(ratings[rating]):
        print(f"Position {i}:",end='\t')
        for j in table[i]:
            print(table[i][j], end='\t')
        print()
    print()

Rating A schedule for shift 1
Position 0:	Atco7	Atco57	Atco7	Atco57	Atco7	Atco57	Atco7	Atco57	
Position 1:	Atco24	Atco78	Atco24	Atco78	Atco24	Atco78	Atco24	Atco78	

Rating B schedule for shift 1
Position 0:	Atco2	Atco2	Atco3	Atco2	Atco3	Atco3	Atco26	Atco3	
Position 1:	Atco3	Atco30	Atco30	Atco26	Atco26	Atco2	Atco2	Atco20	
Position 2:	Atco20	Atco37	Atco37	Atco41	Atco41	Atco20	Atco41	Atco41	
Position 3:	Atco26	Atco26	Atco20	Atco20	Atco30	Atco29	Atco29	Atco37	
Position 4:	Atco29	Atco41	Atco29	Atco29	Atco37	Atco37	Atco30	Atco30	

Rating C schedule for shift 1
Position 0:	Atco1	Atco36	Atco36	Atco8	Atco8	Atco1	Atco1	Atco36	
Position 1:	Atco8	Atco8	Atco1	Atco1	Atco36	Atco36	Atco8	Atco8	
Position 2:	Atco11	Atco38	Atco38	Atco16	Atco16	Atco11	Atco11	Atco38	
Position 3:	Atco12	Atco39	Atco39	Atco18	Atco18	Atco12	Atco12	Atco39	
Position 4:	Atco14	Atco40	Atco40	Atco28	Atco28	Atco14	Atco14	Atco40	
Position 5:	Atco15	Atco47	Atco47	Atco33	Atco33	Atco15	Atco15	Atco47	
Position 6:	Atco16	Atco16	Atco11	Atc