Skip to content

Commit

Permalink
Merge 286f77d into 06e9edb
Browse files Browse the repository at this point in the history
  • Loading branch information
vagvaz authored Jul 25, 2018
2 parents 06e9edb + 286f77d commit 39aa104
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 2 deletions.
208 changes: 208 additions & 0 deletions desmod/pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
"""Pool class for modeling a container of resources.
A pool models a container of items or resources. Pool is similar to the :class:
`simpy.resources.Container`, but with additional events when the Container is
empty or full. Users can put or get items in the pool with a certain amount as
a parameter.
"""

from simpy import Event
from simpy.core import BoundClass


class PoolPutEvent(Event):
def __init__(self, pool, amount=1):
super(PoolPutEvent, self).__init__(pool.env)
self.pool = pool
self.amount = amount
self.callbacks.append(pool._trigger_get)
pool._putters.append(self)
pool._trigger_put()

def cancel(self):
if not self.triggered:
self.pool._putters.remove(self)
self.callbacks = None


class PoolGetEvent(Event):
def __init__(self, pool, amount=1):
super(PoolGetEvent, self).__init__(pool.env)
self.pool = pool
self.amount = amount
self.callbacks.append(pool._trigger_put)
pool._getters.append(self)
pool._trigger_get()

def cancel(self):
if not self.triggered:
self.pool._getters.remove(self)
self.callbacks = None


class PoolWhenNewEvent(Event):
def __init__(self, pool):
super(PoolWhenNewEvent, self).__init__(pool.env)
self.pool = pool
pool._new_waiters.append(self)
pool._trigger_when_new()

def cancel(self):
if not self.triggered:
self.pool._new_waiters.remove(self)
self.callbacks = None


class PoolWhenAnyEvent(Event):
def __init__(self, pool):
super(PoolWhenAnyEvent, self).__init__(pool.env)
self.pool = pool
pool._any_waiters.append(self)
pool._trigger_when_any()

def cancel(self):
if not self.triggered:
self.pool._any_waiters.remove(self)
self.callbacks = None


class PoolWhenFullEvent(Event):
def __init__(self, pool):
super(PoolWhenFullEvent, self).__init__(pool.env)
self.pool = pool
pool._full_waiters.append(self)
pool._trigger_when_full()

def cancel(self):
if not self.triggered:
self.pool._full_waiters.remove(self)
self.callbacks = None


class Pool(object):
"""Simulation pool of arbitrary items.
`Pool` is similar to :class:`simpy.resources.Container`.
It provides a simulation-aware container for managing a pool of objects
needed by multiple processes.
Resources are added and removed using :meth:`put()` and :meth:`get()`.
:param env: Simulation environment.
:param capacity: Capacity of the pool; infinite by default.
:param hard_cap:
If specified, the pool overflows when the `capacity` is reached.
:param init_level: Initial level of the pool.
:param name: Optional name to associate with the queue.
"""

def __init__(self, env, capacity=float('inf'), hard_cap=False,
init_level=0, name=None):
self.env = env
#: Capacity of the queue (maximum number of items).
self.capacity = capacity
self._hard_cap = hard_cap
self.level = init_level
self.name = name
self._putters = []
self._getters = []
self._new_waiters = []
self._any_waiters = []
self._full_waiters = []
self._put_hook = None
self._get_hook = None
BoundClass.bind_early(self)

@property
def size(self):
"""Number of items in pool."""
return self.level

@property
def remaining(self):
"""Remaining pool capacity."""
return self.capacity - self.level

@property
def is_empty(self):
"""Indicates whether the pool is empty."""
return self.level == 0

@property
def is_full(self):
"""Indicates whether the pool is full."""
return self.level >= self.capacity

#: Put amount items in the pool.
put = BoundClass(PoolPutEvent)

#: Get amount items from the queue.
get = BoundClass(PoolGetEvent)

#: Return an event triggered when the pool is non-empty.
when_any = BoundClass(PoolWhenAnyEvent)

#: Return an event triggered when items are put in pool
when_new = BoundClass(PoolWhenNewEvent)

#: Return an event triggered when the pool becomes full.
when_full = BoundClass(PoolWhenFullEvent)

def _add_items(self, item_count=1):
self.level += item_count

def _remove_items(self, item_count=1):
self.level -= item_count
return item_count

def _trigger_put(self, _=None):
if self._putters:
put_ev = self._putters.pop(0)
put_ev.succeed()
self._add_items(put_ev.amount)
self._trigger_when_new()
self._trigger_when_any()
self._trigger_when_full()
if self._put_hook:
self._put_hook()
if self.level > self.capacity and self._hard_cap:
raise OverflowError()

def _trigger_get(self, _=None):
if self._getters and self.level:
for index, get_ev in enumerate(self._getters):
get_ev = self._getters[index]
assert get_ev.amount <= self.capacity, (
"Amount {} greater than pool's {} capacity {}".format(
get_ev.amount, str(self.name), self.capacity))
if get_ev.amount <= self.level:
self._getters.remove(get_ev)
item = self._remove_items(get_ev.amount)
get_ev.succeed(item)
if self._get_hook:
self._get_hook()
break

