### Import libraries and set up random number generator

In [1]:
import numpy as np
from numpy.random import default_rng
seed = 23
rng = default_rng(seed)

### Text Generator Class
* All the code is in this class
* Everything is explained in the comments

In [42]:
'''
Overview:
        This Class contains all the functions needed for creating a markov chain from a corpus of
        text and generating new text from the created markov chain. It also adds an option for 
        increasing the history to be greater than one so that we consider multiple words when
        generating a new word. While this does break the markov property, it can create more
        realistic text and was an interesting problem to explore.

Usage:  The markov chain must be created from a corpus of text which will be passed
        in through a .txt file. The user can also specify the desired history size to
        consider when generating text. Once the TextGenerator object is created, we must
        first parse the text into tokens using the text_to_list() function. Then we can pass
        these tokens into the gen_word_dist() function to create the markov chain which is
        just a nested dictionary. We don't create a matrix since most of the elements will
        be zero anyways and this allows for a more intuitive representation. This step can
        be thought of as 'training', we are filling in all the state transition probabilities.
        Finally, we call the generate_text() function which steps through the markov chain
        using the get_next_state() function. This will output the final generated text in a 
        readable (might be gibberish) form.
'''

class TextGenerator:
    '''
    Description: This function is the constructor.

    Input: txt_file_path - Relative path to the txt file to generate the markov chain
           history_size - the number of tokens per element (only history of size 1 is valid markov chain).
    '''
    def __init__(self,txt_file_path,history_size):
        self.txt_file_path = txt_file_path
        self.history_size = history_size

        # list of each text token including punctuation in order they appear
        self.ordered_token_list = []

        # list of each text sequence (for history > 1) in order they appear
        self.ordered_sequence_list = []


    '''
    Description: This function reads thr .txt file

    Input: none

    Returns: - text from the .txt file as a string
    '''
    def get_text(self):
        with open(self.txt_file_path,encoding='utf-8') as f:
            self.contents = f.read()
        return self.contents
    

    '''
    Description: This function takes a string of text and turns it into a list where
                 each element represents  a single text 'token' including punctuation
                 (These are the states of the markov chain). If desired, we can also
                 split the text into multitoken elements so that we can consider a
                 history of tokens (this breaks the markov property but is interesting
                 to explore). 

    Input: text - A string of text

    Returns: - A list of individual tokens elements in chronological order,
             - A list of all the multitoken elements in chronological order
               (None if history size is 1).
    '''
    def text_to_list(self,text):
        # insert spaces so we can use the split function while keeping punctuation
        text = text.replace(". "," . ").replace("! "," ! ").replace("? "," ? ").replace(", "," , ")\
                .replace("; "," ; ").replace(": "," : ").replace("\"","").replace(".\n"," . <NL> ")\
                .replace(",\n"," , <NL> ").replace("!\n"," ! <NL> ").replace("?\n"," ? <NL> ")\
                .replace(":\n"," : <NL> ")

        # replace special characters with tokens
        text = text.replace("\t", " <TAB>").replace("\n", " <NL> ")

        # now split the text using a space as the delimeter
        self.ordered_token_list = text.split()

        # if each token is a state then we are done
        if self.history_size == 1:
            return self.ordered_token_list, None

        # for multitoken sequences, combine items in the list
        idx = 0
        while idx < len(self.ordered_token_list):
            # Don't go over the list length
            if idx + self.history_size-1 >= len(self.ordered_token_list):
                break

            # combine items based on history size
            self.ordered_sequence_list.append(' '.join(self.ordered_token_list[idx : idx + self.history_size]))
            idx += 1

        return self.ordered_token_list, self.ordered_sequence_list 


    '''
    Description: This function takes the ordered text list and creates
                 a markov chain representation from it. Each token represents a state 
                 and the token immediately following represents a potential
                 next state. We can represent this as a nested dictionary where at
                 the first level we have all the unique words in the corpus (i.e. the states)
                 and at the second level we have all the potential next states and
                 their relative probability of being transitioned to.
                
                 When the history size is greater than one, we will use multiple words to 
                 determine the next word. This technically breaks the markov property but
                 will enable more realistic text by considering the history of words instead
                 of only the current one since in sentences each word correlates with more
                 than just the word before it.

    Input: token_list - An ordered list of individual text tokens.
           multitoken_list - List of multitoken elements for history_size > 1
           all_states - Bool of whether to include all possible outgoing states (even with prob = 0)

    Returns: A nested dictionary representing the Markov Chain. In the case where the
             history size is greater than 1, the first level will be all the unique
             multitoken elements and the second level will be the possible next individual words.
    '''
    def gen_word_dist(self, token_list, multitoken_list=None,all_states=False):
        # save in case we want to visualize the stat graph
        self.all_states = all_states

        # case when only consider the current token, markov property is true
        if self.history_size == 1:
            # create the first level from the unique tokens (all the states)
            unique_tokens = set(token_list)
            self.text_dict = dict.fromkeys(unique_tokens)

            # create a nested dictionary for each unique token (all the outgoing states)
            for token in self.text_dict.keys():
                if all_states:
                    self.text_dict[token] = dict.fromkeys(unique_tokens,0)
                else:
                    self.text_dict[token] = {}
            
            # now add the words that follow each unique token
            # where the key is the following word and the value is the count
            for idx,token in enumerate(token_list[1:]):
                try: # try to increment the count of the token
                    self.text_dict[token_list[idx]][token] += 1
                except KeyError: # otherwise set it as the first occurence
                    self.text_dict[token_list[idx]][token] = 1

            # now we convert the counts to probabilities
            for state in self.text_dict.keys():
                total = sum(self.text_dict[state].values())
                for out_state in self.text_dict[state].keys():
                    self.text_dict[state][out_state] = self.text_dict[state][out_state]/total
                    
            return self.text_dict
        
        # case when past tokens are considered, markov property is broken
        else:
            # create the first level from the unique tokens (all the states)
            unique_sequences = set(multitoken_list)
            self.text_dict = dict.fromkeys(unique_sequences)

            # create a nested dictionary for each unique token (all the outgoing states)
            for sequence in self.text_dict.keys():
                self.text_dict[sequence] = {}
            
            # now add the words that follow each unique sequence
            # where the key is the following word and the value is the count
            token_idx = self.history_size
            for idx,sequence in enumerate(multitoken_list[:-1]):
                try: # try to increment the count of the token
                    self.text_dict[multitoken_list[idx]][token_list[token_idx]] += 1
                except KeyError: # otherwise set it as the first occurence
                    self.text_dict[multitoken_list[idx]][token_list[token_idx]] = 1
                token_idx += 1

            # now we convert the counts to probabilities
            for state in self.text_dict.keys():
                total = sum(self.text_dict[state].values())
                for out_state in self.text_dict[state].keys():
                    self.text_dict[state][out_state] = self.text_dict[state][out_state]/total
                    
            return self.text_dict


    '''
    Description: This function will take a given state in the markov chain and
                 select the next state probabilistically.

    Input: The current state as a dictionary.

    Returns: The next state.
    '''
    def get_next_state(self,current_state):
        # get next states that branch from current one, and their relative probabilities
        out_states = list(current_state.keys())
        probs = list(current_state.values())
        
        # make sure probabilities sum to 1
        probs[0] += 1-sum(probs)

        # choose a state randomly based on probabilities
        return rng.choice(a=out_states,size=1,p=probs)


    '''
    Description: This function will step through the generated markov chain
                 to produce sentences based on the input parameters. It will
                 format the text as well.

    Input: The first word (must be in the corpus), the number of desired sentences,
           the markov chain dictionary

    Returns: The formatted output text.
    '''
    def generate_text(self,first_word,num_sentences,markov_chain):
        if self.history_size == 1:
            # get the state from the desired first word
            curr_state = markov_chain[first_word]

            # start the text sequence
            text = [first_word]

            sentence_count = 0
            # keep adding words until we reach the sentence count
            while sentence_count < num_sentences:
                # get the next word
                next_state = self.get_next_state(curr_state)

                # add this word to the text sequence (only get the string)
                text.append(next_state.tolist()[0])

                # set the current state to the next state (only get the string)
                curr_state = markov_chain[next_state[0]]

                # check if the sentence ended
                if next_state[0] == '.' or next_state[0] == '?' or next_state[0] == '!':
                    sentence_count += 1

            # format the text sequence into a sentence
            text_string = " ".join(text)
            text_string = text_string.replace(" . ",". ").replace(" , ",", ").replace(" ; ","; ").replace(" ! ","! ").replace(" ? ","? ").replace(" : ",": ")
            text_string = text_string.replace("<TAB> ","\t").replace("<NL> ","\n")
            text_string_final = text_string[:-2]+text_string[-1]
            return text_string_final
        else:
            # get the state from the desired first word
            curr_state = markov_chain[first_word]

            # start the text sequence
            text = first_word.split()

            sentence_count = 0
            token_idx = self.history_size
            # keep adding words until we reach the sentence count
            while sentence_count < num_sentences:
                # get the next word
                next_word = self.get_next_state(curr_state)

                # add this word to the text sequence (only get the string)
                text.append(next_word.tolist()[0])

                # sometimes get weird case where key doesn't exist
                try:
                    curr_state = markov_chain[" ".join(text[token_idx-self.history_size+1:token_idx+1])]

                except KeyError:
                    print("==== Exception ====")
                    print(token_idx,text)
                    print("\n\n")
                    print(text[token_idx-self.history_size+1])
                    print("\n\n")
                    print(text[token_idx-self.history_size+1:token_idx+1])
                    break
                
                # check if there is no next state
                if curr_state == {}:
                    curr_state = markov_chain[first_word]

                # check if the sentence ended
                if next_word[0] == '.' or next_word[0] == '?' or next_word[0] == '!':
                    sentence_count += 1
                
                token_idx += 1

            # format the text sequence into a sentence
            text_string = " ".join(text)
            text_string = text_string.replace(" . ",". ").replace(" , ",", ").replace(" ; ","; ").replace(" ! ","! ").replace(" ? ","? ").replace(" : ",": ")
            text_string = text_string.replace("<TAB> ","\t").replace("<NL> ","\n")
            text_string_final = text_string[:-2]+text_string[-1]
            return text_string_final


    '''
    Description: This function will generate a state transition plot from the dictionary.
                 This will only work for a history size of 1 since this is the only
                 valid markov chain.

    Input: state_dict - output of gen_word_dist()

    Returns: State Transition Matrix, state_labels
    '''
    def generate_transition_graph(self,state_dict):
        # regenrate graph with all states
        if self.all_states == False:
            state_dict = self.gen_word_dist(self.ordered_token_list,None,True)
        
        # generate transition matrix
        states = list(state_dict.keys())
        dim = len(states)
        state_transition_matrix = np.zeros((dim,dim))
        for row in range(dim):
            state_transition_matrix[row,:] = np.fromiter(state_dict[states[row]].values(), dtype=float)

        return state_transition_matrix, states

