# SimPy Tutorial
https://towardsdatascience.com/introduction-to-simulation-with-simpy-322606d4ba0c

https://simpy.readthedocs.io/en/latest/simpy_intro/basic_concepts.html

In [2]:
import simpy
import pandas as pd
import numpy  as np
from scipy.stats import norm
from scipy.stats import expon

In [3]:
def car(env):
     while True:
         print('Start parking at %d' % env.now)
         parking_duration = 5
         yield env.timeout(parking_duration)
         print('Start driving at %d' % env.now)
         trip_duration = 2
         yield env.timeout(trip_duration)

In [4]:
env = simpy.Environment()
env.process(car(env))

env.run(until=15)

Start parking at 0
Start driving at 5
Start parking at 7
Start driving at 12
Start parking at 14


In [5]:
class Car(object):
     def __init__(self, env):
         self.env = env
         # Start the run process everytime an instance is created.
         self.action = env.process(self.run())

     def run(self):
         while True:
             print('Start parking and charging at %d' % self.env.now)
             charge_duration = 5
             # We yield the process that process() returns
             # to wait for it to finish
             yield self.env.process(self.charge(charge_duration))

             # The charge process has finished and
             # we can start driving again.
             print('Start driving at %d' % self.env.now)
             trip_duration = 2
             yield self.env.timeout(trip_duration)

     def charge(self, duration):
         yield self.env.timeout(duration)

In [6]:
env = simpy.Environment()
car = Car(env)
#env.process(car(env))

env.run(until=15)

Start parking and charging at 0
Start driving at 5
Start parking and charging at 7
Start driving at 12
Start parking and charging at 14


## SimPy: interrupting processes via interruption handling (interrupted object/class/process)

In [7]:
def driver(env, car):
     yield env.timeout(3)
     car.action.interrupt()

In [8]:
class Car(object):
     def __init__(self, env):
         self.env = env
         # Start the run process everytime an instance is created.
         self.action = env.process(self.run())

     def run(self):
         while True:
             print('Start parking and charging at %d' % self.env.now)
             charge_duration = 5
            # We may get interrupted while charging the battery
             try:
                 yield self.env.process(self.charge(charge_duration))
             except simpy.Interrupt:
                 # When we received an interrupt, we stop charging and
                 # switch to the "driving" state
                 print(f'Was interrupted at {env.now}. Hope, the battery is full enough ...')

             # The charge process has finished and
             # we can start driving again.
             print('Start driving at %d' % self.env.now)
             trip_duration = 2
             yield self.env.timeout(trip_duration)

     def charge(self, duration):
         yield self.env.timeout(duration)

In [9]:
env = simpy.Environment()
car = Car(env)
env.process(driver(env, car))

env.run(until=15)

Start parking and charging at 0
Was interrupted at 3. Hope, the battery is full enough ...
Start driving at 3
Start parking and charging at 5
Start driving at 10
Start parking and charging at 12


#### Resources
- basic resources: FIFO

In [10]:
def car(env, name, bcs, driving_time, charge_duration):
     """BCS: battery charging station"""
     # Simulate driving to the BCS
     yield env.timeout(driving_time)

     # Request one of its charging spots
     print('%s arriving at %d' % (name, env.now))
     # with statement automatically releases resources
     # otherwise release() needs to be called separately
     with bcs.request() as req:
         yield req

         # Charge the battery
         print('%s starting to charge at %s' % (name, env.now))
         yield env.timeout(charge_duration)
         print('%s leaving the bcs at %s' % (name, env.now))

In [11]:
env = simpy.Environment()
bcs = simpy.Resource(env, capacity=2)

In [12]:
for i in range(4):
     env.process(car(env, 'Car %d' % i, bcs, i*2, 5))

In [13]:
env.run()

