# Imports

In [1]:
import numpy as np
import pandas as pd

from random import randint

import simpy
from simpy.events import AnyOf, AllOf, Event

ModuleNotFoundError: No module named 'simpy'

# Yield

Simpy uses **yield** for creating processes.

**yield** returns the next element of a generator.

In [None]:
def create_generator():
    for i in range(3):
        yield i

for i in create_generator():
    print(i)

Combining yields

In [None]:
def create_generator_outer():
    for i in range(3):
        yield (i, create_generator())

for i,g in create_generator_outer():
    for j in g:
        print('%i %i' % (i,j))

# Basic examples

## Timeout event

`env.timeout(delay)` adds a timeout event to the scheduler that happens after the delay.

In [2]:
number_of_events = 0
stats = []

def clock(env, name, rate):
    while True:
        global number_of_events
        number_of_events += 1        
        yield env.timeout(np.random.geometric(rate))

for i in range(10000):
    number_of_events = 0
    env = simpy.Environment()
    env.process(clock(env, 'fast', 0.5))
    env.process(clock(env, 'slow', 0.2))
    env.run(until=10)
    stats.append(number_of_events)

print(pd.Series(stats).describe())

NameError: name 'simpy' is not defined

## Processes

- A process defines a sequence of events.
- A process is an event => we can wait for processes to finish.

In [None]:
def process_1(env):
    while True:
        print('t_%d: Hi from P1' % env.now)
        yield env.timeout(3)
        yield env.process(process_2(env))

def process_2(env):
    while True:
        print('t_%d: Hi from P2' % env.now)
        yield env.timeout(1)
        yield env.process(process_1(env))
        
env = simpy.Environment()
env.process(process_2(env))
env.run(until=10)

## Queue

In [None]:
queue = []
idle = True
customer_number = 0

avg_inter_arrival_time = 5
avg_service_duration = 7

waiting_times = []

class Customer:
    next_id = 0
    def __init__(self, arrival_time):
        self.name = 'C_%d' % Customer.next_id
        Customer.next_id += 1
        self.arrival_time = arrival_time
    def departure(self, departure_time):
        self.departure_time = departure_time
        waiting_times.append(departure_time-self.arrival_time)
    def __str__(self):
        return self.name

def serve():
    global idle
    
    idle = False
    service_duration = np.random.geometric(1.0 / avg_service_duration)
    print('%d: serve %s for %s periods' % (env.now, queue[0], service_duration))
    yield env.timeout(service_duration)
    
    # Customer departure
    customer = queue.pop(0)
    customer.departure(env.now)
    print('%d: departure of %s, queue length is %d' % (env.now, customer, len(queue)))
    
    # Continue working or sleep
    if len(queue) == 0:
        idle = True # queue empty - going to sleep
    else:            
        yield env.process(serve()) # serve another customer
        
def arrival():
    while True:
        customer = Customer(env.now)
        queue.append(customer)
        print('%d: arrival of %s, queue length is %d' % (env.now, customer, len(queue)))

        if idle:
            env.process(serve())
        
        inter_arrival_time = np.random.geometric(1.0 / avg_inter_arrival_time)
        yield env.timeout(inter_arrival_time)
        
env = simpy.Environment()
env.process(arrival())
env.run(until=50)
print()
print('Final queue length is %d' % len(queue))
print(pd.Series(waiting_times).describe())

# Events

## Callback

In [None]:
env = simpy.Environment()

def my_callback(event):
    print('Callback')

def my_process():
    event = env.event()
    event.callbacks.append(my_callback)
    yield event.succeed()
    print('Done')

env.process(my_process())
env.run(until=10)

## Example usages for event

In [None]:
env = simpy.Environment()

class School:
    def __init__(self):
        self.class_ends = env.event()
        self.bell_proc = env.process(self.bell())
        self.pupil_procs = [env.process(self.pupil()) for i in range(3)]
        

    def bell(self):
        for i in range(2):
            yield env.timeout(45)
            self.class_ends.succeed() # end class
            print('%d: Class ended' % env.now)
            self.class_ends = env.event()
            

    def pupil(self):
        while True:
            yield self.class_ends # this event is shared!
            print(r' \o/')

school = School()
env.run(until=1000)

## Processes are events

