In [1]:
#Read in the extracted brown files
import glob

tagged_files = glob.glob("_extracted_brown/*.txt") #Read in the files and creates a list
print(type(tagged_files))
print(len(tagged_files)) #Should be 500

<class 'list'>
500


In [2]:
'''
Make the files into a list of a list of tuples
The tuple contains a str(word) and a set(tag(s)) 
Tag(s) because some words in the file contain more than one tag
'''
#I got help from the website where we download the extarcted brown text files
#https://kristopherkyle.github.io/Corpus-Linguistics-Working-Group/pos_tagging_1.html

#divide into sentences
full_data: list = []
for file in tagged_files:
    with open(file, 'r') as x:
        text = x.read().split("\n\n")
        for sent in text:
            sentence = []
            for word_line in sent.split("\n"):
                #Strip leading/trailing whitespace
                word_line = word_line.strip()
                
                #Skip empty lines
                if not word_line:
                    continue
                    
                # Check if split will work
                parts = word_line.split(" ", 1)
                if len(parts) != 2:
                    continue
                
                #Continue getting the word and tag(s)
                word_, pos = parts
                pos_set:set = set(pos.split("|"))
                sentence.append((word_, pos_set))
            
            if sentence:
                full_data.append(sentence)

In [3]:
#Better Sanity Check so I can see the structure
print(f"full_data type: {type(full_data)}")
print(f"Number of sentences: {len(full_data)}")

if full_data:
    first_sentence = full_data[0]
    print(f"First sentence type: {type(first_sentence)}")
    print(f"First sentence length: {len(first_sentence)}")
    
    if first_sentence:
        first_item = first_sentence[0]
        print(f"First item type: {type(first_item)}")
        print(f"First item: {first_item}")

full_data type: <class 'list'>
Number of sentences: 52108
First sentence type: <class 'list'>
First sentence length: 17
First item type: <class 'tuple'>
First item: ('In', {'IN'})


