# Generic Crew Scheduling Problem from Beasley and Cao 1998 and its extensions (deadheading and layovers) from (Derigs and Schäfer 2014)

## Instance Handling and Solution Checking


In [1]:
# export

import os
from dataclasses import dataclass, field
import csv 
from collections import namedtuple
from IPython.display import display, Markdown
import pandas as pd

## Instance Handling

### Data Type for an activity (task)

**Important:** In the instance files, the activities are indexed from 1 to n. In this code, we index them from 0 to n-1. Note, however, that when writing the solution file, we use the (original) 1-based indexing.

In [2]:
#export

@dataclass(eq=True, order= True, frozen=True)
class Activity:
    index: int
    start_period: int
    end_period: int
    activity_type: str = 'work'
        
    def get_start_period(self):
        return self.start_period
    
    def get_end_period(self):
        return self.end_period
    
    def get_number_of_periods(self):
        return self.end_period - self.start_period
    
    def is_work(self):
        return self.activity_type == 'work'
    
   

### Class for an instance

Contains all the instance information. Read instance from file by giving the file name to the constructor.


In [3]:
#export
    
class csp_bc_instance():
    
    def __init__(self, filename):
        #line = 
        self.instance_name = filename[filename.rfind('/')+1:-4]
        with open(filename) as f:
            line = f.readline().split()
            self.number_of_activities = int(line[0])
            self.max_work_periods = int(line[1])

            self.activities = []
            self.last_end_period = 0


            for i in range(self.number_of_activities):

                line = f.readline().split()

                self.activities.append( Activity(i, int(line[0]),int(line[1])))
                self.last_end_period = max(self.last_end_period, int(line[1]))


            self.earliest_start_after_layover = 240 # 1200 + 480 - 1440
            self.latest_start_after_layover = self.last_end_period + 600 - 1440 

            self.start_after_layover_activities = set()
            for activity in self.activities:
                if activity.start_period >= self.earliest_start_after_layover and activity.start_period <=self.latest_start_after_layover:
                    self.start_after_layover_activities.add(activity)

            self.connection_costs = [{} for i in range(self.number_of_activities)]
            self.connection_costs_reversed = [{} for i in range(self.number_of_activities)]

            for line in f.readlines():
                line = line.split()

                self.connection_costs[int(line[0])-1][int(line[1])-1]=int(line[2])
                self.connection_costs_reversed[int(line[1])-1][int(line[0])-1]=int(line[2])
                
    def get_work_activity_zero_indexed(self,index):
        return self.activities[index]
        
    def get_deadhead_activity_zero_indexed(self,index):
        work_activity = self.activities[index]
        return Activity(index, work_activity.start_period, work_activity.end_period, "deadhead")
        
            



    

## Solution representation

In our implementation, a solution is represented as simple nested data structure:

- a **solution** is a **list of pairings**
- a **pairing** is a **list of duties**
- a **duy** is a **list of activities** (see above)

**Note:** in a pairing, the time in each duty is counted as time from the start of the day, that is, if the start period of the first activity in the second duty is 200, then the period is 200 and not 1440 + 200 in this representation



### Printing (parts of) solutions as strings

In [4]:
#export
def get_activity_str(activity):
    work_time = 0
    if activity.is_work():
        work_time = activity.end_period - activity.start_period
        
        return f"[ID:{activity.index+1} S:{activity.start_period} WT:{work_time} E:{activity.end_period} ]"
    else:
        return f"(ID:{activity.index+1} S:{activity.start_period} WT:{work_time} E:{activity.end_period})"

In [5]:
#export

def get_activity_con_str(act_a, act_b, instance = None):    
    
    cost_str = ""
    
    if instance:
        cost_str = f" C:{instance.connection_costs[act_a.index][act_b.index]}"
    
    if act_b.is_work():
        work_time = act_b.start_period - act_a.end_period
        return f" - WC WT:{work_time}{cost_str} - "
    return  f" - DC - "
    

In [6]:
#export

def get_duty_str(duty, instance = None):
    
    total_cost = 0
    

    
    output = "||  "
    total_work_time = 0
    last_activity = None
    
    for activity in duty:
        if activity.is_work:
            total_work_time += activity.get_number_of_periods()
        
            if last_activity:
                total_work_time += activity.start_period - last_activity.end_period
        
        if last_activity:
            
            if instance:
                total_cost += instance.connection_costs[last_activity.index][activity.index]
                
            output += get_activity_con_str(last_activity, activity, instance)

            
        output += get_activity_str(activity)
        last_activity = activity
        
    if instance:
        output += f" | Total: WT: {total_work_time} Cost: {total_cost} ||"
    else :
        output += f" | Total: WT: {total_work_time}  ||"
    
    return output
        
    
    

In [7]:
#export

