In [48]:
import os
import re
import nltk
import string
import random
import numpy as np
import scipy.linalg
import pandas as pd
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('punkt')

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/franciscofurey/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

# Differents approaches of Markov Chains

### Using as example the menu of a restauran (burger, pizza and hotdog)

In [49]:
# State representation
def get_states() -> dict:
    """Return the mapping of state indices to state names."""
    return {
        0: "Burger",
        1: "Pizza",
        2: "Hotdog"
    }

# Transition Matrix
def get_transition_matrix() -> np.ndarray:
    """Return the transition matrix A."""
    return np.array([[0.2, 0.6, 0.2], [0.3, 0.0, 0.7], [0.5, 0.0, 0.5]])


In [50]:
def random_walk(start_state: int, steps: int) -> None:
    """Perform a random walk on the Markov chain given a start state and number of steps."""
    
    A = get_transition_matrix()  # Retrieve the transition matrix for the Markov chain
    state = get_states()  # Get the possible states in the Markov chain
    curr_state = start_state  # Initialize the current state with the start state
    print(state[curr_state], "--->", end=" ")  # Print the start state
    
    while steps - 1:  # Continue the walk until the desired number of steps is reached
        curr_state = np.random.choice([0, 1, 2], p=A[curr_state])  # Choose the next state based on the current state's transition probabilities
        print(state[curr_state], "--->", end=" ")  # Print the current state after each step
        steps -= 1  # Decrement the steps counter
    print("stop")  # Indicate that the walk has stopped



In [51]:
def monte_carlo(start_state: int, steps: int) -> None:
    """Estimate steady state probabilities using the Monte Carlo approach."""
    
    A = get_transition_matrix()  # Retrieve the transition matrix for the Markov chain
    curr_state = start_state  # Initialize the current state with the start state
    pi = np.array([0, 0, 0])  # Initialize a vector to count the visits to each state
    pi[start_state] = 1  # Start by marking the initial state as visited once

    for i in range(steps):  # Iterate through the number of steps
        curr_state = np.random.choice([0, 1, 2], p=A[curr_state])  # Choose the next state based on current state's transition probabilities
        pi[curr_state] += 1  # Increment the count for the visited state

    print("π = ", pi / steps)  # Calculate and print the estimated steady state probabilities by normalizing the counts



In [52]:
def repeated_matrix_multiplication(steps: int) -> None:
    """Calculate steady state probabilities using repeated matrix multiplication."""
    
    A = get_transition_matrix()  # Retrieve the transition matrix for the Markov chain
    A_n = A  # Initialize A_n with the transition matrix A for multiplication

    for i in range(steps):  # Loop through the specified number of steps
        A_n = np.matmul(A_n, A)  # Multiply the current A_n by A to get the next A_n

    print("A^n = \n", A_n, "\n")  # Print the matrix after n multiplications
    print("π = ", A_n[0])  # The first row of A^n approximates the steady state probabilities



In [53]:
def find_left_eigen_vectors() -> None:
    """Find and display left eigen vectors and their normalized steady state probabilities."""
    
    A = get_transition_matrix()  # Retrieve the transition matrix for the Markov chain
    values, left = scipy.linalg.eig(A, right=False, left=True)  # Compute the left eigenvectors and eigenvalues of the transition matrix A

    print("left eigen vectors = \n", left, "\n")  # Print the left eigenvectors
    print("eigen values = \n", values)  # Print the eigenvalues

    pi = left[:, 0]  # Extract the first left eigenvector, which corresponds to the steady state
    pi_normalized = [(x / np.sum(pi)).real for x in pi]  # Normalize the first left eigenvector to get steady state probabilities
    
    print("Normalized π = ", pi_normalized)  # Print the normalized steady state probabilities
    return pi_normalized  # Return the normalized steady state probabilities