# Testing the functionality of each function

### Testing the text_to_list() function

In [38]:
# create a dummy object for testing
history = 1
gen = TextGenerator("",history)

# sample corpus
text = "My name is Geffen Cooper. What is yours? "

# show the list output
gen.text_to_list(text)

(['My', 'name', 'is', 'Geffen', 'Cooper', '.', 'What', 'is', 'yours', '?'],
 None)

### Testing the gen_word_dist() function

In [44]:
# create a dummy object for testing
history = 1
gen = TextGenerator("",history)

# sample corpus
text = "He is very tall. He is very happy. "
tl, ml = gen.text_to_list(text)

gen.gen_word_dist(tl,ml)

{'very': {'tall': 0.5, 'happy': 0.5},
 '.': {'He': 1.0},
 'He': {'is': 1.0},
 'tall': {'.': 1.0},
 'is': {'very': 1.0},
 'happy': {'.': 1.0}}

# Testing the text generation

In [20]:
# --- parameters ---
history = 2
file = "1789-04-30-first-inaugural-address.txt"
file = "email.txt"
first_word = "Hi all"
num_sentences = 2

# create the generator from a certain corpus
gen = TextGenerator(file,history)

# get the raw text
raw_text = gen.get_text()

# parse the text
token_list, sequence_list = gen.text_to_list(raw_text)