def get_pairing_str(pairing, instance = None):
    
    output = ""
    last_duty = None
    for duty in pairing:
        
        if last_duty:
            output += f" --- Layover with time  {duty[0].start_period + 1440 - last_duty[-1].end_period}) ---  "
        
        last_duty = duty
        output += get_duty_str(duty, instance)

    return output
        
        

In [8]:
#export:

def print_solution(pairings, instance = None):
    
    for pairing in pairings:
        print(get_pairing_str(pairing, instance))
        
    
    

In [9]:
def get_solution_str(pairings, instance = None):
    
    solution_str = ""
    
    for pairing in pairings:
        solution_str += get_pairing_str(pairing, instance) + "\n"
        
    return solution_str

## Solution file reading and writing



Solution files are expected to be in the following format:

- first line: contains instance name (e.g. `csp50`)
- each remaining line contains the information regarding a pairing consisting of the following elements (separated by whitespace):
  - integer number: work activity with the (original one-indexed) activity id. Example: `2`
  - integer number followed by `_D`: deadhead activity with the (original one-indexed) activity id. Example: `2_D`
  - the character `L`: layover between two duties. Example: `L`

### Writing a solutions to a file

In [10]:
#export

def write_solution(instance, pairings, file_name = None):
    
    if file_name is None:
        file_name = instance.instance_name + "_sol.txt"
        
    with open(file_name,"w") as file:
        
        file.write (instance.instance_name + "\n")

        for pairing in pairings:
            line = ""
            for index, duty in enumerate(pairing):
                if index > 0:
                    line += "L "
                for activity in duty:
                        if activity.is_work():
                            line +=  str(activity.index+1) + " "
                        else:
                            line += str(activity.index+1) + "_D "                     
                    
            file.write (line[:-1] + "\n")

            


### Reading a solution from file

In [11]:
#export
def read_solution(solution_file_name, instance_path = "."):
    
    pairings = []
    with open(solution_file_name) as f:
        
        instance_name = f.readline().rstrip()
        
        instance_file_name = instance_path + "/" + instance_name + ".txt"
        
        instance = csp_bc_instance(instance_file_name)
        
        for line in f.readlines():
            pairing = []
            duty = []
            for item in line.split():
                if item.isnumeric():
                    duty.append(instance.get_work_activity_zero_indexed(int(item)-1))
                    
                elif item[-2:]=="_D":
                    duty.append(instance.get_deadhead_activity_zero_indexed(int(item[:-2])-1))
                    
                elif item=="L":
                    pairing.append(duty)
                    duty = []
            
            pairing.append(duty)
            pairings.append(pairing)
    return pairings, instance

## Problem / Instance Variants and Results from Derigs/Schäfer (2014)


### Problem / Instance Variants

In [12]:
#export
def get_problem_variants():
    return ["base", "dh","dhl"]


In [13]:
#export
def get_n_activities_to_range_n_crews():
    range_crew_members = dict([ 
     ( 50, range( 27, 32)),
     (100, range( 44 ,49)),
     (150, range( 69, 74)),
     (200, range( 93, 98)),
     (250, range(108,113)),
     (300, range(129,134)),
     (350, range(144,149)),
     (400, range(159,164)),
     (450, range(182,187)),
     (500, range(204,209))])
    
    return range_crew_members

In [14]:
#export
def get_instance_and_range_crew_members(number_of_activities, instance_folder = "./../instances"):
    
    filename = f"{instance_folder}/csp{number_of_activities}.txt"
    get_n_activities_to_range_n_crews()


    return csp_bc_instance(filename), get_n_activities_to_range_n_crews()[number_of_activities]


### Results from Derigs/Schäfer

In [15]:
#export

def get_dict_results_ds(number_of_activities, number_of_crew_members, instance_folder = "./instances"):
    filename =f"{instance_folder}/results_derigs_schaefer.csv"

    with open(filename, 'r') as data:

        for line in csv.DictReader(data,delimiter =";"):
            if int(line['number_act']) == number_of_activities and int(line['number_crew']) == number_of_crew_members:
                return line
    return None

In [16]:
#export

def get_obj_ds(number_of_activities, number_of_crew_members, variant, instance_folder = "./instances" ):
       
    dict_results_ds = get_dict_results_ds(number_of_activities, number_of_crew_members, instance_folder )
    if dict_results_ds is None:
        return -1
    
    return int(get_dict_results_ds(number_of_activities, number_of_crew_members,instance_folder)[f"{variant}_obj"])

In [17]:
get_obj_ds(50,29,"base")   

2399

In [18]:
#export
def claimed_optimal_ds(number_of_activities, number_of_crew_members, variant, instance_folder = "./instances"):
       
    dict_results_ds = get_dict_results_ds(number_of_activities, number_of_crew_members, instance_folder )
    if dict_results_ds is None:
        return -1
    
    return int(get_dict_results_ds(number_of_activities, number_of_crew_members, instance_folder)[f"{variant}_opt"]) > 0

