In [1]:
from typing import List
from azure.quantum.optimization import Term
from azure.quantum import Workspace

workspace = Workspace (
    subscription_id = "",  # Add your subscription_id
    resource_group = "",   # Add your resource_group
    name = "",             # Add your workspace name
    location = ""          # Add your workspace location (for example, "westus")
    )

workspace.login()

To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code C9NUVT49H to authenticate.


<msrest.authentication.BasicTokenAuthentication at 0x7fe6c9b6d690>

In [2]:
def process_config(jobs_ops_map:dict, machines_ops_map:dict, processing_time:dict, T:int):
    """
    Process & validate problem parameters (config) and generate inverse dict of operations to jobs.

    Keyword arguments:

    jobs_ops_map (dict): Map of jobs to operations {job: [operations]}
    machines_ops_map(dict): Mapping of operations to machines, e.g.:
        machines_ops_map = {
            0: [0,1],          # Operations 0 & 1 assigned to machine 0
            1: [2,3]           # Operations 2 & 3 assigned to machine 1
        }
    processing_time (dict): Operation processing times
    T (int): Allowed time (jobs can only be scheduled below this limit)
    """

    # Problem cannot take longer to complete than all operations executed sequentially
    ## Sum all operation processing times to calculate the maximum makespan
    T = min(sum(processing_time.values()), T) 

    # Ensure operation assignments to machines are sorted in ascending order
    for m, ops in machines_ops_map.items():
        machines_ops_map[m] = sorted(ops)
    ops_jobs_map = {}

    for job, ops in jobs_ops_map.items():
        # Fail if operation IDs within a job are out of order
        assert (ops == sorted(ops)), f"Operation IDs within a job must be in ascending order. Job was: {job}: {ops}"

        for op in ops:
            # Fail if there are duplicate operation IDs
            assert (op not in ops_jobs_map.keys()), f"Operation IDs must be unique. Duplicate ID was: {op}"
            ops_jobs_map[op] = job

    return ops_jobs_map, T

In [8]:
# Set problem parameters
## Allowed time (jobs can only be scheduled below this limit)
T = 10

## Processing time for each operation
processing_time = {0: 2, 1: 1, 2: 2, 3: 2, 4: 1, 5: 2}

## Assignment of operations to jobs (job ID: [operation IDs])
### Operation IDs within a job must be in ascending order
jobs_ops_map = {
    0: [0, 1], # Restart life support
    1: [2, 3], # Recalibrate navigation system
    2: [4, 5]  # Replace power transformer in the reactor
}

## Assignment of operations to machines
### Three jobs, two machines
machines_ops_map = {
    0: [0, 1, 4, 5], # Operations 0, 1, 4 and 5 are assigned to machine 0 (the universal multi-tool)
    1: [2, 3]        # Operations 2 & 3 are assigned to machine 1 (the ship computer)
}

## Inverse mapping of jobs to operations
ops_jobs_map, T = process_config(jobs_ops_map, machines_ops_map, processing_time, T)

In [3]:
def precedence_constraint(jobs_ops_map:dict, T:int, processing_time:dict, weight:float):
    """
    Construct penalty terms for the precedence constraint.

    Keyword arguments:

    jobs_ops_map (dict): Map of jobs to operations {job: [operations]}
    T (int): Allowed time (jobs can only be scheduled below this limit)
    processing_time (dict): Operation processing times
    weight (float): Relative importance of this constraint
    """

    terms = []

    # Loop through all jobs:
    for ops in jobs_ops_map.values():
        # Loop through all operations in this job:
        for i in range(len(ops) - 1):
            for t in range(0, T):
                # Loop over times that would violate the constraint:
                for s in range(0, min(t + processing_time[ops[i]], T)):
                    # Assign penalty
                    terms.append(Term(c=weight, indices=[ops[i]*T+t, (ops[i+1])*T+s]))

    return terms

In [10]:
def operation_once_constraint(ops_jobs_map:dict, T:int, weight:float):
    """
    Construct penalty terms for the operation once constraint.
    Penalty function is of form: 2xy - x - y + 1

    Keyword arguments:

    ops_jobs_map (dict): Map of operations to jobs {op: job}
    T (int): Allowed time (jobs can only be scheduled below this limit)
    weight (float): Relative importance of this constraint
    """

    terms = []

    # 2xy - x - y parts of the constraint function
    # Loop through all operations
    for op in ops_jobs_map.keys():
        for t in range(T):
            # - x - y terms
            terms.append(Term(c=weight*-1, indices=[op*T+t]))

            # + 2xy term
            # Loop through all other start times for the same job
            # to get the cross terms
            for s in range(t+1, T):
                terms.append(Term(c=weight*2, indices=[op*T+t, op*T+s]))

    # + 1 term
    terms.append(Term(c=weight*1, indices=[]))

    return terms