In [4]:
#HMM Model
import numpy as np
class HiddenMarkovModel:
    def __init__(self):
        #Initialize everything when I first create the Hidden Markov Model
        self.states = None
        self.observations = None
        
        #I need these states/observations to index
        #Because I need a way to calculate the probs (numpy understands integer indices, NOT strings!!!)
        self.states_to_idx = None
        self.states_to_idx = None
        
        #Make empty initial/tranmission/emission probabilities 
        #Since it's all learned during training
        self.initial_probs = None
        self.transition_probs = None
        self.emission_probs = None
        
    def train_HMM(self, training_data: list):
        """
        Trains the HMM on tagged data
        Calculates the initial, transmission, and emission probabilities
        Args:
            training_data (list): a list of a list of tuples with the words and POS tags
        """
        #Build the states and observations from the training data
        #Make them sets, since they don't allow duplication
        all_states = set()
        all_observations = set()
        for sentence in training_data:
            for word,tags in sentence:
                #Observations are based on the words
                all_observations.add(word)
                #The states are the tags
                all_states.update(tags)
        
        #Make the states and observations into lists
        self.states = list(all_states)
        self.observations = list(all_observations)
        
        #Make my state/observation index
        self.state_to_idx: dict = {state: i for i, state in enumerate(self.states)}
        self.obs_to_idx: dict = {obs: i for i, obs in enumerate(self.observations)}
        
        #initialize the empty matrices
        n_states = len(self.states)
        n_observations = len(self.observations)
        self.initial_probs = np.zeros(n_states)
        self.transition_probs = np.zeros((n_states, n_states))
        self.emission_probs = np.zeros((n_states, n_observations))
        
        #Now calculate the all the probabilities
        self.calculate_initial_probabilities(training_data)
        self.calculate_transition_probabilities(training_data)
        self.calculate_emission_probabilities(training_data)
        
        #DEBUGGING TO SEE IF IT WORKS PROPERLY
        #print("Sample transition probabilities:")
        #print(f"DT -> NN: {self.transition_probs[self.state_to_idx['DT']][self.state_to_idx['NN']]}")
        #print(f"NN -> VB: {self.transition_probs[self.state_to_idx['NN']][self.state_to_idx['VB']]}")

        #print("\nSample emission probabilities:")
        #print(f"P('The'|'DT'): {self.emission_probs[self.state_to_idx['DT']][self.obs_to_idx['The']]}")
        #print(f"P('cat'|'NN'): {self.emission_probs[self.state_to_idx['NN']][self.obs_to_idx['cat']]}")
        
    def calculate_initial_probabilities(self,training_data: list) -> np.ndarray:
        """
        Calculate the intial state probabilities P(tag|start)
        Args:
            training_data (list): a list of a list of tuples with the words and POS tags
        """
        for sentence in training_data:
            #Check to see if the sentence is empty
            if sentence:
                #Get the first words and tag(s) in the sentence
                first_word,first_tags = sentence[0]
                #Handle if the word has multiple tags
                for tag in first_tags:
                    #If the tag is in the state indec dictionary
                    if tag in self.state_to_idx:
                        tag_idx = self.state_to_idx[tag] #Forgot to add this and it lead to an error
                        #Fractional count if there's multiple tags
                        self.initial_probs[tag_idx] = self.initial_probs[tag_idx] + 1 / (len(first_tags))
    
    def calculate_transition_probabilities(self, training_data:list) -> np.ndarray:
        """
        Create the transition probability of current tag and previous tag
        P(tag i | tag i-1)
        Args:
            training_data (list): a list of a list of tuples with the words and POS tags
        """
        #Create a temporary matrix that will do all the calculations
        #Then store that into the self.transition_probability matrix
        transition_counts = np.zeros((len(self.states), len(self.states)))
        
        for sentence in training_data:
            #i in range of the entire sentence
            for i in range(1, len(sentence)):
                #Previous word and tags
                prev_word, prev_tags = sentence[i-1]
                #Current word and current tags
                current_word, current_tags = sentence[i]
                for previous_tag in prev_tags:
                    for current_tag in current_tags:
                        #If both the previous tag and the current tag are in the state index dicitonary
                        if previous_tag in self.state_to_idx and current_tag in self.state_to_idx:
                            prev_idx = self.state_to_idx[previous_tag]
                            curr_idx = self.state_to_idx[current_tag]
                            #Accidentally used + instead of *
                            transition_counts[prev_idx][curr_idx] +=  1 / (len(prev_tags) * len(current_tags))
                            
        #I need to normalize the transition matrix so it's between 0-1
        row_sums = transition_counts.sum(axis=1, keepdims=True)
        self.transition_probs = np.divide(transition_counts, row_sums, 
                                    out=np.zeros_like(transition_counts), 
                                    where=row_sums!=0)
    
    def calculate_emission_probabilities(self, training_data:list) -> np.ndarray:
        """
        Create the emission probability of the word and tag
        P(word | tag)
        Args:
            training_data (list): 
        """
        #Need a temporary matrix that does all the calculations
        #Then put it into the emission porbability matrix
        emission_counts = np.zeros((len(self.states), len(self.observations)))
        
        for sentence in training_data:
            for word, tags in sentence:
                if word in self.obs_to_idx:
                    word_idx = self.obs_to_idx[word]
                    for tag in tags:
                        if tag in self.state_to_idx:
                            tag_idx = self.state_to_idx[tag]
                            emission_counts[tag_idx][word_idx] += 1 / len(tags)
            
        #Normalize the counts into probabilities (I forgot this, which caused an issue in the code (It was more than 1))
        row_sums = emission_counts.sum(axis=1, keepdims=True)
        self.emission_probs = np.divide(emission_counts, row_sums,
                                    out=np.zeros_like(emission_counts),
                                    where=row_sums!=0)
        
    def viterbi(self, sentence: list) -> np.ndarray:
        """ My implementation of the viterbi algorithm from the textbook
        It returns the best path from the end of the sentence to the beginning
        Args:
            Sentence (list): a list of words
        """
        #Debug to see how the input is
        print(f"Input sentence: {sentence}")
     
        #Intialize the viterbi matrix and the bacpointer matrix
        viterbi = np.zeros((len(sentence), len(self.states)))
        backpointer = np.empty((len(sentence), len(self.states)))
       
        #for each state s from 1 to s
        first_word = sentence[0]
        for state_idx in range(len(self.states)):
            #make a viterbi matrix where viterbi[s][1] <- init_prob of that state * emission[state][observation[0]]
            #This is if the word is known
            if first_word in self.obs_to_idx:
                word_idx = self.obs_to_idx[first_word]
                #viterbi[first word][state] = initial prob of that state * emission[first word in the sentence]
                viterbi[0][state_idx] = self.initial_probs[state_idx] * self.emission_probs[state_idx][word_idx]
            
            #I need a way to handle unknown words
            else:
                #If the word is not known, make it 0
                viterbi[0][state_idx] = 0
            
            #Backpointer for the first word. There's no previous word so make it something to denote that
            backpointer[0][state_idx] = -1
            
            #Debugging statement to see what the initial viterbi row looks like
            #print(f"Initial viterbi row: {viterbi[0]}")
            
        #Going through my sentence (after the first word)
        for t in range(1, len(sentence)):
            #Get the index of the current word
            current_word = sentence[t]
            #See if the current word's index exists
            current_word_idx = self.obs_to_idx.get(current_word)
            
            #Go through every state besides the first word
            for current_state in range(len(self.states)):
                #Need variables to find which previous states gives us the max probability
                max_prob = -1
                best_prev_state = -1
                #Need to go through the previous states
                for prev_state in range(len(self.states)):
                    #The probability of the viterbi[previous word][previous state] * transition probability matrix[previous state][current state] * emission probability matrix[current state][word index]
                    prob = viterbi[t-1][prev_state] * self.transition_probs[prev_state][current_state] * self.emission_probs[current_state][current_word_idx]
                    
                    if prob > max_prob:
                        max_prob = prob #make the current probability the new max probability
                        best_prev_state = prev_state #make the current previous state the best previous state
                
                #After checking all the previous states, store the max probability adn the best previous state
                #Into the viterbi and the backpointer prespectively        
                viterbi[t][current_state] = max_prob
                
                # Debug statement to see what viterbi looks like after each time step
                #print(f"Viterbi at time {t}: {viterbi[t]}")
                
                backpointer[t][current_state] = best_prev_state
                         
        #Backtracking now
        #Get the last word of the sentence
        last_word = len(sentence) - 1
        #Get the best state for the last word with the argmax of the viterbi matrix
        best_last_state = np.argmax(viterbi[last_word])
        #Make a best path array with type int
        bestpath = np.zeros(len(sentence), dtype=int)
        #Make the best path of the last word the best last state
        bestpath[last_word] = best_last_state
        #Start from the second to last word and end at the beginning of the sentence
        #n-2, n-3, ..., 0
        for t in range(len(sentence)-2, -1, -1):
            bestpath[t] = backpointer[t+1][bestpath[t+1]]
            
        #Return the best path and the best path's probability
        return bestpath
    
    def predict(self, sentence: list) -> list:
        """
        Predict the part-of-speech tags for each word in the sentence
        Args:
            sentence (list): a list of words the HMM predicts
        Returns:
            a list of tuples (word, and predicted tag)
        """
        #Use the viterbi function
        tag_indices = self.viterbi(sentence)
        
        #Convert indices to actual tag names
        predicted_tags = [self.states[idx] for idx in tag_indices]
        
        #Pair words with predicted tags
        return list(zip(sentence, predicted_tags))
        