In [54]:
def find_prob(seq: list, A: np.ndarray, pi: list) -> float:
    """
    Calculate the probability of a given sequence of states in a Markov chain.

    Parameters:
    - seq: The sequence of states.
    - A: The transition matrix of the Markov chain.
    - pi: The initial state probabilities.

    Returns:
    - The probability of observing the given sequence of states.
    """
    
    start_state = seq[0]  # Extract the starting state from the sequence
    prob = pi[start_state]  # Initialize the probability with the initial probability of the start state
    
    prev_state = start_state  # Set the previous state as the start state for iteration
    for i in range(1, len(seq)):  # Iterate through the sequence starting from the second state
        curr_state = seq[i]  # Current state in the sequence
        prob *= A[prev_state][curr_state]  # Multiply the current probability by the transition probability from the previous state to the current state
        prev_state = curr_state  # Update the previous state for the next iteration
    
    return prob  # Return the calculated probability of the sequence


In [55]:
# Initial setup
states = get_states()
transition_matrix = get_transition_matrix()

# Perform a random walk
random_walk(start_state=0, steps=15)

# Monte Carlo approach
monte_carlo(start_state=0, steps=10**6)

# Repeated matrix multiplication
repeated_matrix_multiplication(steps=10**3)

# Find left eigen vectors and their normalized steady state probabilities
pi_normalized = find_left_eigen_vectors()

# Calculate probability for a sequence
sequence_prob = find_prob(seq=[1, 2, 2, 0], A=transition_matrix, pi=pi_normalized)
print("Sequence Probability: ", sequence_prob)


Burger ---> Pizza ---> Burger ---> Pizza ---> Hotdog ---> Burger ---> Burger ---> Burger ---> Pizza ---> Hotdog ---> Hotdog ---> Burger ---> Pizza ---> Burger ---> Pizza ---> stop
π =  [0.352207 0.211071 0.436723]
A^n = 
 [[0.35211268 0.21126761 0.43661972]
 [0.35211268 0.21126761 0.43661972]
 [0.35211268 0.21126761 0.43661972]] 

π =  [0.35211268 0.21126761 0.43661972]
left eigen vectors = 
 [[-0.58746336+0.j         -0.16984156-0.35355339j -0.16984156+0.35355339j]
 [-0.35247801+0.j          0.67936622+0.j          0.67936622-0.j        ]
 [-0.72845456+0.j         -0.50952467+0.35355339j -0.50952467-0.35355339j]] 

eigen values = 
 [ 1.  +0.j        -0.15+0.3122499j -0.15-0.3122499j]
Normalized π =  [0.3521126760563379, 0.2112676056338029, 0.43661971830985913]
Sequence Probability:  0.036971830985915506


# Sherlock Holmes Example ussing Markov Chains

In [56]:
# Define the path to the directory containing the stories.
story_path = './data/sherlock/sherlock/'

def read_all_stories(story_path: str) -> list:
    """
    Reads all text files from a specified directory, stopping at a delimiter or an empty line.
    
    Parameters:
        story_path (str): The path to the directory containing story files.
    
    Returns:
        list: A list of lines from all files, excluding delimiter lines and empty lines.
    """
    lines = []
    for _, _, files in os.walk(story_path):
        for file in files:
            with open(os.path.join(story_path, file)) as f:
                for line in f:
                    line = line.strip()
                    if line == '----------': break
                    if line:
                        lines.append(line)
    return lines
        
stories = read_all_stories(story_path=story_path)
print("Number of lines = ", len(stories))

def clean_txt(txt: list) -> list:
    """
    Cleans a list of text lines by converting to lowercase, removing punctuation, and tokenizing.
    
    Parameters:
        txt (list): A list of text lines to be cleaned.
    
    Returns:
        list: A list of cleaned and tokenized words.
    """
    cleaned_words = []
    for line in txt:
        line = line.lower()
        line = re.sub(r"[,.\"\'!@#$%^&*(){}?/;`~:<>+=-\\\[\]]", "", line)
        tokens = word_tokenize(line)
        # Filter out tokens that are not alphabetic
        words = [word for word in tokens if word.isalpha()]
        cleaned_words += words
    return cleaned_words

cleaned_stories = clean_txt(txt=stories)
print("Number of words = ", len(cleaned_stories))


Number of lines =  215021
Number of words =  2332247


