# Chapter 04 <b>Markov Chain</b>

## I - <b>Discrete Time Markov Chain (DTMC)</b>

In [2]:
import numpy as np
import networkx as nx

In [3]:
#
def makeGraf(S,P):
    dic={}
    succ={s:set() for s in S}
    for i in range(len(S)):
        for j in range(len(S)):
            if(P[i][j]!=0):
                dic[(S[i],S[j])]=P[i][j]
                succ[S[i]].add(S[j])
    graf=nx.DiGraph(list(dic.keys()))
    return graf,succ

In [4]:
#
def makeAB(S,P):
    n=len(S)
    P1=P.transpose()-np.identity(n)
    A=np.vstack([P1,np.ones(n)])
    B=np.zeros(n+1)
    B[n]=1
    return A,B

In [5]:
#
def is_distribution(v):
    for i in range(len(v)):
        if ((v[i]>1) or (v[i]<0)): return False
    if(sum(v)!=1): return False
    return True

In [6]:
#
def makeAB(S,P):
    n=len(S)
    P1=P.transpose()
    A=np.vstack([P1,[1 for i in range(n)]])
    for i in range(n):
        A[i][i]-=1
    B=np.zeros(n+1)
    B[n]=1
    return A,B

In [7]:
#CMTD.py

import networkx as nx
import numpy as np

# CMTD Class
class CMTD(object):

    # constructor
    def __init__(self,P ,S = None ,pi0 = None):
        n = len(P)
        pi0_default = np.zeros(n); pi0_default[0] = 1

        # attributes of CMTD
        self.P = np.array(P)
        self.S = range(1,n+1) if S == None else S
        self.pi0 = pi0_default if pi0 == None else np.array(pi0)

        # checks if P and pi0 are stochastic
        if((len(self.pi0) != n) or (not is_distribution(self.pi0))):
            print("pi0 should be a stochastic vector")
            sys.exit(0)
        stoc = (len(self.P) == n)
        if(stoc):
            for i in range(n):
                if((len(P[i]) != n) or (not is_distribution(self.P[i]))):
                    stoc = False;break
        if(not stoc):
            print("P should be a stochastic matrix")
            sys.exit(0)

    # nSteps probabilities
    def nSteps(self,n):
        return self.pi0.dot(np.linalg.matrix_power(self.P,n))

    # is_irreducible
    def is_irreducible(self):
        return(nx.is_strongly_connected(makeGraf(self.S,self.P)[0]))

    # is_aperiodic
    def is_aperiodic(self):
        return(nx.is_aperiodic(makeGraf(self.S,self.P)[0]))

    # is_ergodic
    def is_ergodic(self):
        return (self.is_irreducible() and self.is_aperiodic())

    # classify
    def classify(self):
        graf, succ = makeGraf(self.S,self.P)
        cfcs=list(nx.strongly_connected_components(graf))
        classes={"transitoire":[],"reccurente":[]}
        for i in range(len(cfcs)):
            cfc , voisins = cfcs[i] ,set()
            for s in cfc: voisins = voisins|succ[s]-cfc
            classes["transitoire" if(len(voisins)>0) else "reccurente"].append(cfc)
        return classes

    # hitting_time : average time to hit for the first time the state given in the argument
    def hitting_time(self,state):
        n, i = len(self.S), self.S.index(state)
        I = np.identity(n); I[i,i] = 0;
        g = np.ones(n); g[i]=0;
        for k in range(n):
            if(self.P[k,k] == 1): I[k,k], g[k] = 0, 0
        return np.matmul(np.linalg.inv(np.identity(n)-np.matmul(I,self.P)), g)

    # return_time : average return time to the state given in the argument
    def return_time(self,state):
        i = self.S.index(state)
        return 1 + ( 0 if self.P[i,i] == 1 else  np.dot(self.P[i], self.hitting_time(state)))

    # absorption probability
    def absorbing_proba(self,state):
        n, j = len(self.S) , self.S.index(state)
        absorb = [i for i in range(n) if(self.P[i][i] == 1) ]
        if(j not in absorb):
            print("The state must be absorbant")
            sys.exit(0)
        A, B = np.zeros((n,n)), np.zeros(n)
        A[j][j], B[j] = 1 , 1
        for i in range(n):
            if(i != j):
                if(i in absorb): A[i][i] = 1
                else:
                    for k in range(n):
                        A[i][k] = self.P[i][k] if(k != i) else A[i][i]-1
        return np.linalg.inv(A).dot(B)

    # average absorption time
    def absorbing_time(self):
        n = len(self.S)
        absorb = [i  for i in range(n) if(self.P[i][i] == 1)]
        if(len(absorb) == 0):
            print("No absorbant state exists!")
            sys.exit(0)
        A, B = np.zeros((n,n)), np.zeros(n)
        for i in range(n):
            if(i in absorb): A[i][i]=1
            else:
                B[i] = -1
                for j in range(n):
                    if(j not in absorb): A[i][j]=self.P[i][j]
                A[i][i] -= 1
        return np.linalg.inv(A).dot(B)

    # steady state probabilities
    def steady_prob(self):
        if(not self.is_ergodic()):return None
        n = len(self.S)
        A = np.vstack([self.P.T - np.identity(n),np.ones(n)])
        B = np.append(np.zeros(n),1)
        return np.linalg.lstsq(A,B)[0]