Car 0 arriving at 0
Car 0 starting to charge at 0
Car 1 arriving at 2
Car 1 starting to charge at 2
Car 2 arriving at 4
Car 0 leaving the bcs at 5
Car 2 starting to charge at 5
Car 3 arriving at 6
Car 1 leaving the bcs at 7
Car 3 starting to charge at 7
Car 2 leaving the bcs at 10
Car 3 leaving the bcs at 12


In [14]:
import random
import simpy

In [15]:
"""
Machine shop example

Covers:

- Interrupts
- Resources: PreemptiveResource

Scenario:
  A workshop has *n* identical machines. A stream of jobs (enough to
  keep the machines busy) arrives. Each machine breaks down
  periodically. Repairs are carried out by one repairman. The repairman
  has other, less important tasks to perform, too. Broken machines
  preempt theses tasks. The repairman continues them when he is done
  with the machine repair. The workshop works continuously.

"""

RANDOM_SEED = 42
PT_MEAN = 10.0         # Avg. processing time in minutes
PT_SIGMA = 2.0         # Sigma of processing time
MTTF = 300.0           # Mean time to failure in minutes
BREAK_MEAN = 1 / MTTF  # Param. for expovariate distribution
REPAIR_TIME = 30.0     # Time it takes to repair a machine in minutes
JOB_DURATION = 30.0    # Duration of other jobs in minutes
NUM_MACHINES = 10      # Number of machines in the machine shop
WEEKS = 4              # Simulation time in weeks
SIM_TIME = WEEKS * 7 * 24 * 60  # Simulation time in minutes


def time_per_part():
    """Return actual processing time for a concrete part."""
    return random.normalvariate(PT_MEAN, PT_SIGMA)


def time_to_failure():
    """Return time until next failure for a machine."""
    return random.expovariate(BREAK_MEAN)


class Machine(object):
    """A machine produces parts and my get broken every now and then.

    If it breaks, it requests a *repairman* and continues the production
    after the it is repaired.

    A machine has a *name* and a numberof *parts_made* thus far.

    """
    def __init__(self, env, name, repairman):
        self.env = env
        self.name = name
        self.parts_made = 0
        self.broken = False

        # Start "working" and "break_machine" processes for this machine.
        self.process = env.process(self.working(repairman))
        env.process(self.break_machine())

    def working(self, repairman):
        """Produce parts as long as the simulation runs.

        While making a part, the machine may break multiple times.
        Request a repairman when this happens.

        """
        while True:
            # Start making a new part
            done_in = time_per_part()
            while done_in:
                try:
                    # Working on the part
                    start = self.env.now
                    yield self.env.timeout(done_in)
                    done_in = 0  # Set to 0 to exit while loop.

                except simpy.Interrupt:
                    self.broken = True
                    done_in -= self.env.now - start  # How much time left?

                    # Request a repairman. This will preempt its "other_job".
                    with repairman.request(priority=1) as req:
                        yield req
                        yield self.env.timeout(REPAIR_TIME)

                    self.broken = False

            # Part is done.
            self.parts_made += 1

    def break_machine(self):
        """Break the machine every now and then."""
        while True:
            yield self.env.timeout(time_to_failure())
            if not self.broken:
                # Only break the machine if it is currently working.
                self.process.interrupt()


def other_jobs(env, repairman):
    """The repairman's other (unimportant) job."""
    while True:
        # Start a new job
        done_in = JOB_DURATION
        while done_in:
            # Retry the job until it is done.
            # It's priority is lower than that of machine repairs.
            with repairman.request(priority=2) as req:
                yield req
                try:
                    start = env.now
                    yield env.timeout(done_in)
                    done_in = 0
                except simpy.Interrupt:
                    done_in -= env.now - start

In [16]:
# Setup and start the simulation
print('Machine shop')
random.seed(RANDOM_SEED)  # This helps reproducing the results

# Create an environment and start the setup process
env = simpy.Environment()
repairman = simpy.PreemptiveResource(env, capacity=1)
machines = [Machine(env, 'Machine %d' % i, repairman)
            for i in range(NUM_MACHINES)]