In [5]:
#Send in my list to train the model
hmm = HiddenMarkovModel()
hmm.train_HMM(full_data)

In [6]:
#A sample test Set for the HMM
#A few short sentences
test_sentence1 = ["The", "cat", "sat"]
test_sentence2 = ["Mark", "will", "pay", "the", "bill", "soon"]
test_sentence3 = ["I", "know", "how", "watch", "after", "a", "dog"]
test_sentence4 = ["I", "am", "so", "tired", "."]

possible_unknown1 = ["The", "hidden", "markov", "model", "is", "working", "well", "."]
possible_unknown2 = ["Computer", "science", "is", "cool", "but", "very", "hard", "."]
#A two long ones
test_sentence_long = ["The", "police", "department", "said", "that", "the", "suspect", "has", "been", "apprehended", "today", ",", "they", "hope", "justice", "will", "be", "served", "."]
test_sentence_long2 = ["Today", "the", "studio", "announced", "that", "the", "new", "film", "will", "be", "about", "a", "girl", "who", "is", "transported", "to", "another", "world", "."]

predicted_tags1 = hmm.predict(test_sentence1)
print("HMM prediction of first sentence: ", predicted_tags1)
#Originally: HMM prediction of first sentence:  [('The', 'NPS'), ('cat', 'NPS'), ('sat', 'NPS')] - predicted it as NPs for some reason (Error with probability matrices)
#Fixed it issue: HMM prediction of first sentence:  [('The', 'DT'), ('cat', 'NN'), ('sat', 'VBD')]

predicted_tags2 = hmm.predict(test_sentence2)
print("HMM prediction of second sentence: ", predicted_tags2)
#HMM prediction of second sentence:  [('Mark', 'NNP'), ('will', 'MD'), ('pay', 'VB'), ('the', 'DT'), ('bill', 'NN'), ('soon', 'RB')]

predicted_tags3 = hmm.predict(test_sentence3)
print("HMM prediction of third sentence: ", predicted_tags3)
#HMM prediction of third sentence:  [('I', 'PRP'), ('know', 'VBP'), ('how', 'WRB'), ('watch', 'NN'), ('after', 'IN'), ('a', 'DT'), ('dog', 'NN')]

predicted_tags4 = hmm.predict(test_sentence4)
print("HMM prediction of fourth sentence: ", predicted_tags4)
#HMM prediction of fourth sentence:  [('I', 'PRP'), ('am', 'VBP'), ('so', 'RB'), ('tired', 'VBN'), ('.', '.')]

