In [76]:
import math
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import scipy.stats as st
import random as rd
from importlib import reload  
import statsmodels.api as sm
from typing import Union

import TidySimStat as tss
tss = reload(tss)

`TidySimStat.py` by Edward J. Xu is imported. Copyright all reserved. 
Last modifed date is June 2, 2020.


In [51]:

class Node:

    def __init__(self, index):
        self._index = index
        self._next = None


class Head:

    def __init__(self):
        "Initiate an empty linked list."
        self._next = None

    def insert(self, node_new):
        "Insert the given node according to indices."
        if self.last._index <= node_new._index:
            ## If the index of the last node is smaller than the index of the
            ## given node, there is no need to check all nodes.
            self.last._next = node_new
        else:
            cur = self._next
            while True:
                if cur._next._index < node_new._index:
                    cur = cur._next
                else:
                    node_new._next = cur._next
                    cur._next = node_new
                    break

    def undock(self):
        """Undock the first node and return it. The second node become the
        first one."""
        undocked = self._next
        self._next = self._next._next

        return undocked

    def collect(self):
        cur = self._next
        dict_indices = {}
        i = 0
        while cur:
            dict_indices[i] = cur._index
            cur = cur._next
            i += 1
        n = len(dict_indices)
        indices = [dict_indices[i] for i in range(n)]
        return indices

    @property
    def last(self):
        cur = self._next
        while cur._next:
            cur = cur._next
        return cur


class Arrival(Node):

    def __init__(self, time:float):
        super().__init__(time)
        self.whe_block = 0
        self._next_arrived = None


class Leave(Node):

    def __init__(self, time:float, whi_server:int):
        super().__init__(time)
        self.whi_server = whi_server
        self._arrival = None


class Servers(Head):

    def __init__(self, f_serve, f_arrive, num:int=5):
        """Queueing system modelled by linked lists.

        Notes
        =====
        There is no waiting room in this setting.
        """
        if not callable(f_serve):
            raise ValueError("Function to simulate service time is "
                "not callable.")
        if not callable(f_arrive):
            raise ValueError("Function to simulate arrival sojourn time is "
                "not callable.")

        super().__init__()
        self.states = [0 for i in range(num)]
        self.num_arrival = 0
        self.num_block = 0
        self.f_serve = f_serve
        self.f_arrive = f_arrive
        self.clock = 0
        self.arriveds = {0: None}

        self.warmup()

    def warmup(self):
        """Warm up the scheduled arrivals.

        Notes
        =====
        There is no need to schedule multiple arrivals in warming up stage.
        """
        t = self.f_arrive()
        self._next = Arrival(t)

        ## An arbitrage `Arrival` must be added, or there is no way to build
        ## a linked list for
        # self._next_arrived = Arrival(0)

    def schedule_arrival(self):
        """Schedule a new arrival, insert it to the event list, and return
        the index.
        """
        new = Arrival(time=self._next._index + self.f_arrive())
        self.insert(new)
        return new

    def schedule_leave(self, whi_server):
        new = Leave(self._next._index + self.f_serve(), whi_server)
        self.insert(new)
        return new

    def arrive(self):
        "Event routine triggered when a new customer arrives."
        self.num_arrival += 1
        whi_server = self.first_idle + 0
        if whi_server == self.num:  # There is no idle server.
            ## If the customer is blocked, there is no need to set a
            ## `leave` event.
            self.num_block += 1
            self._next.whe_block = 1
        else:
            ## To assign the customer to the first idle server and simulate
            ## his/her leaving time.
            self.states[whi_server] = 1
            # print(self.states)
            self.schedule_leave(whi_server)

        ## Next schedule
        self.schedule_arrival()

    def leave(self):
        "Event routine triggered when an existing customer leaves."
        self.states[self._next.whi_server] = 0  # To set the server idle.

    def advance(self):
        "Invoke next event and advance the clock time."
        self.clock = self._next._index + 0

        if isinstance(self._next, Leave):
            self.leave()
            self.undock()
        elif isinstance(self._next, Arrival):
            self.arrive()
            docked = self.undock()
            self.arriveds[len(self.arriveds)+1] = docked
            # self.last_arrived._next_arrived = docked

        return self.clock

    @property
    def num(self):
        return len(self.states)

    @property
    def first_idle(self):
        """Return the index of the first idle server.

        Attentions
        ==========
        The indices are 0, 1, 2, ..., self.num-1
        """
        result = 0
        i = 0
        while i <= self.num - 1:
            if self.states[i] == 0:
                break
            else:
                i += 1
        return i

    def collect_event_list(self):
        cur = self._next
        dict_whe_leave = {}
        i = 0
        while cur:
            dict_whe_leave[i] = isinstance(cur, Leave)
            cur = cur._next
            i += 1
        n = len(dict_whe_leave)
        whe_leaves = [dict_whe_leave[i] for i in range(n)]
        return whe_leaves

    # @property
    # def last_arrived(self):
    #     cur = self._next_arrived
    #     while cur._next_arrived:
    #         cur = cur._next_arrived
    #     return cur

    # def collect_arriveds(self):
    #     cur = self._next_arrived
    #     indices = {}
    #     i = 0
    #     while cur:
    #         indices[i] = cur._index
    #         cur = cur._next_arrived
    #         i += 1
    #     n = len(indices)
    #     li_indices = [indices[i] for i in range(n)]
    #     return li_indices