env.process(other_jobs(env, repairman))

# Execute!
env.run(until=SIM_TIME)

# Analyis/results
print('Machine shop results after %s weeks' % WEEKS)
for machine in machines:
    print('%s made %d parts.' % (machine.name, machine.parts_made))

Machine shop
Machine shop results after 4 weeks
Machine 0 made 3251 parts.
Machine 1 made 3273 parts.
Machine 2 made 3242 parts.
Machine 3 made 3343 parts.
Machine 4 made 3387 parts.
Machine 5 made 3244 parts.
Machine 6 made 3269 parts.
Machine 7 made 3185 parts.
Machine 8 made 3302 parts.
Machine 9 made 3279 parts.


In [17]:
random.normalvariate()

1.3192211945238288

## JSSP instance

In [18]:
import numpy as np
import numpy.typing as npt
import simpy

In [19]:
def gen_rnd_JSSP_inst(
    n_jobs: int,
    n_machines: int,
    seed: int = 42,
) -> tuple[int, int, int, npt.NDArray[np.uint16], npt.NDArray[np.uint16], npt.NDArray[np.uint16]]:
    """
    Generates random job shop instance with given number of jobs and machines'
    - each job on all machines
    - max processing time = 9
    
    Output:
        - n_jobs: number of jobs
        - n_machines: number of machines
        - n_tasks: number of tasks
        - mat_ProcTimes: matrix of processing times | shape=(n_jobs,n_machines)
        - mat_JobMachID: matrix of machine IDs per job starting by index 1 | shape=(n_jobs,n_machines)
        - mat_OpID: matrix of operation IDs starting by index 1 | shape=(n_jobs,n_machines)
    """
    # generate random process time matrix shape=(n_jobs, n_machines)
    np_rnd_gen = np.random.default_rng(seed=seed)
    mat_ProcTimes = np_rnd_gen.integers(1, 10, size=(n_jobs,n_machines), dtype=np.uint16)
    
    # generate randomly shuffled job machine combinations
    # machine IDs from 1 to n_machines
    temp = np.arange(0, (n_machines), step=1, dtype=np.uint16)
    temp = np.expand_dims(temp, axis=0)
    # repeat dummy line until number n_jobs is reached
    temp = np.repeat(temp, n_jobs, axis=0)
    # randomly permute the machine indices job-wise
    mat_JobMachID = np_rnd_gen.permuted(temp, axis=1)
    
    # generate operation ID matrix
    n_ops = n_jobs * n_machines
    temp2 = np.arange(0, (n_ops), step=1, dtype=np.uint16)
    mat_OpID = temp2.reshape(n_jobs, -1)
    
    return n_jobs, n_machines, n_ops, mat_ProcTimes, mat_JobMachID, mat_OpID

In [20]:
(n_jobs, n_machines, n_ops, 
 mat_ProcTimes, mat_JobMachID, mat_OpID) = gen_rnd_JSSP_inst(2,3)

In [21]:
mat_ProcTimes

array([[2, 1, 9],
       [7, 9, 6]], dtype=uint16)

In [22]:
mat_JobMachID

array([[2, 0, 1],
       [0, 1, 2]], dtype=uint16)

##### Brainstorming of ways to import machine names and IDs
*Case 1: only names are given*
- read machine names and assert IDs to them
- build data structure with name and ID bundled (maybe as property of a machine class)

*Case 2: IDs are given*
- only building data structure with name and ID bundled

*Data Structure:*
- if only mapping of two pairs in each direction (lookup ID or lookup machine name)
    - bi-directional dictionary

In [23]:
n_machines

3

In [24]:
machine_names = list()
for i in range(n_machines):
    name = f'M{i+1}'
    machine_names.append(name)

In [25]:
machine_names

['M1', 'M2', 'M3']