In [8]:
#Code402.py
#from CMTD import CMTD

# Figurine example
P = [[0.25, 0.75, 0.00, 0.00],
     [0.00, 0.50, 0.50, 0.00],
     [0.00, 0.00, 0.75, 0.25],
     [0.00, 0.00, 0.00, 1.00]]

print(CMTD(P).nSteps(3))

#______________________________   Output  ____________________________________
#[0.015625 0.328125 0.5625   0.09375 ]

[0.015625 0.328125 0.5625   0.09375 ]


In [9]:
#Code403.py

# Classification example
P = [[0.00, 1/2 , 1/2 , 0.00, 0.00, 0.00, 0.00, 0.00],
     [1/3 , 2/3 , 0.00, 0.00, 0.00, 0.00, 0.00, 0.00],
     [0.00, 0.00, 0.00, 1/4 , 0.00, 0.00, 0.00, 3/4 ],
     [0.00, 0.00, 1/2 , 0.00, 1/2 , 0.00, 0.00, 0.00],
     [0.00, 0.00, 0.00, 0.00, 0.00, 1.00, 0.00, 0.00],
     [0.00, 0.00, 0.00, 0.00, 0.00, 0.00, 1.00, 0.00],
     [0.00, 0.00, 0.00, 0.00, 0.00, 2/3 , 0.00, 1/3 ],
     [0.00, 0.00, 0.00, 0.00, 0.00, 1.00, 0.00, 0.00]]

print(CMTD(P).classify())
print(CMTD(P).is_irreducible())
#______________________________   Output  ______________________________________
#{'transitoire': [{5}, {3, 4}, {1, 2}], 'reccurente': [{8, 6, 7}]}

{'transitoire': [{5}, {3, 4}, {1, 2}], 'reccurente': [{8, 6, 7}]}
False


In [12]:
#Code404.py
#from CMTD import CMTD

P = [[0.5, 0.5, 0.0],
     [0.0, 1/3, 2/3],
     [0.5, 0.5, 0.0]]

print(CMTD(P).hitting_time(3))
print(CMTD(P).return_time(1))

#______________________________   Output  ______________________________________
# [3.5 1.5 0. ]
# 3.499999999999999

[3.5 1.5 0. ]
3.499999999999999


In [14]:
#Code405.py
#from CMTD import CMTD

P = [[1.0, 0.0, 0.0, 0.0],
     [1/3, 0.0, 2/3, 0.0],
     [0.0, 1/2, 0.0, 1/2],
     [0.0, 0.0, 0.0, 1.0]]

print(CMTD(P).absorbing_proba(1))
#______________________________   Output  ______________________________________
#[1.   0.5  0.25 0.  ]

[1.   0.5  0.25 0.  ]


In [16]:
#Code406.py
#from CMTD import CMTD

