# Markov Chains

### Markov Chain

In [18]:
import numpy as np

class MarkovChain:
    def __init__(self, A, labs = None):
        
        import numpy as np
        
        l = len(A)
        
        if labs == None:
            self.labs = [str(i) for i in range(l)]
        else:
            self.labs = labs
            
        self.A = np.array(A)
        
        if not all(map(lambda x: x == 1, self.A.sum(axis=0))):
            raise ValueError('The matrix is not column stochastic.')
        
        self.lab_dic = dict(zip(self.labs, [i for i in range(l)]))

In [19]:
A = [[0.7, 0.6], [0.3, 0.4]]

In [20]:
weather_chain = MarkovChain(A, labs=["hot", "cold"])

In [21]:
weather_chain.lab_dic

{'hot': 0, 'cold': 1}

In [22]:
weather_chain.lab_dic['hot']

0

In [23]:
weather_chain.A

array([[0.7, 0.6],
       [0.3, 0.4]])

### Adding transition

In [24]:
class MarkovChain:
    def __init__(self, A, labs = None):
        
        import numpy as np
        
        l = len(A)
        
        if labs == None:
            self.labs = [str(i) for i in range(l)]
        else:
            self.labs = labs
            
        self.A = np.array(A)
        
        if not all(map(lambda x: x == 1, self.A.sum(axis=0))):
            raise ValueError('The matrix is not column stochastic.')
        
        self.lab_dict = dict(zip(self.labs, [i for i in range(l)]))
        
    def transition(self, lab_el):
        
        #lab_el is the current state
        _col = self.lab_dict[lab_el]
        _new_state = np.argmax(np.random.multinomial(1, (self.A[:, int(_col)].flatten())))
        return [k for k,v in self.lab_dict.items() if v == _new_state][0]

In [25]:
A = [[0.7, 0.6], [0.3, 0.4]]
weather_chain = MarkovChain(A, labs=["hot", "cold"])
weather_chain.transition("hot")

'cold'

### Adding walk and path

In [26]:
class MarkovChain:
    
    def __init__(self, A, labs = None):
        
        l = len(A)
        
        if labs == None:
            self.labs = [str(i) for i in range(l)]
        else:
            self.labs = labs
            
        self.A = np.array(A)
        
        if not all(map(lambda x: x == 1, self.A.sum(axis=0))):
            raise ValueError('The matrix is not column stochastic.')
        
        self.lab_dict = dict(zip(self.labs, [i for i in range(l)]))
        
    def transition(self, lab_el):
        
        #lab_el is the current state
        _col = self.lab_dict[lab_el]
        _new_state = np.argmax(np.random.multinomial(1, (self.A[:, int(_col)].flatten())))
        return [k for k,v in self.lab_dict.items() if v == _new_state][0]
    
    def walk(self, start_lab, integer):
        state_list = [start_lab]
        i = 0
        while i < integer:
            next_state = self.transition(start_lab)
            state_list.append(next_state)
            start_lab = next_state
            i += 1
            
        return state_list
    
    def path(self, start_lab, stop_lab):

        state_list = [start_lab]
        next_state = None
        i = 0
        
        while next_state != stop_lab:
            next_state = self.transition(start_lab)
            state_list.append(next_state)
            start_lab = next_state
            i += 1
            
        return state_list

In [27]:
A = [[0.7, 0.6], [0.3, 0.4]]
weather_chain = MarkovChain(A, labs=["hot", "cold"])
weather_chain.walk("cold", 10)

['cold',
 'hot',
 'cold',
 'cold',
 'cold',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'cold']

In [28]:
A = [[0.7, 0.6], [0.3, 0.4]]
weather_chain = MarkovChain(A, labs=["hot", "cold"])
weather_chain.path("cold", "cold")

['cold',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'hot',
 'cold']

### Adding a steady state finding method

In [29]:
from scipy.linalg import norm

class MarkovChain:
    
    def __init__(self, A, labs = None):
        
        self.l = len(A)
        
        if labs == None:
            self.labs = [str(i) for i in range(self.l)]
        else:
            self.labs = labs
            
        self.A = np.array(A)
        
        if not all(map(lambda x: x == 1, self.A.sum(axis=0))):
            raise ValueError('The matrix is not column stochastic.')
        
        self.lab_dict = dict(zip(self.labs, [i for i in range(self.l)]))
        
    def transition(self, lab_el):
        
        #lab_el is the current state
        _col = self.lab_dict[lab_el]
        _new_state = np.argmax(np.random.multinomial(1, (self.A[:, int(_col)].flatten())))
        return [k for k,v in self.lab_dict.items() if v == _new_state][0]
    
    def walk(self, start_lab, integer):
        state_list = [start_lab]
        i = 0
        while i < integer:
            next_state = self.transition(start_lab)
            state_list.append(next_state)
            start_lab = next_state
            i += 1
            
        return state_list
    
    def path(self, start_lab, stop_lab):

        state_list = [start_lab]
        next_state = None
        i = 0
        
        while next_state != stop_lab:
            next_state = self.transition(start_lab)
            state_list.append(next_state)
            start_lab = next_state
            i += 1
            
        return state_list
    
    def steady_state(self, tol, maxiter):
        
        x0 = np.random.rand(self.l)
        x1 = self.A @ x0
        k = 0
        

        while norm(x1 - x0, 1) > tol or k < maxiter:

            x1 = self.A @ x0
            x0 = x1
            k += 1

        if k == maxiter-1:
            raise ValueError('The Markov Chain does not converge!')
        else:
            return x1

In [30]:
A = np.array([[.7, .6],[.3, .4]])
markov = MarkovChain(A, labs=["hot", "cold"])

In [31]:
ss = markov.steady_state(0.0001, 100000)

In [32]:
A @ ss == ss # Steady state found

array([ True,  True])

In [33]:
ss

array([1.0204292, 0.5102146])

In [34]:
A @ ss # Cool!

array([1.0204292, 0.5102146])