In [11]:
import random
from scheduling_lib import Job, Batch, BatchMachine

BATCH_SIZE = 5
MIN_FAMILIES = 5
MAX_FAMILIES = 25
MIN_PROCESSING_TIME = 2
MAX_PROCESSING_TIME = 20
MAX_WEIGHT = 10
MIN_JOBS = 50
MAX_JOBS = 500
MIN_DUE_DATE = 1
MAX_DUE_DATE = 30
MAX_MACHINES = 20

# Generate random data
# list of family processing times
p_f = [random.randint(MIN_PROCESSING_TIME, MAX_PROCESSING_TIME) for i in range(random.randint(MIN_FAMILIES, MAX_FAMILIES))]
print("Family processing times:\n" + str(p_f))

# list of machines
machines = [BatchMachine() for i in range(random.randint(1, MAX_MACHINES))]
print("Number of machines:\n" + str(len(machines)))

# list of jobs
jobs = [Job(family=random.randint(0, len(p_f) - 1), weight=random.randint(1, MAX_WEIGHT), due_date=random.randint(MIN_DUE_DATE, MAX_DUE_DATE)) for i in range(random.randint(MIN_JOBS, MAX_JOBS))]
print("Number of jobs:\n" + str(len(jobs)))

Family processing times:
[19, 10, 9, 12, 6, 2, 14, 7, 13, 19, 19]
Number of machines:
14
Number of jobs:
58


TWT Funktion: $\sum_{j=1}^n w_j T_j = \sum_{j=1}^n w_j \max\{C_j - d_j; 0\}$

In [12]:
# Function to get the family of a job
def s(j: Job) -> int:
    return j.f

In [13]:
def twt(jobs: list[Job]) -> int:
    return sum(job.w * max(job.C - job.d, 0) for job in jobs)

In [14]:
from math import exp


def atc(job: Job, p_avg: float, t: int, kappa: int) -> int:
    return (job.w / p_f[s(job)]) * kappa * exp(-(max(job.d - p_f[s(job)] - t, 0))/(kappa * p_avg))

Eröffnungsheuristik: ATC-BATC-Heuristic

In [15]:
from statistics import mean
from pprint import pprint

schedule = list()

# iterate over different values of kappa
for kappa in [0.5 * l for l in range(1, 11)]:
    # get a copy of the machines, jobs and schedule to not modify the original data while tuning kappa
    temp_machines = machines.copy()
    temp_jobs = jobs.copy()
    temp_schedule = list()
    top_twt = None
    top_kappa = None
    # start time
    t = 0
    # iterate over the jobs
    while len(temp_jobs) > 0:
        # remove all batches that are finished from the machines
        for machine in temp_machines: machine.update(t)
        # check if there are machines that are not allocated to a batch
        allocatable_machines = [machine for machine in temp_machines if machine.is_idle()]
        # iterate over the free machines
        while len(allocatable_machines) > 0 and len(temp_jobs) > 0:
            # sort all remaining jobs by their ATC value descending
            p_avg = mean([p_f[s(job)] for job in temp_jobs])
            for job in temp_jobs:
                job.set_metrics({"atc": atc(job, p_avg, t, kappa)})
            temp_jobs.sort(
                key = lambda job: job.get_metrics()["atc"],
                reverse = True
            )
            # get jobs of families that are currently not in a batch
            allocated_families = [machine.batch.family for machine in temp_machines if not machine.is_idle()]
            allocatable_families = [i for i in range(len(p_f)) if i not in allocated_families]
            # create batches per family by picking the best jobs by ATC Index
            allocatable_batches = list()
            for family in allocatable_families:
                family_batch = Batch()
                family_batch.family = family
                # get all remaining jobs of the family
                family_jobs = [job for job in temp_jobs if s(job) == family]
                # select min(remaining jobs in family, BATCH_SIZE) jobs of the family
                family_batch.jobs = family_jobs[:min(len(family_jobs), BATCH_SIZE)]
                family_batch.set_metrics({"batc": sum([job.get_metrics()["atc"] for job in family_batch.jobs])})
                allocatable_batches.append(family_batch)
            # skip if there are no batches to allocate
            if len(allocatable_batches) == 0:
                continue
            # sort the batches by their BATC value descending
            allocatable_batches.sort(
                key = lambda batch: batch.get_metrics()["batc"],
                reverse = True
            )
            # chose the best batch, add its completion time and allocate it to the first available machine
            top_batch = allocatable_batches[0]
            top_batch.C = t + p_f[top_batch.family]
            allocatable_machines[0].batch = top_batch
            for job in top_batch.jobs:
                job.C = top_batch.C
                temp_jobs.remove(job)
                temp_schedule.append({"job": job, "machine": temp_machines.index(allocatable_machines[0]), "start": t, "end": top_batch.C})
            allocatable_machines = [machine for machine in temp_machines if machine.batch is None]
        # increment time
        t += 1
    # get the TWT of the schedule
    temp_twt = twt([entry["job"] for entry in temp_schedule])
    print("kappa: " + str(kappa) + ", TWT: " + str(temp_twt))
    if top_twt is None or temp_twt < top_twt:
        schedule = temp_schedule
        top_twt = temp_twt
        top_kappa = kappa
        


KeyboardInterrupt: 