In [None]:
env = simpy.Environment()

def sub():
    yield env.timeout(1)
    return 23

def parent():
    ret = yield env.process(sub())
    return ret

env.run(env.process(parent()))

## Waiting for multiple events

In [None]:
env = simpy.Environment()

def test_or():
    t1, t2 = env.timeout(1, value='spam'), env.timeout(2, value='eggs')
    ret = yield t1 | t2
    
    assert ret == {t1: 'spam'} # spam is triggered first
    assert env.now == 1

def test_and():
    t1, t2 = env.timeout(1, value='spam'), env.timeout(2, value='eggs')
    ret = yield t1 & t2
    
    assert ret == {t1: 'spam', t2: 'eggs'}
    assert env.now == 2

env.process(test_or())
env.process(test_and())
env.run()

**Events at the same time**

In [None]:
env = simpy.Environment()

def test_or():
    t1, t2 = env.timeout(1, value='spam'), env.timeout(1, value='eggs')
    ret = yield t1 | t2
    print(len(list(ret.items())))
    print(ret)

proc = env.process(test_or())
env.run()

## Wait for a timeout OR until a trigger is activated

In [None]:
env = simpy.Environment()

class Team:
    
    def repair(self):
        repaired = env.timeout(10)
        self.interrupted = env.event() # used to trigger interruption
        ret = yield self.interrupted | repaired
        
        if repaired in ret:
            print('repaired')
        if self.interrupted in ret:
            print('interrupted')

def interrupt(team):
    yield env.timeout(10) 
    team.interrupted.succeed() # pull trigger
            
team = Team()
env.process(team.repair())
env.process(interrupt(team))
env.run()

Careful when the events occur at the exact same time!

- It may be, that both events are in ret or only one. 
- It depands also on the order of scheduling processes.

=> Process interruption is more predictable, as interrupts are prioritized!

## Events at the same time

Interrupts and process initialization events are priorized over other events, if they occur at the same time.

If the priorititzation is the same, the one that was added first to the scheduler is executed first.

In [None]:
env = simpy.Environment()

class A:
    
    def __init__(self):
        env.process(self.proc1())
        env.process(self.proc2())
    
    def proc1(self):
        yield env.timeout(10) 
        print('1st')
        env.process(self.another_proc())
    
    def proc2(self):
        yield env.timeout(10) 
        print('3rd')
    
    def another_proc(self):
        print('2nd')
        yield env.timeout(1) 

a = A()
env.run()

## Event fail

TODO

In [None]:



env = simpy.Environment()

class A:
    
    def __init__(self):
        env.process(self.proc2())
        env.process(self.proc1())
        
    
    def proc1(self):
        self.to = env.timeout(10) 
        self.trigger = env.event()
        ret = yield self.to | self.trigger
        print(ret)
        
        if self.to in ret:
            print('TO')
            
        if self.trigger in ret:
            print('TR')
        
        
    
    def proc2(self):
        yield env.timeout(9) 
        self.trigger.succeed()
        

a = A()
env.run()

# Process Interaction

## Sleep until woken up

In [None]:
env = simpy.Environment()
my_trigger = env.event()

def my_process():
    print('Start process')
    yield my_trigger  
    print('Triggered at %d' % env.now)

def pull_trigger():
    yield env.timeout(17)
    print('Pull trigger')
    my_trigger.succeed()

env.process(my_process())
env.process(pull_trigger())
env.run(until=100)


In [None]:
# Sleep until woken up OR sth. else happens

env = simpy.Environment()
my_trigger = env.event()

def delay_process_1():
    yield env.timeout(11)
    print('Delay 1')

def delay_process_2():
    yield env.timeout(17)
    print('Delay 2')
    
def my_process():
    print('Start process')
    delay = env.process(delay_process())
    trigger = env.process(trigger_process())
    yield trigger | delay
    if delay.triggered:
        print('Random_delay at %d' % env.now)
        trigger.interrupt()
    if trigger.triggered:
        print('Trigger pulled at %d' % env.now)
        #random_delay.interrupt()
    print('Done at %d' % env.now)


env.process(my_process())
trigger_process = env.process(pull_trigger())
env.run(until=100)

## Interrupt a process


