In [None]:
!pip install -q pulp

[K     |████████████████████████████████| 40.6MB 102kB/s 
[?25h

In [None]:
from pulp import LpProblem, LpAffineExpression, LpVariable, LpStatus, LpMinimize, value
import numpy as np

import copy
import random

In [None]:
# define parameters
periods = 8
num_students = 2200
num_teachers = 80
all_courses = list(range(172)) # number of courses offered here

In [None]:
def shuffled(input_list):
  copied_list = copy.deepcopy(input_list)
  random.shuffle(copied_list)
  return copied_list

In [None]:
# define problem
problem = LpProblem("Toy_Example", LpMinimize)

In [None]:
# define requests and qualification lists
"""student_requests = [
                    [0, 1, 3],
                    [0, 2, 3],
                    [0, 2, 4],
                    [1, 3, 4],
                    [0, 1, 2],
                    [1, 2, 3]
]

teacher_qualifs = [
                    [0, 1, 3],
                    [0, 2, 4],
                    [1, 2, 3]
]"""
student_requests = [ shuffled(all_courses)[:periods] for i in range(num_students) ]
teacher_qualifs = [ shuffled(all_courses)[:periods] for i in range(num_teachers) ]


# turn them into vectors
student_request_vectors = [list(map(lambda i: 1 if i in prefs else 0, all_courses)) for student, prefs in enumerate(student_requests)]
teacher_qualification_vectors = [list(map(lambda i: 1 if i in quals else 0, all_courses)) for teacher, quals in enumerate(teacher_qualifs)]

In [None]:
# initialize schedules (periods by courses)
blank_schedule = [ [ None for c in all_courses ] for p in range(periods) ]

def variables_added(schedule, prefix):
  for p, period_array in enumerate(schedule):
    for c, _ in enumerate(period_array):
      schedule[p][c] = LpVariable(f"{prefix}_Course_{c}_at_{p}", lowBound = 0, upBound = 1, cat="Integer")
  return schedule

# may be unclear: the list passed to the enumerate only matters for indexing. Thus `student_requests_vectors` could be used instead of 
# `student_requests`
student_schedules = np.array([ variables_added(copy.deepcopy(blank_schedule), f"Student_{student}") for student, _ in enumerate(student_requests) ])
teacher_schedules = np.array([ variables_added(copy.deepcopy(blank_schedule), f"Teacher_{teacher}") for teacher, _ in enumerate(teacher_qualifs) ])

KeyboardInterrupt: ignored

In [None]:
# define summation
def summation(terms):
  """
  return a usable sum of `terms` where coefficients are 1
  """
  total = LpAffineExpression({t: 1 for t in terms})
  return total

def summation_test():
  test_1 = LpVariable("Test_1", lowBound = 0, upBound = 1, cat="Integer")
  test_2 = LpVariable("Test_2", lowBound = 0, upBound = 1, cat="Integer")
  test_3 = LpVariable("Test_3", lowBound = 0, upBound = 1, cat="Integer")
  if str(summation([test_1, test_2, test_3])) == "Test_1 + Test_2 + Test_3":
    print("Summation test passed.")
  else:
    print("Summation test failed.")

def power(base, term):
  expression = LpAffineExpression(term)
  for i in range(base):
    expression *= term
  return expression

def summation_of_exponential(base, terms):
  total = LpAffineExpression({power(base, t) : 1 for t in terms})
  return total

summation_test()

Summation test passed.


In [None]:
total_teaching_vector = []
for teacher, schedule in enumerate(teacher_schedules):
  unrolled_schedule = []
  for p in schedule:
    for c in p:
      unrolled_schedule.append(c)
  total_teaching = summation(unrolled_schedule)
  total_teaching_vector.append(total_teaching)
#problem += summation_of_exponential(1000, total_teaching_vector)

In [None]:
# define constraints

for student, requests in enumerate(student_request_vectors):
  schedule = student_schedules[student]

  ## Each requested course must appear exactly once. Unrequested courses must not appear. ##
  # Take the sum of the slice of the schedule that represents that course (at all periods).
  # If they requested the course, requested is 1. Otherwise, it's 0. The sum should equal 1
  # if they requested it (meaning they are assigned the course once) and 0 if they didn't.
  # Furthermore, requested will never be greater than 1, thus a course will never be assigned more
  # than once.
  for course, requested in enumerate(requests):
    problem += summation(schedule[:, course]) == requested

  ## Each period must have 0 or 1 classes assigned to it, no more. ##
  # The sum of the slice of the schedule at every period must be 0 or 1, meaning they must not
  # be assigned more than 1 section per period.
  for period in schedule:
    problem += (summation(period) <= 1)

for teacher, qualifications in enumerate(teacher_qualification_vectors):
  schedule = teacher_schedules[teacher]

  ## Courses that they are unqualified to teach must not be taught by them. ##
  # This is almost the exact same as the constraint with student requests. Scroll up (line 6)
  # for explanation.
  for course, qualified in enumerate(teacher_qualification_vectors[teacher]):
    problem += summation(schedule[:, course]) <= qualified
  
  ## Each period must have 0 or 1 classes assigned to it, no more. ##
  # This is the same as it is for students. Scroll up (line 15) for explanation
  for period in schedule:
    problem += summation(period) <= 1

## Each student's section must have a teacher. ##
# This constraint should not be handled by the classes.
# Each variable in a student's schedule represents whether they are assigned
# to a specific section.
for student, s_schedule in enumerate(student_schedules):
  for period, classes_at_period in enumerate(s_schedule):
    for class_id, attending in enumerate(classes_at_period):
      # get corresponding qualified teacher variables
      teacher_assignment_variables = []
      for teacher, t_schedule in enumerate(teacher_schedules):
        if teacher_qualification_vectors[teacher][class_id] == 1:
          teacher_assignment_variables.append(t_schedule[period, class_id])
      problem += summation(teacher_assignment_variables) >= attending

In [None]:
# display solution
print(problem.constraints)
status = problem.solve()
print(f"Solution is {LpStatus[status]}\n")

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [None]:
class Class:
  def __init__(self, course_id, period):
    self.period = period
    self.course_id = course_id
    self.students = []

    self.teacher = None

  def __eq__(self, other):
    return (self.period == other.period) and (self.course_id == other.course_id)

  def __str__(self):
    return f"P.{self.period} {self.course_id} with students {self.students} taught by {self.teacher}"

classes = []
for student, schedule in enumerate(student_schedules):
  for period in schedule:
    for c in period:
      if value(c) == 1:
        # parse variable name
        all_tokens = str(c).split("_")
        int_tokens = []
        for t in all_tokens:
          try:
            int_tokens.append(int(t))
          except ValueError:
            pass
        
        new_class = Class(int_tokens[1], int_tokens[2])

        for existing_class in classes:
          if new_class == existing_class:
            break
        else:
          new_class.students.append(int_tokens[0])
          classes.append(new_class)
          continue

        existing_class.students.append(int_tokens[0])

for teacher, schedule in enumerate(teacher_schedules):
  for period in schedule:
    for c in period:
      if value(c) == 1:
        # parse variable name
        all_tokens = str(c).split("_")
        int_tokens = []
        for t in all_tokens:
          try:
            int_tokens.append(int(t))
          except ValueError:
            pass
        
        new_class = Class(int_tokens[1], int_tokens[2])

        for existing_class in classes:
          if new_class == existing_class:
            break
        else:
          classes.append(new_class)
          continue

        existing_class.teacher = int_tokens[0]

# output classes by period
last_period_displayed = None
for existing_class in sorted(classes, key=lambda c: c.period):
  if existing_class.period != last_period_displayed:
    print()
  print(existing_class)
  last_period_displayed = existing_class.period

In [None]:
counts = [1, 2, 3, 4]
avg = sum(counts) / 4
deviations = [abs(avg - c) for c in counts]
avg_dev = sum(deviations) / 4
print(avg_dev)

counts = [1, 3, 3, 3]
avg = sum(counts) / 4
deviations = [abs(avg - c) for c in counts]
avg_dev = sum(deviations) / 4
print(avg_dev)

# won't work since abs isn't linear