In [26]:
from typing import TypeAlias
from collections import OrderedDict

SimPyEnv: TypeAlias = simpy.core.Environment

In [27]:
class Machine(simpy.resources.resource.Resource):
    
    def __init__(
        self,
        env: SimPyEnv,
        identifier: int,
        name: str | None = None,
        num_slots: int = 1,
    ) -> None:
        """
        env:        SimPy Environment in which machine is embedded
        num_slots:  capacity of the machine, if multiple processing 
                    slots available at the same time > 1
        """
        # intialize base class
        super().__init__(env=env, capacity=num_slots)
        
        # assert machine information
        self.identifier = identifier
        # custom name
        if name is not None:
            self.name = name
        else:
            self.name = f'M{identifier}'
        
        # currently processed job
        self.current_job_ID: int | None = None
        self.current_job: Job | None = None
        
        # machine state parameters
        self.is_occupied: bool = False
        self.is_waiting: bool = False
        self.is_blocked: bool = False
        self.is_failed: bool = False
        # maybe for future, curently no working time calendars planned
        self.is_paused: bool = False
        
        # time in state parameters
        self.time_occupied: float = 0.
        self.time_waiting: float = 0.
        self.time_blocked: float = 0.
        self.time_failed: float = 0.
        
        # number of inputs/outputs
        self.num_jobs_input: int = 0
        self.num_jobs_output: int = 0

In [28]:
env = simpy.Environment()
m1 = Machine(env=env, identifier=1)

In [29]:
m1.identifier

1

In [30]:
class Operation(object):
    
    def __init__(
        self,
        identifier: int,
        proc_times: float,
        machine_identifier: int,
        name: str | None = None,
    ) -> None:
        """
        identifier:         operation's ID
        proc_times:         operation's processing times
        machine_identifier: ID of machine on which operation is processed
        """
        # !!!!!!!!! perhaps processing times in future multiple entries depending on associated machine
        # change of input format necessary, currently only one machine for each operation
        # no groups, no differing processing times for different machines 

        # assert operation information
        self.identifier = identifier
        # custom name
        if name is not None:
            self.name = name
        else:
            self.name = f'O{identifier}'
            
        # process information
        self.proc_time = proc_times
        self.target_machine = machine_identifier

In [49]:
class Job(object):
    
    def __init__(
        self,
        identifier: int,
        proc_times: list[float],
        machine_order: list[int],
        operation_identifiers: list[int],
        name: str | None = None,
    ) -> None:
        """
        identifier:             job's ID
        proc_times:             list of processing times for each operation
        machine_order:          list of machine IDs
        operation_identifiers:  list of operation IDs
        """
        # intialize base class
        super().__init__()
        
        ### BASIC INFORMATION ###
        # assert job information
        self.identifier = identifier
        # custom name
        if name is not None:
            self.name = name
        else:
            self.name = f'J{identifier}'
        
        ### OPERATIONS ##
        self.operations = list()
        
        for idx, op_ID in enumerate(operation_identifiers):
            op = Operation(
                identifier=op_ID,
                proc_times=proc_times[idx],
                machine_identifier=machine_order[idx],
            )
            self.operations.append(op)
            
        self.total_num_ops: int = len(self.operations)
        self.num_finished_ops: int = 0
        self.next_op: Operation = self.operations[0]
        
        ### STATE ###
        # intra-process job state parameters
        # job is being processed, maybe better naming in future
        self.is_occupied: bool = False
        # waiting state only when released
        self.is_waiting: bool = False
        # if lying on failed machine
        self.is_failed: bool = False
        
        # intra-process time characteristics
        self.time_occupied: float = 0.
        self.time_waiting: float = 0.
        self.time_failed: float = 0.
        
        # inter-process job state parameters
        # first operation scheduled --> released job
        self.is_released: bool = False
        # last operation ended --> finished job
        self.is_finished: bool = False
        
        # inter-process time characteristics
        # time of first operation starting point
        self.time_entry: float = 0.
        # time of last operation ending point
        self.time_exit: float = 0.
        
        # current resource location
        self.current_resource: object | None = None # specify type if class definition finished
        
        
    def obtain_next_op(self) -> Operation:
        return self.next_op