In [57]:
def make_markov_model(cleaned_stories: list, n_gram: int = 2) -> dict:
    """
    Constructs a Markov model from a list of words.

    This function creates a dictionary representing the transitions between n-gram states
    to the next possible states along with their probabilities.

    Parameters:
        cleaned_stories (list): A list of cleaned and tokenized words from stories.
        n_gram (int): The number of words in the state used for the Markov model.

    Returns:
        dict: A dictionary representing the Markov model where keys are current states
              and values are dictionaries of next possible states and their probabilities.
    """
    markov_model = {}  # Initialize an empty dictionary for the Markov model

    # Loop through the list of words to populate the Markov model
    for i in range(len(cleaned_stories) - n_gram - 1):
        curr_state, next_state = "", ""
        # Construct the current and next state by concatenating words
        for j in range(n_gram):
            curr_state += cleaned_stories[i+j] + " "
            next_state += cleaned_stories[i+j+n_gram] + " "
        curr_state = curr_state[:-1]  # Remove the trailing space
        next_state = next_state[:-1]

        # If the current state is not in the model, add it
        if curr_state not in markov_model:
            markov_model[curr_state] = {next_state: 1}
        else:
            # If the next state exists, increment; otherwise, add it with a count of 1
            markov_model[curr_state].setdefault(next_state, 0)
            markov_model[curr_state][next_state] += 1
    
    # Calculate transition probabilities for each state
    for curr_state, transition in markov_model.items():
        total = sum(transition.values())
        for state, count in transition.items():
            markov_model[curr_state][state] = count / total  # Convert counts to probabilities
        
    return markov_model

# Example usage
markov_model = make_markov_model(cleaned_stories=cleaned_stories)
print("Number of states = ", len(markov_model.keys()))
print("All possible transitions from 'the game' state: \n")
if 'the game' in markov_model:
    print(markov_model['the game'])
else:
    print("The state 'the game' does not exist in the model.")

Number of states =  208717
All possible transitions from 'the game' state: 

{'is up': 0.06306306306306306, 'is and': 0.036036036036036036, 'was afoot': 0.036036036036036036, 'for the': 0.036036036036036036, 'was whist': 0.036036036036036036, 'would have': 0.036036036036036036, 'in their': 0.036036036036036036, 'was up': 0.09009009009009009, 'in that': 0.036036036036036036, 'the lack': 0.036036036036036036, 'for all': 0.06306306306306306, 'is afoot': 0.036036036036036036, 'was in': 0.02702702702702703, 'is hardly': 0.02702702702702703, 'may wander': 0.02702702702702703, 'now a': 0.02702702702702703, 'my own': 0.02702702702702703, 'at any': 0.02702702702702703, 'mr holmes': 0.02702702702702703, 'ay whats': 0.02702702702702703, 'my friend': 0.02702702702702703, 'fairly by': 0.02702702702702703, 'is not': 0.02702702702702703, 'was not': 0.02702702702702703, 'worth it': 0.02702702702702703, 'you are': 0.02702702702702703, 'i am': 0.02702702702702703, 'now count': 0.02702702702702703, 'your

In [58]:
def generate_story(markov_model: dict, limit: int = 100, start: str = 'my god') -> str:
    """
    Generates a story based on a given Markov model, starting from a specified state.

    Parameters:
        markov_model (dict): The Markov model to use for generating the story.
        limit (int): The maximum number of words in the generated story.
        start (str): The starting state (words) for the story.

    Returns:
        str: A string representing the generated story.
    """
    n = 0  # Initialize word counter
    curr_state = start  # Set the current state to the starting words
    story = curr_state + " "  # Initialize the story with the starting state

    # Generate the story up to the word limit
    while n < limit and curr_state in markov_model:
        # Choose the next state based on the distribution in the Markov model
        next_state = random.choices(
            population=list(markov_model[curr_state].keys()),
            weights=list(markov_model[curr_state].values()),
            k=1  # Choose one next state
        )[0]

        story += next_state + " "  # Append the next state to the story
        curr_state = next_state  # Update the current state to the next state
        n += 1  # Increment word counter

    return story.strip()  # Return the story, removing any trailing space

# Example usage: Generate and print 20 short stories
for i in range(20):
    story = generate_story(markov_model=markov_model, start="dear holmes", limit=8)
    print(f"{i}. {story}")

print("-------------------")
print("A longer story:")
print(generate_story(markov_model, start="the case", limit=100))


0. dear holmes it is again he had lit his pipe to emphasize each curious episode in comparison with
1. dear holmes said i it is curious is it not been for my dear fellow he is when
2. dear holmes my previous letters and having applied the rules of this said he in a cab if
3. dear holmes i ejaculated oh there can be little doubt that the faintest shadow of a sentence he
4. dear holmes what do you deduce that this good gentleman mr holmes i wished to live in order
5. dear holmes i fear that youll let me advise you as to the admiralty chief secretary of the
6. dear holmes he has gone come with me you must admit that what you were after two days
7. dear holmes i exclaimed you will send for your friend this was raving insanity he shuddered and again
8. dear holmes am i allowed to cooperate with him and wasnt it natural also that i should study
9. dear holmes oh yes you have heard of there you may be of some club friends opposite to
10. dear holmes that i can add anything which will cover th

# DATACAMP example

In [3]:
# The statespace
states = ["Sleep","Icecream","Run"]

# Possible sequences of events
transitionName = [["SS","SR","SI"],["RS","RR","RI"],["IS","IR","II"]]

# Probabilities matrix (transition matrix)
transitionMatrix = [[0.2,0.6,0.2],[0.1,0.6,0.3],[0.2,0.7,0.1]]

In [4]:
if sum(transitionMatrix[0]) + sum(transitionMatrix[1]) + sum(transitionMatrix[1]) != 3:
    print("Somewhere, something went wrong. Transition matrix, perhaps?")
else: 
    print("All is gonna be okay, you should move on!! ;)")    

All is gonna be okay, you should move on!! ;)