P = [[1/4, 3/4, 0.0, 0.0],
     [0.0, 1/2, 1/2, 0.0],
     [0.0, 0.0, 3/4, 1/4],
     [0.0, 0.0, 0.0, 1.0]]

print(CMTD(P).absorbing_time())

#______________________________   Output  ______________________________________
#[7.33333333 6.         4.         0.        ]

[7.33333333 6.         4.         0.        ]


In [17]:
#Code407.py
#from CMTD import CMTD

P = [[0.70, 0.10, 0.20],
     [0.50, 0.25, 0.25],
     [0.40, 0.30, 0.30]]

print(CMTD(P).steady_prob())

#______________________________   Output  ______________________________________
#[0.59602649 0.17218543 0.23178808]

[0.59602649 0.17218543 0.23178808]


  return np.linalg.lstsq(A,B)[0]


## II - <b>Continuous Time Markov Chain (CTMC)</b>

In [20]:
#CMTC.py

import sys
import numpy as np
#from CMTD import CMTD

# CMTD subclass of CMTD
class CMTC(CMTD):

    # constructor
    def __init__(self,P, lambdas, S = None, pi0 = None ):
        super().__init__(P ,S ,pi0)
        n = len(P)
        self.lambdas=np.array(lambdas)
        if((len(self.lambdas) != n)or(len(list(filter(lambda x:x<0,self.lambdas))) != 0)):
            print("lambdas should have ",n," positive values")
            sys.exit(0)
        copyP = self.P.copy()
        np.fill_diagonal(copyP,-1)
        self.Q = np.matmul(self.lambdas * np.identity(n),copyP)

    # steady state probabilities
    def steady_prob(self):
        n = len(self.S)
        A = np.vstack([self.Q.T,np.ones(n)])
        B = np.append(np.zeros(n),1)
        return np.linalg.lstsq(A,B)[0]

In [21]:
#Code408.py
#from CMTC import CMTC

p = [[0.0, 1.0, 0.0],
     [0.0, 0.0, 1.0],
     [1/2, 1/2, 0.0]]
lambdas = [2,1,3]

continuous = CMTC(p, lambdas)
print(continuous.steady_prob())

#______________________________   Output  ______________________________________
#[0.15789474 0.63157895 0.21052632]

[0.15789474 0.63157895 0.21052632]


  return np.linalg.lstsq(A,B)[0]


In [22]:
#Code409.py
#from CMTC import CMTC

p = [[0.0, 0.5, 0.5],
     [0.5, 0.0, 0.5],
     [0, 1, 0.0]]
lambdas = [2,2,2]

continuous = CMTC(p, lambdas)
print("Hitting time: ",continuous.hitting_time(3))

#______________________________   Output  ______________________________________
# Hitting time:  [1. 1.]

Hitting time:  [2. 2. 0.]


In [24]:
#Code410.py

import numpy as np
#from CMTD import CMTD

def getRankPage(G,d):
    N = len(G)
    _output = list( map(lambda x: 0 if list(x).count(1) == 0 else  1/list(x).count(1) , G))
    M = np.identity(N) - d * np.multiply(np.array(_output), G.T).T
    M = np.vstack([M.T ,np.ones(N)])
    B = np.hstack([np.ones(N)*(1-d), N])
    return np.linalg.lstsq(M,B)[0]

# Test with getRankPage
d = 0.85
A = np.array([[0, 1, 1],
              [1, 0, 0],
              [0, 1, 0]])
print('R : ', getRankPage(A,d))

# Test with the steady state computation
G = [[0.050, 0.475, 0.475],
     [0.900, 0.050, 0.050],
     [0.050, 0.900, 0.050]]
print( 'R : ', CMTD(G).steady_prob()*3)

#______________________________   Output  ______________________________________
# R : [1.16336914 1.19219898 0.64443188]
# R : [1.16336914 1.19219898 0.64443188]

R :  [1.16336914 1.19219898 0.64443188]
R :  [1.16336914 1.19219898 0.64443188]


  return np.linalg.lstsq(M,B)[0]
  return np.linalg.lstsq(A,B)[0]