# generate the markov chain
dist = gen.gen_word_dist(token_list,sequence_list)

# generate the text
print(gen.generate_text(first_word,num_sentences,dist))


Hi all, 

---- 
Hi all, 

Hope that you can now submit it by Tuesday 11:59 PM. As you know, there can be at the level of labs 2-4 so basically design a new lab with report; 2) read and study a related machine learning research paper and write a summary in a report or survey paper discussing the algorithm, results, strengths and weaknesses.


In [79]:
def print_array(arr):
    """
    prints a 2-D numpy array in a nicer format
    """
    for a in arr:
        for elem in a:
            print("{} ".format(elem).rjust(3), end="")
        print(end="\n")


In [116]:
# create a dummy object for testing
history = 1
gen = TextGenerator("",history)

# sample corpus
text = "Hello! How is it going? My name is Geffen Cooper. What is your name? "
tl, ml = gen.text_to_list(text)

d = gen.gen_word_dist(tl,ml,True)

mat,labels = gen.generate_transition_graph(d)
print_array(mat)
label_str = "["
for n in labels:
    label_str += '"{}"'.format(n) + ","
print(label_str[:-1] + "]")

0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 
1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 
0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.5 0.0 0.0 0.0 0.0 0.5 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 
0.0 0.0 0.3333333333333333 0.0 0.3333333333333333 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.3333333333333333 
0.0 0.0 0.0 0.0 0.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 
0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1.0 0.0 
["!","Hello","it","going","your",".","?","name","My","How","What","is","Cooper","Geffen"]
