# Print Job Scheduling Optimization with Pyomo and Simulated ML
This notebook demonstrates how to use Pyomo for print job scheduling, with simulated ML-based print time prediction.

In [29]:
import json
from pyomo.environ import *
from datetime import timedelta
import random

In [30]:
with open('printers.json', 'r') as f:
    printers = json.load(f)

with open('models.json', 'r') as f:
    models = json.load(f)

with open('task.txt', 'r') as f:
    task_list = [line.strip() for line in f if line.strip()]

In [31]:
def parse_time(timestr):
    h, m, s = map(int, timestr.split(':'))
    return h + m/60 + s/3600

def predict_print_time_ml(model_info):
    base = len(model_info.get('model_name', ''))
    return max(0.5, base * 0.2 + random.uniform(-0.2, 0.2))

In [32]:
jobs = []
for idx, model_id in enumerate(task_list):
    model_info = next((m for m in models if m['file_id'] == model_id), None)
    if model_info:
        print_time = predict_print_time_ml(model_info)
        jobs.append({'job_id': idx, 'model_id': model_id, 'model': model_info, 'print_time': print_time})
    else:
        raise ValueError(f"Model ID {model_id} not found in models.json")

printer_ids = [p['name'] for p in printers]
job_ids = [j['job_id'] for j in jobs]

In [33]:
model = ConcreteModel()
model.P = RangeSet(0, len(printer_ids)-1)
model.J = RangeSet(0, len(jobs)-1)

model.x = Var(model.J, model.P, domain=Binary)
model.s = Var(model.J, domain=NonNegativeReals)
model.Cmax = Var(domain=NonNegativeReals)

In [34]:
def printer_supports_material(printer, required_material):
    if isinstance(required_material, list):
        return any(m['type'] in required_material for m in printer['materials'])
    else:
        return any(m['type'] == required_material for m in printer['materials'])

In [35]:
model.compatible = ConstraintList()
for j in range(len(jobs)):
    compatible = jobs[j]['model']['compatible_printers']
    required_material = jobs[j]['model']['material']
    for p in range(len(printer_ids)):
        printer = printers[p]
        if printer.get('model_name') not in compatible or not printer_supports_material(printer, required_material):
            model.compatible.add(model.x[j, p] == 0)

In [36]:
model.job_assign = ConstraintList()
for j in range(len(jobs)):
    model.job_assign.add(sum(model.x[j,p] for p in range(len(printer_ids))) == 1)

In [37]:
model.no_overlap = ConstraintList()
for p in range(len(printer_ids)):
    for j1 in range(len(jobs)):
        for j2 in range(len(jobs)):
            if j1 < j2:
                M = 1e6
                t1 = jobs[j1]['print_time']
                t2 = jobs[j2]['print_time']
                model.no_overlap.add(
                    model.s[j1] + t1 <= model.s[j2] + M*(2 - model.x[j1,p] - model.x[j2,p])
                )
                model.no_overlap.add(
                    model.s[j2] + t2 <= model.s[j1] + M*(2 - model.x[j1,p] - model.x[j2,p])
                )

In [38]:
model.makespan = ConstraintList()
for j in range(len(jobs)):
    t = jobs[j]['print_time']
    model.makespan.add(model.Cmax >= model.s[j] + t)

In [39]:
model.obj = Objective(expr=model.Cmax, sense=minimize)

In [40]:
for j in range(len(jobs)):
    compatible = jobs[j]['model']['compatible_printers']
    required_material = jobs[j]['model']['material']
    found = False
    for p, printer in enumerate(printers):
        if printer.get('model_name') in compatible:
            if any(m['type'] == required_material or (isinstance(required_material, list) and m['type'] in required_material) for m in printer['materials']):
                found = True
    if not found:
        print(f"WARNING: No compatible printer found for job {j} (model_id={jobs[j]['model_id']}) with compatible_printers={compatible} and required_material={required_material}")

In [41]:
solver = SolverFactory('highs')
solver.config.load_solutions = False
result = solver.solve(model, tee=False)

printer_job_map = {pid: [] for pid in printer_ids}
for j in range(len(jobs)):
    for p in range(len(printer_ids)):
        if value(model.x[j, p]) > 0.5:
            printer_job_map[printer_ids[p]].append(jobs[j]['model_id'])
with open('result.txt', 'w') as f:
    for printer in printer_ids:
        f.write(f"{printer}:\n")
        for job in printer_job_map[printer]:
            f.write(f"  {job}\n")

<h2>Add Imports</h2>

In [42]:
import json
from pyomo.environ import *
from datetime import timedelta
import random

<h2>Formatting the Data & Preparing all Printing Jobs</h2>

In [43]:
with open('printers.json', 'r') as f:
    printers = json.load(f)

with open('models.json', 'r') as f:
    models = json.load(f)

with open('task.txt', 'r') as f:
    task_list = [line.strip() for line in f if line.strip()]

def parse_time(timestr):
    h, m, s = map(int, timestr.split(':'))
    return h + m/60 + s/3600

def predict_print_time_ml(model_info):
    base = len(model_info.get('model_name', ''))
    return max(0.5, base * 0.2 + random.uniform(-0.2, 0.2))

jobs = []
for idx, model_id in enumerate(task_list):
    model_info = next((m for m in models if m['file_id'] == model_id), None)
    if model_info:
        print_time = predict_print_time_ml(model_info)
        jobs.append({'job_id': idx, 'model_id': model_id, 'model': model_info, 'print_time': print_time})
    else:
        raise ValueError(f"Model ID {model_id} not found in models.json")