In [4]:
def no_overlap_constraint(T:int, processing_time:dict, ops_jobs_map:dict, machines_ops_map:dict, weight:float):
    """
    Construct penalty terms for the no overlap constraint.

    Keyword arguments:

    T (int): Allowed time (jobs can only be scheduled below this limit)
    processing_time (dict): Operation processing times
    weight (float): Relative importance of this constraint
    ops_jobs_map (dict): Map of operations to jobs {op: job}
    machines_ops_map(dict): Mapping of operations to machines, e.g.:
        machines_ops_map = {
            0: [0,1],          # Operations 0 & 1 assigned to machine 0
            1: [2,3]           # Operations 2 & 3 assigned to machine 1
        }
    """

    terms = []

    # For each machine
    for ops in machines_ops_map.values():
        # Loop over each operation i requiring this machine
        for i in ops:
            # Loop over each operation k requiring this machine 
            for k in ops:
                # Loop over simulation time
                for t in range(T):
                    # When i != k (when scheduling two different operations)
                    if i != k:
                        # t = s meaning two operations are scheduled to start at the same time on the same machine
                        terms.append(Term(c=weight*1, indices=[i*T+t, k*T+t]))

                        # Add penalty when operation runtimes overlap
                        for s in range(t, min(t + processing_time[i], T)):
                            terms.append(Term(c=weight*1, indices=[i*T+t, k*T+s]))  

                        # If operations are in the same job, penalize for the extra time 0 -> t (operations scheduled out of order)
                        if ops_jobs_map[i] == ops_jobs_map[k]:
                            for s in range(0, t):
                                if i < k:
                                    terms.append(Term(c=weight*1, indices=[i*T+t, k*T+s]))  
                                if i > k:
                                    terms.append(Term(c=weight*1, indices=[i*T+s, k*T+t]))  

    return terms

In [12]:
def calc_penalty(t:int, m_count:int, t0:int): 
    assert m_count > 1                           # Ensure you don't divide by 0
    return (m_count**(t - t0) - 1)/float(m_count - 1)

def makespan_objective(T:int, processing_time:dict, jobs_ops_map:dict, m_count:int, weight:float):
    """
    Construct makespan minimization terms.

    Keyword arguments:

    T (int): Allowed time (jobs can only be scheduled below this limit)
    processing_time (dict): Operation processing times
    jobs_ops_map (dict): Map of jobs to operations {job: [operations]}
    m_count (int): Number of machines
    weight (float): Relative importance of this constraint
    """

    terms = []

    lower_bound = max([sum([processing_time[i] for i in job]) for job in jobs_ops_map.values()])
    upper_bound = T

    # Loop through the final operation of each job
    for job in jobs_ops_map.values():
        i = job[-1]
        # Loop through each time step the operation could be completion at
        for t in range(lower_bound + 1, T + processing_time[i]):
            terms.append(Term(c=weight*(calc_penalty(t, m_count, lower_bound)), indices=[i*T + (t - processing_time[i])]))

    return terms

In [5]:
# Generate terms to submit to solver using functions defined previously
## Assign penalty term weights:
alpha = 5  # Precedence constraint
beta = 5   # Operation once constraint
gamma = 5  # No overlap constraint
delta = 0.004  # Makespan minimization (objective function)

## Build terms
### Constraints:
c1 = precedence_constraint(jobs_ops_map, T, processing_time, alpha)
c2 = operation_once_constraint(ops_jobs_map, T, beta)
c3 = no_overlap_constraint(T, processing_time, ops_jobs_map, machines_ops_map, gamma)

### Objective function
c4 = makespan_objective(T, processing_time, jobs_ops_map, len(machines_ops_map), delta)

### Combine terms:
terms = []
terms = c1 + c2 + c3 + c4

NameError: name 'jobs_ops_map' is not defined

In [19]:
from azure.quantum.optimization import Problem, ProblemType
from azure.quantum.optimization import SimulatedAnnealing # Change this line to match the Azure Quantum Optimization solver type you wish to use

# Problem type is PUBO in this instance. You could also have chosen to represent the problem in Ising form.
problem = Problem(name="Job shop sample", problem_type=ProblemType.pubo, terms=terms)

# Provide details of your workspace, created at the beginning of this tutorial
# Provide the name of the solver you wish to use for this problem (as imported above)
solver = SimulatedAnnealing(workspace, timeout = 100) # Timeout in seconds

# Run job synchronously
result = solver.optimize(problem)
config = result['configuration']

print(config)

AttributeError: module 'azure.quantum' has no attribute 'storage'