predicted_long_tags1 = hmm.predict(test_sentence_long)
print("HMM prediction of first long sentence: ", predicted_long_tags1)
#HMM prediction of first long sentence:  [('The', 'DT'), ('police', 'NN'), ('department', 'NN'), ('said', 'VBD'), ('that', 'IN'), ('the', 'DT'), ('suspect', 'NN'), ('has', 'VBZ'), ('been', 'VBN'), ('apprehended', 'VBN'), ('today', 'RB'), (',', ','), ('they', 'PRP'), ('hope', 'VBP'), ('justice', 'NN'), ('will', 'MD'), ('be', 'VB'), ('served', 'VBN'), ('.', '.')]

predicted_long_tags2 = hmm.predict(test_sentence_long2)
print("HMM prediction of second long sentence: ", predicted_long_tags2)
#HMM prediction of second long sentence:  [('Today', 'RB'), ('the', 'DT'), ('studio', 'NN'), ('announced', 'VBD'), ('that', 'IN'), ('the', 'DT'), ('new', 'JJ'), ('film', 'NN'), ('will', 'MD'), ('be', 'VB'), ('about', 'IN'), ('a', 'DT'), ('girl', 'NN'), ('who', 'WP'), ('is', 'VBZ'), ('transported', 'VBN'), ('to', 'TO'), ('another', 'DT'), ('world', 'NN'), ('.', '.')]