In [32]:
(n_jobs, n_machines, n_ops, 
 mat_ProcTimes, mat_JobMachID, mat_OpID) = gen_rnd_JSSP_inst(2,3)

In [33]:
mat_ProcTimes

array([[2, 1, 9],
       [7, 9, 6]], dtype=uint16)

In [34]:
mat_JobMachID

array([[2, 0, 1],
       [0, 1, 2]], dtype=uint16)

In [35]:
mat_OpID

array([[0, 1, 2],
       [3, 4, 5]], dtype=uint16)

- job sets as own class currently not necessary, use standard OrderedDict instead

In [36]:
class JobSet(object):
    
    def __init__(
        self,
        mat_ProcTimes: npt.NDArray[np.uint16],
        mat_JobMachID: npt.NDArray[np.uint16],
        mat_OpID: npt.NDArray[np.uint16],
    ):
        """
        mat_ProcTimes: matrix of processing times | shape=(n_jobs,n_machines)
        mat_JobMachID: matrix of machine IDs per job starting by index 1 | shape=(n_jobs,n_machines)
        mat_OpID: matrix of operation IDs starting by index 1 | shape=(n_jobs,n_machines)
        """
        
        self._jobs = OrderedDict()
        
        for job_id in range(len(mat_ProcTimes)):
            temp1 = mat_ProcTimes[job_id].tolist()
            temp2 = mat_JobMachID[job_id].tolist()
            temp3 = mat_OpID[job_id].tolist()
            job = Job(
                identifier=job_id,
                proc_times=temp1,
                machine_order=temp2,
                operation_identifiers=temp3,
            )
            self._jobs[job_id] = job
            
    def __getitem__(
        self,
        job_id: int,
    ):
        return self._jobs[job_id]
    

In [37]:
def build_job_set(
    mat_ProcTimes: npt.NDArray[np.uint16],
    mat_JobMachID: npt.NDArray[np.uint16],
    mat_OpID: npt.NDArray[np.uint16],
) -> OrderedDict[int, Job]:
    """
    mat_ProcTimes: matrix of processing times | shape=(n_jobs,n_machines)
    mat_JobMachID: matrix of machine IDs per job starting by index 1 | shape=(n_jobs,n_machines)
    mat_OpID: matrix of operation IDs starting by index 1 | shape=(n_jobs,n_machines)
    """
    jobs = OrderedDict()
        
    for job_id in range(len(mat_ProcTimes)):
        temp1 = mat_ProcTimes[job_id].tolist()
        temp2 = mat_JobMachID[job_id].tolist()
        temp3 = mat_OpID[job_id].tolist()
        job = Job(
            identifier=job_id,
            proc_times=temp1,
            machine_order=temp2,
            operation_identifiers=temp3,
        )
        jobs[job_id] = job
        
    return jobs
    

In [38]:
jobs_set = build_job_set(
    mat_ProcTimes=mat_ProcTimes,
    mat_JobMachID=mat_JobMachID,
    mat_OpID=mat_OpID,
)

In [39]:
jobs_set

OrderedDict([(0, <__main__.Job at 0x1b9f5c24110>),
             (1, <__main__.Job at 0x1b9f5c25dd0>)])

In [44]:
test = jobs_set[1]

In [45]:
op = test.obtain_next_op()

In [46]:
op

<__main__.Operation at 0x1b9f5c25ed0>

In [48]:
op.identifier

3

In [43]:
job_set = JobSet(
    mat_ProcTimes=mat_ProcTimes,
    mat_JobMachID=mat_JobMachID,
    mat_OpID=mat_OpID,
)