In [19]:
claimed_optimal_ds(50,29,"base", "./instances")   

True

## Checking Solutions

### Checking a duty

**A duty if feasible if:**
- its **total work time** does not exceed T (in the BC instances, T=480)
- only allowed connections (as specified in the instance) are used

The work time consists of time coming from:
- **work activities**: the duration of the activity (end time - start time)
  - if an activity is used as deadhead, it does not incur work time
- **work connections**: the duration of the connection between two activities $a$ and $b$ (start time of $b$ - end time of $a$)
  - a connection is counted as *work connection* if and only if $b$ is a work activivity, otherwise it is called *deadhead connection* and does not incur work time

Note: In the BC instances, the time needed to get from base to the first activity and to return to base from the last activity is assumed to be 0.

**The cost of a duty:**
- **only depends on the connection costs** between two activities within a duty (as specified in the instance)
  - connection costs are incurred irrespective of the type of connection (work or deadhead)



In [20]:
# export
def check_duty(instance, duty):
    
    is_feasible = True
    
    total_period_span = duty[-1].end_period - duty[0].start_period
    total_cost = 0
    total_deadhead_periods = 0
    
    last_activity = None
    
    for activity in duty:
        if not activity.is_work():
            total_deadhead_periods += activity.end_period-activity.start_period
            
        if last_activity:
            
            if activity.index in instance.connection_costs[last_activity.index]:
                
                total_cost += instance.connection_costs[last_activity.index][activity.index]
                
                if not activity.is_work():
                    total_deadhead_periods += activity.start_period - last_activity.end_period
            else:
                is_feasible = False
                print("Infeasible Connection used from", last_activity, "to", activity, get_duty_str(duty))
                
        last_activity = activity
        
    total_work_time = total_period_span - total_deadhead_periods
    
    if total_work_time > instance.max_work_periods:
        is_feasible = False
        print("Violating max total work time", total_work_time, get_duty_str(duty))
    
    return is_feasible, total_cost
                    

### Checking a Pairing

**A pairing is feasible if:**
- all of its duties are feasible 
- it consists of at most two duties, that is, it involves a layover connection

**A  pairing with a layover connection is feasible if:**

- the first duty ends (strictly) after period 1200
- the layover time between the two duties (end time of first duty - start time of second duty) is between 480 and 600

**The cost of pairing** is the sum of the costs of its duties


In [21]:
#export

def check_pairing(instance, pairing):
    
    total_cost = 0
    is_feasible = True
    if len(pairing) > 2:
        is_feasible = False
        print("More than 2 duties in pairing, namely",len(pairing), get_pairing_str(pairing))
    
    last_duty = None
    for duty in pairing:
        
        is_duty_feasible, duty_cost = check_duty(instance, duty)        
        is_feasible = is_feasible and is_duty_feasible
        total_cost += duty_cost
        
        
        if last_duty is not None:
            
            if last_duty[-1].end_period <= 1200:
                is_feasible = False
                print("layover is only allowed to start after 1200, here: ", last_duty[-1].end_period , get_pairing_str(pairing))
                
            layover_connection_time = 1440 + duty[0].start_period - last_duty[-1].end_period 
            if layover_connection_time < 480 or layover_connection_time > 600:
                is_feasible = False
                print("layover connection time is not within [480,600], but:", layover_connection_time, get_pairing_str(pairing))
        last_duty = duty
        
    return is_feasible, total_cost

### Checking a Solution (a list of pairings)

**A solution is feasible for an instance with a given number of crew members if:**
- the number of pairings equals the number of crew members
- all pairings are feasible

**The total cost of a solution** is the sum of the costs of its pairings

In [22]:
#export

def check_pairings(instance, pairings, number_of_crew_members):
    
    total_cost = 0
    is_feasible = True
    
    if not len(pairings) ==  number_of_crew_members:
        is_feasible = False
        print ("Wrong number of pairings, namely", len(pairings), "and", number_of_crew_members, "crew members")
    
    for pairing in pairings:    
        is_pairing_feasible, pairing_cost  = check_pairing(instance, pairing)
        is_feasible = is_feasible and is_pairing_feasible
        total_cost += pairing_cost
        
    return is_feasible, total_cost

Some function for collecting certain statistics of a solution:

In [23]:
#export       
def get_number_of_layovers(pairings):
    number_of_layovers = 0
    for pairing in pairings:
        number_of_layovers += len(pairing)-1
    return number_of_layovers

def get_number_of_duties(pairings):
    number_of_duties= 0
    for pairing in pairings:
        number_of_duties += len(pairing)
    return number_of_duties                            