In [5]:
# A function that implements the Markov model to forecast the state/mood. 
def activity_forecast(days):
    # Choose the starting state
    activityToday = "Sleep"
    print("Start state: " + activityToday)
    # Shall store the sequence of states taken. So, this only has the starting state for now.
    activityList = [activityToday]
    i = 0
    # To calculate the probability of the activityList
    prob = 1
    while i != days:
        if activityToday == "Sleep":
            change = np.random.choice(transitionName[0],replace=True,p=transitionMatrix[0])
            if change == "SS":
                prob = prob * 0.2
                activityList.append("Sleep")
                pass
            elif change == "SR":
                prob = prob * 0.6
                activityToday = "Run"
                activityList.append("Run")
            else:
                prob = prob * 0.2
                activityToday = "Icecream"
                activityList.append("Icecream")
        elif activityToday == "Run":
            change = np.random.choice(transitionName[1],replace=True,p=transitionMatrix[1])
            if change == "RR":
                prob = prob * 0.5
                activityList.append("Run")
                pass
            elif change == "RS":
                prob = prob * 0.2
                activityToday = "Sleep"
                activityList.append("Sleep")
            else:
                prob = prob * 0.3
                activityToday = "Icecream"
                activityList.append("Icecream")
        elif activityToday == "Icecream":
            change = np.random.choice(transitionName[2],replace=True,p=transitionMatrix[2])
            if change == "II":
                prob = prob * 0.1
                activityList.append("Icecream")
                pass
            elif change == "IS":
                prob = prob * 0.2
                activityToday = "Sleep"
                activityList.append("Sleep")
            else: 
                prob = prob * 0.7
                activityToday = "Run"
                activityList.append("Run")
        i += 1  
    print("Possible states: " + str(activityList))
    print("End state after "+ str(days) + " days: " + activityToday)
    print("Probability of the possible sequence of states: " + str(prob))

# Function that forecasts the possible state for the next 2 days
activity_forecast(2)

Start state: Sleep
Possible states: ['Sleep', 'Run', 'Icecream']
End state after 2 days: Icecream
Probability of the possible sequence of states: 0.18