In [None]:
class EV:
    """ An electric vehicle """
    
    def __init__(self, env):
        self.env = env
        self.drive_proc = env.process(self.drive())

    def drive(self):
        """ Alternate between: drive for a while, then park and charge. """
        while True:
            # Drive
            yield self.env.timeout(20)
           
            # Park 
            print('%d: start parking at' % self.env.now)
            charging = self.env.process(self.charge_battery())
            parking = self.env.timeout(60)
            yield charging | parking
            
            if not charging.triggered:
                charging.interrupt('need to go!')
        
            print('%d: stop parking' % self.env.now)
            
    def charge_battery(self):
        print('%d: charging started' % self.env.now)
        try:
            yield self.env.timeout(70) # takes longer than parking
            print('%d: battery charged' % self.env.now)
        except simpy.Interrupt as i:
            # Onoes! Got interrupted before the charging was done.
            print('%d: charging interrupted, cause: %s' % (self.env.now, i.cause))

env = simpy.Environment()
ev = EV(env)
env.run(until=100)

# Turbine example

In [None]:
import simpy
import numpy as np
from queue import PriorityQueue
from enum import Enum
import copy
from IPython.core.debugger import set_trace

## Helper

In [None]:
def print_q(q):
    lst = [i for i in q.queue]
    lst.sort()
    print(lst) 

## Generate maintenance incidents

- List with unscheduled incidents
- List with scheduled services 
- Leave a gap of at least 3 periods corresponding to the repair time

In [None]:
def generate_incidents(prob, gap):
    ret = []
    i = 0
    while i < 100:
        if np.random.rand() < prob:
            ret.append(i)
            i += gap
        else:
            i += 1
    return ret

def generate_all():
    
    # Unscheduled
    global incidents
    incidents = generate_incidents(0.1, 4)
    print(incidents)
    
    # Scheduled
    global incidents_scheduled
    incidents_scheduled = generate_incidents(0.05, 10)
    print(incidents_scheduled)
    global n_jobs
    n_jobs = len(incidents) + len(incidents_scheduled)
    print('# jobs =', n_jobs)

## Setup simulation

In [None]:
def setup_sim():
    incident_q = PriorityQueue()
    for i in incidents:
        incident_q.put(i)
    print_q(incident_q)

    incidents_scheduled_q = incidents_scheduled.copy()
    print(incidents_scheduled_q)
    
    global env
    env = simpy.Environment()
    env.incidents = incident_q
    env.incidents_scheduled = incidents_scheduled_q
    env.until = 200
    env.threshold = 4
    print('Threshold =', env.threshold)
    
    env.turbine = Turbine(env.incidents, env.incidents_scheduled)

## Job

In [None]:
class JobType(Enum):
    SCHEDULED = 1
    UNSCHEDULED = 2
    MERGED = 3

class Job():
    
    next_id = 0
    
    def __init__(self, job_type=JobType.UNSCHEDULED, next_process=None, info=''):
        self.id = Job.next_id
        Job.next_id += 1
        self.t = env.now
        self.job_type = job_type
        self.next_process = next_process
        self.info = info

    def __str__(self):
        return 'Job [id={}, t={}, job_type={}, next_process={}, info={}]'.format(
            self.id, self.t, self.job_type, self.next_process, self.info)
    

## Turbine