def get_number_of_duties_counting_single_duties_twice(pairings):
    number_of_duties= 0
    for pairing in pairings:
        if (len(pairing)) == 1:
            number_of_duties +=1
        number_of_duties += len(pairing)
    return number_of_duties 

In [24]:
#export

def get_number_of_deadheads(duty):
    counter = 0
    for activity in duty:
        counter += not activity.is_work()
    return counter

def get_maximum_number_of_deadheads_per_duty(pairings):
    
    max_counter = 0
    for pairing in pairings:
        for duty in pairing:
            max_counter = max(max_counter, get_number_of_deadheads(duty))
    return max_counter
        

In [25]:
#export
def get_maximum_number_of_deadheads_in_sequence(duty):
    counter = 0
    max_counter = 0
    for activity in duty:
        if not activity.is_work():            
            counter += 1
        else:
            max_counter = max(max_counter, counter)
            counter = 0     
        
    return max(max_counter, counter)

def get_maximum_number_of_deadheads_in_sequence_per_duty(pairings):
    max_counter = 0
    for pairing in pairings:
        for duty in pairing:
            max_counter = max(max_counter, get_maximum_number_of_deadheads_in_sequence(duty))
    return max_counter

In [26]:
#export
def get_maximum_number_of_activities_per_duty(pairings):
    max_counter = 0
    for pairing in pairings:
        for duty in pairing:
            max_counter = max(max_counter, len(duty))
    return max_counter
    

In [27]:
#export
def get_number_of_work_activities(duty):
    counter = 0
    for activity in duty:
        counter += activity.is_work()
    return counter
    

def get_maximum_number_of_work_activities_per_duty(pairings):
    max_counter = 0
    for pairing in pairings:
        for duty in pairing:
            max_counter = max(max_counter, get_number_of_work_activities(duty))
    return max_counter

In [28]:
#export
def get_pairings_from_file(instance_folder, solution_folder, variant, n_activities, n_crews):
    solution_file_name = solution_folder + f"/csp{n_activities}_{variant}_cm{n_crews}_sol.txt"
    return read_solution(solution_file_name, instance_folder)
    
    
    

In [29]:
#export
def check_sol_from_file(instance_folder, solution_folder, variant, n_activities, n_crews):
       
    pairings, instance = get_pairings_from_file(instance_folder, solution_folder, variant, n_activities, n_crews)
    
    return check_pairings(instance, pairings, n_crews)
    

## Try it out: Check a solution and print it

In [30]:
instance_folder = "./instances"
solution_folder = "./optimal_solutions"
variant = "dh"
n_activities = 50
n_crews = 29

In [31]:
pairings, instance = get_pairings_from_file(instance_folder, solution_folder, variant, n_activities, n_crews)

print ("Objective from Derigs / Schäfer:", get_obj_ds(n_activities, n_crews, variant), "Claimed ot be optimal: ", claimed_optimal_ds(n_activities, n_crews, variant))

print("Optimal Solution found using the SEN-Approach:", check_pairings(instance, pairings, n_crews)[1], " Checked to be feasible:", check_pairings(instance, pairings, n_crews)[0], "\n")

print_solution(pairings, instance)

Objective from Derigs / Schäfer: 2399 Claimed ot be optimal:  True
Optimal Solution found using the SEN-Approach: 2382  Checked to be feasible: True 

||  [ID:1 S:1 WT:143 E:144 ] | Total: WT: 143 Cost: 0 ||
||  [ID:2 S:14 WT:203 E:217 ] - WC WT:107 C:155 - [ID:13 S:324 WT:120 E:444 ] | Total: WT: 430 Cost: 155 ||
||  [ID:3 S:60 WT:152 E:212 ] | Total: WT: 152 Cost: 0 ||
||  [ID:4 S:97 WT:21 E:118 ] | Total: WT: 21 Cost: 0 ||
||  [ID:5 S:134 WT:231 E:365 ] - WC WT:54 C:77 - [ID:19 S:419 WT:88 E:507 ] | Total: WT: 373 Cost: 77 ||
||  [ID:6 S:138 WT:187 E:325 ] - WC WT:77 C:130 - [ID:15 S:402 WT:174 E:576 ] | Total: WT: 438 Cost: 130 ||
||  [ID:7 S:143 WT:129 E:272 ] - WC WT:142 C:170 - [ID:18 S:414 WT:49 E:463 ] - WC WT:91 C:97 - [ID:25 S:554 WT:61 E:615 ] | Total: WT: 472 Cost: 267 ||
||  [ID:8 S:144 WT:220 E:364 ] - WC WT:46 C:91 - [ID:17 S:410 WT:52 E:462 ] | Total: WT: 318 Cost: 91 ||
||  [ID:9 S:220 WT:35 E:255 ] - DC - (ID:12 S:301 WT:0 E:531) - WC WT:22 C:23 - [ID:24 S:553 WT:194