In [52]:
block_rates = [0 for i in range(10)]
for i in range(10):
    ser = Servers(lambda: tss.sim_exp(8), lambda: tss.sim_exp(1), 10)
    while ser.num_arrival <= 10000:
        ser.advance()
    block_rates[i] = ser.num_block / 10000

In [53]:
block_rates

[0.1276,
 0.1253,
 0.1119,
 0.1253,
 0.1159,
 0.1075,
 0.1235,
 0.1271,
 0.1192,
 0.1197]

In [77]:
def cal_mean_sample(sample:list, var_pop:int=None) -> Union[list, float]:
    """Calculate sample mean and its variance if the population variance
    is given.

    Attentions
    ==========
    - Usually, the population variance `var_pop` is represented by
      `\sigma^2`.
    """
    mean_sample = np.mean(sample)
    var_mean_sample = None
    if var_pop is not None:
        n = len(sample)  # size of the sample
        var_mean_sample = var_pop / n

    if var_pop is None:
        return mean_sample
    else:
        return [mean_sample, var_mean_sample]

In [79]:
cal_mean_sample([1, 2, 3])

2.0

In [85]:
alpha = 0.05
z_alpha2 = st.norm.ppf(1 - alpha / 2)
z_alpha2

1.959963984540054

In [92]:

import numpy as np
from typing import Union
import scipy.stats as st
import math


def cal_mean_sample(sample:list, var_pop:int=None) -> Union[list, float]:
    """Calculate sample mean and its variance if the population variance
    is given.

    Attentions
    ==========
    - Usually, the population variance `var_pop` is represented by
      `\sigma^2`.
    """
    mean_sample = np.mean(sample)
    var_mean_sample = None
    if var_pop is not None:
        n = len(sample)  # size of the sample
        var_mean_sample = var_pop / n

    if var_pop is None:
        return mean_sample
    else:
        return [mean_sample, var_mean_sample]


def cal_var_sample(sample:list) -> float:
    """Calculate the sample variance."""
    mean_sample = cal_mean_sample(sample)
    n = len(sample)
    var_sample = sum([(i - mean_sample)**2 for i in sample]) / (n-1)
    return var_sample


def update_mean_sample(mean_sample:float, n:float,
        sample_new:Union[float, list]) -> float:
    """Update the sample recursively.

    Keyword Arguments
    =================
    mean_sample: existing sample mean
    n: size of sample for the existing sample mean
    sample_new: a new single sample
    """
    mean_sample_new = mean_sample + (sample_new - mean_sample) / (n+1)
    return mean_sample_new


def update_var_sample():
    pass


def est_interval(sample:list, alpha:float=0.05) -> list:
    """Interval estimation of the population mean based on the given
    sample and significance level alpha.
    """
    mean = cal_mean_sample(sample)
    var = cal_var_sample(sample)
    n = len(sample)

    z_alpha2 = st.norm.ppf(1 - alpha / 2)

    interval = [mean - z_alpha2 * math.sqrt(var / n),
        mean + z_alpha2 * math.sqrt(var / n)]

    return interval


In [120]:
def est_interval(sample:list, alpha:float=0.05,
        var_pop:int=None) -> list:
    """Interval estimation of the population mean based on the given
    sample and significance level alpha.

    Attentions
    ==========
    - Usually, the population variance `var_pop` is represented by
      `\sigma^2`. Sometimes, the population standard deviance `\sigma`
      is given.
    """
    mean = cal_mean_sample(sample)
    n = len(sample)

    if var_pop is not None:
        z_alpha2 = st.norm.ppf(1 - alpha / 2)
        interval = [mean - z_alpha2 * math.sqrt(var_pop / n),
            mean + z_alpha2 * math.sqrt(var_pop / n)]
    else:
        var = cal_var_sample(sample)
        print(var)
        t_alpha2 = st.t.ppf(1 - alpha / 2, df=n-1)
        interval = [mean - t_alpha2 * math.sqrt(var / n),
            mean + t_alpha2 * math.sqrt(var / n)]

    return interval

In [112]:
li = [17, 21, 20, 18, 19, 22, 20, 21, 16, 19]
interval = est_interval(li, var_pop=9)
[round(i, 2) for i in interval]

[17.44, 21.16]

In [113]:
19.3 - 1.86

17.44

In [105]:
st.t.ppf(1 - alpha, df=8)

1.396815309743419

In [115]:
5.8 + 2.38

8.18

In [121]:
li = [16,0,0,2,3,6,8,2,5,0,12,10,5,7,2,3,8,17,9,1]
interval = est_interval(li)
[round(i, 2) for i in interval]

25.852631578947364


[3.42, 8.18]