diff --git a/.travis.yml b/.travis.yml
index 9ef39ee..ef83c88 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,6 +5,8 @@ python:
- 2.7
- 3.4
- 3.5
+ - 3.6
+ - 3.7
cache: pip
env:
global:
@@ -30,6 +32,6 @@ before_script:
- "sh -e /etc/init.d/xvfb start"
- sleep 3 # give xvfb some time to start
script:
- - py.test --cov=queueing_tool --cov-report term-missing --doctest-modules
+ - pytest --cov=queueing_tool --cov-report term-missing --doctest-modules -k "not slow"
after_success:
- coveralls
diff --git a/README.rst b/README.rst
index 3eba2fe..654f195 100644
--- a/README.rst
+++ b/README.rst
@@ -37,7 +37,7 @@ Installation
------------
-**Prerequisites:** Queueing-tool runs on Python 2.7 and 3.4-3.5 and it
+**Prerequisites:** Queueing-tool runs on Python 2.7 and 3.4-3.7 and it
requires `networkx `__ and
`numpy `__. If you want to plot, you will need
to install `matplotlib `__ as well.
@@ -73,7 +73,7 @@ The issue tracker is at https://github.com/djordon/queueing-tool/issues. Please
Copyright and license
---------------------
-Code and documentation Copyright 2014-2016 Daniel Jordon. Code released
+Code and documentation Copyright 2014-2019 Daniel Jordon. Code released
under the `MIT
license `__.
diff --git a/VERSION b/VERSION
index 23aa839..f0bb29e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.2.2
+1.3.0
diff --git a/queueing_tool/__init__.py b/queueing_tool/__init__.py
index 8e1d802..cc75220 100644
--- a/queueing_tool/__init__.py
+++ b/queueing_tool/__init__.py
@@ -1,20 +1,24 @@
from __future__ import absolute_import
-from queueing_tool.queues import *
-import queueing_tool.queues as queues
+from queueing_tool.common import *
+import queueing_tool.common as common
+
+from queueing_tool.graph import *
+import queueing_tool.graph as graph
from queueing_tool.network import *
import queueing_tool.network as network
-from queueing_tool.graph import *
-import queueing_tool.graph as graph
+from queueing_tool.queues import *
+import queueing_tool.queues as queues
+
__all__ = []
-__version__ = '1.2.2'
+__version__ = '1.3.0'
-__all__.extend(['__version__'])
-__all__.extend(queues.__all__)
-__all__.extend(network.__all__)
+__all__.extend(['__version__', 'EdgeID', 'AgentID'])
__all__.extend(graph.__all__)
+__all__.extend(network.__all__)
+__all__.extend(queues.__all__)
# del queues, network, generation, graph
diff --git a/queueing_tool/common.py b/queueing_tool/common.py
new file mode 100644
index 0000000..eab7b89
--- /dev/null
+++ b/queueing_tool/common.py
@@ -0,0 +1,12 @@
+import collections
+
+
+EdgeID = collections.namedtuple(
+ typename='EdgeID',
+ field_names=['source', 'target', 'edge_index', 'edge_type']
+)
+
+AgentID = collections.namedtuple(
+ typename='AgentID',
+ field_names=['edge_index', 'agent_qid']
+)
diff --git a/queueing_tool/graph/__init__.py b/queueing_tool/graph/__init__.py
index 4d8c013..9f3c948 100644
--- a/queueing_tool/graph/__init__.py
+++ b/queueing_tool/graph/__init__.py
@@ -7,6 +7,7 @@
generate_pagerank_graph
generate_transition_matrix
graph2dict
+ matrix2dict
minimal_random_graph
set_types_rank
set_types_random
@@ -15,7 +16,8 @@
"""
from queueing_tool.graph.graph_functions import (
- graph2dict
+ graph2dict,
+ matrix2dict
)
from queueing_tool.graph.graph_generation import (
generate_random_graph,
@@ -42,6 +44,7 @@
'generate_pagerank_graph',
'generate_transition_matrix',
'graph2dict',
+ 'matrix2dict',
'minimal_random_graph',
'set_types_rank',
'set_types_random',
diff --git a/queueing_tool/graph/graph_functions.py b/queueing_tool/graph/graph_functions.py
index f45eed0..8c00e0a 100644
--- a/queueing_tool/graph/graph_functions.py
+++ b/queueing_tool/graph/graph_functions.py
@@ -86,3 +86,56 @@ def graph2dict(g, return_dict_of_dict=True):
return dict_of_dicts
else:
return {k: list(val.keys()) for k, val in dict_of_dicts.items()}
+
+
+def matrix2dict(matrix, graph):
+ """Takes an adjacency matrix and returns an adjacency list.
+
+ Parameters
+ ----------
+ graph : :any:`networkx.DiGraph`, :any:`networkx.Graph`, etc.
+ Any object that networkx can turn into a
+ :any:`DiGraph`.
+ matrix : :class:`~numpy.ndarray`
+ An matrix related to an adjacency list. The ``matrix[i, j]``
+ represents some relationship between the vertex ``i`` and the
+ vertex ``j``.
+
+ Returns
+ -------
+ adj : dict
+ An adjacency representation of the matrix for the graph.
+
+ Examples
+ --------
+ >>> import queueing_tool as qt
+ >>> import networkx as nx
+ >>> adj = {0: [1, 2], 1: [0], 2: [0, 3], 3: [2]}
+ >>> g = nx.DiGraph(adj)
+ >>> mat = qt.generate_transition_matrix(g, seed=123)
+ >>> mat # doctest: +ELLIPSIS
+ ... # doctest: +NORMALIZE_WHITESPACE
+ array([[ 0. , 0.70707071, 0.29292929, 0. ],
+ [ 1. , 0. , 0. , 0. ],
+ [ 0.29113924, 0. , 0. , 0.70886076],
+ [ 0. , 0. , 1. , 0. ]])
+ >>> qt.matrix2dict(mat, g) # doctest: +ELLIPSIS
+ ... # doctest: +NORMALIZE_WHITESPACE
+ {0: {1: 0.707..., 2: 0.292...},
+ 1: {0: 1.0},
+ 2: {0: 0.291..., 3: 0.708...},
+ 3: {2: 1.0}}
+ """
+ num_columns, num_rows = matrix.shape
+
+ if num_columns != num_rows or num_columns != graph.number_of_nodes():
+ msg = "Matrix has wrong shape, must be {} x {}"
+ non = graph.number_of_nodes()
+ raise ValueError(msg.format(non, non))
+
+ result = {}
+ adjacency_graph = graph2dict(graph, return_dict_of_dict=False)
+ for source, adjacents in adjacency_graph.items():
+ result[source] = {dest: matrix[source, dest] for dest in adjacents}
+
+ return result
diff --git a/queueing_tool/graph/graph_generation.py b/queueing_tool/graph/graph_generation.py
index e027c79..b742be2 100644
--- a/queueing_tool/graph/graph_generation.py
+++ b/queueing_tool/graph/graph_generation.py
@@ -1,14 +1,13 @@
-import numbers
-
import networkx as nx
import numpy as np
+from numpy.random import RandomState
from queueing_tool.graph.graph_functions import _test_graph, _calculate_distance
from queueing_tool.graph.graph_wrapper import QueueNetworkDiGraph
from queueing_tool.union_find import UnionFind
-def generate_transition_matrix(g, seed=None):
+def generate_transition_matrix(g, seed=None, random_state=None):
"""Generates a random transition matrix for the graph ``g``.
Parameters
@@ -18,6 +17,10 @@ def generate_transition_matrix(g, seed=None):
seed : int (optional)
An integer used to initialize numpy's psuedo-random number
generator.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
Returns
-------
@@ -29,28 +32,29 @@ def generate_transition_matrix(g, seed=None):
"""
g = _test_graph(g)
- if isinstance(seed, numbers.Integral):
- np.random.seed(seed)
+ if random_state is None:
+ random_state = RandomState(seed)
nV = g.number_of_nodes()
mat = np.zeros((nV, nV))
- for v in g.nodes():
- ind = [e[1] for e in g.out_edges(v)]
+ for v in sorted(g.nodes()):
+ ind = [e[1] for e in sorted(g.out_edges(v))]
deg = len(ind)
if deg == 1:
mat[v, ind] = 1
elif deg > 1:
- probs = np.ceil(np.random.rand(deg) * 100) / 100.
+ probs = np.ceil(random_state.rand(deg) * 100) / 100.
if np.isclose(np.sum(probs), 0):
- probs[np.random.randint(deg)] = 1
+ probs[random_state.randint(deg)] = 1
mat[v, ind] = probs / np.sum(probs)
return mat
-def generate_random_graph(num_vertices=250, prob_loop=0.5, **kwargs):
+def generate_random_graph(num_vertices=250, prob_loop=0.5,
+ seed=None, random_state=None, **kwargs):
"""Creates a random graph where the edges have different types.
This method calls :func:`.minimal_random_graph`, and then adds
@@ -63,6 +67,13 @@ def generate_random_graph(num_vertices=250, prob_loop=0.5, **kwargs):
The number of vertices in the graph.
prob_loop : float (optional, default: 0.5)
The probability that a loop gets added to a vertex.
+ seed : int (optional)
+ An integer used to initialize numpy's psuedo-random number
+ generator.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
**kwargs :
Any parameters to send to :func:`.minimal_random_graph` or
:func:`.set_types_random`.
@@ -85,13 +96,13 @@ def generate_random_graph(num_vertices=250, prob_loop=0.5, **kwargs):
>>> non_loops = [e for e in g.edges() if e[0] != e[1]]
>>> p1 = np.sum([g.ep(e, 'edge_type') == 1 for e in non_loops])
>>> float(p1) / len(non_loops) # doctest: +ELLIPSIS
- 0.486...
+ 0.498...
>>> p2 = np.sum([g.ep(e, 'edge_type') == 2 for e in non_loops])
>>> float(p2) / len(non_loops) # doctest: +ELLIPSIS
- 0.249...
+ 0.260...
>>> p3 = np.sum([g.ep(e, 'edge_type') == 3 for e in non_loops])
>>> float(p3) / len(non_loops) # doctest: +ELLIPSIS
- 0.264...
+ 0.241...
To make an undirected graph with 25 vertices where there are 4
different edge types with random proportions:
@@ -105,17 +116,21 @@ def generate_random_graph(num_vertices=250, prob_loop=0.5, **kwargs):
recommended use edge type indices starting at 1, since 0 is
typically used for terminal edges.
"""
- g = minimal_random_graph(num_vertices, **kwargs)
- for v in g.nodes():
+ if random_state is None:
+ random_state = RandomState(seed)
+
+ g = minimal_random_graph(num_vertices, random_state=random_state, **kwargs)
+ for v in sorted(g.nodes()):
e = (v, v)
+
if not g.is_edge(e):
- if np.random.uniform() < prob_loop:
+ if random_state.uniform() < prob_loop:
g.add_edge(*e)
- g = set_types_random(g, **kwargs)
- return g
+ return set_types_random(g, random_state=random_state, **kwargs)
-def generate_pagerank_graph(num_vertices=250, **kwargs):
+
+def generate_pagerank_graph(num_vertices=250, seed=None, random_state=None, **kwargs):
"""Creates a random graph where the vertex types are
selected using their pagerank.
@@ -127,6 +142,13 @@ def generate_pagerank_graph(num_vertices=250, **kwargs):
----------
num_vertices : int (optional, the default is 250)
The number of vertices in the graph.
+ seed : int (optional)
+ An integer used to initialize numpy's psuedo-random number
+ generator.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
**kwargs :
Any parameters to send to :func:`.minimal_random_graph` or
:func:`.set_types_rank`.
@@ -149,15 +171,18 @@ def generate_pagerank_graph(num_vertices=250, **kwargs):
loops then have edge types that correspond to the vertices type.
The rest of the edges are set to type 1.
"""
- g = minimal_random_graph(num_vertices, **kwargs)
+ if random_state is None:
+ random_state = RandomState(seed)
+
+ g = minimal_random_graph(num_vertices, random_seed=random_state, **kwargs)
r = np.zeros(num_vertices)
for k, pr in nx.pagerank(g).items():
r[k] = pr
- g = set_types_rank(g, rank=r, **kwargs)
- return g
+ return set_types_rank(g, rank=r, random_seed=random_state, **kwargs)
-def minimal_random_graph(num_vertices, seed=None, **kwargs):
+
+def minimal_random_graph(num_vertices, seed=None, random_state=None, **_kwargs):
"""Creates a connected graph with random vertex locations.
Parameters
@@ -165,10 +190,12 @@ def minimal_random_graph(num_vertices, seed=None, **kwargs):
num_vertices : int
The number of vertices in the graph.
seed : int (optional)
- An integer used to initialize numpy's psuedorandom number
- generators.
- **kwargs :
- Unused.
+ An integer used to initialize numpy's psuedo-random number
+ generator.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
Returns
-------
@@ -184,10 +211,10 @@ def minimal_random_graph(num_vertices, seed=None, **kwargs):
``r`` are connect by an edge --- where ``r`` is the smallest number
such that the graph ends up connected.
"""
- if isinstance(seed, numbers.Integral):
- np.random.seed(seed)
+ if random_state is None:
+ random_state = RandomState(seed)
- points = np.random.random((num_vertices, 2)) * 10
+ points = random_state.uniform(size=(num_vertices, 2)) * 10
edges = []
for k in range(num_vertices - 1):
@@ -215,7 +242,7 @@ def minimal_random_graph(num_vertices, seed=None, **kwargs):
def set_types_random(g, proportions=None, loop_proportions=None, seed=None,
- **kwargs):
+ random_state=None, **_kwargs):
"""Randomly sets ``edge_type`` (edge type) properties of the graph.
This function randomly assigns each edge a type. The probability of
@@ -237,10 +264,12 @@ def set_types_random(g, proportions=None, loop_proportions=None, seed=None,
that are expected to be of that type. The values can must sum
to one.
seed : int (optional)
- An integer used to initialize numpy's psuedorandom number
+ An integer used to initialize numpy's psuedo-random number
generator.
- **kwargs :
- Unused.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
Returns
-------
@@ -266,8 +295,8 @@ def set_types_random(g, proportions=None, loop_proportions=None, seed=None,
"""
g = _test_graph(g)
- if isinstance(seed, numbers.Integral):
- np.random.seed(seed)
+ if random_state is None:
+ random_state = RandomState(seed)
if proportions is None:
proportions = {k: 1. / 3 for k in range(1, 4)}
@@ -287,13 +316,13 @@ def set_types_random(g, proportions=None, loop_proportions=None, seed=None,
eTypes = {}
types = list(proportions.keys())
- values = np.random.choice(types, size=len(edges), replace=True, p=props)
+ values = random_state.choice(types, size=len(edges), replace=True, p=props)
for k, e in enumerate(edges):
eTypes[e] = values[k]
types = list(loop_proportions.keys())
- values = np.random.choice(types, size=len(loops), replace=True, p=lprops)
+ values = random_state.choice(types, size=len(loops), replace=True, p=lprops)
for k, e in enumerate(loops):
eTypes[e] = values[k]
@@ -305,7 +334,8 @@ def set_types_random(g, proportions=None, loop_proportions=None, seed=None,
return g
-def set_types_rank(g, rank, pType2=0.1, pType3=0.1, seed=None, **kwargs):
+def set_types_rank(g, rank, pType2=0.1, pType3=0.1, seed=None,
+ random_state=None, **_kwargs):
"""Creates a stylized graph. Sets edge and types using `pagerank`_.
This function sets the edge types of a graph to be either 1, 2, or
@@ -334,8 +364,10 @@ def set_types_rank(g, rank, pType2=0.1, pType3=0.1, seed=None, **kwargs):
seed : int (optional)
An integer used to initialize numpy's psuedo-random number
generator.
- **kwargs :
- Unused.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
Returns
-------
@@ -350,8 +382,8 @@ def set_types_rank(g, rank, pType2=0.1, pType3=0.1, seed=None, **kwargs):
"""
g = _test_graph(g)
- if isinstance(seed, numbers.Integral):
- np.random.seed(seed)
+ if random_state is None:
+ random_state = RandomState(seed)
tmp = np.sort(np.array(rank))
nDests = int(np.ceil(g.number_of_nodes() * pType2))
@@ -365,7 +397,8 @@ def set_types_rank(g, rank, pType2=0.1, pType3=0.1, seed=None, **kwargs):
min_g_dist = np.ones(nFCQ) * np.infty
ind_g_dist = np.ones(nFCQ, int)
- r, theta = np.random.random(nFCQ) / 500., np.random.random(nFCQ) * 360.
+ r = random_state.uniform(size=nFCQ) / 500.0
+ theta = random_state.uniform(size=nFCQ) * 360.0
xy_pos = np.array([r * np.cos(theta), r * np.sin(theta)]).transpose()
g_pos = xy_pos + dest_pos[np.array(np.mod(np.arange(nFCQ), nDests), int)]
@@ -380,7 +413,7 @@ def set_types_rank(g, rank, pType2=0.1, pType3=0.1, seed=None, **kwargs):
dests = set(dests)
g.new_vertex_property('loop_type')
- for v in g.nodes():
+ for v in sorted(g.nodes()):
if v in dests:
g.set_vp(v, 'loop_type', 3)
if not g.is_edge((v, v)):
@@ -397,9 +430,6 @@ def set_types_rank(g, rank, pType2=0.1, pType3=0.1, seed=None, **kwargs):
for v in g.nodes():
if g.vp(v, 'loop_type') in [2, 3]:
e = (v, v)
- if g.vp(v, 'loop_type') == 2:
- g.set_ep(e, 'edge_type', 2)
- else:
- g.set_ep(e, 'edge_type', 3)
+ g.set_ep(e, 'edge_type', g.vp(v, 'loop_type'))
return g
diff --git a/queueing_tool/graph/graph_preparation.py b/queueing_tool/graph/graph_preparation.py
index e751d00..138d661 100644
--- a/queueing_tool/graph/graph_preparation.py
+++ b/queueing_tool/graph/graph_preparation.py
@@ -116,7 +116,7 @@ def _prepare_graph(g, g_colors, q_cls, q_arg, adjust_graph):
ans = nx.to_dict_of_dicts(g)
g = adjacency2graph(ans, adjust=2, is_directed=g.is_directed())
g = QueueNetworkDiGraph(g)
- if len(pos) > 0:
+ if pos:
g.set_pos(pos)
g.new_vertex_property('vertex_color')
@@ -134,7 +134,7 @@ def _prepare_graph(g, g_colors, q_cls, q_arg, adjust_graph):
if 'pos' not in g.vertex_properties():
g.set_pos()
- for k, e in enumerate(g.edges()):
+ for k, e in enumerate(sorted(g.edges())):
g.set_ep(e, 'edge_pen_width', 1.25)
g.set_ep(e, 'edge_marker_size', 8)
if e[0] == e[1]:
@@ -142,7 +142,7 @@ def _prepare_graph(g, g_colors, q_cls, q_arg, adjust_graph):
else:
g.set_ep(e, 'edge_color', queues[k].colors['edge_color'])
- for v in g.nodes():
+ for v in sorted(g.nodes()):
g.set_vp(v, 'vertex_pen_width', 1)
g.set_vp(v, 'vertex_size', 8)
e = (v, v)
@@ -159,7 +159,7 @@ def _prepare_graph(g, g_colors, q_cls, q_arg, adjust_graph):
def _set_queues(g, q_cls, q_arg, has_cap):
queues = [0 for k in range(g.number_of_edges())]
- for e in g.edges():
+ for e in sorted(g.edges()):
eType = g.ep(e, 'edge_type')
qedge = (e[0], e[1], g.edge_index[e], eType)
diff --git a/queueing_tool/graph/graph_wrapper.py b/queueing_tool/graph/graph_wrapper.py
index 582180c..4112416 100644
--- a/queueing_tool/graph/graph_wrapper.py
+++ b/queueing_tool/graph/graph_wrapper.py
@@ -1,5 +1,8 @@
+import itertools
+
import networkx as nx
import numpy as np
+from numpy.random import RandomState
try:
import matplotlib.pyplot as plt
@@ -54,7 +57,7 @@ def _adjacency_adjust(adjacency, adjust, is_directed):
null_nodes = set()
for k, adj in adjacency.items():
- if len(adj) == 0:
+ if not adj:
null_nodes.add(k)
for k, adj in adjacency.items():
@@ -64,13 +67,13 @@ def _adjacency_adjust(adjacency, adjust, is_directed):
else:
for k, adj in adjacency.items():
- if len(adj) == 0:
+ if not adj:
adj[k] = {'edge_type': 0}
return adjacency
-def adjacency2graph(adjacency, edge_type=None, adjust=1, **kwargs):
+def adjacency2graph(adjacency, edge_type=None, adjust=1, **_kwargs):
"""Takes an adjacency list, dict, or matrix and returns a graph.
The purpose of this function is take an adjacency list (or matrix)
@@ -114,6 +117,7 @@ def adjacency2graph(adjacency, edge_type=None, adjust=1, **kwargs):
a loop is added with edge type 0.
>>> import queueing_tool as qt
+ >>> import pprint
>>> adj = {
... 0: {1: {}},
... 1: {2: {},
@@ -123,10 +127,9 @@ def adjacency2graph(adjacency, edge_type=None, adjust=1, **kwargs):
>>> # A loop will be added to vertex 2
>>> g = qt.adjacency2graph(adj, edge_type=eTy)
>>> ans = qt.graph2dict(g)
- >>> ans # doctest: +NORMALIZE_WHITESPACE
+ >>> pprint.pprint(ans) # doctest: +NORMALIZE_WHITESPACE
{0: {1: {'edge_type': 1}},
- 1: {2: {'edge_type': 2},
- 3: {'edge_type': 4}},
+ 1: {2: {'edge_type': 2}, 3: {'edge_type': 4}},
2: {2: {'edge_type': 0}},
3: {0: {'edge_type': 1}}}
@@ -135,10 +138,9 @@ def adjacency2graph(adjacency, edge_type=None, adjust=1, **kwargs):
>>> adj = {0 : [1], 1: [2, 3], 3: [0]}
>>> g = qt.adjacency2graph(adj, edge_type=eTy)
>>> ans = qt.graph2dict(g)
- >>> ans # doctest: +NORMALIZE_WHITESPACE
+ >>> pprint.pprint(ans) # doctest: +NORMALIZE_WHITESPACE
{0: {1: {'edge_type': 1}},
- 1: {2: {'edge_type': 2},
- 3: {'edge_type': 4}},
+ 1: {2: {'edge_type': 2}, 3: {'edge_type': 4}},
2: {2: {'edge_type': 0}},
3: {0: {'edge_type': 1}}}
@@ -148,12 +150,11 @@ def adjacency2graph(adjacency, edge_type=None, adjust=1, **kwargs):
>>> # The graph is unaltered
>>> g = qt.adjacency2graph(adj, edge_type=eTy, adjust=2)
>>> ans = qt.graph2dict(g)
- >>> ans # doctest: +NORMALIZE_WHITESPACE
+ >>> pprint.pprint(ans) # doctest: +NORMALIZE_WHITESPACE
{0: {1: {'edge_type': 1}},
- 1: {2: {'edge_type': 0},
- 3: {'edge_type': 4}},
- 2: {},
- 3: {0: {'edge_type': 1}}}
+ 1: {2: {'edge_type': 0}, 3: {'edge_type': 4}},
+ 2: {},
+ 3: {0: {'edge_type': 1}}}
"""
if isinstance(adjacency, np.ndarray):
@@ -209,6 +210,13 @@ class QueueNetworkDiGraph(nx.DiGraph):
data : :any:`networkx.DiGraph`, :class:`numpy.ndarray`, dict, etc.
Any object that networkx can turn into a
:any:`DiGraph`.
+ seed : int (optional)
+ An integer used to initialize numpy's psuedo-random number
+ generator.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
kwargs :
Any additional arguments for :any:`networkx.DiGraph`.
@@ -232,38 +240,47 @@ class QueueNetworkDiGraph(nx.DiGraph):
Not suitable for stand alone use; only use with a
:class:`.QueueNetwork`.
"""
- def __init__(self, data=None, **kwargs):
+ def __init__(self, data=None, seed=None, random_state=None, **kwargs):
if isinstance(data, dict):
data = adjacency2graph(data, **kwargs)
super(QueueNetworkDiGraph, self).__init__(data, **kwargs)
- edges = sorted(self.edges())
- self.edge_index = {e: k for k, e in enumerate(edges)}
+ if random_state is None:
+ random_state = RandomState(seed)
+
+ self.random_state = random_state
+ self.edge_index = {e: k for k, e in enumerate(self.edges())}
pos = nx.get_node_attributes(self, name='pos')
if len(pos) == self.number_of_nodes():
- self.pos = np.array([pos[v] for v in self.nodes()])
+ self.pos = pos
else:
self.pos = None
- self.edge_color = None
- self.vertex_color = None
- self.vertex_fill_color = None
- self._nE = self.number_of_edges()
+ @property
+ def edge_color(self):
+ if 'edge_color' not in self.edge_properties():
+ return None
+ return np.array([self.adj[e[0]][e[1]].get('edge_color') for e in self.edges()])
+
+ @property
+ def vertex_color(self):
+ if 'vertex_color' not in self.edge_properties():
+ return None
+ return np.array([self.node[n].get('vertex_color') for n in self.nodes()])
+
+ @property
+ def vertex_fill_color(self):
+ if 'vertex_fill_color' not in self.edge_properties():
+ return None
+ return np.array([self.node[n].get('vertex_fill_color') for n in self.nodes()])
def freeze(self):
nx.freeze(self)
def is_edge(self, e):
- return e in self.edge_index
-
- def add_edge(self, *args, **kwargs):
- super(QueueNetworkDiGraph, self).add_edge(*args, **kwargs)
- e = (args[0], args[1])
- if e not in self.edge_index:
- self.edge_index[e] = self._nE
- self._nE += 1
+ return e[1] in self.adj[e[0]]
def out_neighbours(self, v):
return [e[1] for e in self.out_edges(v)]
@@ -276,47 +293,38 @@ def vp(self, v, vertex_property):
def set_ep(self, e, edge_property, value):
self.adj[e[0]][e[1]][edge_property] = value
- if hasattr(self, edge_property):
- attr = getattr(self, edge_property)
- attr[self.edge_index[e]] = value
def set_vp(self, v, vertex_property, value):
self.node[v][vertex_property] = value
- if hasattr(self, vertex_property):
- attr = getattr(self, vertex_property)
- attr[v] = value
def vertex_properties(self):
props = set()
- for v in self.nodes():
+ for v in itertools.islice(self.nodes(), 1):
props.update(self.node[v].keys())
return props
def edge_properties(self):
props = set()
- for e in self.edges():
+ for e in itertools.islice(self.edges(), 1):
props.update(self.adj[e[0]][e[1]].keys())
return props
def new_vertex_property(self, name):
values = {v: None for v in self.nodes()}
nx.set_node_attributes(self, name=name, values=values)
- if name == 'vertex_color':
- self.vertex_color = [0 for v in range(self.number_of_nodes())]
- if name == 'vertex_fill_color':
- self.vertex_fill_color = [0 for v in range(self.number_of_nodes())]
def new_edge_property(self, name):
values = {e: None for e in self.edges()}
nx.set_edge_attributes(self, name=name, values=values)
- if name == 'edge_color':
- self.edge_color = np.zeros((self.number_of_edges(), 4))
- def set_pos(self, pos=None):
+ def set_node_positions(self, pos=None):
if pos is None:
- pos = nx.spring_layout(self)
+ pos = nx.spring_layout(self, seed=self.random_state)
nx.set_node_attributes(self, name='pos', values=pos)
- self.pos = np.array([pos[v] for v in self.nodes()])
+ self.pos = {v: pos[v] for v in self.nodes()}
+
+ def set_pos(self, pos=None):
+ self.set_node_positions(pos=pos)
def get_edge_type(self, edge_type):
"""Returns all edges with the specified edge type.
@@ -458,15 +466,10 @@ def lines_scatter_args(self, line_kwargs=None, scatter_kwargs=None, pos=None):
If a specific keyword argument is not passed then the defaults
are used.
"""
- if pos is not None:
- self.set_pos(pos)
- elif self.pos is None:
- self.set_pos()
+ self.set_pos(pos)
- edge_pos = [0 for e in self.edges()]
- for e in self.edges():
- ei = self.edge_index[e]
- edge_pos[ei] = (self.pos[e[0]], self.pos[e[1]])
+ edge_pos = [(self.pos[e[0]], self.pos[e[1]]) for e in self.edges()]
+ node_pos = np.array([self.pos[v] for v in self.nodes()])
line_collecton_kwargs = {
'segments': edge_pos,
@@ -485,8 +488,8 @@ def lines_scatter_args(self, line_kwargs=None, scatter_kwargs=None, pos=None):
'hatch': None,
}
scatter_kwargs_ = {
- 'x': self.pos[:, 0],
- 'y': self.pos[:, 1],
+ 'x': node_pos[:, 0],
+ 'y': node_pos[:, 1],
's': 50,
'c': self.vertex_fill_color,
'alpha': None,
diff --git a/queueing_tool/network/multiclass_network.py b/queueing_tool/network/multiclass_network.py
new file mode 100644
index 0000000..8a3e2f4
--- /dev/null
+++ b/queueing_tool/network/multiclass_network.py
@@ -0,0 +1,163 @@
+import collections
+import copy
+from heapq import heappush, heappop
+
+import numpy as np
+from numpy import infty
+
+from queueing_tool.network.queue_network import QueueNetwork
+from queueing_tool.queues.agents import Agent
+from queueing_tool.queues.choice import _choice
+from queueing_tool.queues.queue_servers import QueueServer
+
+
+class MultiClassQueueNetwork(QueueNetwork):
+
+ def __init__(self, *args, **kwargs):
+ super(MultiClassQueueNetwork, self).__init__(*args, **kwargs)
+
+ def default_factory():
+ return copy.deepcopy(self._route_probs)
+
+ self._routing_transitions = collections.defaultdict(default_factory)
+
+ def set_transitions(self, mat, category=None):
+ for key, value in mat.items():
+ probs = list(value.values())
+
+ if key not in self.g.node:
+ msg = "One of the keys don't correspond to a vertex."
+ raise ValueError(msg)
+ elif self.out_edges[key] and not np.isclose(sum(probs), 1):
+ msg = "Sum of transition probabilities at a vertex was not 1."
+ raise ValueError(msg)
+ elif (np.array(probs) < 0).any():
+ msg = "Some transition probabilities were negative."
+ raise ValueError(msg)
+
+ for k, e in enumerate(self.g.out_edges(key)):
+ self._routing_transitions[category][key][k] = value.get(e[1], 0)
+
+ def set_categorical_transitions(self, adjacency_list):
+ for category, mat in adjacency_list.items():
+ self.set_transitions(mat, category)
+
+ def transitions(self):
+ mat = {
+ category: {
+ node: {e[1]: p for e, p in zip(self.g.out_edges(node), value)}
+ for node, value in routing_probs.values()
+ }
+ for category, routing_probs in self._routing_transitions.items()
+ }
+ return mat
+
+ def routing_transition(self, destination, category=None):
+ if category not in self._routing_transitions:
+ category = None
+
+ return self._routing_transitions[category][destination]
+
+ def copy(self):
+ network = super(MultiClassQueueNetwork, self).copy()
+ network._routing_transitions = copy.deepcopy(self._routing_transitions)
+
+
+class MultiClassQueueServer(QueueServer):
+
+ def next_event(self):
+ """Simulates the queue forward one event.
+
+ Use :meth:`.simulate` instead.
+
+ Returns
+ -------
+ out : :class:`.Agent` (sometimes)
+ If the next event is a departure then the departing agent
+ is returned, otherwise nothing is returned.
+
+ See Also
+ --------
+ :meth:`.simulate` : Simulates the queue forward.
+ """
+ if self._departures[0]._time < self._arrivals[0]._time:
+ new_depart = heappop(self._departures)
+ self._current_t = new_depart._time
+ self._num_total -= 1
+ self.num_system -= 1
+ self.num_departures += 1
+
+ if self.collect_data and new_depart.agent_id in self.data:
+ self.data[new_depart.agent_id][-1][2] = self._current_t
+
+ if self.queue:
+ agent = self.queue.popleft()
+ if self.collect_data and agent.agent_id in self.data:
+ self.data[agent.agent_id][-1][1] = self._current_t
+
+ agent._time = self.service_f(self._current_t, agent, self)
+ agent.queue_action(self, 1)
+ heappush(self._departures, agent)
+
+ new_depart.queue_action(self, 2)
+ self._update_time()
+ return new_depart
+
+ elif self._arrivals[0]._time < infty:
+ arrival = heappop(self._arrivals)
+ self._current_t = arrival._time
+
+ if self._active:
+ self._add_arrival()
+
+ self.num_system += 1
+ self._num_arrivals += 1
+
+ if self.collect_data:
+ b = 0 if self.num_system <= self.num_servers else 1
+ if arrival.agent_id not in self.data:
+ self.data[arrival.agent_id] = \
+ [[arrival._time, 0, 0, len(self.queue) + b, self.num_system]]
+ else:
+ self.data[arrival.agent_id]\
+ .append([arrival._time, 0, 0, len(self.queue) + b, self.num_system]) # noqa: E501
+
+ arrival.queue_action(self, 0)
+
+ if self.num_system <= self.num_servers:
+ if self.collect_data:
+ self.data[arrival.agent_id][-1][1] = arrival._time
+
+ arrival._time = self.service_f(arrival._time, arrival, self)
+ arrival.queue_action(self, 1)
+ heappush(self._departures, arrival)
+ else:
+ self.queue.append(arrival)
+
+ self._update_time()
+
+ return None
+
+
+ClassAgentID = collections.namedtuple(
+ typename='ClassAgentID',
+ field_names=['edge_index', 'agent_qid', 'category']
+)
+
+
+class ClassedAgent(Agent):
+ def __init__(self, agent_id=(0, 0), category=None, **kwargs):
+ self.category = category or self.__class__.__name__
+ super(ClassedAgent, self).__init__(agent_id=agent_id, **kwargs)
+ self.agent_id = ClassAgentID(agent_id[0], agent_id[1], self.category)
+
+ def desired_destination(self, network, edge):
+
+ n = len(network.out_edges[edge[1]])
+ if n <= 1:
+ return network.out_edges[edge[1]][0]
+
+ pr = network.routing_transition(edge[1], self.category)
+ u = np.random.uniform()
+ k = _choice(pr, u, n)
+ return network.out_edges[edge[1]][k]
diff --git a/queueing_tool/network/queue_network.py b/queueing_tool/network/queue_network.py
index b993fea..3c1dcec 100644
--- a/queueing_tool/network/queue_network.py
+++ b/queueing_tool/network/queue_network.py
@@ -4,7 +4,7 @@
import array
import numpy as np
-from numpy.random import uniform
+from numpy.random import RandomState
try:
import matplotlib.pyplot as plt
@@ -17,7 +17,7 @@
except ImportError:
HAS_MATPLOTLIB = False
-from queueing_tool.graph import _prepare_graph
+from queueing_tool.graph import _prepare_graph, matrix2dict
from queueing_tool.queues import (
NullQueue,
QueueServer,
@@ -73,6 +73,10 @@ class QueueNetwork(object):
seed : int (optional)
An integer used to initialize numpy's psuedo-random number
generator.
+ random_state : :class:`~numpy.random.RandomState` (optional)
+ Used to initialize numpy's psuedo-random number generator. If
+ present, ``seed`` is ignored. If this is missing then the seed is
+ used to create a :class:`~numpy.random.RandomState`.
colors : dict (optional)
A dictionary of RGBA colors used to color the graph. The keys
are specified in the Notes section. If this parameter is
@@ -236,11 +240,12 @@ class QueueNetwork(object):
>>> import queueing_tool as qt
>>> import networkx as nx
>>> import numpy as np
- >>>
+
+ >>> rs = np.random.RandomState(seed=13)
>>> g = nx.moebius_kantor_graph()
>>> q_cl = {1: qt.QueueServer}
- >>> def arr(t): return t + np.random.gamma(4, 0.0025)
- >>> def ser(t): return t + np.random.exponential(0.025)
+ >>> def arr(t): return t + rs.gamma(4, 0.0025)
+ >>> def ser(t): return t + rs.exponential(0.025)
>>> q_ar = {
... 1: {
... 'arrival_f': arr,
@@ -248,7 +253,7 @@ class QueueNetwork(object):
... 'num_servers': 5
... }
... }
- >>> net = qt.QueueNetwork(g, q_classes=q_cl, q_args=q_ar, seed=13)
+ >>> net = qt.QueueNetwork(g=g, q_classes=q_cl, q_args=q_ar, random_state=rs)
To specify that arrivals enter from type 1 edges and simulate run:
@@ -259,15 +264,16 @@ class QueueNetwork(object):
>>> nA = [(q.num_system, q.edge[2]) for q in net.edge2queue if q.edge[3] == 1]
>>> nA.sort(reverse=True)
- >>> nA[:5]
- [(4, 37), (4, 34), (3, 43), (3, 32), (3, 30)]
+ >>> nA
+ ... # doctest: +SKIP
To view the state of the network do the following (note, you need
to have pygraphviz installed and your graph may be rotated):
>>> net.simulate(n=500)
- >>> pos = nx.nx_agraph.graphviz_layout(g.to_undirected(), prog='neato') # doctest: +SKIP
- >>> net.draw(pos=pos) # doctest: +SKIP
+ >>> pos = nx.nx_agraph.graphviz_layout(g.to_undirected(), prog='neato')
+ ... # doctest: +SKIP
+ >>> net.draw(pos=pos) # doctest: +SKIP
<...>
.. figure:: my_network1.png
@@ -303,11 +309,14 @@ class QueueNetwork(object):
}
def __init__(self, g, q_classes=None, q_args=None, seed=None, colors=None,
- max_agents=1000, blocking='BAS', adjust_graph=True):
+ max_agents=1000, blocking='BAS', adjust_graph=True, random_state=None):
if not isinstance(blocking, str):
raise TypeError("blocking must be a string")
+ # Used for testing
+ self._qkey = None
+
self._t = 0
self.num_events = 0
self.max_agents = max_agents
@@ -315,7 +324,7 @@ def __init__(self, g, q_classes=None, q_args=None, seed=None, colors=None,
self._initialized = False
self._prev_edge = None
self._fancy_heap = PriorityQueue()
- self._blocking = True if blocking.lower() != 'rs' else False
+ self._blocking = blocking.lower() != 'rs'
if colors is None:
colors = {}
@@ -338,13 +347,18 @@ def __init__(self, g, q_classes=None, q_args=None, seed=None, colors=None,
for k in set(q_classes.keys()) - set(q_args.keys()):
q_args[k] = {}
+ if random_state is None:
+ random_state = RandomState(seed)
+
+ for kw in q_args.values():
+ kw.setdefault('random_state', random_state)
+
+ self.random_state = random_state
+
for key, args in q_args.items():
if 'colors' not in args:
args['colors'] = self.default_q_colors.get(key, self.default_q_colors[1])
- if isinstance(seed, numbers.Integral):
- np.random.seed(seed)
-
if g is not None:
g, qs = _prepare_graph(g, self.colors, q_classes, q_args, adjust_graph)
@@ -353,15 +367,15 @@ def __init__(self, g, q_classes=None, q_args=None, seed=None, colors=None,
self.edge2queue = qs
self.num_agents = np.zeros(g.number_of_edges(), int)
- self.out_edges = [0 for v in range(self.nV)]
- self.in_edges = [0 for v in range(self.nV)]
- self._route_probs = [0 for v in range(self.nV)]
+ self.out_edges = {}
+ self.in_edges = {}
+ self._route_probs = {}
- for v in g.nodes():
+ for v in sorted(g.nodes()):
vod = g.out_degree(v)
probs = array.array('d', [1. / vod for i in range(vod)])
- self.out_edges[v] = [g.edge_index[e] for e in g.out_edges(v)]
- self.in_edges[v] = [g.edge_index[e] for e in g.in_edges(v)]
+ self.out_edges[v] = [g.edge_index[e] for e in sorted(g.out_edges(v))]
+ self.in_edges[v] = [g.edge_index[e] for e in sorted(g.in_edges(v))]
self._route_probs[v] = probs
g.freeze()
@@ -406,7 +420,7 @@ def time(self):
t = np.infty
return t
- def animate(self, out=None, t=None, line_kwargs=None,
+ def animate(self, filename=None, t=None, line_kwargs=None,
scatter_kwargs=None, **kwargs):
"""Animates the network as it's simulating.
@@ -419,7 +433,7 @@ def animate(self, out=None, t=None, line_kwargs=None,
Parameters
----------
- out : str (optional)
+ filename : str (optional)
The location where the frames for the images will be saved.
If this parameter is not given, then the animation is shown
in interactive mode.
@@ -527,7 +541,7 @@ def animate(self, out=None, t=None, line_kwargs=None,
t = np.infty if t is None else t
now = self._t
- def update(frame_number):
+ def update(_frame_number):
if t is not None:
if self._t > now + t:
return False
@@ -535,6 +549,7 @@ def update(frame_number):
lines.set_color(line_args['colors'])
scatt.set_edgecolors(scat_args['edgecolors'])
scatt.set_facecolor(scat_args['c'])
+ return None
if hasattr(ax, 'set_facecolor'):
ax.set_facecolor(kwargs['bgcolor'])
@@ -562,12 +577,12 @@ def update(frame_number):
animation_args[key] = value
animation = FuncAnimation(**animation_args)
- if 'filename' not in kwargs:
+ if filename is None:
plt.ioff()
plt.show()
else:
save_args = {
- 'filename': None,
+ 'filename': filename,
'writer': None,
'fps': None,
'dpi': None,
@@ -877,15 +892,16 @@ def get_queue_data(self, queues=None, edge=None, edge_type=None, return_header=F
To get data from an edge connecting two vertices do the
following:
- >>> data = net.get_queue_data(edge=(1, 50))
+ >>> edges = list(g.edges())
+ >>> data = net.get_queue_data(edge=edges[0])
To get data from several edges do the following:
- >>> data = net.get_queue_data(edge=[(1, 50), (10, 91), (99, 99)])
+ >>> data = net.get_queue_data(edge=edges[:3])
You can specify the edge indices as well:
- >>> data = net.get_queue_data(queues=(20, 14, 0, 4))
+ >>> data = net.get_queue_data(queues=qt.EdgeID(20, 14, 0, 4))
"""
queues = _get_queues(self.g, queues, edge, edge_type)
@@ -893,7 +909,7 @@ def get_queue_data(self, queues=None, edge=None, edge_type=None, return_header=F
for q in queues:
dat = self.edge2queue[q].fetch_data()
- if len(dat) > 0:
+ if dat.size > 0:
data = np.vstack((data, dat))
if return_header:
@@ -955,7 +971,7 @@ def initialize(self, nActive=1, queues=None, edges=None, edge_type=None):
if nActive >= 1 and isinstance(nActive, numbers.Integral):
qs = [q.edge[2] for q in self.edge2queue if q.edge[3] != 0]
n = min(nActive, len(qs))
- queues = np.random.choice(qs, size=n, replace=False)
+ queues = self.random_state.choice(qs, size=n, replace=False)
elif not isinstance(nActive, numbers.Integral):
msg = "If queues is None, then nActive must be an integer."
raise TypeError(msg)
@@ -968,7 +984,7 @@ def initialize(self, nActive=1, queues=None, edges=None, edge_type=None):
queues = [e for e in queues if self.edge2queue[e].edge[3] != 0]
- if len(queues) == 0:
+ if not queues:
raise QueueingToolError("There were no queues to initialize.")
if len(queues) > self.max_agents:
@@ -1048,10 +1064,12 @@ def set_transitions(self, mat):
likely:
>>> import queueing_tool as qt
+ >>> import pprint
>>> g = qt.generate_random_graph(5, seed=10)
>>> net = qt.QueueNetwork(g)
- >>> net.transitions(False) # doctest: +ELLIPSIS
- ... # doctest: +NORMALIZE_WHITESPACE
+ >>> ans = net.transitions(False)
+ >>> pprint.pprint(ans) # doctest: +ELLIPSIS
+ ... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 1.0},
1: {2: 0.5, 3: 0.5},
2: {0: 0.25, 1: 0.25, 2: 0.25, 4: 0.25},
@@ -1062,8 +1080,9 @@ def set_transitions(self, mat):
probabilities, you can do so with the following:
>>> net.set_transitions({1 : {2: 0.75, 3: 0.25}})
- >>> net.transitions(False) # doctest: +ELLIPSIS
- ... # doctest: +NORMALIZE_WHITESPACE
+ >>> ans = net.transitions(False)
+ >>> pprint.pprint(ans) # doctest: +ELLIPSIS
+ ... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 1.0},
1: {2: 0.75, 3: 0.25},
2: {0: 0.25, 1: 0.25, 2: 0.25, 4: 0.25},
@@ -1074,13 +1093,14 @@ def set_transitions(self, mat):
:func:`.generate_transition_matrix`. You can change all
transition probabilities with an :class:`~numpy.ndarray`:
- >>> mat = qt.generate_transition_matrix(g, seed=10)
+ >>> mat = qt.generate_transition_matrix(g, seed=1234)
>>> net.set_transitions(mat)
- >>> net.transitions(False) # doctest: +ELLIPSIS
- ... # doctest: +NORMALIZE_WHITESPACE
+ >>> ans = net.transitions(False)
+ >>> pprint.pprint(ans) # doctest: +ELLIPSIS
+ ... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 1.0},
- 1: {2: 0.962..., 3: 0.037...},
- 2: {0: 0.301..., 1: 0.353..., 2: 0.235..., 4: 0.108...},
+ 1: {2: 0.240..., 3: 0.759...},
+ 2: {0: 0.192..., 1: 0.344..., 2: 0.340..., 4: 0.122...},
3: {1: 1.0},
4: {2: 1.0}}
@@ -1091,40 +1111,24 @@ def set_transitions(self, mat):
:func:`.generate_transition_matrix` : Generate a random routing
matrix.
"""
- if isinstance(mat, dict):
- for key, value in mat.items():
- probs = list(value.values())
-
- if key not in self.g.node:
- msg = "One of the keys don't correspond to a vertex."
- raise ValueError(msg)
- elif len(self.out_edges[key]) > 0 and not np.isclose(sum(probs), 1):
- msg = "Sum of transition probabilities at a vertex was not 1."
- raise ValueError(msg)
- elif (np.array(probs) < 0).any():
- msg = "Some transition probabilities were negative."
- raise ValueError(msg)
-
- for k, e in enumerate(self.g.out_edges(key)):
- self._route_probs[key][k] = value.get(e[1], 0)
-
- elif isinstance(mat, np.ndarray):
- non_terminal = np.array([self.g.out_degree(v) > 0 for v in self.g.nodes()])
- if mat.shape != (self.nV, self.nV):
- msg = ("Matrix is the wrong shape, should "
- "be {0} x {1}.").format(self.nV, self.nV)
- raise ValueError(msg)
- elif not np.allclose(np.sum(mat[non_terminal, :], axis=1), 1):
+ if isinstance(mat, np.ndarray):
+ mat = matrix2dict(mat, self.g)
+
+ for key, value in mat.items():
+ probs = list(value.values())
+
+ if key not in self.g.node:
+ msg = "One of the keys ({0}) doesn't correspond to a vertex."
+ raise ValueError(msg.format(key))
+ elif self.out_edges[key] and not np.isclose(sum(probs), 1):
msg = "Sum of transition probabilities at a vertex was not 1."
raise ValueError(msg)
- elif (mat < 0).any():
- raise ValueError("Some transition probabilities were negative.")
+ elif (np.array(probs) < 0).any():
+ msg = "Some transition probabilities were negative."
+ raise ValueError(msg)
- for k in range(self.nV):
- for j, e in enumerate(self.g.out_edges(k)):
- self._route_probs[k][j] = mat[k, e[1]]
- else:
- raise TypeError("mat must be a numpy array or a dict.")
+ for k, e in enumerate(sorted(self.g.out_edges(key))):
+ self._route_probs[key][k] = value.get(e[1], 0)
def show_active(self, **kwargs):
"""Draws the network, highlighting active queues.
@@ -1213,7 +1217,7 @@ def show_type(self, edge_type, **kwargs):
if self.g.is_edge(e) and self.g.ep(e, 'edge_type') == edge_type:
ei = self.g.edge_index[e]
self.g.set_vp(v, 'vertex_fill_color', self.colors['vertex_highlight'])
- self.g.set_vp(v, 'vertex_color', self.edge2queue[ei].colors['vertex_color'])
+ self.g.set_vp(v, 'vertex_color', self.edge2queue[ei].colors['vertex_color']) # noqa: E501
else:
self.g.set_vp(v, 'vertex_fill_color', self.colors['vertex_inactive'])
self.g.set_vp(v, 'vertex_color', [0, 0, 0, 0.9])
@@ -1314,7 +1318,8 @@ def _simulate_next_event(self, slow=True):
q2.num_blocked += 1
q1._departures[0].blocked += 1
if self._blocking:
- t = q2._departures[0]._time + EPS * uniform(0.33, 0.66)
+ jitter = EPS * self.random_state.uniform(0.33, 0.66)
+ t = q2._departures[0]._time + jitter
q1.delay_service(t)
else:
q1.delay_service()
@@ -1456,12 +1461,15 @@ def transitions(self, return_matrix=True):
>>> import queueing_tool as qt
>>> import networkx as nx
+ >>> import pprint
>>> g = nx.sedgewick_maze_graph()
>>> net = qt.QueueNetwork(g)
Below is an adjacency list for the graph ``g``.
- >>> qt.graph2dict(g, False)
+ >>> ans = qt.graph2dict(g, False)
+ >>> ans = dict([(k, sorted(v)) for k, v in ans.items()])
+ >>> pprint.pprint(ans)
... # doctest: +NORMALIZE_WHITESPACE
{0: [2, 5, 7],
1: [7],
@@ -1490,8 +1498,9 @@ def transitions(self, return_matrix=True):
>>> mat = qt.generate_transition_matrix(g, seed=96)
>>> net.set_transitions(mat)
- >>> net.transitions(False) # doctest: +ELLIPSIS
- ... # doctest: +NORMALIZE_WHITESPACE
+ >>> ans = net.transitions(False)
+ >>> pprint.pprint(ans) # doctest: +ELLIPSIS
+ ... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 0.112..., 5: 0.466..., 7: 0.420...},
1: {7: 1.0},
2: {0: 0.561..., 6: 0.438...},
@@ -1511,42 +1520,40 @@ def transitions(self, return_matrix=True):
if return_matrix:
mat = np.zeros((self.nV, self.nV))
for v in self.g.nodes():
- ind = [e[1] for e in self.g.out_edges(v)]
+ ind = [e[1] for e in sorted(self.g.out_edges(v))]
mat[v, ind] = self._route_probs[v]
else:
mat = {
- k: {e[1]: p for e, p in zip(self.g.out_edges(k), value)}
- for k, value in enumerate(self._route_probs)
+ k: {e[1]: p for e, p in zip(sorted(self.g.out_edges(k)), value)}
+ for k, value in self._route_probs.items()
}
return mat
def _update_all_colors(self):
- do = [True for v in range(self.nV)]
+ do = {v: True for v in self.g.nodes()}
for q in self.edge2queue:
- e = q.edge[:2]
- v = q.edge[1]
- if q.edge[0] == q.edge[1]:
- self.g.set_ep(e, 'edge_color', q._current_color(1))
- self.g.set_vp(v, 'vertex_color', q._current_color(2))
- if q.edge[3] != 0:
- self.g.set_vp(v, 'vertex_fill_color', q._current_color())
- do[v] = False
+ if q.edge.source == q.edge.target:
+ self.g.set_ep(q.edge, 'edge_color', q._current_color(1))
+ self.g.set_vp(q.edge.target, 'vertex_color', q._current_color(2))
+ if q.edge.edge_type != 0:
+ self.g.set_vp(q.edge.target, 'vertex_fill_color', q._current_color())
+ do[q.edge.target] = False
else:
- self.g.set_ep(e, 'edge_color', q._current_color())
- if do[v]:
- self._update_vertex_color(v)
- do[v] = False
- if do[q.edge[0]]:
- self._update_vertex_color(q.edge[0])
- do[q.edge[0]] = False
+ self.g.set_ep(q.edge, 'edge_color', q._current_color())
+ if do[q.edge.target]:
+ self._update_vertex_color(q.edge.target)
+ do[q.edge.target] = False
+ if do[q.edge.source]:
+ self._update_vertex_color(q.edge.source)
+ do[q.edge.source] = False
def _update_vertex_color(self, v):
ee = (v, v)
ee_is_edge = self.g.is_edge(ee)
eei = self.g.edge_index[ee] if ee_is_edge else 0
- if not ee_is_edge or (ee_is_edge and self.edge2queue[eei].edge[3] == 0):
+ if not ee_is_edge or (ee_is_edge and self.edge2queue[eei].edge.edge_type == 0):
nSy = 0
cap = 0
for ei in self.in_edges[v]:
@@ -1563,33 +1570,33 @@ def _update_vertex_color(self, v):
self.g.set_vp(v, 'vertex_color', self.colors['vertex_color'])
def _update_graph_colors(self, qedge):
- e = qedge[:2]
- v = qedge[1]
if self._prev_edge is not None:
- pe = self._prev_edge[:2]
- pv = self._prev_edge[1]
- q = self.edge2queue[self._prev_edge[2]]
-
- if pe[0] == pe[1]:
- self.g.set_ep(pe, 'edge_color', q._current_color(1))
- self.g.set_vp(pv, 'vertex_color', q._current_color(2))
- if q.edge[3] != 0:
- self.g.set_vp(v, 'vertex_fill_color', q._current_color())
-
+ q = self.edge2queue[self._prev_edge.edge_index]
+
+ if self._prev_edge.source == self._prev_edge.target:
+ self.g.set_ep(self._prev_edge, 'edge_color', q._current_color(1))
+ self.g.set_vp(self._prev_edge.target, 'vertex_color', q._current_color(2))
+
+ if q.edge.edge_type != 0:
+ self.g.set_vp(
+ v=self._prev_edge.target,
+ vertex_property='vertex_fill_color',
+ value=q._current_color()
+ )
else:
- self.g.set_ep(pe, 'edge_color', q._current_color())
- self._update_vertex_color(pv)
+ self.g.set_ep(self._prev_edge, 'edge_color', q._current_color())
+ self._update_vertex_color(self._prev_edge.target)
- q = self.edge2queue[qedge[2]]
- if qedge[0] == qedge[1]:
- self.g.set_ep(e, 'edge_color', q._current_color(1))
- self.g.set_vp(v, 'vertex_color', q._current_color(2))
- if q.edge[3] != 0:
- self.g.set_vp(v, 'vertex_fill_color', q._current_color())
+ q = self.edge2queue[qedge.edge_index]
+ if qedge.source == qedge.target:
+ self.g.set_ep(qedge, 'edge_color', q._current_color(1))
+ self.g.set_vp(qedge.target, 'vertex_color', q._current_color(2))
+ if q.edge.edge_type != 0:
+ self.g.set_vp(qedge.target, 'vertex_fill_color', q._current_color())
else:
- self.g.set_ep(e, 'edge_color', q._current_color())
- self._update_vertex_color(v)
+ self.g.set_ep(qedge, 'edge_color', q._current_color())
+ self._update_vertex_color(qedge.target)
def _get_queues(g, queues, edge, edge_type):
@@ -1618,7 +1625,7 @@ def _get_queues(g, queues, edge, edge_type):
if g.ep(e, 'edge_type') in edge_type:
tmp.append(g.edge_index[e])
- queues = np.array(tmp, int)
+ queues = np.array(sorted(tmp), int)
if queues is None:
queues = range(g.number_of_edges())
diff --git a/queueing_tool/queues/agents.py b/queueing_tool/queues/agents.py
index 74435bc..22ad55a 100644
--- a/queueing_tool/queues/agents.py
+++ b/queueing_tool/queues/agents.py
@@ -1,6 +1,7 @@
from numpy import infty
from numpy.random import uniform
+from queueing_tool.common import AgentID
from queueing_tool.queues.choice import _choice, _argmin
@@ -29,19 +30,19 @@ class Agent(object):
Attributes
----------
- agent_id : tuple
+ agent_id : namedtuple
A unique identifier for an agent.
blocked : int
Specifies how many times an agent has been blocked by a finite
capacity queue.
"""
- def __init__(self, agent_id=(0, 0), **kwargs):
- self.agent_id = agent_id
+ def __init__(self, agent_id=(0, 0), **_kwargs):
+ self.agent_id = AgentID(*agent_id)
self.blocked = 0
self._time = 0 # The agents arrival or departure time
def __repr__(self):
- return "Agent; agent_id:{0}. time: {1}".format(self.agent_id, round(self._time, 3))
+ return "Agent; {0}. time: {1}".format(self.agent_id, round(self._time, 3))
def __lt__(self, b):
return self._time < b._time
@@ -58,13 +59,14 @@ def __le__(self, b):
def __ge__(self, b):
return self._time >= b._time
- def add_loss(self, *args, **kwargs):
+ def add_loss(self, *_args, **_kwargs):
"""Adds one to the number of times the agent has been blocked
from entering a queue.
"""
self.blocked += 1
- def desired_destination(self, network, edge):
+ @staticmethod
+ def desired_destination(network, edge):
"""Returns the agents next destination given their current
location on the network.
@@ -114,8 +116,7 @@ def queue_action(self, queue, *args, **kwargs):
``args[0] == 0``), when service starts for the Agent (where
``args[0] == 1``), and when the Agent departs from the queue
(where ``args[0] == 2``). By default, this method does nothing
- to the queue, but is here if the Agent class is extended and
- this method is overwritten.
+ to the queue.
"""
pass
@@ -130,9 +131,6 @@ class GreedyAgent(Agent):
with the shortest line (where the ordering is given by
:class:`QueueNetwork's<.QueueNetwork>` ``out_edges`` attribute).
"""
- def __init__(self, agent_id=(0, 0)):
- Agent.__init__(self, agent_id)
-
def __repr__(self):
msg = "GreedyAgent; agent_id:{0}. time: {1}"
return msg.format(self.agent_id, round(self._time, 3))
diff --git a/queueing_tool/queues/queue_extentions.py b/queueing_tool/queues/queue_extentions.py
index 2a5150f..83f6e12 100644
--- a/queueing_tool/queues/queue_extentions.py
+++ b/queueing_tool/queues/queue_extentions.py
@@ -19,8 +19,8 @@ class ResourceAgent(Agent):
a resource to that queue by increasing the number of
servers there by one; the ``ResourceAgent`` is then deleted.
"""
- def __init__(self, agent_id=(0, 0)):
- super(ResourceAgent, self).__init__(agent_id)
+ def __init__(self, agent_id=(0, 0), **kwargs):
+ super(ResourceAgent, self).__init__(agent_id, **kwargs)
self._has_resource = False
self._had_resource = False
@@ -238,7 +238,7 @@ def __init__(self, agent_id=(0, 0), net_size=1, **kwargs):
def __repr__(self):
return "InfoAgent; agent_id:{0}. Time: {1}".format(self.agent_id, round(self._time, 3))
- def add_loss(self, qedge, *args, **kwargs): # qedge[2] is the edge_index of the queue
+ def add_loss(self, qedge, *_args, **_kwargs): # qedge[2] is the edge_index of the queue
self.stats[qedge[2], 2] += 1
def get_beliefs(self):
@@ -317,7 +317,7 @@ def _add_arrival(self, agent=None):
return
self._num_total += 1
- new_agent = self.AgentFactory((self.edge[2], self._oArrivals), len(self.net_data))
+ new_agent = self._agent_factory((self.edge[2], self._oArrivals), len(self.net_data))
new_agent._time = self._next_ct
heappush(self._arrivals, new_agent)
diff --git a/queueing_tool/queues/queue_servers.py b/queueing_tool/queues/queue_servers.py
index b2626bc..2c4cca0 100644
--- a/queueing_tool/queues/queue_servers.py
+++ b/queueing_tool/queues/queue_servers.py
@@ -3,10 +3,11 @@
import numbers
from heapq import heappush, heappop
-from numpy.random import uniform, exponential
+from numpy.random import uniform, exponential, RandomState
from numpy import infty
import numpy as np
+from queueing_tool.common import EdgeID
from queueing_tool.queues.agents import Agent, InftyAgent
@@ -119,6 +120,8 @@ class QueueServer(object):
the edge type for this queue. This is automatically created
when a :class:`.QueueNetwork` instance is created.
AgentFactory : class (optional, default: the :class:`~Agent` class)
+ Deprecated. Use the :meth:`~QueueServer.agent_factory` method
+ instead.
Any function that can create agents. Note that the function
must take one parameter.
active_cap : int (optional, default: ``infty``)
@@ -273,11 +276,11 @@ class QueueServer(object):
'vertex_color': [0.0, 0.5, 1.0, 1.0]
}
- def __init__(self, num_servers=1, arrival_f=None,
+ def __init__(self, num_servers=1, arrival_f=None, # pylint: disable=R0914
service_f=None, edge=(0, 0, 0, 1),
AgentFactory=Agent, collect_data=False, active_cap=infty,
deactive_t=infty, colors=None, seed=None,
- coloring_sensitivity=2, **kwargs):
+ coloring_sensitivity=2, random_state=None, **kwargs):
if not isinstance(num_servers, numbers.Integral) and num_servers is not infty:
msg = "num_servers must be an integer or infinity."
@@ -286,43 +289,43 @@ def __init__(self, num_servers=1, arrival_f=None,
msg = "num_servers must be a positive integer or infinity."
raise ValueError(msg)
- self.edge = edge
+ self.edge = EdgeID(*edge)
self.num_servers = kwargs.get('nServers', num_servers)
self.num_departures = 0
self.num_system = 0
self.data = {} # times; agent_id : [arrival, service start, departure]
self.queue = collections.deque()
+ if random_state is None:
+ random_state = RandomState(seed)
+
if arrival_f is None:
- def arrival_f(t):
- return t + exponential(1.0)
+ def arrival_f(t): # pylint: disable=E0102
+ return t + random_state.exponential(1.0)
if service_f is None:
- def service_f(t):
- return t + exponential(0.9)
+ def service_f(t): # pylint: disable=E0102
+ return t + random_state.exponential(0.9)
self.arrival_f = arrival_f
self.service_f = service_f
- self.AgentFactory = AgentFactory
+ self._agent_factory = AgentFactory
self.collect_data = collect_data
self.active_cap = active_cap
self.deactive_t = deactive_t
inftyAgent = InftyAgent()
- self._arrivals = [inftyAgent] # A list of arriving agents.
- self._departures = [inftyAgent] # A list of departing agents.
+ self._arrivals = [inftyAgent] # A list of arriving agents.
+ self._departures = [inftyAgent] # A list of departing agents.
self._num_arrivals = 0
self._oArrivals = 0
- self._num_total = 0 # The number of agents scheduled to arrive + num_system
+ self._num_total = 0 # noqa E501 The number of agents scheduled to arrive + num_system
self._active = False
- self._current_t = 0 # The time of the last event.
- self._time = infty # The time of the next event.
- self._next_ct = 0 # The next time an arrival from outside the network can arrive.
+ self._current_t = 0 # The time of the last event.
+ self._time = infty # The time of the next event.
+ self._next_ct = 0 # noqa E501 The next time an arrival from outside the network can arrive.
self.coloring_sensitivity = coloring_sensitivity
- if isinstance(seed, numbers.Integral):
- np.random.seed(seed)
-
if colors is not None:
self.colors = colors
for col in set(self._default_colors.keys()) - set(self.colors.keys()):
@@ -346,6 +349,11 @@ def current_time(self):
def num_arrivals(self):
return [self._num_arrivals, self._oArrivals]
+ @property
+ def AgentFactory(self):
+ # Deprecated
+ return self._agent_factory
+
def __repr__(self):
my_str = ("QueueServer:{0}. Servers: {1}, queued: {2}, arrivals: {3}, "
"departures: {4}, next time: {5}")
@@ -366,7 +374,7 @@ def _add_arrival(self, agent=None):
return
self._num_total += 1
- new_agent = self.AgentFactory((self.edge[2], self._oArrivals))
+ new_agent = self.agent_factory()
new_agent._time = self._next_ct
heappush(self._arrivals, new_agent)
@@ -378,6 +386,9 @@ def _add_arrival(self, agent=None):
if self._arrivals[0]._time < self._departures[0]._time:
self._time = self._arrivals[0]._time
+ def agent_factory(self):
+ return self._agent_factory((self.edge[2], self._oArrivals), queue=self)
+
def at_capacity(self):
"""Returns whether the queue is at capacity or not.
@@ -518,7 +529,7 @@ def fetch_data(self, return_header=False):
qdata.extend(d)
dat = np.zeros((len(qdata), 6))
- if len(qdata) > 0:
+ if qdata:
dat[:, :5] = np.array(qdata)
dat[:, 5] = self.edge[2]
@@ -577,7 +588,7 @@ def next_event(self):
if self.collect_data and new_depart.agent_id in self.data:
self.data[new_depart.agent_id][-1][2] = self._current_t
- if len(self.queue) > 0:
+ if self.queue:
agent = self.queue.popleft()
if self.collect_data and agent.agent_id in self.data:
self.data[agent.agent_id][-1][1] = self._current_t
@@ -672,11 +683,11 @@ def set_num_servers(self, n):
If ``n`` is not positive.
"""
if not isinstance(n, numbers.Integral) and n is not infty:
- the_str = "n must be an integer or infinity.\n{0}"
- raise TypeError(the_str.format(str(self)))
+ the_str = "n ({0}) must be an integer or infinity.\n{1}"
+ raise TypeError(the_str.format(n, str(self)))
elif n <= 0:
- the_str = "n must be a positive integer or infinity.\n{0}"
- raise ValueError(the_str.format(str(self)))
+ the_str = "n ({0}) must be a positive integer or infinity.\n{1}"
+ raise ValueError(the_str.format(n, str(self)))
else:
self.num_servers = n
@@ -704,6 +715,7 @@ def simulate(self, n=1, t=None, nA=None, nD=None):
>>> import queueing_tool as qt
>>> import numpy as np
+ >>> np.random.seed(15)
>>> rate = lambda t: 2 + 16 * np.sin(np.pi * t / 8)**2
>>> arr = lambda t: qt.poisson_random_measure(t, rate, 18)
>>> ser = lambda t: t + np.random.gamma(4, 0.1)
@@ -722,23 +734,23 @@ def simulate(self, n=1, t=None, nA=None, nD=None):
>>> t0 = q.time
>>> q.simulate(t=75)
>>> round(float(q.time - t0), 1)
- 75.1
+ 75.2
>>> q.num_arrivals[1] + q.num_departures - num_events
- 1597
+ 1594
To simulate forward until 1000 new departures are observed run:
>>> nA0, nD0 = q.num_arrivals[1], q.num_departures
>>> q.simulate(nD=1000)
>>> q.num_departures - nD0, q.num_arrivals[1] - nA0
- (1000, 983)
+ (1000, 1019)
To simulate until 1000 new arrivals are observed run:
>>> nA0, nD0 = q.num_arrivals[1], q.num_departures
>>> q.simulate(nA=1000)
>>> q.num_departures - nD0, q.num_arrivals[1] - nA0,
- (987, 1000)
+ (1010, 1000)
"""
if t is None and nD is None and nA is None:
for dummy in range(n):
@@ -885,7 +897,7 @@ class NullQueue(QueueServer):
'vertex_color': [0.5, 0.5, 0.5, 0.5]
}
- def __init__(self, *args, **kwargs):
+ def __init__(self, *_args, **kwargs):
if 'edge' not in kwargs:
kwargs['edge'] = (0, 0, 0, 0)
@@ -898,7 +910,7 @@ def __repr__(self):
def initialize(self, *args, **kwargs):
pass
- def set_num_servers(self, *args, **kwargs):
+ def set_num_servers(self, n):
pass
def number_queued(self):
@@ -911,7 +923,7 @@ def _add_arrival(self, agent=None):
else:
self.data[agent.agent_id].append([agent._time, 0, 0, 0, 0])
- def delay_service(self, *args, **kwargs):
+ def delay_service(self, t=None):
pass
def next_event_description(self):
diff --git a/requirements.txt b/requirements.txt
index fafae50..43d6849 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,5 @@
Cython==0.23.4
-matplotlib==1.5.1
+matplotlib==2.3.3
mock==1.0.1
networkx==1.11
numpy==1.10.4
diff --git a/setup.cfg b/setup.cfg
index 58f8c99..48a12d5 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -2,3 +2,6 @@
addopts = --doctest-modules --color=yes --capture=no
doctest_optionflags = NORMALIZE_WHITESPACE
testpaths = tests queueing_tool
+
+[flake8]
+max-line-length = 90
diff --git a/setup.py b/setup.py
index 3ed100b..bac9a4f 100644
--- a/setup.py
+++ b/setup.py
@@ -48,6 +48,8 @@
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
+ 'Programming Language :: Python :: 3.6',
+ 'Programming Language :: Python :: 3.7',
'Programming Language :: Cython',
'Topic :: Scientific/Engineering :: Information Analysis',
'Topic :: Scientific/Engineering :: Mathematics'
diff --git a/tests/test_graph_generation.py b/tests/test_graph_generation.py
index b6abfd1..0873beb 100644
--- a/tests/test_graph_generation.py
+++ b/tests/test_graph_generation.py
@@ -1,44 +1,29 @@
-import unittest
-
-from numpy.random import randint
import networkx as nx
import numpy as np
+import pytest
import queueing_tool as qt
def generate_adjacency(a=3, b=25, c=6, n=12):
ans = {}
- for k in np.unique(randint(a, b, n)):
- ans[k] = {j: {} for j in randint(a, b, randint(1, c))}
+ for k in np.unique(np.random.randint(a, b, n)):
+ ans[k] = {j: {} for j in np.random.randint(a, b, np.random.randint(1, c))}
return ans
-class TestGraphFunctions(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.expected_response0 = {
- 0: {1: {'edge_type': 5}},
- 1: {2: {'edge_type': 9}, 3: {'edge_type': 14}},
- 2: {0: {'edge_type': 1}},
- 3: {3: {'edge_type': 0}}
- }
- cls.expected_response1 = {
- 0: {1: {'edge_type': 5}},
- 1: {2: {'edge_type': 9}, 3: {'edge_type': 0}},
- 2: {0: {'edge_type': 1}},
- 3: {}
- }
+class TestGraphFunctions(object):
- def test_graph2dict(self):
+ @staticmethod
+ def test_graph2dict():
adj = generate_adjacency()
g1 = qt.adjacency2graph(adj, adjust=2)
aj1 = qt.graph2dict(g1)
g2 = qt.adjacency2graph(aj1, adjust=2)
- self.assertTrue(nx.is_isomorphic(g1, g2))
+ assert nx.is_isomorphic(g1, g2)
- def test_add_edge_lengths(self):
+ @staticmethod
+ def test_add_edge_lengths():
g1 = qt.generate_pagerank_graph(10)
g2 = qt.add_edge_lengths(g1)
@@ -46,20 +31,22 @@ def test_add_edge_lengths(self):
for key in g2.edge_properties():
edge_props.add(key)
- self.assertTrue('edge_length' in edge_props)
+ assert 'edge_length' in edge_props
- def test_generate_transition(self):
+ @staticmethod
+ def test_generate_transition():
g = qt.generate_random_graph(20)
mat = qt.generate_transition_matrix(g)
ans = np.sum(mat, axis=1)
- self.assertTrue(np.allclose(ans, 1))
+ np.testing.assert_allclose(ans, 1)
mat = qt.generate_transition_matrix(g, seed=10)
ans = np.sum(mat, axis=1)
- self.assertTrue(np.allclose(ans, 1))
+ np.testing.assert_allclose(ans, 1)
- def test_adjacency2graph_matrix_adjacency(self):
+ @staticmethod
+ def test_adjacency2graph_matrix_adjacency():
# Test adjacency argument using ndarray work
adj = np.array([[0, 1, 0, 0],
@@ -69,11 +56,17 @@ def test_adjacency2graph_matrix_adjacency(self):
ety = {0: {1: 5}, 1: {2: 9, 3: 14}}
g = qt.adjacency2graph(adj, edge_type=ety, adjust=2)
- ans = qt.graph2dict(g)
-
- self.assertTrue(ans == self.expected_response1)
+ actual = qt.graph2dict(g)
+ expected = {
+ 0: {1: {'edge_type': 5}},
+ 1: {2: {'edge_type': 9}, 3: {'edge_type': 0}},
+ 2: {0: {'edge_type': 1}},
+ 3: {}
+ }
+ assert actual == expected
- def test_adjacency2graph_matrix_etype(self):
+ @staticmethod
+ def test_adjacency2graph_matrix_etype():
# Test adjacency argument using ndarrays work
adj = {0: {1: {}}, 1: {2: {}, 3: {}}, 2: {0: {}}, 3: {}}
ety = np.array([[0, 5, 0, 0],
@@ -82,14 +75,22 @@ def test_adjacency2graph_matrix_etype(self):
[0, 0, 0, 0]])
g = qt.adjacency2graph(adj, edge_type=ety, adjust=1)
- ans = qt.graph2dict(g)
- self.assertEqual(ans, self.expected_response0)
+ actual = qt.graph2dict(g)
+ expected = {
+ 0: {1: {'edge_type': 5}},
+ 1: {2: {'edge_type': 9}, 3: {'edge_type': 14}},
+ 2: {0: {'edge_type': 1}},
+ 3: {3: {'edge_type': 0}}
+ }
+ assert expected == actual
- def test_adjacency2graph_errors(self):
- with self.assertRaises(TypeError):
+ @staticmethod
+ def test_adjacency2graph_errors():
+ with pytest.raises(TypeError):
qt.adjacency2graph([])
- def test_set_types_random(self):
+ @staticmethod
+ def test_set_types_random():
nV = 1200
nT = np.random.randint(5, 10)
@@ -107,16 +108,17 @@ def test_set_types_random(self):
props = (np.array(mat).sum(1) + 0.0) / len(non_loops)
ps = np.array([pType[k] for k in eType])
- self.assertTrue(np.allclose(props, ps, atol=0.01))
+ np.testing.assert_allclose(props, ps, atol=0.01)
prob[-1] = 2
pType = {eType[k]: prob[k] for k in range(nT)}
- with self.assertRaises(ValueError):
+ with pytest.raises(ValueError):
g = qt.set_types_random(g, proportions=pType, seed=10)
- with self.assertRaises(ValueError):
+ with pytest.raises(ValueError):
g = qt.set_types_random(g, loop_proportions=pType, seed=10)
- def test_test_graph_importerror(self):
- with self.assertRaises(TypeError):
+ @staticmethod
+ def test_test_graph_importerror():
+ with pytest.raises(TypeError):
qt.generate_transition_matrix(1)
diff --git a/tests/test_network.py b/tests/test_network.py
index 941c0f3..4bfadde 100644
--- a/tests/test_network.py
+++ b/tests/test_network.py
@@ -1,5 +1,3 @@
-import os
-import unittest
try:
import unittest.mock as mock
except ImportError:
@@ -13,44 +11,50 @@
import networkx as nx
import numpy as np
+import pytest
import queueing_tool as qt
-TRAVIS_TEST = os.environ.get('TRAVIS_TEST', False)
+@pytest.fixture(name='queue_network', scope='module')
+def fixture_queue_network():
+ g = qt.generate_pagerank_graph(200)
+ qn = qt.QueueNetwork(g)
+ qn.g.draw_graph = mock.MagicMock()
+ qn.max_agents = 2000
+ qn.initialize(50)
+ return qn
-class TestQueueNetwork(unittest.TestCase):
+@pytest.fixture
+def clear_queue_network(queue_network):
+ yield
+ queue_network.clear()
+ queue_network.initialize(50)
- @classmethod
- def setUpClass(cls):
- cls.g = qt.generate_pagerank_graph(200)
- cls.qn = qt.QueueNetwork(cls.g)
- cls.qn.g.draw_graph = mock.MagicMock()
- cls.qn.max_agents = 2000
- cls.qn.initialize(50)
- def tearDown(self):
- self.qn.clear()
- self.qn.initialize(50)
+@pytest.mark.usefixtures('clear_queue_network')
+class TestQueueNetwork(object):
- def test_QueueNetwork_accounting(self):
+ @staticmethod
+ def test_accounting(queue_network):
- num_events = 2500
+ num_events = 1500
ans = np.zeros(num_events, bool)
- na = np.zeros(self.qn.nE, int)
- for q in self.qn.edge2queue:
+ na = np.zeros(queue_network.nE, int)
+ for q in queue_network.edge2queue:
na[q.edge[2]] = len(q._arrivals) + len(q._departures) + len(q.queue) - 2
for k in range(num_events):
- ans[k] = (self.qn.num_agents == na).all()
- self.qn.simulate(n=1)
- for q in self.qn.edge2queue:
+ ans[k] = (queue_network.num_agents == na).all()
+ queue_network.simulate(n=1)
+ for q in queue_network.edge2queue:
na[q.edge[2]] = len(q._arrivals) + len(q._departures) + len(q.queue) - 2
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_QueueNetwork_add_arrival(self):
+ @staticmethod
+ def test_add_arrival():
adj = {0: [1], 1: [2, 3]}
g = qt.adjacency2graph(adj)
@@ -61,7 +65,7 @@ def test_QueueNetwork_add_arrival(self):
qn.initialize(edges=(0, 1))
qn.start_collecting_data(edge=[(1, 2), (1, 3)])
- qn.simulate(150000)
+ qn.simulate(15000)
data = qn.get_queue_data(edge=[(1, 2), (1, 3)])
e0, e1 = qn.out_edges[1]
@@ -71,18 +75,20 @@ def test_QueueNetwork_add_arrival(self):
trans = qn.transitions(False)
- self.assertAlmostEqual(trans[1][2], p0, 2)
- self.assertAlmostEqual(trans[1][3], p1, 2)
+ assert np.isclose(trans[1][2], p0, atol=1e-1)
+ assert np.isclose(trans[1][3], p1, atol=1e-1)
- def test_QueueNetwork_animate(self):
+ @staticmethod
+ def test_animate(queue_network):
if not HAS_MATPLOTLIB:
with mock.patch('queueing_tool.network.queue_network.plt.show'):
- self.qn.animate(frames=5)
+ queue_network.animate(frames=5)
else:
plt.switch_backend('Agg')
- self.qn.animate(frames=5)
+ queue_network.animate(frames=5)
- def test_QueueNetwork_blocking(self):
+ @staticmethod
+ def test_blocking():
g = nx.random_geometric_graph(100, 0.2).to_directed()
g = qt.set_types_random(g, proportions={k: 1.0 / 6 for k in range(1, 7)})
@@ -103,35 +109,38 @@ def test_QueueNetwork_blocking(self):
qn = qt.QueueNetwork(g, q_classes=q_cls, q_args=q_arg, seed=17)
qn.blocking = 'RS'
- self.assertEqual(qn.blocking, 'RS')
- self.assertEqual(qn._blocking, False)
+ assert qn.blocking == 'RS'
+ assert qn._blocking is False
qn.clear()
- self.assertEqual(qn._initialized, False)
+ assert qn._initialized is False
- def test_QueueNetwork_blocking_setter_error(self):
- self.qn.blocking = 'RS'
- with self.assertRaises(TypeError):
- self.qn.blocking = 2
+ @staticmethod
+ def test_blocking_setter_error(queue_network):
+ queue_network.blocking = 'RS'
+ with pytest.raises(TypeError):
+ queue_network.blocking = 2
- def test_QueueNetwork_closedness(self):
+ @staticmethod
+ def test_closedness(queue_network):
num_events = 2500
ans = np.zeros(num_events, bool)
- na = np.zeros(self.qn.nE, int)
- for q in self.qn.edge2queue:
+ na = np.zeros(queue_network.nE, int)
+ for q in queue_network.edge2queue:
na[q.edge[2]] = len(q._arrivals) + len(q._departures) + len(q.queue) - 2
for k in range(num_events):
- ans[k] = np.sum(self.qn.num_agents) >= np.sum(na)
- for q in self.qn.edge2queue:
+ ans[k] = np.sum(queue_network.num_agents) >= np.sum(na)
+ for q in queue_network.edge2queue:
na[q.edge[2]] = len(q._arrivals) + len(q._departures) + len(q.queue) - 2
- self.qn.simulate(n=1)
+ queue_network.simulate(n=1)
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_QueueNetwork_copy(self):
+ @staticmethod
+ def test_copy():
g = nx.random_geometric_graph(100, 0.2).to_directed()
g = qt.set_types_random(g, proportions={k: 0.2 for k in range(1, 6)})
@@ -144,69 +153,74 @@ def test_QueueNetwork_copy(self):
}
q_arg = {3: {'net_size': g.number_of_edges()},
- 4: {'num_servers': 500}}
+ 4: {'num_servers': 50}}
qn = qt.QueueNetwork(g, q_classes=q_cls, q_args=q_arg, seed=17)
qn.max_agents = np.infty
qn.initialize(queues=range(g.number_of_edges()))
- qn.simulate(n=50000)
+ qn.simulate(n=5000)
qn2 = qn.copy()
stamp = [(q.num_arrivals, q.time) for q in qn2.edge2queue]
- qn2.simulate(n=25000)
+ qn2.simulate(n=2500)
- self.assertFalse(qn.current_time == qn2.current_time)
- self.assertFalse(qn.time == qn2.time)
+ assert qn.current_time != qn2.current_time
+ assert qn.time != qn2.time
ans = []
for k, q in enumerate(qn2.edge2queue):
if stamp[k][1] != q.time:
ans.append(q.time != qn.edge2queue[k].time)
- self.assertTrue(np.array(ans).all())
+ assert np.array(ans).all()
+ @staticmethod
@mock.patch('queueing_tool.network.queue_network.HAS_MATPLOTLIB', True)
- def test_QueueNetwork_drawing(self):
+ def test_drawing(queue_network):
scatter_kwargs = {'c': 'b'}
kwargs = {'bgcolor': 'green'}
- self.qn.draw(scatter_kwargs=scatter_kwargs, **kwargs)
- self.qn.g.draw_graph.assert_called_with(scatter_kwargs=scatter_kwargs,
- line_kwargs=None, **kwargs)
+ queue_network.draw(scatter_kwargs=scatter_kwargs, **kwargs)
+ queue_network.g.draw_graph.assert_called_with(scatter_kwargs=scatter_kwargs,
+ line_kwargs=None, **kwargs)
- self.qn.draw(scatter_kwargs=scatter_kwargs)
- bgcolor = self.qn.colors['bgcolor']
- self.qn.g.draw_graph.assert_called_with(scatter_kwargs=scatter_kwargs,
- line_kwargs=None, bgcolor=bgcolor)
+ queue_network.draw(scatter_kwargs=scatter_kwargs)
+ bgcolor = queue_network.colors['bgcolor']
+ queue_network.g.draw_graph.assert_called_with(scatter_kwargs=scatter_kwargs,
+ line_kwargs=None, bgcolor=bgcolor)
+ @staticmethod
@mock.patch('queueing_tool.network.queue_network.HAS_MATPLOTLIB', False)
- def test_QueueNetwork_drawing_importerror(self):
- with self.assertRaises(ImportError):
- self.qn.draw()
+ def test_drawing_importerror(queue_network):
+ with pytest.raises(ImportError):
+ queue_network.draw()
- def test_QueueNetwork_drawing_animation_error(self):
- self.qn.clear()
- with self.assertRaises(qt.QueueingToolError):
- self.qn.animate()
+ @staticmethod
+ def test_drawing_animation_error(queue_network):
+ queue_network.clear()
+ with pytest.raises(qt.QueueingToolError):
+ queue_network.animate()
- self.qn.initialize()
+ queue_network.initialize()
with mock.patch('queueing_tool.network.queue_network.HAS_MATPLOTLIB', False):
- with self.assertRaises(ImportError):
- self.qn.animate()
+ with pytest.raises(ImportError):
+ queue_network.animate()
- def test_QueueNetwork_init_error(self):
+ @staticmethod
+ def test_init_error():
g = qt.generate_pagerank_graph(7)
- with self.assertRaises(TypeError):
+ with pytest.raises(TypeError):
qt.QueueNetwork(g, blocking=2)
- def test_QueueNetwork_get_agent_data(self):
+ @staticmethod
+ def test_get_agent_data(queue_network):
- self.qn.clear()
- self.qn.initialize(queues=1)
- self.qn.start_collecting_data()
- self.qn.simulate(n=20000)
+ queue_network.clear()
+ queue_network.initialize(queues=1)
+ queue_network.start_collecting_data()
+ queue_network.simulate(n=2000)
- data = self.qn.get_agent_data()
+ data = queue_network.get_agent_data()
dat0 = data[(1, 0)]
a = dat0[:, 0]
@@ -217,33 +231,35 @@ def test_QueueNetwork_get_agent_data(self):
b.sort()
c.sort()
- self.assertTrue((a == dat0[:, 0]).all())
- self.assertTrue((b == dat0[dat0[:, 1] > 0, 1]).all())
- self.assertTrue((c == dat0[dat0[:, 2] > 0, 2]).all())
- self.assertTrue((dat0[1:, 0] == dat0[dat0[:, 2] > 0, 2]).all())
+ assert (a == dat0[:, 0]).all()
+ assert (b == dat0[dat0[:, 1] > 0, 1]).all()
+ assert (c == dat0[dat0[:, 2] > 0, 2]).all()
+ assert (dat0[1:, 0] == dat0[dat0[:, 2] > 0, 2]).all()
- def test_QueueNetwork_get_queue_data(self):
+ @staticmethod
+ def test_get_queue_data():
g = nx.random_geometric_graph(50, 0.5).to_directed()
q_cls = {1: qt.QueueServer}
qn = qt.QueueNetwork(g, q_classes=q_cls, seed=17)
- k = np.random.randint(10000, 20000)
+ k = np.random.randint(1000, 2000)
- qn.max_agents = 4000
+ qn.max_agents = 400
qn.initialize(queues=range(qn.nE))
qn.start_collecting_data()
qn.simulate(n=k)
data = qn.get_queue_data()
- self.assertEqual(data.shape, (k, 6))
+ assert data.shape == (k, 6)
qn.stop_collecting_data()
qn.clear_data()
ans = np.array([q.data == {} for q in qn.edge2queue])
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_QueueNetwork_greedy_routing(self):
+ @staticmethod
+ def test_greedy_routing():
lam = np.random.randint(1, 10) + 0.0
rho = np.random.uniform(0.75, 1)
@@ -284,9 +300,9 @@ def ser_id(t):
qn = qt.QueueNetwork(g, q_classes=qcl, q_args=arg)
qn.initialize(edges=(0, 1))
- qn.max_agents = 5000
+ qn.max_agents = 500
- num_events = 1000
+ num_events = 100
ans = np.zeros(num_events, bool)
e01 = qn.g.edge_index[(0, 1)]
edg = qn.edge2queue[e01].edge
@@ -296,161 +312,143 @@ def ser_id(t):
qn.simulate(n=1)
if qn.next_event_description() == ('Departure', e01):
d0 = qn.edge2queue[e01]._departures[0].desired_destination(qn, edg)
- a1 = np.argmin([qn.edge2queue[e].number_queued() for e in qn.out_edges[1]])
+ a1 = np.argmin([qn.edge2queue[e].number_queued() for e in qn.out_edges[1]]) # noqa: E501
d1 = qn.out_edges[1][a1]
ans[c] = d0 == d1
c += 1
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_QueueNetwork_initialize_Error(self):
- self.qn.clear()
- with self.assertRaises(ValueError):
- self.qn.initialize(nActive=0)
+ @staticmethod
+ def test_initialize_error(queue_network):
+ queue_network.clear()
+ with pytest.raises(ValueError):
+ queue_network.initialize(nActive=0)
- with self.assertRaises(TypeError):
- self.qn.initialize(nActive=1.6)
+ with pytest.raises(TypeError):
+ queue_network.initialize(nActive=1.6)
_get_queues_mock = mock.Mock()
_get_queues_mock.return_value = []
mock_location = 'queueing_tool.network.queue_network._get_queues'
with mock.patch(mock_location, _get_queues_mock):
- with self.assertRaises(qt.QueueingToolError):
- self.qn.initialize(edge_type=1)
+ with pytest.raises(qt.QueueingToolError):
+ queue_network.initialize(edge_type=1)
- def test_QueueNetwork_initialization(self):
+ @staticmethod
+ def test_initialization_single_edge_index(queue_network):
# Single edge index
- k = np.random.randint(0, self.qn.nE)
- self.qn.clear()
- self.qn.initialize(queues=k)
+ k = np.random.randint(0, queue_network.nE)
+ queue_network.clear()
+ queue_network.initialize(queues=k)
- ans = [q.edge[2] for q in self.qn.edge2queue if q.active]
- self.assertEqual(ans, [k])
+ ans = [q.edge[2] for q in queue_network.edge2queue if q.active]
+ assert ans == [k]
+ @staticmethod
+ def test_initialization_multiple_edge_index(queue_network):
# Multiple edge indices
- k = np.unique(np.random.randint(0, self.qn.nE, 5))
- self.qn.clear()
- self.qn.initialize(queues=k)
+ k = np.unique(np.random.randint(0, queue_network.nE, 5))
+ queue_network.clear()
+ queue_network.initialize(queues=k)
- ans = np.array([q.edge[2] for q in self.qn.edge2queue if q.active])
+ ans = np.array([q.edge[2] for q in queue_network.edge2queue if q.active])
ans.sort()
- self.assertTrue((ans == k).all())
+ assert (ans == k).all()
+ @staticmethod
+ def test_initialization_single_edge(queue_network):
# Single edge as edge
- k = np.random.randint(0, self.qn.nE)
- e = self.qn.edge2queue[k].edge[:2]
- self.qn.clear()
- self.qn.initialize(edges=e)
-
- ans = [q.edge[2] for q in self.qn.edge2queue if q.active]
- self.assertEqual(ans, [k])
+ k = np.random.randint(0, queue_network.nE)
+ e = queue_network.edge2queue[k].edge[:2]
+ queue_network.clear()
+ queue_network.initialize(edges=e)
- # Single edge as tuple
- k = np.random.randint(0, self.qn.nE)
- e = self.qn.edge2queue[k].edge[:2]
- self.qn.clear()
- self.qn.initialize(edges=e)
-
- ans = [q.edge[2] for q in self.qn.edge2queue if q.active]
- self.assertEqual(ans, [k])
+ ans = [q.edge[2] for q in queue_network.edge2queue if q.active]
+ assert ans == [k]
+ @staticmethod
+ def test_initialization_multiple_edges(queue_network):
# Multiple edges as tuples
- k = np.unique(np.random.randint(0, self.qn.nE, 5))
- es = [self.qn.edge2queue[i].edge[:2] for i in k]
- self.qn.clear()
- self.qn.initialize(edges=es)
-
- ans = [q.edge[2] for q in self.qn.edge2queue if q.active]
- self.assertTrue((ans == k).all())
+ k = np.unique(np.random.randint(0, queue_network.nE, 5))
+ es = [queue_network.edge2queue[i].edge[:2] for i in k]
+ queue_network.clear()
+ queue_network.initialize(edges=es)
- # Multple edges as edges
- k = np.unique(np.random.randint(0, self.qn.nE, 5))
- es = [self.qn.edge2queue[i].edge[:2] for i in k]
- self.qn.clear()
- self.qn.initialize(edges=es)
-
- ans = [q.edge[2] for q in self.qn.edge2queue if q.active]
- self.assertTrue((ans == k).all())
+ ans = [q.edge[2] for q in queue_network.edge2queue if q.active]
+ assert (ans == k).all()
+ @staticmethod
+ def test_initialization_single_edge_type(queue_network):
# Single edge_type
k = np.random.randint(1, 4)
- self.qn.clear()
- self.qn.initialize(edge_type=k)
+ queue_network.clear()
+ queue_network.initialize(edge_type=k)
- ans = np.array([q.edge[3] == k for q in self.qn.edge2queue if q.active])
- self.assertTrue(ans.all())
+ ans = np.array([q.edge[3] == k for q in queue_network.edge2queue if q.active])
+ assert ans.all()
+ @staticmethod
+ def test_initialization_multiple_edge_types(queue_network):
# Multiple edge_types
k = np.unique(np.random.randint(1, 4, 3))
- self.qn.clear()
- self.qn.initialize(edge_type=k)
+ queue_network.clear()
+ queue_network.initialize(edge_type=k)
- ans = np.array([q.edge[3] in k for q in self.qn.edge2queue if q.active])
- self.assertTrue(ans.all())
+ ans = np.array([q.edge[3] in k for q in queue_network.edge2queue if q.active])
+ assert ans.all()
- self.qn.clear()
- self.qn.max_agents = 3
- self.qn.initialize(nActive=self.qn.num_edges)
- ans = np.array([q.active for q in self.qn.edge2queue])
- self.assertEqual(ans.sum(), 3)
+ @staticmethod
+ def test_initialization_num_active_edges(queue_network):
+ queue_network.clear()
+ queue_network.max_agents = 3
+ queue_network.initialize(nActive=queue_network.num_edges)
+ ans = np.array([q.active for q in queue_network.edge2queue])
+ assert ans.sum() == 3
- def test_QueueNetwork_max_agents(self):
+ @staticmethod
+ def test_max_agents(queue_network):
num_events = 1500
- self.qn.max_agents = 200
+ queue_network.max_agents = 200
ans = np.zeros(num_events, bool)
for k in range(num_events // 2):
- ans[k] = np.sum(self.qn.num_agents) <= self.qn.max_agents
- self.qn.simulate(n=1)
+ ans[k] = np.sum(queue_network.num_agents) <= queue_network.max_agents
+ queue_network.simulate(n=1)
- self.qn.simulate(n=20000)
+ queue_network.simulate(n=20000)
for k in range(num_events // 2, num_events):
- ans[k] = np.sum(self.qn.num_agents) <= self.qn.max_agents
- self.qn.simulate(n=1)
-
- self.assertTrue(ans.all())
-
- def test_QueueNetwork_properties(self):
- self.qn.clear()
- self.assertEqual(self.qn.time, np.infty)
- self.assertEqual(self.qn.num_edges, self.qn.nE)
- self.assertEqual(self.qn.num_vertices, self.qn.nV)
- self.assertEqual(self.qn.num_nodes, self.qn.nV)
-
- def test_QueueNetwork_set_transitions_Error(self):
- with self.assertRaises(ValueError):
- self.qn.set_transitions({-1: {0: 0.75, 1: 0.25}})
-
- with self.assertRaises(ValueError):
- self.qn.set_transitions({self.qn.nV: {0: 0.75, 1: 0.25}})
-
- with self.assertRaises(ValueError):
- self.qn.set_transitions({0: {0: 0.75, 1: -0.25}})
-
- with self.assertRaises(ValueError):
- self.qn.set_transitions({0: {0: 1.25, 1: -0.25}})
-
- mat = np.zeros((2, 2))
- with self.assertRaises(ValueError):
- self.qn.set_transitions(mat)
-
- mat = np.zeros((self.qn.nV, self.qn.nV))
- with self.assertRaises(ValueError):
- self.qn.set_transitions(mat)
-
- mat[0, 0] = -1
- mat[0, 1] = 2
- with self.assertRaises(ValueError):
- self.qn.set_transitions(mat)
-
- mat = 1
- with self.assertRaises(TypeError):
- self.qn.set_transitions(mat)
-
- def test_QueueNetwork_simulate(self):
+ ans[k] = np.sum(queue_network.num_agents) <= queue_network.max_agents
+ queue_network.simulate(n=1)
+
+ assert ans.all()
+
+ @staticmethod
+ def test_properties(queue_network):
+ queue_network.clear()
+ assert queue_network.time == np.infty
+ assert queue_network.num_edges == queue_network.nE
+ assert queue_network.num_vertices == queue_network.nV
+ assert queue_network.num_nodes == queue_network.nV
+
+ @staticmethod
+ @pytest.mark.parametrize('mat', [
+ {-1: {0: 0.75, 1: 0.25}},
+ {200: {0: 0.75, 1: 0.25}},
+ {0: {0: 0.75, 1: -0.25}},
+ {0: {0: 1.25, 1: -0.25}},
+ np.zeros((2, 2)),
+ np.zeros((200, 200)),
+ ])
+ def test_set_transitions_error(mat, queue_network):
+ with pytest.raises(ValueError):
+ queue_network.set_transitions(mat)
+
+ @staticmethod
+ def test_simulate():
g = qt.generate_pagerank_graph(50)
qn = qt.QueueNetwork(g)
@@ -460,113 +458,120 @@ def test_QueueNetwork_simulate(self):
qn.max_agents = 2000
qn.simulate(t=t0)
- self.assertGreater(qn.current_time, t0)
+ assert qn.current_time > t0
- def test_QueueNetwork_simulate_error(self):
- self.qn.clear()
- with self.assertRaises(qt.QueueingToolError):
- self.qn.simulate()
+ @staticmethod
+ def test_simulate_error(queue_network):
+ queue_network.clear()
+ with pytest.raises(qt.QueueingToolError):
+ queue_network.simulate()
- def test_QueueNetwork_simulate_slow(self):
- e = self.qn._fancy_heap.array_edges[0]
- edge = self.qn.edge2queue[e].edge
+ @staticmethod
+ def test_simulate_slow(queue_network):
+ e = queue_network._fancy_heap.array_edges[0]
+ edge = queue_network.edge2queue[e].edge
if edge[0] == edge[1]:
- for q in self.qn.edge2queue:
+ for q in queue_network.edge2queue:
if q.edge[0] != q.edge[1]:
break
- self.qn._simulate_next_event(slow=True)
+ queue_network._simulate_next_event(slow=True)
else:
- for q in self.qn.edge2queue:
+ for q in queue_network.edge2queue:
if q.edge[0] == q.edge[1]:
break
- self.qn._simulate_next_event(slow=True)
+ queue_network._simulate_next_event(slow=True)
- self.qn.clear()
- self.qn.initialize(queues=[q.edge[2]])
- e = self.qn._fancy_heap.array_edges[0]
- edge = self.qn.edge2queue[e].edge
+ queue_network.clear()
+ queue_network.initialize(queues=[q.edge[2]])
+ e = queue_network._fancy_heap.array_edges[0]
+ edge = queue_network.edge2queue[e].edge
loop = edge[0] == edge[1]
- self.qn._simulate_next_event(slow=True)
+ queue_network._simulate_next_event(slow=True)
while True:
- e = self.qn._fancy_heap.array_edges[0]
- edge = self.qn.edge2queue[e].edge
+ e = queue_network._fancy_heap.array_edges[0]
+ edge = queue_network.edge2queue[e].edge
if (edge[0] != edge[1]) == loop:
- self.qn._simulate_next_event(slow=True)
+ queue_network._simulate_next_event(slow=True)
break
else:
- self.qn._simulate_next_event(slow=False)
+ queue_network._simulate_next_event(slow=False)
+ @staticmethod
@mock.patch('queueing_tool.network.queue_network.HAS_MATPLOTLIB', True)
- def test_QueueNetwork_show_type(self):
+ def test_show_type(queue_network):
args = {'c': 'b', 'bgcolor': 'green'}
- self.qn.show_type(edge_type=2, **args)
- self.qn.g.draw_graph.assert_called_with(scatter_kwargs=None,
- line_kwargs=None, **args)
+ queue_network.show_type(edge_type=2, **args)
+ queue_network.g.draw_graph.assert_called_with(scatter_kwargs=None,
+ line_kwargs=None, **args)
+ @staticmethod
@mock.patch('queueing_tool.network.queue_network.HAS_MATPLOTLIB', True)
- def test_QueueNetwork_show_active(self):
+ def test_show_active(queue_network):
args = {
'fname': 'types.png',
'figsize': (3, 3),
'bgcolor': 'green'
}
- self.qn.show_active(**args)
- self.qn.g.draw_graph.assert_called_with(scatter_kwargs=None,
- line_kwargs=None, **args)
+ queue_network.show_active(**args)
+ queue_network.g.draw_graph.assert_called_with(scatter_kwargs=None,
+ line_kwargs=None, **args)
- def test_QueueNetwork_sorting(self):
+ @staticmethod
+ def test_sorting(queue_network):
num_events = 2000
ans = np.zeros(num_events, bool)
for k in range(num_events // 2):
- queue_times = [q.time for q in self.qn.edge2queue]
+ queue_times = [q.time for q in queue_network.edge2queue]
queue_times.sort()
tmp = queue_times[0]
- self.qn.simulate(n=1)
- ans[k] = (tmp == self.qn._qkey[0])
+ queue_network.simulate(n=1)
+ ans[k] = (tmp == queue_network._qkey[0])
- self.qn.simulate(n=10000)
+ queue_network.simulate(n=10000)
for k in range(num_events // 2, num_events):
- queue_times = [q.time for q in self.qn.edge2queue]
+ queue_times = [q.time for q in queue_network.edge2queue]
queue_times.sort()
tmp = queue_times[0]
- self.qn.simulate(n=1)
- ans[k] = (tmp == self.qn._qkey[0])
+ queue_network.simulate(n=1)
+ ans[k] = (tmp == queue_network._qkey[0])
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_QueueNetwork_transitions(self):
+ @staticmethod
+ def test_transitions(queue_network):
- degree = [len(self.qn.out_edges[k]) for k in range(self.qn.nV)]
+ degree = [len(queue_network.out_edges[k]) for k in range(queue_network.nV)]
v, deg = np.argmax(degree), max(degree)
trans = np.random.uniform(size=deg)
trans = trans / sum(trans)
- probs = {v: {e[1]: p for e, p in zip(self.qn.g.out_edges(v), trans)}}
+ edges = sorted(queue_network.g.out_edges(v))
+ probs = {v: {e[1]: p for e, p in zip(edges, trans)}}
- self.qn.set_transitions(probs)
- mat = self.qn.transitions()
- tra = mat[v, [e[1] for e in self.qn.g.out_edges(v)]]
+ queue_network.set_transitions(probs)
+ mat = queue_network.transitions()
+ tra = mat[v, [e[1] for e in edges]]
- self.assertTrue((tra == trans).all())
+ assert (tra == trans).all()
- tra = self.qn.transitions(return_matrix=False)
- tra = np.array([tra[v][e[1]] for e in self.qn.g.out_edges(v)])
- self.assertTrue((tra == trans).all())
+ tra = queue_network.transitions(return_matrix=False)
+ tra = np.array([tra[v][e[1]] for e in edges])
+ assert (tra == trans).all()
- mat = qt.generate_transition_matrix(self.g)
- self.qn.set_transitions(mat)
- tra = self.qn.transitions()
+ mat = qt.generate_transition_matrix(queue_network.g)
+ queue_network.set_transitions(mat)
+ tra = queue_network.transitions()
- self.assertTrue(np.allclose(tra, mat))
+ assert np.allclose(tra, mat)
- mat = qt.generate_transition_matrix(self.g)
- self.qn.set_transitions({v: {e[1]: mat[e] for e in self.qn.g.out_edges(v)}})
- tra = self.qn.transitions()
+ mat = qt.generate_transition_matrix(queue_network.g)
+ queue_network.set_transitions({v: {e[1]: mat[e] for e in edges}}) # noqa: E501
+ tra = queue_network.transitions()
- self.assertTrue(np.allclose(tra[v], mat[v]))
+ assert np.allclose(tra[v], mat[v])
diff --git a/tests/test_qndigraph.py b/tests/test_qndigraph.py
index 0acae68..9e6d971 100644
--- a/tests/test_qndigraph.py
+++ b/tests/test_qndigraph.py
@@ -1,5 +1,4 @@
import os
-import unittest
try:
import unittest.mock as mock
except ImportError:
@@ -7,6 +6,7 @@
import networkx as nx
import numpy as np
+import pytest
import matplotlib
import matplotlib.image
@@ -29,36 +29,41 @@
}
-class TestQueueNetworkDiGraph(unittest.TestCase):
+@pytest.fixture(name='queue_network_graph', scope='module')
+def fixture_queue_network_graph():
+ np.random.seed(10)
+ return qt.QueueNetworkDiGraph(nx.krackhardt_kite_graph())
- @classmethod
- def setUpClass(cls):
- cls.g = qt.QueueNetworkDiGraph(nx.krackhardt_kite_graph())
- np.random.seed(10)
+class TestQueueNetworkDiGraph(object):
+
+ @staticmethod
@mock.patch.dict('sys.modules', matplotlib_mock)
- def testlines_scatter_args(self):
+ def test_lines_scatter_args(queue_network_graph):
+ np.random.seed(10)
ax = mock.Mock()
ax.transData = mock.Mock()
line_args = {'linewidths': 77, 'vmax': 107}
scat_args = {'vmax': 107}
- kwargs = {'pos': {v: (910, 10) for v in self.g.nodes()}}
+ kwargs = {'pos': {v: (910, 10) for v in queue_network_graph.nodes()}}
- a, b = self.g.lines_scatter_args(line_args, scat_args, **kwargs)
+ a, b = queue_network_graph.lines_scatter_args(line_args, scat_args, **kwargs)
- self.assertEqual(a['linewidths'], 77)
- self.assertEqual(b['vmax'], 107)
- self.assertTrue('beefy' not in a and 'beefy' not in b)
+ assert a['linewidths'] == 77
+ assert b['vmax'] == 107
+ assert 'beefy' not in a and 'beefy' not in b
- def test_draw_graph(self):
- pos = np.random.uniform(size=(self.g.number_of_nodes(), 2))
+ @staticmethod
+ def test_draw_graph(queue_network_graph):
+ np.random.seed(10)
+ pos = np.random.uniform(size=(queue_network_graph.number_of_nodes(), 2))
kwargs = {
'fname': 'test1.png',
'pos': pos
}
- self.g.draw_graph(scatter_kwargs={'s': 100}, **kwargs)
+ queue_network_graph.draw_graph(scatter_kwargs={'s': 100}, **kwargs)
- version = 1 if matplotlib.__version__.startswith('1') else 2
+ version = matplotlib.__version__[0]
filename = 'test-mpl-{version}.x.png'.format(version=version)
img0 = matplotlib.image.imread('tests/img/{filename}'.format(filename=filename))
@@ -69,15 +74,15 @@ def test_draw_graph(self):
pixel_diff = (img0 != img1).flatten()
num_pixels = pixel_diff.shape[0] + 0.0
- self.assertLess(pixel_diff.sum() / num_pixels, 0.0001)
+ assert pixel_diff.sum() / num_pixels < 0.0001
with mock.patch('queueing_tool.graph.graph_wrapper.HAS_MATPLOTLIB', False):
- with self.assertRaises(ImportError):
- self.g.draw_graph()
+ with pytest.raises(ImportError):
+ queue_network_graph.draw_graph()
kwargs = {'pos': 1}
- self.g.set_pos = mock.MagicMock()
+ queue_network_graph.set_pos = mock.MagicMock()
with mock.patch.dict('sys.modules', matplotlib_mock):
- self.g.draw_graph(**kwargs)
+ queue_network_graph.draw_graph(**kwargs)
- self.g.set_pos.assert_called_once_with(1)
+ queue_network_graph.set_pos.assert_called_once_with(1)
diff --git a/tests/test_queue_server.py b/tests/test_queue_server.py
index 2448b5a..e62dda1 100644
--- a/tests/test_queue_server.py
+++ b/tests/test_queue_server.py
@@ -1,26 +1,34 @@
import functools
-import unittest
import networkx as nx
import numpy as np
+import pytest
import queueing_tool as qt
-class TestQueueServers(unittest.TestCase):
+@pytest.fixture(name='lam')
+def fixture_lam():
+ return float(np.random.randint(1, 10))
- def setUp(self):
- self.lam = np.random.randint(1, 10) + 0.0
- self.rho = np.random.uniform(0.5, 1)
- def test_QueueServer_init_errors(self):
- with self.assertRaises(TypeError):
+@pytest.fixture(name='rho')
+def fixture_rho():
+ return np.random.uniform(0.5, 1)
+
+
+class TestQueueServer(object):
+
+ @staticmethod
+ def test_init_errors():
+ with pytest.raises(TypeError):
qt.QueueServer(num_servers=3.0)
- with self.assertRaises(ValueError):
+ with pytest.raises(ValueError):
qt.QueueServer(num_servers=0)
- def test_QueueServer_set_num_servers(self):
+ @staticmethod
+ def test_set_num_servers():
nSe = np.random.randint(1, 10)
q = qt.QueueServer(num_servers=nSe)
@@ -31,31 +39,34 @@ def test_QueueServer_set_num_servers(self):
Se2 = q.num_servers
q.set_num_servers(np.infty)
- self.assertEqual(Se1, nSe)
- self.assertEqual(Se2, 2 * nSe)
- self.assertTrue(q.num_servers is np.inf)
+ assert Se1 == nSe
+ assert Se2 == 2 * nSe
+ assert q.num_servers is np.inf
- def test_QueueServer_set_num_servers_errors(self):
+ @staticmethod
+ def test_set_num_servers_errors():
q = qt.QueueServer(num_servers=3)
- with self.assertRaises(TypeError):
+ with pytest.raises(TypeError):
q.set_num_servers(3.0)
- with self.assertRaises(ValueError):
+ with pytest.raises(ValueError):
q.set_num_servers(0)
- def test_QueueServer_set_inactive(self):
+ @staticmethod
+ def test_set_inactive():
q = qt.QueueServer()
q.set_active()
- a = q.active
+ was_active = q.active
q.set_inactive()
- self.assertTrue(a)
- self.assertTrue(not q.active)
+ assert was_active
+ assert not q.active
- def test_QueueServer_copy(self):
+ @staticmethod
+ def test_copy():
q1 = qt.QueueServer(seed=15)
q1.set_active()
@@ -65,9 +76,10 @@ def test_QueueServer_copy(self):
t = q1.time
q2.simulate(t=20)
- self.assertTrue(t < q2.time)
+ assert t < q2.time
- def test_QueueServer_active_cap(self):
+ @staticmethod
+ def test_active_cap():
def r(t):
return 2 + np.sin(t)
@@ -78,16 +90,17 @@ def r(t):
q.set_active()
q.simulate(n=3000)
- self.assertEqual(q.num_departures, 1000)
- self.assertEqual(q.num_arrivals, [1000, 1000])
+ assert q.num_departures == 1000
+ assert q.num_arrivals == [1000, 1000]
- def test_QueueServer_accounting(self):
+ @staticmethod
+ def test_accounting(lam, rho):
nSe = np.random.randint(1, 10)
- mu = self.lam / (self.rho * nSe)
+ mu = lam / (rho * nSe)
def arr(t):
- return t + np.random.exponential(1 / self.lam)
+ return t + np.random.exponential(1 / lam)
def ser(t):
return t + np.random.exponential(1 / mu)
@@ -106,22 +119,26 @@ def ser(t):
ans[k, 2] = len(q._departures) - 1 <= q.num_servers
q.simulate(n=1)
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_QueueServer_deactivate(self):
+ @staticmethod
+ def test_deactivate():
q = qt.QueueServer(num_servers=3, deactive_t=10)
q.set_active()
- self.assertTrue(q.active)
+
+ assert q.active
+
q.simulate(t=10)
- self.assertFalse(q.active)
+ assert not q.active
- def test_QueueServer_simulation(self):
+ @staticmethod
+ def test_simulation(lam, rho):
nSe = np.random.randint(1, 10)
- mu = self.lam / (self.rho * nSe)
+ mu = lam / (rho * nSe)
def arr(t):
- return t + np.random.exponential(1 / self.lam)
+ return t + np.random.exponential(1 / lam)
def ser(t):
return t + np.random.exponential(1 / mu)
@@ -153,15 +170,19 @@ def ser(t):
q.simulate(t=t)
ans[3] = q.current_time - t0 >= t
- self.assertTrue(ans.all())
+ assert ans.all()
+
+
+class TestLossQueueServer(object):
- def test_LossQueue_accounting(self):
+ @staticmethod
+ def test_accounting(lam, rho):
nSe = np.random.randint(1, 10)
- mu = self.lam / (self.rho * nSe)
+ mu = lam / (rho * nSe)
def arr(t):
- return t + np.random.exponential(1 / self.lam)
+ return t + np.random.exponential(1 / lam)
def ser(t):
return t + np.random.exponential(1 / mu)
@@ -180,17 +201,18 @@ def ser(t):
ans[k, 2] = len(q._departures) - 1 <= q.num_servers
q.simulate(n=1)
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_LossQueue_blocking(self):
+ @staticmethod
+ def test_blocking(lam, rho):
nSe = np.random.randint(1, 10)
- mu = self.lam / (self.rho * nSe)
+ mu = lam / (rho * nSe)
k = np.random.randint(5, 15)
scl = 1 / (mu * k)
def arr(t):
- return t + np.random.exponential(1 / self.lam)
+ return t + np.random.exponential(1 / lam)
def ser(t):
return t + np.random.gamma(k, scl)
@@ -210,9 +232,13 @@ def ser(t):
else:
q.simulate(n=1)
- self.assertTrue(ans.all())
+ assert ans.all()
- def test_NullQueue_data_collection(self):
+
+class TestNullQueueServer(object):
+
+ @staticmethod
+ def test_data_collection():
adj = {
0: {1: {'edge_type': 1}},
1: {2: {'edge_type': 2},
@@ -226,16 +252,19 @@ def test_NullQueue_data_collection(self):
qn = qt.QueueNetwork(g, q_classes=qcl)
qn.initialize(edges=(0, 1))
qn.start_collecting_data(edge_type=2)
- qn.max_agents = 5000
- qn.simulate(n=10000)
+ qn.max_agents = 500
+ qn.simulate(n=1000)
data = qn.get_queue_data()
# Data collected by NullQueues do not have departure and
# service start times in the data
+ assert not data[:, (1, 2)].any()
+
- self.assertFalse(data[:, (1, 2)].any())
+class TestResourceQueueServer(object):
- def test_ResourceQueue_network(self):
+ @staticmethod
+ def test_network():
g = nx.random_geometric_graph(100, 0.2).to_directed()
q_cls = {1: qt.ResourceQueue, 2: qt.ResourceQueue}
@@ -248,9 +277,10 @@ def test_ResourceQueue_network(self):
nServ = {1: 50, 2: 500}
ans = np.array([q.num_servers != nServ[q.edge[3]] for q in qn.edge2queue])
- self.assertTrue(ans.any())
+ assert ans.any()
- def test_ResourceQueue_network_data_collection(self):
+ @staticmethod
+ def test_network_data_collection():
g = qt.generate_random_graph(100)
q_cls = {1: qt.ResourceQueue, 2: qt.ResourceQueue}
q_arg = {1: {'num_servers': 500},
@@ -264,27 +294,31 @@ def test_ResourceQueue_network_data_collection(self):
qn.simulate(n=5000)
data = qn.get_queue_data()
- self.assertTrue(len(data) > 0)
+ assert data.size > 0
- def test_ResourceQueue_network_current_color(self):
+ @staticmethod
+ def test_network_current_color():
q = qt.ResourceQueue(num_servers=50)
ans = q._current_color(0)
col = q.colors['vertex_fill_color']
col = [i * (0.9 - 1. / 6) / 0.9 for i in col]
col[3] = 1.0
- self.assertEqual(ans, col)
+ assert ans == col
ans = q._current_color(1)
col = q.colors['edge_loop_color']
col = [i * (0.9 - 1. / 6) / 0.9 for i in col]
col[3] = 0
- self.assertEqual(ans, col)
+ assert ans == col
ans = q._current_color(2)
col = q.colors['vertex_pen_color']
- self.assertEqual(ans, col)
+ assert ans == col
- def test_InfoQueue_network(self):
+
+class TestInfoQueueServer(object):
+ @staticmethod
+ def test_network():
g = nx.random_geometric_graph(100, 0.2).to_directed()
q_cls = {1: qt.InfoQueue}
@@ -296,18 +330,22 @@ def test_InfoQueue_network(self):
qn.simulate(n=2000)
# Finish this
- self.assertTrue(True)
+ assert True
+
+
+class TestAgents(object):
- def test_Agent_compare(self):
+ @staticmethod
+ def test_compare():
a0 = qt.Agent()
a1 = qt.Agent()
- self.assertEqual(a0, a1)
+ assert a0 == a1
a1._time = 10
- self.assertLessEqual(a0, a1)
- self.assertLess(a0, a1)
+ assert a0 <= a1
+ assert a0 < a1
a0._time = 20
- self.assertGreaterEqual(a0, a1)
- self.assertGreater(a0, a1)
+ assert a0 >= a1
+ assert a0 > a1
diff --git a/tests/test_statistical_properties.py b/tests/test_statistical_properties.py
index f8c979b..bec1502 100644
--- a/tests/test_statistical_properties.py
+++ b/tests/test_statistical_properties.py
@@ -1,42 +1,44 @@
import functools
import math
-import os
-import unittest
import numpy as np
+import pytest
import queueing_tool as qt
-TRAVIS_TEST = os.environ.get('TRAVIS_TEST', False)
-
-
def empirical_cdf0(x, z, n):
return np.sum(z <= x) / n
+
empirical_cdf = np.vectorize(empirical_cdf0, excluded={1, 2})
def chi2_cdf(q, k, n=1000000, ns=1):
return np.mean([empirical_cdf(q, np.random.chisquare(k, n), n) for i in range(ns)])
-reason = "Test takes long."
+@pytest.fixture(name='lam')
+def fixture_lam():
+ return float(np.random.randint(1, 10))
-class TestQueueServers(unittest.TestCase):
- def setUp(self):
- self.lam = np.random.randint(1, 10) + 0.0
- self.rho = np.random.uniform(0.5, 1)
+@pytest.fixture(name='rho')
+def fixture_rho():
+ return np.random.uniform(0.5, 1)
- @unittest.skipIf(TRAVIS_TEST, reason)
- def test_Markovian_QueueServer(self):
+
+@pytest.mark.slow
+class TestQueueServerStatistically(object):
+
+ @staticmethod
+ def test_markovian_property(lam, rho): # pylint: disable=R0914
nSe = np.random.randint(1, 10)
- mu = self.lam / (self.rho * nSe)
+ mu = lam / (rho * nSe)
def arr(t):
- return t + np.random.exponential(1 / self.lam)
+ return t + np.random.exponential(1 / lam)
def ser(t):
return t + np.random.exponential(1 / mu)
@@ -70,23 +72,23 @@ def ser(t):
x, y = dep[1:], dep[:-1]
cc = np.corrcoef(x, y)[0, 1]
- self.assertAlmostEqual(cc, 0, 1)
- self.assertGreater(p1, 0.05)
+ assert np.isclose(cc, 0, atol=1e-1)
+ assert p1 > 0.05
- @unittest.skipIf(TRAVIS_TEST, reason)
- def test_QueueServer_Littleslaw(self):
+ @staticmethod
+ def test_littles_law(lam, rho):
nSe = np.random.randint(1, 10)
- mu = self.lam / (self.rho * nSe)
+ mu = lam / (rho * nSe)
def arr(t):
- return t + np.random.exponential(1 / self.lam)
+ return t + np.random.exponential(1 / lam)
def ser(t):
return t + np.random.exponential(1 / mu)
q = qt.QueueServer(num_servers=nSe, arrival_f=arr, service_f=ser)
- n = 500000
+ n = 250000
q.set_active()
q.simulate(n=n) # Burn in period
@@ -97,20 +99,20 @@ def ser(t):
ind = data[:, 2] > 0
wait = data[ind, 1] - data[ind, 0]
- ans = np.mean(wait) * self.lam - np.mean(data[:, 3]) * self.rho
+ ans = np.mean(wait) * lam - np.mean(data[:, 3]) * rho
- self.assertAlmostEqual(ans, 0, 1)
+ assert np.isclose(ans, 0, atol=1e-1)
- @unittest.skipIf(TRAVIS_TEST, reason)
- def test_LossQueue_blocking(self):
+ @staticmethod
+ def test_queue_blocking(lam, rho): # pylint: disable=R0914
nSe = np.random.randint(1, 10)
- mu = self.lam / (self.rho * nSe)
+ mu = lam / (rho * nSe)
k = np.random.randint(5, 15)
scl = 1 / (mu * k)
def arr(t):
- return t + np.random.exponential(1 / self.lam)
+ return t + np.random.exponential(1 / lam)
def ser(t):
return t + np.random.gamma(k, scl)
@@ -127,62 +129,61 @@ def ser(t):
nA1 = q2.num_arrivals[1]
nB1 = q2.num_blocked
- a = self.lam / mu
+ a = lam / mu
f = np.array([math.factorial(j) for j in range(nSe + 1)])
pois_pmf = np.exp(-a) * a**nSe / math.factorial(nSe)
pois_cdf = np.sum(np.exp(-a) * a**np.arange(nSe + 1) / f)
p_block = (nB1 - nB0 + 0.0) / (nA1 - nA0)
- self.assertAlmostEqual(pois_pmf / pois_cdf, p_block, 2)
-
-
-class TestRandomMeasure(unittest.TestCase):
-
- @unittest.skipIf(TRAVIS_TEST, reason)
- def test_poisson_random_measure(self):
- # This function tests to make sure the poisson_random_measure function
- # actually simulates a Poisson random measure. It does so looking for
- # Poisson random variables using a chi-squared test (testing the
- # composite null hypothesis). It does not look for independence of the
- # random variables.
- # This test should fail some percentage of the time
-
- def rate(t):
- return 0.5 + 4 * np.sin(np.pi * t / 12)**2
-
- arr_f = functools.partial(qt.poisson_random_measure, rate=rate, rate_max=4.5)
-
- nSamp = 15000
- nArr = 1000
- arrival_times = np.zeros((nSamp, nArr))
- for k in range(nSamp):
- t = 0
- for j in range(nArr):
- t = arr_f(t)
- arrival_times[k, j] = t
- if t > 12:
- break
-
- mu1 = 5 * np.sum(rate(np.linspace(3, 8, 200))) / 200 # or 2*(5 + (sqrt(3) + 2) * 3/pi) + 2.5
- mu2 = 4 * np.sum(rate(np.linspace(8, 12, 200))) / 200 # or 2*(4 - 3*sqrt(3)/pi) + 2
- mus = [mu1, mu2]
-
- rv1 = np.sum(np.logical_and(3 < arrival_times, arrival_times < 8), axis=1)
- rv2 = np.sum(np.logical_and(8 < arrival_times, arrival_times < 12), axis=1)
- rvs = [rv1, rv2]
- df = [max(rv1) + 2, max(rv2) + 2]
-
- Q = np.zeros((max(df), len(rvs)))
-
- for i, sample in enumerate(rvs):
- for k in range(df[i] - 1):
- pi_hat = nSamp * np.exp(-mus[i]) * mus[i]**k / math.factorial(k)
- Q[k, i] = (np.sum(sample == k) - pi_hat)**2 / pi_hat
-
- ans = np.array([math.factorial(j) for j in range(k + 1)])
- pois_cdf = np.sum(np.exp(-mus[i]) * mus[i]**np.arange(k + 1) / ans)
- Q[k + 1, i] = nSamp * (1 - pois_cdf)
-
- Qs = np.sum(Q, axis=0)
- p = np.array([1 - chi2_cdf(Qs[i], df[i] - 2) for i in range(len(rvs))])
- self.assertTrue((p > 0.1).any())
+
+ assert np.isclose(pois_pmf / pois_cdf, p_block, atol=1e-2)
+
+
+@pytest.mark.slow
+def test_poisson_random_measure(): # pylint: disable=R0914
+ # This function tests to make sure the poisson_random_measure function
+ # actually simulates a Poisson random measure. It does so looking for
+ # Poisson random variables using a chi-squared test (testing the
+ # composite null hypothesis). It does not look for independence of the
+ # random variables.
+ # This test should fail some percentage of the time
+
+ def rate(t):
+ return 0.5 + 4 * np.sin(np.pi * t / 12)**2
+
+ arr_f = functools.partial(qt.poisson_random_measure, rate=rate, rate_max=4.5)
+
+ nSamp = 15000
+ nArr = 1000
+ arrival_times = np.zeros((nSamp, nArr))
+ for k in range(nSamp):
+ t = 0
+ for j in range(nArr):
+ t = arr_f(t)
+ arrival_times[k, j] = t
+ if t > 12:
+ break
+
+ mu1 = 5 * np.sum(rate(np.linspace(3, 8, 200))) / 200 # or 2*(5 + (sqrt(3) + 2) * 3 / pi) + 2.5
+ mu2 = 4 * np.sum(rate(np.linspace(8, 12, 200))) / 200 # or 2*(4 - 3*sqrt(3)/pi) + 2
+ mus = [mu1, mu2]
+
+ rv1 = np.sum(np.logical_and(arrival_times > 3, arrival_times < 8), axis=1)
+ rv2 = np.sum(np.logical_and(arrival_times > 8, arrival_times < 12), axis=1)
+ rvs = [rv1, rv2]
+ df = [max(rv1) + 2, max(rv2) + 2]
+
+ Q = np.zeros((max(df), len(rvs)))
+
+ for i, sample in enumerate(rvs):
+ for k in range(df[i] - 1):
+ pi_hat = nSamp * np.exp(-mus[i]) * mus[i]**k / math.factorial(k)
+ Q[k, i] = (np.sum(sample == k) - pi_hat)**2 / pi_hat
+
+ ans = np.array([math.factorial(j) for j in range(k + 1)])
+ pois_cdf = np.sum(np.exp(-mus[i]) * mus[i]**np.arange(k + 1) / ans)
+ Q[k + 1, i] = nSamp * (1 - pois_cdf)
+
+ Qs = np.sum(Q, axis=0)
+ p = np.array([1 - chi2_cdf(Qs[i], df[i] - 2) for i in range(len(rvs))])
+ assert (p > 0.1).any()