diff --git a/ciw/dists/distributions.py b/ciw/dists/distributions.py index 26713d06..c6c13e2d 100644 --- a/ciw/dists/distributions.py +++ b/ciw/dists/distributions.py @@ -1,6 +1,9 @@ -'''Distributions available in Ciw.''' +'''Distributions available in Ciw.''' import copy +import math +import random +from math import sqrt, exp, pi, erf from itertools import cycle from operator import add, mul, sub, truediv from random import ( @@ -10,10 +13,11 @@ gammavariate, lognormvariate, weibullvariate, -) +) from typing import List, NoReturn import numpy as np +import statistics as st from ciw.auxiliary import * from ciw.individual import Individual @@ -22,7 +26,6 @@ class Distribution(object): """ A general distribution from which all other distirbutions will inherit. """ - def __repr__(self): return "Distribution" @@ -62,6 +65,36 @@ def __truediv__(self, dist): Divide two distributions such that sampling is the ratio of the samples. """ return CombinedDistribution(self, dist, truediv) + + @property + def mean(self): + """default mean of a distribution""" + return float('nan') + + @property + def variance(self): + """default variance of a distribution""" + return float('nan') + + @property + def sd(self): + """default standard deviation of a distribution""" + return math.sqrt(self.variance) + + @property + def median(self): + """default median of a distribution""" + return float('nan') + + @property + def upper_limit(self): + """default upper range of a distribution""" + return float('nan') + + @property + def lower_limit(self): + """default lower range of a distribution""" + return float('nan') class CombinedDistribution(Distribution): @@ -69,7 +102,6 @@ class CombinedDistribution(Distribution): A distribution that combines the samples of two other distributions, `dist1` and `dist2`, using `operator`. """ - def __init__(self, dist1, dist2, operator): self.d1 = copy.deepcopy(dist1) self.d2 = copy.deepcopy(dist2) @@ -83,6 +115,53 @@ def sample(self, t=None, ind=None): s2 = self.d2.sample(t, ind) return self.operator(s1, s2) + @property + def mean(self): + m1 = self.d1.mean + m2 = self.d2.mean + if self.operator == add: + return m1 + m2 + elif self.operator == sub: + return m1 - m2 + elif self.operator == mul: + return m1 * m2 + elif self.operator == truediv: + if m2 == 0: + return float('nan') + return m1 / m2 # delta-method mean + + @property + def variance(self): + m1 = self.d1.mean + m2 = self.d2.mean + v1 = self.d1.variance + v2 = self.d2.variance + + if self.operator in (add, sub): + # Var(A + B) = Var(A) + Var(B), assuming independence + # Var(A - B) = Var(A) + Var(B), assuming independence + return v1 + v2 + elif self.operator == mul: + # Var(A * B) = v1*v2 + m2^2*v1 + m1^2*v2, assuming independence + return v1 * v2 + (m2 ** 2) * v1 + (m1 ** 2) * v2 + elif self.operator == truediv: + # delta-method approximation for Var(A/B) + if m2 == 0: + return float('nan') + return (v1 / (m2 ** 2)) + ((m1 ** 2) * v2) / (m2 ** 4) + + @property + def upper_limit(self): + if self.operator == add: + return self.d1.upper_limit + self.d2.upper_limit + return float('nan') + + @property + def lower_limit(self): + if self.operator == add: + return self.d1.lower_limit + self.d2.lower_limit + return float('nan') + class Uniform(Distribution): """ @@ -92,7 +171,6 @@ class Uniform(Distribution): - `lower` the lower bound - `upper` the upper bound """ - def __init__(self, lower, upper): if lower < 0.0 or upper < 0.0: raise ValueError("Uniform distribution must sample positive numbers only.") @@ -109,6 +187,28 @@ def __repr__(self): def sample(self, t=None, ind=None): return uniform(self.lower, self.upper) + @property + def mean(self): + """Returns the mean of the Uniform distribution.""" + return (self.lower + self.upper) / 2 + + @property + def variance(self): + """Returns the variance of the Uniform distribution.""" + return ((self.upper - self.lower) ** 2) / 12 + + @property + def median(self): + return (self.lower + self.upper) / 2 + + @property + def upper_limit(self): + return self.upper + + @property + def lower_limit(self): + return self.lower + class Deterministic(Distribution): """ @@ -117,7 +217,6 @@ class Deterministic(Distribution): Takes: - `value` the value to return """ - def __init__(self, value): if value < 0.0: raise ValueError( @@ -131,6 +230,28 @@ def __repr__(self): def sample(self, t=None, ind=None): return self.value + @property + def mean(self): + """Returns the mean of the Deterministic distribution.""" + return self.value + + @property + def variance(self): + """Variance of a Deterministic distribution is always 0.""" + return 0.0 + + @property + def median(self): + return self.value + + @property + def upper_limit(self): + return self.value + + @property + def lower_limit(self): + return self.value + class Triangular(Distribution): """ @@ -139,9 +260,8 @@ class Triangular(Distribution): Takes: - `lower` the lower bound - `upper` the upper bound - - `mode` the modal value + - `mode` the modal value """ - def __init__(self, lower, mode, upper): if lower < 0.0 or upper < 0.0 or mode < 0.0: raise ValueError( @@ -161,6 +281,34 @@ def __repr__(self): def sample(self, t=None, ind=None): return triangular(self.lower, self.upper, self.mode) + @property + def mean(self): + """Returns the mean of the Triangular distribution.""" + return (self.lower + self.mode + self.upper) / 3 + + @property + def variance(self): + """Returns the variance of the Triangular distribution.""" + a, b, c = self.lower, self.upper, self.mode + return (a**2 + b**2 + c**2 - a*b - a*c - b*c) / 18 + + @property + def median(self): + # True median of a Triangular(a, c, b) + a, c, b = self.lower, self.mode, self.upper + mid = (a + b) / 2 + if c >= mid: + return a + (((b - a) * (c - a) / 2) ** 0.5) + return b - (((b - a) * (b - c) / 2) ** 0.5) + + @property + def upper_limit(self): + return self.upper + + @property + def lower_limit(self): + return self.lower + class Exponential(Distribution): """ @@ -169,7 +317,6 @@ class Exponential(Distribution): Takes: - `rate` the rate parameter, lambda """ - def __init__(self, rate): if rate <= 0.0: raise ValueError( @@ -183,6 +330,29 @@ def __repr__(self): def sample(self, t=None, ind=None): return expovariate(self.rate) + @property + def mean(self): + """Returns the mean of the Exponential distribution.""" + return 1 / self.rate + + @property + def variance(self): + """Returns the variance of the Exponential distribution.""" + return 1 / (self.rate ** 2) + + + @property + def median(self): + return math.log(2) / self.rate + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 + class Gamma(Distribution): """ @@ -192,7 +362,6 @@ class Gamma(Distribution): - `shape` the shape parameter, alpha - `scale` the scale parameter, beta """ - def __init__(self, shape, scale): self.shape = shape self.scale = scale @@ -203,25 +372,73 @@ def __repr__(self): def sample(self, t=None, ind=None): return gammavariate(self.shape, self.scale) + @property + def mean(self): + """Returns the mean of the Gamma distribution.""" + return self.shape * self.scale + + @property + def variance(self): + """Returns the variance of the Gamma distribution.""" + return self.shape * (self.scale ** 2) + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 + class Normal(Distribution): """ - The Truncated Normal distribution. + Truncated Normal distribution (truncated below at 0). - Takes: - - `mean` the mean of the Normal, mu - - `sd` the standard deviation of the Normal, sigma + Parameters: + mean (float): Mean of the original normal distribution. + sd (float): Standard deviation of the original normal distribution. """ - def __init__(self, mean, sd): - self.mean = mean - self.sd = sd + self._mean = mean + self._sd = sd def __repr__(self): - return f"Normal(mean={self.mean}, sd={self.sd})" + return f"Normal(mean={self._mean}, sd={self._sd})" def sample(self, t=None, ind=None): - return truncated_normal(self.mean, self.sd) + return truncated_normal(self._mean, self._sd) + + @property + def mean(self): + z = self._mean / self._sd + phi = (1 / sqrt(2 * pi)) * exp(-0.5 * z ** 2) + Phi = 0.5 * (1 + erf(z / sqrt(2))) + return self._mean + self._sd * (phi / Phi) + + @property + def variance(self): + z = self._mean / self._sd + phi = (1 / sqrt(2 * pi)) * exp(-0.5 * z ** 2) + Phi = 0.5 * (1 + erf(z / sqrt(2))) + term = phi / Phi + return (self._sd ** 2) * (1 - z * term - (term ** 2)) + + @property + def median(self): + # Truncated below at 0 + z = self._mean / self._sd + Norm = st.NormalDist(0, 1) + target = 1.0 - 0.5 * Norm.cdf(z) + return self._mean + self._sd * Norm.inv_cdf(target) + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 class Lognormal(Distribution): @@ -230,18 +447,37 @@ class Lognormal(Distribution): Takes: - `mean` the mean of the Normal, mu - - `sd` the standard deviation of the Normal, sigma + - `sd` the standard deviation of the Normal, sigma """ - def __init__(self, mean, sd): - self.mean = mean - self.sd = sd + self._mean = mean + self._sd = sd def __repr__(self): - return f"Lognormal(mean={self.mean}, sd={self.sd})" + return f"Lognormal(mean={self._mean}, sd={self._sd})" def sample(self, t=None, ind=None): - return lognormvariate(self.mean, self.sd) + return lognormvariate(self._mean, self._sd) + + @property + def mean(self): + return math.exp(self._mean + (self._sd ** 2) / 2) + + @property + def variance(self): + return (math.exp(self._sd ** 2) - 1) * math.exp(2 * self._mean + self._sd ** 2) + + @property + def median(self): + return math.exp(self._mean) + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 class Weibull(Distribution): @@ -252,7 +488,6 @@ class Weibull(Distribution): - `scale` the scale parameter, alpha - `shape` the shape parameter, beta """ - def __init__(self, scale, shape): self.scale = scale self.shape = shape @@ -263,6 +498,30 @@ def __repr__(self): def sample(self, t=None, ind=None): return weibullvariate(self.scale, self.shape) + @property + def mean(self): + """Returns the mean of the Weibull distribution.""" + return self.scale * math.gamma(1 + 1 / self.shape) + + @property + def variance(self): + """Returns the variance of the Weibull distribution.""" + g1 = math.gamma(1 + 1 / self.shape) + g2 = math.gamma(1 + 2 / self.shape) + return (self.scale ** 2) * (g2 - (g1 ** 2)) + + @property + def median(self): + return self.scale * (math.log(2) ** (1 / self.shape)) + + @property + def upper_limit(self): + return float ('inf') + + @property + def lower_limit(self): + return 0 + class Empirical(Distribution): """ @@ -271,7 +530,6 @@ class Empirical(Distribution): Takes: - `observations` the observations from which to sample """ - def __init__(self, observations): if any(o < 0 for o in observations): raise ValueError( @@ -285,6 +543,38 @@ def __repr__(self): def sample(self, t=None, ind=None): return random_choice(self.observations) + @property + def mean(self): + """Returns the mean of the Empirical distribution.""" + return sum(self.observations) / len(self.observations) + + @property + def variance(self): + """Returns the variance of the Empirical distribution.""" + m = self.mean + return sum((x - m) ** 2 for x in self.observations) / len(self.observations) + + @property + def median(self): + # standard sample-median: + # - odd n: middle element + # - even n: average of the two middle elements + xs = sorted(self.observations) + n = len(xs) + mid = n // 2 + if n % 2 == 1: + return xs[mid] + else: + return (xs[mid - 1] + xs[mid]) / 2 + + @property + def upper_limit(self): + return max(self.observations) + + @property + def lower_limit(self): + return min(self.observations) + class Sequential(Distribution): """ @@ -293,7 +583,6 @@ class Sequential(Distribution): Takes: - `sequence` the sequence to cycle through """ - def __init__(self, sequence): if any(o < 0 for o in sequence): raise ValueError( @@ -311,6 +600,38 @@ def __repr__(self): def sample(self, t=None, ind=None): return next(self.generator) + @property + def mean(self): + """Returns the mean of the Sequential distribution.""" + return sum(self.sequence) / len(self.sequence) + + @property + def variance(self): + """Returns the variance of the Sequential distribution.""" + m = self.mean + return sum((x - m) ** 2 for x in self.sequence) / len(self.sequence) + + @property + def median(self): + # sample median of the fixed sequence + # - odd n: middle element + # - even n: average of the two middle elements + xs = sorted(self.sequence) + n = len(xs) + mid = n // 2 + if n % 2 == 1: + return xs[mid] + else: + return (xs[mid - 1] + xs[mid]) / 2 + + @property + def upper_limit(self): + return max(self.sequence) + + @property + def lower_limit(self): + return min(self.sequence) + class Pmf(Distribution): """ @@ -318,9 +639,8 @@ class Pmf(Distribution): Takes: - `values` the values to sample - - `probs` the associated probabilities + - `probs` the associated probabilities """ - def __init__(self, values, probs): if any(o < 0 for o in values): raise ValueError("Pmf must sample positive numbers only.") @@ -337,73 +657,130 @@ def __repr__(self): def sample(self, t=None, ind=None): return random_choice(self.values, self.probs) - -class PhaseType(Distribution): + @property + def mean(self): + """Returns the mean of the PMF distribution.""" + return sum(v * p for v, p in zip(self.values, self.probs)) + + @property + def variance(self): + """Returns the variance of the PMF distribution.""" + m = self.mean + return sum(p * (v - m) ** 2 for v, p in zip(self.values, self.probs)) + + @property + def median(self): + pairs = sorted(zip(self.values, self.probs), key=lambda t: t[0]) + cum = 0.0 + for v, p in pairs: + cum += p + if cum >= 0.5: + return v + return pairs[-1][0] + + @property + def upper_limit(self): + return max(self.values) + + @property + def lower_limit(self): + return min(self.values) + + +class PhaseType(Distribution): """ A distribution defined by an initial vector and an absorbing Markov chain Takes: - - `initial_state` the intial probabilities of being in each state + - `initial_state` the intial probabilities of being in each state - `absorbing_matrix` the martix representation of the absorbing Markov chain, with the final state the absorbing state """ - def __init__(self, initial_state, absorbing_matrix): - if any(p < 0 for p in initial_state): - raise ValueError("Initial state vector must have positive probabilities.") - if any(p > 1.0 for p in initial_state): - raise ValueError("Initial state vector probabilities must be no more than 1.0.") - if sum(initial_state) > 1.0 + 10 ** (-10) or sum(initial_state) < 1.0 - 10 ** ( - -10 - ): - raise ValueError("Initial state vector probabilities must sum to 1.0.") - if any(len(absorbing_matrix) != len(row) for row in absorbing_matrix): - raise ValueError("Matrix of the absorbing Markov chain must be square.") - if len(initial_state) != len(absorbing_matrix): + def __init__(self, initial_state, absorbing_matrix): + if any(p < 0 for p in initial_state): + raise ValueError("Initial state vector must have positive probabilities.") + if any(p > 1.0 for p in initial_state): + raise ValueError("Initial state vector probabilities must be no more than 1.0.") + if sum(initial_state) > 1.0 + 10 ** (-10) or sum(initial_state) < 1.0 - 10 ** (-10): + raise ValueError("Initial state vector probabilities must sum to 1.0.") + if any(len(absorbing_matrix) != len(row) for row in absorbing_matrix): + raise ValueError("Matrix of the absorbing Markov chain must be square.") + if len(initial_state) != len(absorbing_matrix): raise ValueError( "Initial state vector must have same number of states as absorbing Markov chain matrix." - ) - if any( - row[j] < 0 - for i, row in enumerate(absorbing_matrix) - for j in range(len(absorbing_matrix)) - if i != j - ): - raise ValueError("Transition rates must be positive.") - if not all( - -(10 ** (-10)) < sum(row) < 10 ** (-10) - for i, row in enumerate(absorbing_matrix) - ): - raise ValueError("Matrix rows must sum to 0.") - if not all(r == 0 for r in absorbing_matrix[-1]): - raise ValueError("Final state must be the absorbing state.") - if not any(row[-1] > 0 for row in absorbing_matrix): - raise ValueError("Must be possible to reach the absorbing state.") - self.initial_state = initial_state - self.states = tuple(range(len(initial_state))) - self.absorbing_matrix = absorbing_matrix - - def __repr__(self): - return "PhaseType" - - def sample_transition(self, rate): - if rate <= 0.0: - return float("Inf") - return expovariate(rate) - - def sample(self, t=None, ind=None): - cumulative_time = 0 - current_state = random_choice(self.states, probs=self.initial_state) - while current_state != self.states[-1]: - potential_transitions = [ - self.sample_transition(r) for r in self.absorbing_matrix[current_state] - ] - time, idx = min( - (time, idx) for (idx, time) in enumerate(potential_transitions) - ) - cumulative_time += time - current_state = idx - return cumulative_time + ) + if any( + row[j] < 0 + for i, row in enumerate(absorbing_matrix) + for j in range(len(absorbing_matrix)) + if i != j + ): + raise ValueError("Transition rates must be positive.") + if not all( + -(10 ** (-10)) < sum(row) < 10 ** (-10) + for i, row in enumerate(absorbing_matrix) + ): + raise ValueError("Matrix rows must sum to 0.") + if not all(r == 0 for r in absorbing_matrix[-1]): + raise ValueError("Final state must be the absorbing state.") + if not any(row[-1] > 0 for row in absorbing_matrix): + raise ValueError("Must be possible to reach the absorbing state.") + self.initial_state = initial_state + self.states = tuple(range(len(initial_state))) + self.absorbing_matrix = absorbing_matrix + + def __repr__(self): + return "PhaseType" + + def sample_transition(self, rate): + if rate <= 0.0: + return float("Inf") + return expovariate(rate) + + def sample(self, t=None, ind=None): + cumulative_time = 0 + current_state = random_choice(self.states, probs=self.initial_state) + while current_state != self.states[-1]: + potential_transitions = [ + self.sample_transition(r) for r in self.absorbing_matrix[current_state] + ] + time, idx = min( + (time, idx) for (idx, time) in enumerate(potential_transitions) + ) + cumulative_time += time + current_state = idx + return cumulative_time + + @property + def mean(self): + Q = np.array(self.absorbing_matrix)[:-1, :-1] + alpha = np.array(self.initial_state[:-1]) + ones = np.ones(len(Q)) + return float(alpha @ np.linalg.inv(-Q) @ ones) + + @property + def variance(self): + Q = np.array(self.absorbing_matrix)[:-1, :-1] + alpha = np.array(self.initial_state)[:-1] + ones = np.ones(len(Q)) + invQ = np.linalg.inv(-Q) + + # E[T] and E[T^2] + mean = float(alpha @ invQ @ ones) + second_moment = float(2 * alpha @ invQ @ invQ @ ones) + + # Var(T) = E[T^2] - (E[T])^2 (with tinynegative clamp) + v = second_moment - (mean ** 2) + return v + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 class Erlang(PhaseType): @@ -414,9 +791,8 @@ class Erlang(PhaseType): - `rate` the rate spent in each phase - `num_phases` the number of phases in series """ - def __init__(self, rate, num_phases): - if rate <= 0.0: + if rate <= 0.0: raise ValueError("Rate must be positive.") if num_phases < 1: raise ValueError("At least one phase is required.") @@ -432,6 +808,14 @@ def __init__(self, rate, num_phases): def __repr__(self): return f"Erlang(rate={self.rate}, k={self.num_phases})" + @property + def mean(self): + return self.num_phases / self.rate + + @property + def variance(self): + return self.num_phases / (self.rate ** 2) + class HyperExponential(PhaseType): """ @@ -441,10 +825,11 @@ class HyperExponential(PhaseType): - `rates` a vector of rates for each phase - `probs` a probability vector for starting in each phase """ - def __init__(self, rates, probs): if any(r <= 0.0 for r in rates): raise ValueError("Rates must be positive.") + self.rates = rates + self.probs = probs initial_state = probs + [0] num_phases = len(probs) absorbing_matrix = [[0] * (num_phases + 1) for _ in range(num_phases + 1)] @@ -456,6 +841,15 @@ def __init__(self, rates, probs): def __repr__(self): return "HyperExponential" + @property + def mean(self): + return sum(p / r for p, r in zip(self.probs, self.rates)) + + @property + def variance(self): + mean = self.mean + return sum(2 * p / (r ** 2) for p, r in zip(self.probs, self.rates)) - (mean ** 2) + class HyperErlang(PhaseType): """ @@ -466,7 +860,6 @@ class HyperErlang(PhaseType): - `probs` a probability vector for starting in each phase - `phase_lengths` the number of sub-phases in each phase """ - def __init__(self, rates, probs, phase_lengths): if any(r <= 0.0 for r in rates): raise ValueError("Rates must be positive.") @@ -488,12 +881,32 @@ def __init__(self, rates, probs, phase_lengths): absorbing_matrix[offset + subphase][offset + subphase + 1] = r else: absorbing_matrix[offset + subphase][-1] = r + self.rates = rates + self.probs = probs + self.phase_lengths = phase_lengths super().__init__(initial_state, absorbing_matrix) def __repr__(self): return "HyperErlang" + @property + def mean(self): + return sum( + p * k / r for p, r, k in zip(self.probs, self.rates, self.phase_lengths) + ) + + @property + def variance(self): + mean = self.mean # ∑ p * k / r + # second moment for Erlang(k, r) is k*(k+1)/r^2 + second_moment = sum( + p * (k * (k + 1)) / (r ** 2) + for p, r, k in zip(self.probs, self.rates, self.phase_lengths) + ) + v = second_moment - (mean ** 2) + return v + class Coxian(PhaseType): """ @@ -503,7 +916,6 @@ class Coxian(PhaseType): - `rates` a vector of rates for each phase - `probs` a vector of the probability of absorption at each phase """ - def __init__(self, rates, probs): if any(r <= 0.0 for r in rates): raise ValueError("Rates must be positive.") @@ -542,7 +954,6 @@ class PoissonIntervals(Sequential): inter-arrival times, and use Ciw's Sequential distribution to output them. """ - def __init__(self, rates, endpoints, max_sample_date): if any(r < 0.0 for r in rates): raise ValueError("All rates must be positive.") @@ -599,8 +1010,39 @@ def get_dates(self): ] interval_dates = sorted(interval_dates) self.dates += interval_dates - - + + @property + def overall_rate(self): + deltas = [self.endpoints[0]] + [ + self.endpoints[i] - self.endpoints[i - 1] + for i in range(1, len(self.endpoints)) + ] + P = self.endpoints[-1] + LambdaP = sum(r * d for r, d in zip(self.rates, deltas)) + return LambdaP / P + + @property + def mean(self): + O_R = self.overall_rate + return 1 / O_R if O_R > 0 else float('inf') + + @property + def variance(self): + return float('nan') + + @property + def median(self): + return float('nan') + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 + + class Poisson(Distribution): """ The Poisson distribution. @@ -609,7 +1051,6 @@ class Poisson(Distribution): Takes: - `rate` the rate parameter, lambda """ - def __init__(self, rate): if rate <= 0.0: raise ValueError("Poisson distribution must sample positive numbers only.") @@ -621,6 +1062,26 @@ def sample(self, t=None, ind=None): def __repr__(self): return f"Poisson(rate={self.rate})" + @property + def mean(self): + return self.rate + + @property + def variance(self): + return self.rate + + @property + def median(self): + """this is a well known approximation of the median of a Poisson distribution""" + return math.floor(self.rate + (1 / 3) - (0.02 / self.rate)) + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 class Geometric(Distribution): """ @@ -630,7 +1091,6 @@ class Geometric(Distribution): Takes: - `prob` the probability parameter """ - def __init__(self, prob): if prob <= 0.0 or prob >= 1: raise ValueError( @@ -644,6 +1104,26 @@ def sample(self, t=None, ind=None): def __repr__(self): return f"Geometric(prob={self.prob})" + @property + def mean(self): + return 1 / self.prob + + @property + def variance(self): + return (1 - self.prob) / (self.prob ** 2) + + @property + def median(self): + return math.ceil(-1 / math.log(1 - self.prob, 2)) + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 + class Binomial(Distribution): """ @@ -651,10 +1131,9 @@ class Binomial(Distribution): Note that this is a discrete integer distribution, for use with Batching. Takes: - - `n` the parameter representing the total number of experiments + - `n` the total number of experiments - `prob` the probability parameter """ - def __init__(self, n, prob): if prob <= 0.0 or prob >= 1: raise ValueError( @@ -673,6 +1152,26 @@ def sample(self, t=None, ind=None): def __repr__(self): return f"Binomial(n={self.n}, prob={self.prob})" + @property + def mean(self): + return self.n * self.prob + + @property + def variance(self): + return self.n * self.prob * (1 - self.prob) + + @property + def median(self): + return round(self.n * self.prob) + + @property + def upper_limit(self): + return float('inf') + + @property + def lower_limit(self): + return 0 + class MixtureDistribution(Distribution): """ @@ -697,13 +1196,7 @@ class MixtureDistribution(Distribution): ------- sample(t: float, ind: Individual = None) -> float: Generate a random sample from the mixture distribution. - - Notes - ----- - The weights in `probs` should sum to 1, indicating the relative importance of each distribution - in the mixture. The distributions in `dists` should be instances of `ciw.dists.Distribution`. """ - def __init__(self, dists: List[Distribution], probs: List[float]) -> NoReturn: """ Initialize the MixtureDistribution. @@ -722,25 +1215,40 @@ def __init__(self, dists: List[Distribution], probs: List[float]) -> NoReturn: def sample(self, t: float = None, ind: Individual = None) -> float: """ Generate a random sample from the mixture distribution. - - Parameters - ---------- - t : float - The time parameter for the sample generation. - inds : Individual, optional - The individual to sample a time for, if applicable. - - Returns - ------- - float - A random sample from the mixture distribution. """ chosen_dist = random.choices( population=self.dists, weights=self.probs, - k=1)[0] + k=1 + )[0] return chosen_dist.sample(t, ind) - + def __repr__(self): return "MixtureDistribution" + + @property + def mean(self): + return sum( + p * dist.mean for p, dist in zip(self.probs, self.dists) + ) + + @property + def variance(self): + mu = self.mean + var = sum( + p * (dist.variance + dist.mean ** 2) for p, dist in zip(self.probs, self.dists) + ) - mu ** 2 + return max(var, 0) + + @property + def upper_limit(self): + if any(math.isnan(dist.upper_limit) for dist in self.dists): + return float('nan') + return max(dist.upper_limit for dist in self.dists) + + @property + def lower_limit(self): + if any(math.isnan(dist.lower_limit) for dist in self.dists): + return float('nan') + return min(dist.lower_limit for dist in self.dists) diff --git a/docs/Guides/Distributions/index.rst b/docs/Guides/Distributions/index.rst index 6375c0b2..8f988e09 100644 --- a/docs/Guides/Distributions/index.rst +++ b/docs/Guides/Distributions/index.rst @@ -9,4 +9,5 @@ Contents: set_distributions.rst phasetype.rst combining_distributions.rst + summary_stats.rst time_dependent.rst diff --git a/docs/Guides/Distributions/summary_stats.rst b/docs/Guides/Distributions/summary_stats.rst new file mode 100644 index 00000000..eb9df9d2 --- /dev/null +++ b/docs/Guides/Distributions/summary_stats.rst @@ -0,0 +1,75 @@ +.. _dist-stats: + +=============================================== +How to Access Distributions' Summary Statistics +=============================================== + +Every distribution object in Ciw has attributes giving summary statisics for the distributions. These are: + ++ Mean ++ Median ++ Variance ++ Standard deviation ++ Upper limit ++ Lower limit + +They can be accessed as follows:: + + >>> import ciw + >>> D = ciw.dists.Exponential(rate=5) + >>> D.mean + 0.2 + >>> D.median + 0.1386294361... + >>> D.variance + 0.04 + >>> D.sd + 0.2 + >>> D.upper_limit + inf + >>> D.lower_limit + 0 + + +In some cases where a closed from expression does not exist, the attribute will return :code:`nan`. For example:: + + >>> G = ciw.dists.Gamma(shape=0.6, scale=1.2) + >>> G.median + nan + + +And where possible, summary statistics are calculated for :ref:`combined distributions `. For example:: + + + >>> D1 = ciw.dists.Uniform(lower=2, upper=6) + >>> D2 = ciw.dists.Uniform(lower=3, upper=9) + >>> D = D1 + D2 + >>> D1.mean, D2.mean, D.mean + (4.0, 6.0, 10.0) + >>> D1.sd, D2.sd, D.sd + (1.154..., 1.732..., 2.081...) + >>> D1.lower_limit, D2.lower_limit, D.lower_limit + (2, 3, 5) + >>> D1.upper_limit, D2.upper_limit, D.upper_limit + (6, 9, 15) + +and :ref:`mixture distributions `:: + + >>> D1 = ciw.dists.Uniform(lower=2, upper=6) + >>> D2 = ciw.dists.Uniform(lower=3, upper=9) + >>> D = ciw.dists.MixtureDistribution( + ... dists=[D1, D2], + ... probs=[0.5, 0.5] + ... ) + >>> D.mean + 5.0 + >>> D.sd + 1.7795... + >>> D.upper_limit + 9 + >>> D.lower_limit + 2 + + + + diff --git a/tests/test_sampling.py b/tests/test_sampling.py index 736d2048..f3dea446 100644 --- a/tests/test_sampling.py +++ b/tests/test_sampling.py @@ -1,11 +1,15 @@ import unittest import ciw +import math +from math import sqrt, exp, pi, erf +import numpy as np +import statistics as st from csv import reader from random import random, choice from hypothesis import given from hypothesis.strategies import floats, integers, lists, random_module, text import os -import copy +import tqdm def import_empirical(dist_file): @@ -80,6 +84,22 @@ def sample(self, t=None, ind=None): return max((-0.05 * n) + 0.2, 0) +def compare_theoretical_to_observed(D, n, places, self): + samples = [D.sample() for _ in range(n)] + if D.mean is not None: + self.assertAlmostEqual(np.mean(samples), D.mean, places=places) + if not math.isnan(D.variance): + self.assertAlmostEqual(np.var(samples), D.variance, places=places) + if not math.isnan(D.sd): + self.assertAlmostEqual(np.std(samples), D.sd, places=places) + if not math.isnan(D.median): + self.assertAlmostEqual(np.median(samples), D.median, places=places) + if not math.isnan(D.lower_limit): + self.assertTrue(min(samples) >= D.lower_limit) + if not math.isnan(D.upper_limit): + self.assertTrue(max(samples) <= D.upper_limit) + + class TestSampling(unittest.TestCase): def test_dists_repr(self): Di = ciw.dists.Distribution() @@ -188,6 +208,20 @@ def test_sampling_uniform_dist_hypothesis(self, u, rm): self.assertTrue( ul <= Nu.simulation.inter_arrival_times[Nu.id_number]['Customer']._sample() <= uh ) + + def test_uniform_summary_stats(self): + """ + Test that the mean of a Uniform distribution is computed correctly. + """ + U = ciw.dists.Uniform(2.2, 3.3) + expected_mean = (2.2 + 3.3) / 2 + self.assertEqual(U.mean, expected_mean) + expected_variance = ((3.3 - 2.2) ** 2) / 12 + self.assertEqual(U.variance, expected_variance) + self.assertEqual(U.sd, math.sqrt(((3.3 - 2.2) ** 2) / 12)) + self.assertEqual(U.median, (2.2 + 3.3) / 2) + self.assertEqual(U.lower_limit, 2.2) + self.assertEqual(U.upper_limit, 3.3) def test_deterministic_dist_object(self): D = ciw.dists.Deterministic(4.4) @@ -195,7 +229,6 @@ def test_deterministic_dist_object(self): samples = [round(D._sample(), 2) for _ in range(10)] expected = [4.40, 4.40, 4.40, 4.40, 4.40, 4.40, 4.40, 4.40, 4.40, 4.40] self.assertEqual(samples, expected) - self.assertRaises(ValueError, ciw.dists.Deterministic, -4.4) def test_sampling_deterministic_dist(self): @@ -237,6 +270,15 @@ def test_sampling_deterministic_dist_hypothesis(self, d, rm): Nd.simulation.inter_arrival_times[Nd.id_number]['Customer']._sample(), d ) + def test_deterministic_summary_stats(self): + D = ciw.dists.Deterministic(4.4) + self.assertEqual(D.mean, 4.4) + self.assertEqual(D.variance, 0.0) + self.assertEqual(D.sd, 0.0) + self.assertEqual(D.median, 4.4) + self.assertEqual(D.upper_limit, 4.4) + self.assertEqual(D.lower_limit, 4.4) + def test_triangular_dist_object(self): Tr = ciw.dists.Triangular(1.1, 1.5, 6.6) ciw.seed(5) @@ -288,6 +330,29 @@ def test_sampling_triangular_dist_hypothesis(self, t, rm): self.assertTrue(tl <= Nt.simulation.service_times[Nt.id_number]['Customer']._sample() <= th) self.assertTrue(tl <= Nt.simulation.inter_arrival_times[Nt.id_number]['Customer']._sample() <= th) + def test_triangular_summary_stats(self): + a, b, c = 1.1, 6.6, 2.2 + T = ciw.dists.Triangular(a, c, b) + expected_mean = (1.1 + 2.2 + 6.6) / 3 + self.assertEqual(T.mean, expected_mean) + expected_variance = (a**2 + b**2 + c**2 - a*b - a*c - b*c) / 18 + self.assertEqual(T.variance, expected_variance) + self.assertEqual(T.sd, math.sqrt(expected_variance)) + mid = (a + b) / 2.0 + expected_median = b - math.sqrt((b - a) * (b - c) / 2.0) + self.assertEqual(T.median, expected_median) + self.assertEqual(T.upper_limit, b) + self.assertEqual(T.lower_limit, a) + + def test_triangular_median_right_mode_branch(self): + a, b, c = 1.0, 8.0, 7.0 # mode c is to the right of the midpoint + T = ciw.dists.Triangular(a, c, b) + mid = (a + b) / 2.0 + self.assertTrue(c >= mid) + expected = a + math.sqrt((b - a) * (c - a) / 2.0) + self.assertEqual(T.median, expected) + + def test_exponential_dist_object(self): E = ciw.dists.Exponential(4.4) ciw.seed(5) @@ -338,6 +403,19 @@ def test_sampling_exponential_dist_hypothesis(self, e, rm): self.assertTrue( Ne.simulation.inter_arrival_times[Ne.id_number]['Customer']._sample() >= 0.0 ) + def test_exponential_summary_stats(self): + """ + Test that the mean of the Exponential distribution is computed correctly. + """ + E = ciw.dists.Exponential(4.4) + expected_mean = 1 / 4.4 + self.assertEqual(E.mean, expected_mean) + expected_variance = 1 / (4.4 ** 2) + self.assertEqual(E.variance, expected_variance) + self.assertEqual(E.sd, 1.0 / 4.4) + self.assertEqual(E.median, math.log(2.0) / 4.4) + self.assertTrue(math.isinf(E.upper_limit)) + self.assertEqual(E.lower_limit, 0.0) def test_gamma_dist_object(self): G = ciw.dists.Gamma(0.6, 1.2) @@ -390,6 +468,15 @@ def test_sampling_gamma_dist_hypothesis(self, ga, gb, rm): self.assertTrue( Ng.simulation.inter_arrival_times[Ng.id_number]['Customer']._sample() >= 0.0 ) + + def test_gamma_summary_stats(self): + G = ciw.dists.Gamma(0.6, 1.2) + expected_mean = 0.6 * 1.2 + expected_variance = 0.6 * (1.2 ** 2) + self.assertEqual(G.sd, math.sqrt(G.variance)) + self.assertTrue(math.isnan(G.median)) + self.assertTrue(math.isinf(G.upper_limit)) + self.assertEqual(G.lower_limit, 0.0) def test_lognormal_dist_object(self): Ln = ciw.dists.Lognormal(0.8, 0.2) @@ -439,6 +526,20 @@ def test_sampling_lognormal_dist_hypothesis(self, lm, lsd, rm): self.assertTrue(Nl.simulation.service_times[Nl.id_number]['Customer']._sample() >= 0.0) self.assertTrue(Nl.simulation.inter_arrival_times[Nl.id_number]['Customer']._sample() >= 0.0) + def test_lognormal_summary_stats(self): + mu = 0.7 + sigma = 0.4 + L = ciw.dists.Lognormal(mu, sigma) + + expected_mean = math.exp(mu + (sigma ** 2) / 2) + expected_variance = (math.exp(sigma ** 2) - 1) * math.exp(2 * mu + sigma ** 2) + self.assertEqual(L.mean, expected_mean) + self.assertEqual(L.variance, expected_variance) + self.assertEqual(L.sd, math.sqrt(L.variance)) + self.assertEqual(L.median, math.exp(0.7)) + self.assertTrue(math.isinf(L.upper_limit)) + self.assertEqual(L.lower_limit, 0.0) + def test_weibull_dist_object(self): W = ciw.dists.Weibull(0.9, 0.8) ciw.seed(5) @@ -491,6 +592,18 @@ def test_sampling_weibull_dist_hypothesis(self, wa, wb, rm): Nw.simulation.inter_arrival_times[Nw.id_number]['Customer']._sample() >= 0.0 ) + def test_weibull_summary_stats(self): + W = ciw.dists.Weibull(0.8, 0.9) + expected_mean = 0.8 * math.gamma(1 + 1 / 0.9) + g1 = math.gamma(1 + 1 / 0.9) + g2 = math.gamma(1 + 2 / 0.9) + expected_variance = (0.8 ** 2) * (g2 - g1 ** 2) + self.assertEqual(W.variance, expected_variance) + self.assertEqual(W.sd, math.sqrt(W.variance)) + self.assertEqual(W.median, 0.8 * (math.log(2) ** (1.0 / 0.9))) + self.assertTrue(math.isinf(W.upper_limit)) + self.assertEqual(W.lower_limit, 0.0) + def test_normal_dist_object(self): N = ciw.dists.Normal(0.5, 0.1) ciw.seed(5) @@ -540,6 +653,25 @@ def test_sampling_normal_dist_hypothesis(self, nm, ns, rm): for itr in range(10): # Because repition happens in the simulation self.assertTrue(Nw.simulation.service_times[Nw.id_number]['Customer']._sample() >= 0.0) self.assertTrue(Nw.simulation.inter_arrival_times[Nw.id_number]['Customer']._sample() >= 0.0) + + def test_normal_truncated_summary_stats(self): + N = ciw.dists.Normal(5.0, 1.0) + mu = N._mean + sd = N._sd + z = mu / sd + phi = (1 / sqrt(2 * pi)) * exp(-0.5 * z**2) + Phi = 0.5 * (1 + erf(z / sqrt(2))) + expected_mean = mu + sd * (phi / Phi) + expected_variance = sd**2 * (1 - z * (phi / Phi) - (phi / Phi)**2) + self.assertEqual(N.mean, expected_mean) + self.assertEqual(N.variance, expected_variance) + self.assertEqual(N.sd, math.sqrt(N.variance)) + Norm = st.NormalDist(0, 1) + target = 1.0 - 0.5 * Norm.cdf(z) + expected_med = mu + sd * Norm.inv_cdf(target) + self.assertEqual(N.median, expected_med) + self.assertTrue(math.isinf(N.upper_limit)) + self.assertEqual(N.lower_limit, 0.0) def test_empirical_dist_object(self): Em = ciw.dists.Empirical([8.0, 8.0, 8.0, 8.8, 8.8, 12.3]) @@ -597,6 +729,30 @@ def test_sampling_empirical_dist_hypothesis(self, dist, rm): in set(my_empirical_dist) ) + def test_empirical_summary_stats(self): + values = [8.0, 8.0, 8.0, 8.8, 8.8, 12.3] + Em = ciw.dists.Empirical(values) + expected_mean = sum(values) / len(values) + self.assertEqual(Em.mean, expected_mean) + mean = sum(values) / len(values) + expected_variance = sum((x - mean) ** 2 for x in values) / len(values) + self.assertEqual(Em.variance, expected_variance) + self.assertEqual(Em.sd, math.sqrt(Em.variance)) + self.assertEqual(Em.median, (8.0 + 8.8) / 2.0) + self.assertEqual(Em.upper_limit, 12.3) + self.assertEqual(Em.lower_limit, 8.0) + + def test_empirical_median(self): + # even + Em = ciw.dists.Empirical([1, 2, 3, 6]) + self.assertEqual(Em.median, 2.5) + self.assertEqual(Em.mean, 3) + # odd + values = [5.0, 7.0, 9.0] + E = ciw.dists.Empirical(values) + self.assertEqual(E.median, 7.0) + self.assertEqual(E.mean, 7.0) + def test_pmf_object(self): Pmf = ciw.dists.Pmf([3.7, 3.8, 4.1], [0.2, 0.5, 0.3]) ciw.seed(5) @@ -654,6 +810,28 @@ def test_sampling_pmf_dist_hypothesis(self, custs, rm): self.assertTrue(Nc.simulation.service_times[Nc.id_number]['Customer']._sample() in set(cust_vals)) self.assertTrue(Nc.simulation.inter_arrival_times[Nc.id_number]['Customer']._sample() in set(cust_vals)) + + def test_pmf_summary_stats(self): + values = [3.7, 3.8, 4.1] + probs = [0.2, 0.5, 0.3] + P = ciw.dists.Pmf(values, probs) + expected_mean = sum(v * p for v, p in zip(values, probs)) + self.assertEqual(P.mean, expected_mean) + expected_variance = sum(p * (v - expected_mean) ** 2 for v, p in zip(values, probs)) + self.assertEqual(P.variance, expected_variance) + self.assertEqual(P.sd, math.sqrt(P.variance)) + self.assertEqual(P.median, 3.8) + self.assertEqual(P.upper_limit, 4.1) + self.assertEqual(P.lower_limit, 3.7) + + def test_pmf_median_fallback(self): + # Force sum of probs < 1.0 to trigger fallback + P = ciw.dists.Pmf.__new__(ciw.dists.Pmf) + P.values = [1.0, 2.0, 3.0] + P.probs = [0.1, 0.1, 0.1] # sum = 0.3, so cumulative never reaches 0.5 + self.assertEqual(P.median, 3.0) + + def test_custom_dist_object(self): CD = CustomDist() ciw.seed(5) @@ -876,6 +1054,30 @@ def test_sampling_sequential_dist_hypothesis(self, dist1, dist2): self.assertEqual(inter_arrivals, expected_inter_arrival_times[1:]) self.assertEqual(services, expected_service_times) + def test_sequential_summary_stats(self): + values = [0.9, 0.7, 0.5, 0.3, 0.1] + S = ciw.dists.Sequential(values) + expected_mean = sum(values) / len(values) + self.assertEqual(S.mean, expected_mean) + expected_variance = sum((x - expected_mean) ** 2 for x in values) / len(values) + self.assertEqual(S.variance, expected_variance) + self.assertEqual(S.sd, math.sqrt(S.variance)) + self.assertEqual(S.median, 0.5) + self.assertEqual(S.upper_limit, 0.9) + self.assertEqual(S.lower_limit, 0.1) + + values = [0.7, 0.5, 0.3, 0.1] + S = ciw.dists.Sequential(values) + expected_mean = sum(values) / len(values) + self.assertEqual(S.mean, expected_mean) + expected_variance = sum((x - expected_mean) ** 2 for x in values) / len(values) + self.assertEqual(S.variance, expected_variance) + self.assertEqual(S.sd, math.sqrt(S.variance)) + self.assertEqual(S.median, 0.4) + self.assertEqual(S.upper_limit, 0.7) + self.assertEqual(S.lower_limit, 0.1) + + def test_combining_distributions(self): Dt = ciw.dists.Deterministic(5.0) Sq = ciw.dists.Sequential([1.0, 2.0, 3.0, 4.0]) @@ -992,6 +1194,71 @@ def test_combining_distributions(self): self.assertEqual(str(Ex_div_Dt), "CombinedDistribution") self.assertEqual(str(Ex_div_Sq), "CombinedDistribution") + def test_combined_add_summary_stats(self): + # A: N(5, sd=1) -> mean=5, var=1 + # B: N(2, sd=0.5) -> mean=2, var=0.25 + A = ciw.dists.Normal(5.0, 1.0) + B = ciw.dists.Normal(2.0, 0.5) + + C = A + B + expected_mean = A.mean + B.mean # 5 + 2 = 7 + expected_var = A.variance + B.variance # 1 + 0.25 = 1.25 + self.assertEqual(C.mean, expected_mean) + self.assertEqual(C.variance, expected_var) + self.assertEqual(C.upper_limit, float('inf')) + self.assertEqual(C.lower_limit, 0) + + def test_combined_sub_summary_stats(self): + A = ciw.dists.Normal(5.0, 1.0) + B = ciw.dists.Normal(2.0, 0.5) + + C = A - B + expected_mean = A.mean - B.mean # 5 - 2 = 3 + expected_var = A.variance + B.variance # 1 + 0.25 = 1.25 + self.assertEqual(C.mean, expected_mean) + self.assertEqual(C.variance, expected_var) + self.assertTrue(math.isnan(C.upper_limit)) + self.assertTrue(math.isnan(C.lower_limit)) + + def test_combined_mul_summary_stats(self): + # Product moments (independent): + # E[AB] = mA mB + # Var(AB) = vA vB + mB^2 vA + mA^2 vB + A = ciw.dists.Normal(5.0, 1.0) # mA=5, vA=1 + B = ciw.dists.Normal(2.0, 0.5) # mB=2, vB=0.25 + + C = A * B + expected_mean = A.mean * B.mean # 10 + expected_var = ( + A.variance * B.variance # 1 * 0.25 = 0.25 + + (B.mean ** 2) * A.variance # 4 * 1 = 4 + + (A.mean ** 2) * B.variance # 25 * 0.25 = 6.25 + ) # total = 10.5 + self.assertEqual(C.mean, expected_mean) + self.assertEqual(C.variance, expected_var) + + def test_combined_div_summary_stats(self): + # Division uses the delta-method approximation implemented in your properties: + # E[A/B] = mA / mB + # Var(A/B) = vA / mB^2 + (mA^2 * vB) / mB^4 (requires mB != 0) + A = ciw.dists.Normal(5.0, 1.0) # mA=5, vA=1 + B = ciw.dists.Normal(2.0, 0.5) # mB=2, vB=0.25 + + C = A / B + expected_mean = A.mean / B.mean # 2.5 + expected_var = ( + A.variance / (B.mean ** 2) # 1 / 4 = 0.25 + + (A.mean ** 2) * B.variance / (B.mean ** 4) # 25 * 0.25 / 16 = 0.390625 + ) # total = 0.640625 + self.assertEqual(C.mean, expected_mean) + self.assertEqual(C.variance, expected_var) + + def test_combined_div_by_zero(self): + A = ciw.dists.Normal(5, 1) + B = ciw.dists.Deterministic(0) + C = A / B + self.assertTrue(math.isnan(C.mean)) + def test_mixture_distributions(self): ciw.seed(0) D1 = ciw.dists.Deterministic(1.0) @@ -1022,6 +1289,27 @@ def test_mixture_distributions(self): self.assertEqual(str(Mixted_eq), 'MixtureDistribution') self.assertEqual(meq_samples, meq_expected) + def test_mixture_summary_stats(self): + c1 = ciw.dists.Normal(0.0, 1.0) + c2 = ciw.dists.Normal(3.0, 2.0) + probs = [0.6, 0.4] + M = ciw.dists.MixtureDistribution([c1, c2], probs) + + expected_mean = probs[0] * c1.mean + probs[1] * c2.mean + self.assertEqual(M.mean, expected_mean) + expected_variance = ( + probs[0] * (c1.variance + c1.mean ** 2) + + probs[1] * (c2.variance + c2.mean ** 2) + ) - expected_mean ** 2 + self.assertEqual(M.variance, expected_variance) + + D = ciw.dists.Deterministic(3.0) + U = ciw.dists.Uniform(2.0, 4.0) + M = ciw.dists.MixtureDistribution([D, U], [0.5, 0.5]) + self.assertEqual(M.sd, math.sqrt(M.variance)) + self.assertTrue(math.isnan(M.median)) + self.assertTrue(M.upper_limit, 4.0) + self.assertTrue(M.lower_limit, 2.0) def test_state_dependent_distribution(self): N = ciw.create_network( @@ -1142,6 +1430,33 @@ def test_erlang_dist_object(self): # We would expect this to be `num_phases` / `rate` = 7 / 5 = 1.4 self.assertEqual(round(sum(many_samples) / 1000, 4), 1.4057) + def test_phasetype_summary_stats(self): + initial_state = [0.3, 0.7, 0.0] + absorbing_matrix = [ + [-5, 3, 2], + [0, -4, 4], + [0, 0, 0] + ] + Ph = ciw.dists.PhaseType(initial_state, absorbing_matrix) + # Extract transient generator Q and initial vector alpha + Q = np.array(absorbing_matrix)[:-1, :-1] # Top-left 2x2 submatrix + alpha = np.array(initial_state[:-1]) # First 2 entries + invQ = np.linalg.inv(-Q) + ones = np.ones(len(Q)) + + # First moment: E[T] = α (-Q)^-1 1 + expected_mean = float(alpha @ invQ @ ones) + + # Second moment: E[T^2] = 2 α (-Q)^-2 1 + expected_second_moment = float(2 * alpha @ invQ @ invQ @ ones) + + # Variance: Var(T) = E[T^2] - (E[T])^2 + expected_variance = expected_second_moment - expected_mean ** 2 + + # Assertions + self.assertEqual(Ph.mean, expected_mean) + self.assertEqual(Ph.variance, expected_variance) + def test_sampling_erlang_dist(self): N = ciw.create_network( arrival_distributions=[ciw.dists.Erlang(5, 7)], @@ -1185,6 +1500,18 @@ def test_sampling_erlang_dist_hypothesis(self, sizes, rates, rm): self.assertTrue(Nw.simulation.service_times[Nw.id_number]['Customer']._sample() >= 0.0) self.assertTrue(Nw.simulation.inter_arrival_times[Nw.id_number]['Customer']._sample() >= 0.0) + def test_erlang_summary_stats(self): + rate = 5.0 + k = 7 + Er = ciw.dists.Erlang(rate, k) + expected_mean = k / rate + expected_variance = k / (rate ** 2) + self.assertEqual(Er.mean, expected_mean) + self.assertEqual(Er.variance, expected_variance) + self.assertTrue(math.isnan(Er.median)) + self.assertEqual(Er.lower_limit, 0) + self.assertEqual(Er.upper_limit, float('inf')) + def test_hyperexponential_dist_object(self): Hx = ciw.dists.HyperExponential([5, 7, 2, 1], [0.4, 0.1, 0.3, 0.2]) ciw.seed(5) @@ -1257,6 +1584,20 @@ def test_sampling_hyperexponential_dist(self): expected = [0.12, 0.04, 0.43, 0.05, 0.5] self.assertEqual(samples, expected) + def test_hyperexponential_summary_stats(self): + rates = [5, 2, 10] + probs = [0.3, 0.5, 0.2] + Hx = ciw.dists.HyperExponential(rates, probs) + + expected_mean = sum(p / r for p, r in zip(probs, rates)) + expected_variance = sum(2 * p / (r ** 2) for p, r in zip(probs, rates)) - expected_mean ** 2 + + self.assertEqual(Hx.mean, expected_mean) + self.assertEqual(Hx.variance, expected_variance) + self.assertTrue(math.isnan(Hx.median)) + self.assertEqual(Hx.lower_limit, 0) + self.assertEqual(Hx.upper_limit, float('inf')) + def test_hypererlang_dist_object(self): Hg = ciw.dists.HyperErlang([5, 7, 2], [0.5, 0.3, 0.2], [3, 2, 5]) ciw.seed(5) @@ -1347,6 +1688,24 @@ def test_sampling_hypererlang_dist(self): expected = [0.25, 0.22, 3.46, 3.07, 1.69] self.assertEqual(samples, expected) + def test_hypererlang_summary_stats(self): + rates = [5, 2, 10] + probs = [0.3, 0.5, 0.2] + lengths = [3, 2, 4] # k_i (number of Erlang phases per branch) + He = ciw.dists.HyperErlang(rates=rates, probs=probs, phase_lengths=lengths) + expected_mean = sum(p * k / r for p, r, k in zip(probs, rates, lengths)) + expected_second_moment = sum( + p * (k * (k + 1)) / (r ** 2) for p, r, k in zip(probs, rates, lengths) + ) + expected_variance = expected_second_moment - expected_mean ** 2 + self.assertEqual(He.mean, expected_mean) + self.assertEqual(He.variance, expected_variance) + self.assertTrue(math.isnan(He.median)) + self.assertEqual(He.lower_limit, 0) + self.assertEqual(He.upper_limit, float('inf')) + + + def test_coxian_dist_object(self): Cx = ciw.dists.Coxian([5, 4, 7, 2], [0.2, 0.5, 0.3, 1.0]) ciw.seed(5) @@ -1422,6 +1781,28 @@ def test_sampling_coxian_dist(self): expected = [1.09, 0.77, 0.81, 0.08, 0.43] self.assertEqual(samples, expected) + def test_coxian_summary_stats(self): + rates = [5, 4, 7, 2] + probs = [0.2, 0.5, 0.3, 1.0] # Prob of absorbing at each phase + Cx = ciw.dists.Coxian(rates, probs) + + # Recompute mean and variance using matrix-based method + + Q = np.array(Cx.absorbing_matrix)[:-1, :-1] + alpha = np.array(Cx.initial_state[:-1]) + ones = np.ones(len(Q)) + invQ = np.linalg.inv(-Q) + + expected_mean = float(alpha @ invQ @ ones) + second_moment = float(2 * alpha @ invQ @ invQ @ ones) + expected_variance = second_moment - expected_mean ** 2 + + self.assertEqual(Cx.mean, expected_mean) + self.assertEqual(Cx.variance, expected_variance) + self.assertTrue(math.isnan(Cx.median)) + self.assertEqual(Cx.lower_limit, 0) + self.assertEqual(Cx.upper_limit, float('inf')) + def test_poissoninterval_dist_object(self): ciw.seed(5) Pi = ciw.dists.PoissonIntervals( @@ -1699,7 +2080,6 @@ def test_poisson_dist_object(self): samples = [Po._sample() for _ in range(10)] expected = [3, 0, 1, 0, 0, 2, 4, 2, 1, 1] self.assertEqual(samples, expected) - self.assertRaises(ValueError, ciw.dists.Poisson, -1.5) def test_sampling_poisson_dist(self): @@ -1718,6 +2098,27 @@ def test_sampling_poisson_dist(self): expected = [3, 0, 1, 0, 0, 2, 4, 2, 1, 1] self.assertEqual(samples, expected) + def test_poisson_summary_stats(self): + P = ciw.dists.Poisson(1.5) + self.assertEqual(P.mean, 1.5) + self.assertEqual(P.variance, 1.5) + self.assertEqual(P.sd, math.sqrt(P.variance)) + self.assertEqual(P.median, 1) + self.assertEqual(P.lower_limit, 0) + self.assertEqual(P.upper_limit, float("inf")) + + def test_poissonintervals_summary_stats(self): + Pi = ciw.dists.PoissonIntervals( + rates=[5, 1.5, 3], + endpoints=[3.2, 7.9, 10], + max_sample_date=15 + ) + sd = Pi.sd + self.assertTrue((sd == sd) or math.isnan(sd)) + self.assertTrue(math.isnan(Pi.median)) + self.assertTrue(math.isinf(Pi.upper_limit)) + self.assertEqual(Pi.lower_limit, 0.0) + def test_geometric_dist_object(self): Ge = ciw.dists.Geometric(0.3) ciw.seed(5) @@ -1743,6 +2144,16 @@ def test_sampling_geometric_dist(self): expected = [6, 3, 4, 2, 1, 2, 2, 1, 1, 3] self.assertEqual(samples, expected) + def test_geometric_summary_stats(self): + G = ciw.dists.Geometric(0.3) + expected_mean = 1 / 0.3 + self.assertEqual(G.mean, expected_mean) + expected_variance = (1 - 0.3) / (0.3 ** 2) + self.assertEqual(G.variance, expected_variance) + self.assertTrue(math.isinf(G.upper_limit)) + self.assertEqual(G.lower_limit, 0) + self.assertEqual(G.median, 2) + def test_binomial_dist_object(self): Bi = ciw.dists.Binomial(20, 0.4) ciw.seed(5) @@ -1770,3 +2181,95 @@ def test_sampling_binomial_dist(self): samples = [len([r for r in recs if r.arrival_date == t]) for t in range(1, 11)] expected = [10, 10, 8, 7, 5, 7, 7, 4, 4, 15] self.assertEqual(samples, expected) + + def test_binomial_summary_stats(self): + B = ciw.dists.Binomial(20, 0.4) + expected_mean = 20 * 0.4 + self.assertEqual(B.mean, expected_mean) + expected_variance = 20 * 0.4 * (1 - 0.4) + self.assertEqual(B.variance, expected_variance) + self.assertEqual(B.median, 8) + self.assertEqual(B.lower_limit, 0) + self.assertTrue(math.isinf(B.upper_limit)) + + def test_base_distribution_properties(self): + """Test base Distribution class default properties""" + base = ciw.dists.Distribution() + self.assertTrue(math.isnan(base.mean)) + self.assertTrue(math.isnan(base.variance)) + self.assertTrue(math.isnan(base.upper_limit)) + self.assertTrue(math.isnan(base.lower_limit)) + + def test_combined_division_by_zero(self): + """Test division variance when denominator mean is zero""" + d1 = ciw.dists.Deterministic(5.0) + d2 = ciw.dists.Deterministic(0.0) + combined = d1 / d2 + self.assertTrue(math.isnan(combined.variance)) + + def test_mixture_nan_limits(self): + """Test MixtureDistribution NaN limit handling""" + class NaNDist(ciw.dists.Distribution): + @property + def upper_limit(self): + return float('nan') + @property + def lower_limit(self): + return float('nan') + + mixture = ciw.dists.MixtureDistribution([NaNDist(), ciw.dists.Deterministic(1)], [0.5, 0.5]) + self.assertTrue(math.isnan(mixture.median)) + self.assertTrue(math.isnan(mixture.upper_limit)) + self.assertTrue(math.isnan(mixture.lower_limit)) + + def test_comparing_theoretical_to_observed(self): + ciw.seed(0) + Un = ciw.dists.Uniform(0.41, 2.75) + Dt = ciw.dists.Deterministic(1.1) + Tr = ciw.dists.Triangular(1.1, 2.2, 3.3) + Ex = ciw.dists.Exponential(0.8) + Ga = ciw.dists.Gamma(3.1, 0.2) + No = ciw.dists.Normal(50.5, 2.6) + No_trunc = ciw.dists.Normal(1, 2) + Ln = ciw.dists.Lognormal(0.35, 0.16) + Wb = ciw.dists.Weibull(88.6, 99.1) + Em = ciw.dists.Empirical([3.3, 3.3, 4.4, 3.3, 4.4]) + Sq = ciw.dists.Sequential([3.3, 3.3, 4.4, 3.3, 4.4]) + Pf = ciw.dists.Pmf([1.1, 2.2, 3.3], [0.31, 0.21, 0.48]) + Ph = ciw.dists.PhaseType([1, 0, 0], [[-3, 2, 1], [1, -5, 4], [0, 0, 0]]) + Er = ciw.dists.Erlang(4.5, 8) + Hx = ciw.dists.HyperExponential([7, 12, 5], [0.3, 0.1, 0.6]) + He = ciw.dists.HyperErlang([8, 15, 9], [0.3, 0.1, 0.6], [2, 2, 7]) + Cx = ciw.dists.Coxian([4, 7, 2], [0.3, 0.2, 1.0]) + Pi = ciw.dists.PoissonIntervals(rates=[10, 4.5, 7], endpoints=[3.2, 7.9, 10], max_sample_date=4000) + Po = ciw.dists.Poisson(rate=0.6) + Ge = ciw.dists.Geometric(prob=0.49) + Bi = ciw.dists.Binomial(n=12, prob=0.8) + + params_set = [ + (Un, 10000, 2), + (Dt, 500, 5), + (Tr, 10000, 2), + (Ex, 15000, 1), + (Ga, 10000, 1), + (No, 10000, 1), + (No_trunc, 10000, 1), + (Ln, 10000, 2), + (Wb, 10000, 1), + (Em, 10000, 2), + (Sq, 10000, 2), + (Pf, 10000, 1), + (Ph, 10000, 2), + (Er, 10000, 1), + (Hx, 13000, 2), + (He, 12000, 2), + (Cx, 15000, 2), + (Pi, 15000, 2), + (Po, 20000, 2), + (Ge, 17000, 1), + (Bi, 10000, 1) + ] + for params in tqdm.tqdm(params_set): + ciw.seed(0) + D, n, places = params + compare_theoretical_to_observed(D=D, n=n, places=places, self=self)