Input sentence: ['The', 'cat', 'sat']
HMM prediction of first sentence:  [('The', 'DT'), ('cat', 'NN'), ('sat', 'VBD')]
Input sentence: ['Mark', 'will', 'pay', 'the', 'bill', 'soon']
HMM prediction of second sentence:  [('Mark', 'NNP'), ('will', 'MD'), ('pay', 'VB'), ('the', 'DT'), ('bill', 'NN'), ('soon', 'RB')]
Input sentence: ['I', 'know', 'how', 'watch', 'after', 'a', 'dog']
HMM prediction of third sentence:  [('I', 'PRP'), ('know', 'VBP'), ('how', 'WRB'), ('watch', 'NN'), ('after', 'IN'), ('a', 'DT'), ('dog', 'NN')]
Input sentence: ['I', 'am', 'so', 'tired', '.']
HMM prediction of fourth sentence:  [('I', 'PRP'), ('am', 'VBP'), ('so', 'RB'), ('tired', 'VBN'), ('.', '.')]
Input sentence: ['The', 'police', 'department', 'said', 'that', 'the', 'suspect', 'has', 'been', 'apprehended', 'today', ',', 'they', 'hope', 'justice', 'will', 'be', 'served', '.']
HMM prediction of first long sentence:  [('The', 'DT'), ('police', 'NN'), ('department', 'NN'), ('said', 'VBD'), ('that', 'IN'), ('th

In [7]:
#Feature engineering for the extracted brown files
#I got help from the geeks2geeks website
#https://www.geeksforgeeks.org/nlp/conditional-random-fields-crfs-for-pos-tagging-in-nlp/
def word_features(sentence, i):
    word = sentence[i][0]
    pos_tag = sentence[i][1]
    features = {
        'word': word,
        'pos' : pos_tag,
        'is_first': i == 0, #if the word is a first word
        'is_last': i == len(sentence) - 1,  #if the word is a last word
        'is_capitalized': word[0].upper() == word[0],
        'is_all_caps': word.upper() == word,      #word is in uppercase
        'is_all_lower': word.lower() == word,      #word is in lowercase
         #prefix of the word
        'prefix-1': word[0],   
        'prefix-2': word[:2],
        'prefix-3': word[:3],
         #suffix of the word
        'suffix-1': word[-1],
        'suffix-2': word[-2:],
        'suffix-3': word[-3:],
         #extracting previous word
        'prev_word': '' if i == 0 else sentence[i-1][0],
         #extracting next word
        'next_word': '' if i == len(sentence)-1 else sentence[i+1][0],
        'prev_pos': '' if i == 0 else sentence[i-1][1],  # Previous word's POS tag
        'next_pos': '' if i == len(sentence)-1 else sentence[i+1][1],  # Next word's POS tag
        'has_hyphen': '-' in word,    #if word has hypen
        'is_numeric': word.isdigit(),  #if word is in numeric
        'capitals_inside': word[1:].lower() != word[1:]
    }
    return features

In [8]:
X = []
y = []
for sentence in full_data:
    X_sentence = []
    y_sentence = []
    #Go through every sentence in the full data list
    for i in range(len(sentence)):
        #Append the word features into the X_sentence
        #print(f"Sentence[i][0]: {sentence[i][0]}") 
        X_sentence.append(word_features(sentence,i))
        #print(f"Sentence[i][1] is: {sentence[i][1]}")
        y_sentence.append(sentence[i][1])
        
    #Append the sentences into the original list
    X.append(X_sentence)
    y.append(y_sentence)
    
#Split the extracted files (80% training, 20% testing)
split = int(0.8 * len(X))
#Get every word,tag up to 80% of the orignal X and y
X_train = X[:split]
y_train = y[:split]
#Get the remaining 20% of the original X and y
X_test = X[split:]
y_test = y[split:]

In [9]:
#check the size of the training and test sets
print(f"The length of the x_train is : {len(X_train)}")
print(f"The length of the y_train is : {len(y_train)}")
print(f"The length of the X_test is : {len(X_test)}")
print(f"The length of the y_test is : {len(y_test)}")

The length of the x_train is : 41686
The length of the y_train is : 41686
The length of the X_test is : 10422
The length of the y_test is : 10422


In [None]:
#CRF
import numpy as np
import math
from collections import defaultdict

#Need to modify to handle more than one tag
class LinearChainConditionalRandomField():
    '''
    My implementation of linear-chain Conditional Random Field
    '''
    def __init__(self, learning_rate=0.01, max_iter = 50, l2_penalty = 0.1):
        self.learning_rate = learning_rate
        self.max_iter = max_iter
        self.l2_penalty = l2_penalty
        self.weights = defaultdict(float) #Use the defaultdict to make a dictionary fo floats for the weights
        self.all_labels = set() #A set for all the possible POS tags

    def log_sum_exp(self, log_values):
        """
            Compute the log sum of exponential values
        Args:
            log_values (float): the log values
        """
        if not log_values:
            return -float('inf')
        max_val = max(log_values)
        if max_val == -float('inf'):
            return -float('inf')
        return max_val + math.log(sum(math.exp(x - max_val) for x in log_values))
    
    def convert_features(self, features_dict, label) -> dict:
        """
        Converts existing features to feature vector with label
        Args:
            feature_dict(dict): a dictionary of the features
            label(): the label of the 
        Returns:
            dict: a feature vector containing the feature's name, value, and label
        """
        feature_vector = {}

        #Only use the most important features to avoid explosion
        #I decided to make it the word, previous word, next word and the tags for those words
        important_features = ['word', 'prev_word', 'next_word', 'prev_pos', 'next_pos']
        
        #See if the most important features exist
        for feature_name in important_features:
            if feature_name in features_dict:
                value = features_dict[feature_name]
                if value and isinstance(value, str):
                    feature_vector[f"{feature_name}_{value}_{label}"] = 1
        
        # Add a few other features
        if features_dict.get('is_capitalized'):
            feature_vector[f"cap_{label}"] = 1
        if features_dict.get('is_numeric'):
            feature_vector[f"num_{label}"] = 1
        if features_dict.get('has_hyphen'):
            feature_vector[f"hyphen_{label}"] = 1
            
        return feature_vector
    def compute_transition_features(self, prev_label, current_label):
        """
        Compute the transition features of the previous tag
        and the current tag
        Args:
            prev_label (_type_): _description_
            current_label (_type_): _description_

        Returns:
            dictionary: a dictionary containing the transition from previous label to current label
        """
        return {f"trans_{prev_label}_{current_label}": 1}
    
    def compute_score(self, sequence_features, labels):
        """
        Compute score using the precomputed features
        Args:
            sequence_features ():
            labels ():
        Returns:
            float: the score of the emission and transition features
        """
        score = 0.0
        
        # First word
        first_features = self.convert_features(sequence_features[0], labels[0])
        for feat, value in first_features.items():
            score += self.weights[feat] * value
            
        # Remaining words with transitions
        for i in range(1, len(sequence_features)):
            # Emission features
            emission_features = self.convert_features(sequence_features[i], labels[i])
            for feat, value in emission_features.items():
                score += self.weights[feat] * value
                
            # Transition features
            transition_features = self.compute_transition_features(labels[i-1], labels[i])
            for feat, value in transition_features.items():
                score += self.weights[feat] * value
                
        return score
    
    def forward_algorithm_log(self, sequence_features, possible_labels):
        """
        Forward algorithm for computing partition function
        This is for the training part of the model
        Args:
            sequence_features ():
            possible_labels ():
        Returns:
            list[defaultdict]: contains alpha[observations][labels]
            float: the partition function or sum of all the alpha values
        """
        T = len(sequence_features)
        alpha_log = [defaultdict(float) for _ in range(T)]
        
        # Initialize first position
        for label in possible_labels[0]:
            features = self.convert_features(sequence_features[0], label)
            score = sum(self.weights[feat] * value for feat, value in features.items())
            alpha_log[0][label] = score  # Store log-scores directly
        
        # Recursion in log-space
        for t in range(1, T):
            for current_label in possible_labels[t]:
                log_scores = []
                for prev_label in possible_labels[t-1]:
                    # Emission + transition score
                    emission_features = self.convert_features(sequence_features[t], current_label)
                    emission_score = sum(self.weights[feat] * value for feat, value in emission_features.items())
                    
                    transition_features = self.compute_transition_features(prev_label, current_label)
                    transition_score = sum(self.weights[feat] * value for feat, value in transition_features.items())
                    
                    total_score = alpha_log[t-1][prev_label] + emission_score + transition_score
                    log_scores.append(total_score)
                
                alpha_log[t][current_label] = self.log_sum_exp(log_scores)
        
        # Partition function is log_sum_exp of final alphas
        log_Z = self.log_sum_exp(list(alpha_log[T-1].values()))
        return alpha_log, log_Z
    
    def backward_algorithm_log(self, sequence_features, possible_labels) -> list:
        """
        Backward algorithm
        Args:
            sequence_features ():
            possible_labels ():
        Returns:
            list[defaultdict]: a list of the observation and labels
        """
        T = len(sequence_features)
        beta_log = [defaultdict(float) for _ in range(T)]
        
        # Initialize last position
        for label in possible_labels[T-1]:
            beta_log[T-1][label] = 0.0  # log(1) = 0
        
        # Recursion backwards
        for t in range(T-2, -1, -1):
            for current_label in possible_labels[t]:
                log_scores = []
                for next_label in possible_labels[t+1]:
                    emission_features = self.convert_features(sequence_features[t+1], next_label)
                    emission_score = sum(self.weights[feat] * value for feat, value in emission_features.items())
                    
                    transition_features = self.compute_transition_features(current_label, next_label)
                    transition_score = sum(self.weights[feat] * value for feat, value in transition_features.items())
                    
                    total_score = beta_log[t+1][next_label] + emission_score + transition_score
                    log_scores.append(total_score)
                
                beta_log[t][current_label] = self.log_sum_exp(log_scores)
                
        return beta_log
    
    def compute_marginals_log(self, sequence_features, possible_labels):
        """
        Compute marginal probabilities using forward-backward
        Args:
            sequence_features ():
            possible_labels ():
        Returns:
            list[defaultdict] : the node marginals, probability of each label at each position P(y_t | X)
            list[defaultdict] : the edge marginals, probability of each label transition P(y_{t-1}, y_t | X)
        """
        T = len(sequence_features)
        alpha_log, log_Z = self.forward_algorithm_log(sequence_features, possible_labels)
        beta_log = self.backward_algorithm_log(sequence_features, possible_labels)
        
        # Node marginals in log-space, then convert to probabilities
        node_marginals = [defaultdict(float) for _ in range(T)]
        for t in range(T):
            for label in possible_labels[t]:
                log_prob = alpha_log[t][label] + beta_log[t][label] - log_Z
                node_marginals[t][label] = math.exp(log_prob) if math.isfinite(log_prob) else 0.0
        
        # Edge marginals
        edge_marginals = [defaultdict(lambda: defaultdict(float)) for _ in range(T-1)]
        for t in range(T-1):
            for prev_label in possible_labels[t]:
                for current_label in possible_labels[t+1]:
                    emission_features = self.convert_features(sequence_features[t+1], current_label)
                    emission_score = sum(self.weights[feat] * value for feat, value in emission_features.items())
                    
                    transition_features = self.compute_transition_features(prev_label, current_label)
                    transition_score = sum(self.weights[feat] * value for feat, value in transition_features.items())
                    
                    log_prob = (alpha_log[t][prev_label] + emission_score + 
                            transition_score + beta_log[t+1][current_label] - log_Z)
                    edge_marginals[t][prev_label][current_label] = math.exp(log_prob) if math.isfinite(log_prob) else 0.0
        
        return node_marginals, edge_marginals
    
    def viterbi_decode(self, sequence_features, possible_labels):
        """Viterbi algorithm for finding the most likely label sequence"""
        T = len(sequence_features)
        delta = [defaultdict(float) for _ in range(T)]
        psi = [defaultdict(str) for _ in range(T)]
        
        # Initialize
        for label in possible_labels[0]:
            features = self.convert_features(sequence_features[0], label)
            delta[0][label] = sum(self.weights[feat] * value for feat, value in features.items())
            psi[0][label] = None
        
        # Recursion
        for t in range(1, T):
            for current_label in possible_labels[t]:
                best_score = -float('inf')
                best_prev_label = None
                
                for prev_label in possible_labels[t-1]:
                    # Emission features
                    emission_features = self.convert_features(sequence_features[t], current_label)
                    emission_score = sum(self.weights[feat] * value for feat, value in emission_features.items())
                    
                    # Transition features
                    transition_features = self.compute_transition_features(prev_label, current_label)
                    transition_score = sum(self.weights[feat] * value for feat, value in transition_features.items())
                    
                    score = delta[t-1][prev_label] + emission_score + transition_score
                    
                    if score > best_score:
                        best_score = score
                        best_prev_label = prev_label
                
                delta[t][current_label] = best_score
                psi[t][current_label] = best_prev_label
        
        # Backtrack
        best_path = [None] * T
        best_score = -float('inf')
        
        # Find best final label
        for label in possible_labels[T-1]:
            if delta[T-1][label] > best_score:
                best_score = delta[T-1][label]
                best_path[T-1] = label
        
        # Backtrack through the sequence
        for t in range(T-2, -1, -1):
            best_path[t] = psi[t+1][best_path[t+1]]
        
        return best_path, best_score
    
    def fit(self, X_train:list, y_train:list):
        """Train the CRF model
        Args:
            X_train (list): the words in the training set
            y_train (list): the part-of-speech tag(s) for each word
        """
        #Train with numerical stability fixes
        # Collect all possible labels
        for sentence_labels in y_train:
            for label_set in sentence_labels:
                self.all_labels.update(label_set)
        
        print(f"Training with {len(self.all_labels)} unique labels")
        
        # Convert y_train from sets to lists for training
        y_train_single = []
        for sentence_labels in y_train:
            sentence_single = [next(iter(tag_set)) for tag_set in sentence_labels]
            y_train_single.append(sentence_single)
        
        
        print(f"Using {len(X_train)} sequences for stable training")
        
        for iteration in range(self.max_iter):
            total_loss = 0.0
            grad = defaultdict(float)
            grad_norm = 0.0
            
            for sequence_features, true_labels in zip(X_train, y_train):
                possible_labels = [self.all_labels for _ in range(len(sequence_features))]
                
                try:
                    # Use log-space algorithms for stability
                    node_marginals, edge_marginals = self.compute_marginals_log(sequence_features, possible_labels)
                    
                    # Compute empirical features from true labels
                    empirical_features = defaultdict(float)
                    
                    # First position
                    first_features = self.convert_features(sequence_features[0], true_labels[0])
                    for feat, value in first_features.items():
                        empirical_features[feat] += value
                    
                    # Remaining positions with transitions
                    for i in range(1, len(sequence_features)):
                        emission_features = self.convert_features(sequence_features[i], true_labels[i])
                        for feat, value in emission_features.items():
                            empirical_features[feat] += value
                        
                        transition_features = self.compute_transition_features(true_labels[i-1], true_labels[i])
                        for feat, value in transition_features.items():
                            empirical_features[feat] += value
                    
                    # Compute expected features from marginals
                    expected_features = defaultdict(float)
                    
                    # Node features (emission)
                    for t in range(len(sequence_features)):
                        for label, prob in node_marginals[t].items():
                            features = self.convert_features(sequence_features[t], label)
                            for feat, value in features.items():
                                expected_features[feat] += prob * value
                    
                    # Edge features (transition)
                    for t in range(len(sequence_features)-1):
                        for prev_label in possible_labels[t]:
                            for current_label in possible_labels[t+1]:
                                prob = edge_marginals[t][prev_label][current_label]
                                trans_features = self.compute_transition_features(prev_label, current_label)
                                for feat, value in trans_features.items():
                                    expected_features[feat] += prob * value
                    
                    # Update gradient with clipping
                    for feat in set(empirical_features.keys()) | set(expected_features.keys()):
                        update = (empirical_features[feat] - expected_features[feat])
                        grad[feat] += update
                        grad_norm += update * update
                    
                    # Compute loss
                    true_score = self.compute_score(sequence_features, true_labels)
                    alpha_log, log_Z = self.forward_algorithm_log(sequence_features, possible_labels)
                    
                    if math.isfinite(log_Z):
                        log_likelihood = true_score - log_Z
                        total_loss -= log_likelihood
                    else:
                        total_loss += 100.0  # Penalty for numerical issues
                        
                except Exception as e:
                    print(f"Error in sequence: {e}")
                    continue
            
            # Gradient clipping
            #Necessary to handle vanishing/exploding gradient
            grad_norm = math.sqrt(grad_norm)
            if grad_norm > 1.0:
                clip_factor = 1.0 / grad_norm
                for feat in grad:
                    grad[feat] *= clip_factor
            
            # Update weights with regularization and clipping
            for feat in grad:
                # L2 regularization
                grad[feat] -= self.l2_penalty * self.weights[feat]
                self.weights[feat] += self.learning_rate * grad[feat]
                
                # Weight clipping to prevent explosion
                if abs(self.weights[feat]) > 10.0:
                    self.weights[feat] = math.copysign(10.0, self.weights[feat])
            
            avg_loss = total_loss / len(X_train) if X_train else 0.0
            print(f"Iteration {iteration}, Loss: {avg_loss:.4f}")
            
            # Early stopping if loss becomes NaN
            # Prevent epxloding/vanishing gradient
            if math.isnan(avg_loss):
                print("Loss became NaN - stopping early")
                break
            
            # Reduce learning rate
            self.learning_rate *= 0.95
    
    def predict(self, X_test):
        """Simple prediction
        Args:
            X_test(list): a list of words
        Returns:
            list: a list of the possible tags for each word
        """
        predictions = []
        
        for sequence_features in X_test:
            possible_labels = [self.all_labels for _ in range(len(sequence_features))]
            best_path, _ = self.viterbi_decode(sequence_features, possible_labels)
            predictions.append(best_path)
        
        return predictions
    
    def evaluate(self, X_test, y_test):
        """Evaluate accuracy on test set
        Args:
            X_test (list): test set of words
            y_test (list)
        Return:
            float: the accuracy of the CRF model
        """
        predictions = self.predict(X_test)
        correct = 0
        total = 0
        
        for pred_seq, true_seq in zip(predictions, y_test): #For the predicted sequence and the true sequence
            for pred_label, true_set in zip(pred_seq, true_seq): #For the predicted label and the true sequence
                if pred_label in true_set:  # Check if prediction is in possible tags
                    correct += 1
                total += 1
        
        accuracy = correct / total if total > 0 else 0
        print(f"Accuracy: {accuracy:.4f} ({correct}/{total})")
        return accuracy


In [14]:

crf = LinearChainConditionalRandomField(learning_rate=0.05, max_iter=15)
crf.fit(X_train, y_train)
crf.evaluate(X_test, y_test)


Training with 48 unique labels
Using 41686 sequences for stable training


KeyboardInterrupt: 

In [None]:
#Need to read in the GMB_Dataset
file_path = './GMB_dataset.txt'
with open(file_path, 'r') as gmb_data:
    

In [None]:
#Feature engineering necessary for the CRF
#Modified to capture the two previous words and the next two words 
def word_features(sentence, i):
    word = sentence[i][0]
    pos_tag = sentence[i][1]
    
    features = {
        'word': word,
        'pos': pos_tag,
        'is_first': i == 0,
        'is_last': i == len(sentence) - 1,
        'is_capitalized': word[0].upper() == word[0],
        'is_all_caps': word.upper() == word,
        'is_all_lower': word.lower() == word,
        'prefix-1': word[0],
        'prefix-2': word[:2],
        'prefix-3': word[:3],
        'suffix-1': word[-1],
        'suffix-2': word[-2:],
        'suffix-3': word[-3:],
        'prev_word': '' if i == 0 else sentence[i-1][0],
        'next_word': '' if i == len(sentence)-1 else sentence[i+1][0],
        'prev_pos': '' if i == 0 else sentence[i-1][1],
        'next_pos': '' if i == len(sentence)-1 else sentence[i+1][1],
        'prev_prev_pos': '' if i <= 1 else sentence[i-2][1],  # Two words back POS
        'next_next_pos': '' if i >= len(sentence)-2 else sentence[i+2][1],  # Two words ahead POS
        'pos_bigram': f"{'' if i == 0 else sentence[i-1][1]}_{pos_tag}",  # POS bigram with previous word
        'has_hyphen': '-' in word,
        'is_numeric': word.isdigit(),
        'capitals_inside': word[1:].lower() != word[1:]
    }
    return features

In [None]:
#CRF for Named Entity Recognition (NER)

class ConditionalRandomFieldNer:
    def __init__(self, learning_rate=0.01, max_iterations=50):
        self.learning_rate = learning_rate
        self.max_iterations = max_iterations
        
    def fit(self, X_train, y_train):
    def predict(self, X_train):
        predictions = []
        return predictions
    def model_evaluation(self, X_test, y_test):
        precision = 0.0
        recall = 0.0
        accuracy = 0.0
        return precision, recall, accuracy

In [None]:
#test sentences for the CRFs
CRF_test_sentence1 = ["She", "likes", "to", "read","books"]
CRF_test_sentence2 = []
CRF_test_sentence3 = []
CRF_test_sentence4 = []