def _trigger_when_new(self):
for when_new_ev in self._new_waiters:
when_new_ev.succeed()
del self._new_waiters[:]

def _trigger_when_any(self):
if self.level:
for when_any_ev in self._any_waiters:
when_any_ev.succeed()
del self._any_waiters[:]

def _trigger_when_full(self):
if self.level >= self.capacity:
for when_full_ev in self._full_waiters:
when_full_ev.succeed()
del self._full_waiters[:]

def __str__(self):
return ('Pool: name={0.name}'
' level={0.level}'
' capacity={0.capacity}'
')'.format(self))
22 changes: 22 additions & 0 deletions desmod/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import six

from desmod.queue import Queue
from desmod.pool import Pool


def attach(scope, target, callbacks, **hints):
Expand All @@ -23,6 +24,11 @@ def attach(scope, target, callbacks, **hints):
_attach_queue_remaining(target, callbacks)
else:
_attach_queue_size(target, callbacks)
elif isinstance(target, Pool):
if hints.get('trace_remaining', False):
_attach_pool_remaining(target, callbacks)
else:
_attach_pool_size(target, callbacks)
else:
raise TypeError(
'Cannot probe {} of type {}'.format(scope, type(target)))
Expand Down Expand Up @@ -124,3 +130,19 @@ def hook():
callback(queue.remaining)

queue._put_hook = queue._get_hook = hook


def _attach_pool_size(pool, callbacks):
def hook():
for callback in callbacks:
callback(pool.size)

pool._put_hook = pool._get_hook = hook


def _attach_pool_remaining(pool, callbacks):
def hook():
for callback in callbacks:
callback(pool.remaining)

pool._put_hook = pool._get_hook = hook
5 changes: 3 additions & 2 deletions desmod/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .util import partial_format
from .timescale import parse_time, scale_time
from .queue import Queue
from .pool import Pool


class Tracer(object):
Expand Down Expand Up @@ -205,7 +206,7 @@ def activate_probe(self, scope, target, **hints):
assert self.enabled
var_type = hints.get('var_type')
if var_type is None:
if isinstance(target, simpy.Container):
if isinstance(target, (simpy.Container, Pool)):
if isinstance(target.level, float):
var_type = 'real'
else:
Expand All @@ -221,7 +222,7 @@ def activate_probe(self, scope, target, **hints):
if k in hints}

if 'init' not in kwargs:
if isinstance(target, simpy.Container):
if isinstance(target, (simpy.Container, Pool)):
kwargs['init'] = target.level
elif isinstance(target, simpy.Resource):
kwargs['init'] = len(target.users) if target.users else 'z'
Expand Down
116 changes: 116 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from pytest import raises

from desmod.pool import Pool


def test_pool(env):
pool = Pool(env, capacity=2)

def producer(amount, wait):
yield env.timeout(wait)
yield pool.put(amount)

def consumer(expected_amount, wait):
yield env.timeout(wait)
msg = yield pool.get(expected_amount)
assert msg == expected_amount

env.process(producer(1, 0))
env.process(producer(2, 1))
env.process(consumer(1, 0))
env.process(consumer(2, 1))
env.run()


def test_pool_when_full_any(env):
pool = Pool(env, capacity=9)
result = []

def producer(env):
yield env.timeout(1)
for i in range(1, 6):
yield pool.put(i)
yield env.timeout(1)

def consumer(env):
yield env.timeout(5)
for i in range(1, 3):
msg = yield pool.get(i)
assert msg == i

def full_waiter(env):
yield pool.when_full()
assert env.now == 4
assert pool.level == 10
result.append('full')

def any_waiter(env):
yield pool.when_any()
assert env.now == 1
result.append('any')

env.process(producer(env))
env.process(consumer(env))
env.process(full_waiter(env))
env.process(any_waiter(env))
env.process(any_waiter(env))
env.run()
assert pool.level
assert pool.is_full
assert pool.remaining == pool.capacity - pool.level
assert not pool.is_empty
assert pool.size == 12
assert 'full' in result
assert result.count('any') == 2


def test_pool_overflow(env):
pool = Pool(env, capacity=5, hard_cap=True)

def producer(env):
yield env.timeout(1)
for i in range(5):
yield pool.put(i)
yield env.timeout(1)

env.process(producer(env))
with raises(OverflowError):
env.run()


def test_pool_get_more(env):
pool = Pool(env, capacity=6, name='foo')

def producer(env):
yield pool.put(1)
yield env.timeout(1)
yield pool.put(1)

def consumer(env, amount1, amount2):
amount = yield pool.get(amount1)
assert amount == amount1
amount = yield pool.get(amount2) # should fail

env.process(producer(env))
env.process(consumer(env, 1, 10))
with raises(AssertionError,
message="Amount {} greater than pool's {} capacity {}".format(
10, 'foo', 6)):
env.run()


def test_pool_cancel(env):
pool = Pool(env)

event_cancel = pool.get(2)
event_cancel.cancel()
event_full = pool.when_full()
event_full.cancel()
event_any = pool.when_any()
event_any.cancel()

env.run()
assert pool.level == 0
assert not event_cancel.triggered
assert not event_full.triggered
assert not event_any.triggered
Loading

0 comments on commit 39aa104

Please sign in to comment.