In [None]:
class Turbine():
    
    def __init__(self, incidents, incidents_scheduled):
        """
        incidents: PriorityQueue
        incidents_scheduled: list (sorted)
        """
        self.incidents = incidents 
        self.incidents_scheduled = incidents_scheduled
        
        # State
        self.stop_switch = False
        self.completed_scheduled_services = []
        self.initiated_scheduled_services = []
        self.completed_unscheduled_failures = []
        self.initiated_unscheduled_failures = []
        
        # Collector
        self.job_log = []
        
        # Processes
        env.process(self.fail_process())
        env.process(self.scheduled_maintenance_process())
    
    def include(self, job):
        if len(self.incidents_scheduled) == 0: # no outstanding scheduled services left
            return job 
        
        h = self.incidents_scheduled[0] 
        if h - env.now > threshold:
            return job

        return Job(job_type=JobType.MERGED, next_process='fail', info='%s+%s' % (job.t, h))
    
    def fail_process(self):
        #print('%d: Fail process' % (env.now))
        next_dt = self.get_next_failure()
        #print('%d: Next dt = %s' % (env.now, next_dt))
        yield env.timeout(next_dt - env.now)

        # Maintenance necessary
        log = '%d: Failure' % env.now
        job = Job(job_type=JobType.UNSCHEDULED, next_process='fail')
        new_job = self.include(job) 
        
        if new_job == job: 
            log += ' - Not merged %s' % (job)
            print(log)
            env.process(self.repair_process(job))
        else: 
            log += ' - Merged %s to %s' % (job, new_job) 
            print(log)
            self.stop_switch = True
            self.incidents_scheduled.pop(0) # rm from list
            self.initiated_scheduled_services.append(new_job.t)
            env.process(self.repair_process(new_job))
    
    def get_next_failure(self):
        #print('%d: Get next failure' % (env.now))
        next_dt = -1
        while next_dt < env.now and self.incidents.qsize() > 0:
            next_dt = self.incidents.get()
        if next_dt < env.now:
            
            return until + 1 # never
        return next_dt
    
    def repair_process(self, job):
        #print('%d: Repair process, %s' % (env.now, job))
        if job.job_type == JobType.MERGED:
            yield env.timeout(4)
        else:
            yield env.timeout(3)
        
        print('%d: Repaired, %s' % (env.now, job))
        if job.job_type == JobType.SCHEDULED:
            self.completed_scheduled_services.append(job)
        if job.job_type == JobType.UNSCHEDULED:
            self.completed_unscheduled_failures.append(job)
        
        self.job_log.append(job)
        
        # Start next process
        if job.next_process is None:
            return
        
        if job.next_process == 'fail':
            env.process(self.fail_process())       
            return
        
        if job.next_process == 'scheduled':
            # no need to start new process
            return
                
    def scheduled_maintenance_process(self):
        #print('%d: Scheduled maintenance process' % (env.now))
        
        if len(self.incidents_scheduled) == 0:
            return # terminate scheduled service process for good
        
        next_scheduled_service = self.incidents_scheduled[0] 
        yield env.timeout(next_scheduled_service - env.now) 
        
        log = '%d: Time for scheduled maintenance' % (env.now)
        
        if self.stop_switch:
            log += ' - Already initiated'
            print(log)
            self.stop_switch = False
            # already removed from list
            # already added to initiated list
            env.process(self.scheduled_maintenance_process())
            return
        
        self.incidents_scheduled.pop(0) # Remove from the list
        self.initiated_scheduled_services.append(next_scheduled_service) # Add to initiated list
        
        # Create and add job
        job = Job(job_type=JobType.SCHEDULED)
        log += ' - Initiated %s' % job
        print(log)
        env.process(self.repair_process(job))
        env.process(self.scheduled_maintenance_process())
    
        
    def __str__(self):
        return self.name

    

## Check results

In [None]:
def check_result(env):
    counter = 0
    for job in env.turbine.job_log:
        print(job)
        counter += 1 if job.job_type != JobType.MERGED else 2
    print('Counter =', counter, ', # jobs =', n_jobs)
    assert counter == n_jobs

## Run

In [None]:
for i in range(20):
    generate_all()
    setup_sim()
    env.run(until=env.until)
    check_result(env)

## Single run

In [None]:
setup_sim()
env.run(until=env.until)
check_result(env)

# Shared resource


In [None]:
import threading
import simpy


env = simpy.Environment()

class TechPool():
    
    def __init__(self, tech_count):
        self.tech_count = tech_count
        
        # State
        self.available_tech_count = tech_count
        self.lock = threading.Lock() 
    
    def get_techs(self, n, who='?'):
        self.lock.acquire()
        if n > self.available_tech_count:
            raise Exception('Not enough techs')
        self.available_tech_count -= n
        print(env.now, 'TechPool - %d for %s = %d' % (n, who, self.available_tech_count)) 
        self.lock.release()
        return n
    
    
def process_1(pool):
    print('Start process 1')
    yield env.timeout(1)
    print('Get 1')
    pool.get_techs(1)
    

def process_2(pool):
    print('Start process 2')
    yield env.timeout(1)
    print('Get 2')
    pool.get_techs(2)

pool = TechPool(3)
env.process(process_1(pool))
env.process(process_2(pool))
env.run(until=100)
    