printer_ids = [p['name'] for p in printers]
job_ids = [j['job_id'] for j in jobs]

<h2>Create Pyomo Model</h2>

Create Model and Sets within it

In [44]:
model = ConcreteModel()
model.P = RangeSet(0, len(printer_ids)-1) # Set of printers
model.J = RangeSet(0, len(jobs)-1)       # Set of jobs


Define Decisions and their Values

In [45]:
model.x = Var(model.J, model.P, domain=Binary) # x[j,p] = 1 if job j is assigned to printer p, 0 otherwise
model.s = Var(model.J, domain=NonNegativeReals) # s[j] = start time of job j
model.Cmax = Var(domain=NonNegativeReals)      # Cmax = makespan (completion time of the last job)


<h3>Defining the Constraints</h3>

<h4>Material Compability Function</h4>

In [46]:
def printer_supports_material(printer, required_material):
    if isinstance(required_material, list):
        return any(m['type'] in required_material for m in printer['materials'])
    else:
        return any(m['type'] == required_material for m in printer['materials'])

<h4>Add Printer & Material Constraints</h4>

In [47]:
model.compatible = ConstraintList()
for j in range(len(jobs)):
    compatible_printers_for_model = jobs[j]['model']['compatible_printers']
    required_material = jobs[j]['model']['material']
    for p in range(len(printer_ids)):
        printer = printers[p]
        # Check if the printer name is in the compatible list AND if the printer supports the required material
        if printer['name'] not in compatible_printers_for_model or not printer_supports_material(printer, required_material):
            model.compatible.add(model.x[j, p] == 0) # If not compatible, assignment must be 0


<h4>Add constraint for one job per printer per time</h4>
<p>Avoids multiple print jobs at once, makes sure that if there are multiple there's a quene and that this should be avoided when possible</p>

In [48]:
model.job_assign = ConstraintList()
for j in range(len(jobs)):
    model.job_assign.add(sum(model.x[j,p] for p in range(len(printer_ids))) == 1)

In [59]:
model.no_overlap = ConstraintList()
for p in range(len(printer_ids)):
    for j1 in range(len(jobs)):
        for j2 in range(len(jobs)):
            if j1 < j2:
                M = 1e6 
                t1 = jobs[j1]['print_time']
                t2 = jobs[j2]['print_time']
                #If both jobs j1 and j2 are assigned to printer p then they cannot overlap
                #Either j1 finishes before j2 starts or j2 finishes before j1 starts
                #This is enforced by using a binary variable to turn off the constraint
                #if either j1 or j2 (or both) are not assigned to printer p
                model.no_overlap.add(
                    model.s[j1] + t1 <= model.s[j2] + M*(2 - model.x[j1,p] - model.x[j2,p])
                )
                model.no_overlap.add(
                    model.s[j2] + t2 <= model.s[j1] + M*(2 - model.x[j1,p] - model.x[j2,p])
                )

'pyomo.core.base.constraint.ConstraintList'>) on block unknown with a new
Component (type=<class 'pyomo.core.base.constraint.ConstraintList'>). This is
block.del_component() and block.add_component().


Determining the Actuall Completion Time

In [None]:
model.makespan = ConstraintList()
for j in range(len(jobs)):
    t = jobs[j]['print_time']
    model.makespan.add(model.Cmax >= model.s[j] + t)

In [None]:
model.obj = Objective(expr=model.Cmax, sense=minimize) 

<h2>Solving the Opimization Problem</h2>

Checks if there is a lack of compatibility initally, was having issues with this so I figured having a function to check before would be best

In [None]:
for j in range(len(jobs)):
    compatible = jobs[j]['model']['compatible_printers']
    required_material = jobs[j]['model']['material']
    found = False
    for p, printer in enumerate(printers):
        if printer['name'] in compatible: 
            if any(m['type'] == required_material or (isinstance(required_material, list) and m['type'] in required_material) for m in printer['materials']):
                found = True
                break 
    if not found:
        print(f"WARNING: No compatible printer found for job {j} (model_id={jobs[j]['model_id']}) with compatible_printers={compatible} and required_material={required_material}")




Running the solver (A solution can be found even if there isn't technically a feasible solution, this is because there isn't enough data in order to create an "optimal solution" or multiple of the same are possible. This can happen if multiple files can be printed in different orders or with different printers and still achieve the same time.):

In [57]:
solver = SolverFactory('highs')
solver.config.load_solutions = True
result = solver.solve(model, tee=False) 

NoFeasibleSolutionError: A feasible solution was not found, so no solution can be loaded. Please set opt.config.load_solutions=False and check results.solution_status and results.incumbent_objective before loading a solution.

In [58]:
with open('printers.json', 'r') as f:
    printers = json.load(f)

with open('models.json', 'r') as f:
    models = json.load(f)

with open('task.txt', 'r') as f:
    task_list = [line.strip() for line in f if line.strip()]

def parse_time(timestr):
    h, m, s = map(int, timestr.split(':'))
    return h + m/60 + s/3600

def predict_print_time_ml(model_info):
    base = len(model_info.get('model_name', ''))
    return max(0.5, base * 0.2 + random.uniform(-0.2, 0.2))

jobs = []
for idx, model_id in enumerate(task_list):
    model_info = next((m for m in models if m['file_id'] == model_id), None)
    if model_info:
        print_time = predict_print_time_ml(model_info)
        jobs.append({'job_id': idx, 'model_id': model_id, 'model': model_info, 'print_time': print_time})
    else:
        raise ValueError(f"Model ID {model_id} not found in models.json")

printer_ids = [p['name'] for p in printers]
job_ids = [j['job_id'] for j in jobs]