In [1]:
pip install nltk

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.3.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [14]:
import numpy as np
import pandas as pd
import random
from sklearn.model_selection import train_test_split, KFold
import pprint, time
import nltk  # pip install nltk

# Download the required NLTK corpora
nltk.download('brown')
nltk.download('universal_tagset')

# Load the Brown corpus with universal POS tags
pos_tagged_sentences = nltk.corpus.brown.tagged_sents(tagset='universal')

# Convert to a list for easier manipulation and inspection
tagged_corpus = list(pos_tagged_sentences)

# Preprocessing function for tagged sentences
def preprocess_tagged_sentences(tagged_corpus):
    processed_sentences = []
    for sentence in tagged_corpus:
        modified_sentence = [('SOS', 'SOS')]
        for word, tag in sentence:
            if word == '.':
                modified_sentence.append((word, 'EOS'))
            else:
                modified_sentence.append((word, tag))
        if not (sentence and sentence[-1][0] == '.'):
            modified_sentence.append(('EOS', 'EOS'))
        processed_sentences.append(modified_sentence)
    return processed_sentences

[nltk_data] Downloading package brown to
[nltk_data]     C:\Users\annan\AppData\Roaming\nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to
[nltk_data]     C:\Users\annan\AppData\Roaming\nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


In [15]:
processed_corpus = preprocess_tagged_sentences(tagged_corpus)

In [16]:
processed_corpus[:1]

[[('SOS', 'SOS'),
  ('The', 'DET'),
  ('Fulton', 'NOUN'),
  ('County', 'NOUN'),
  ('Grand', 'ADJ'),
  ('Jury', 'NOUN'),
  ('said', 'VERB'),
  ('Friday', 'NOUN'),
  ('an', 'DET'),
  ('investigation', 'NOUN'),
  ('of', 'ADP'),
  ("Atlanta's", 'NOUN'),
  ('recent', 'ADJ'),
  ('primary', 'NOUN'),
  ('election', 'NOUN'),
  ('produced', 'VERB'),
  ('``', '.'),
  ('no', 'DET'),
  ('evidence', 'NOUN'),
  ("''", '.'),
  ('that', 'ADP'),
  ('any', 'DET'),
  ('irregularities', 'NOUN'),
  ('took', 'VERB'),
  ('place', 'NOUN'),
  ('.', 'EOS')]]

In [17]:
def train_HMM(train_data):
    # Flatten the training dataset into a list of word-tag pairs
    train_word_tag_pairs = [pair for sentence in train_data for pair in sentence]

    # Extract unique tags and words
    unique_tags = list({tag for _, tag in train_word_tag_pairs})
    unique_words = list({word for word, _ in train_word_tag_pairs})

    # Precompute tag and word-tag frequencies
    tag_freq = {}
    word_tag_freq = {}
    for word, tag in train_word_tag_pairs:
        if tag not in tag_freq:
            tag_freq[tag] = 0
        if (word, tag) not in word_tag_freq:
            word_tag_freq[(word, tag)] = 0
        tag_freq[tag] += 1
        word_tag_freq[(word, tag)] += 1

    # Precompute transition probabilities
    tag_list = list(unique_tags)
    tag_index = {tag: idx for idx, tag in enumerate(tag_list)}
    transition_matrix = np.zeros((len(unique_tags), len(unique_tags)), dtype='float32')

    prev_tag = train_word_tag_pairs[0][1]
    for i in range(1, len(train_word_tag_pairs)):
        current_tag = train_word_tag_pairs[i][1]
        if prev_tag in tag_index and current_tag in tag_index:
            transition_matrix[tag_index[prev_tag], tag_index[current_tag]] += 1
        prev_tag = current_tag

    for i in range(len(tag_list)):
        total = np.sum(transition_matrix[i, :])
        if total > 0:
            transition_matrix[i, :] /= total

    transition_df = pd.DataFrame(transition_matrix, columns=tag_list, index=tag_list)

    # Observation probabilities
    word_index = {word: idx for idx, word in enumerate(unique_words)}
    tag_index = {tag: idx for idx, tag in enumerate(unique_tags)}

    observation_matrix = np.zeros((len(unique_tags), len(unique_words)), dtype='float32')
    for word, tag in train_word_tag_pairs:
        if word in word_index and tag in tag_index:
            observation_matrix[tag_index[tag], word_index[word]] += 1

    for i in range(len(unique_tags)):
        total = np.sum(observation_matrix[i, :])
        if total > 0:
            observation_matrix[i, :] /= total

    observation_df = pd.DataFrame(observation_matrix, columns=unique_words, index=unique_tags)

    return transition_df, observation_df

In [18]:
def Viterbi(words, transition_df, observation_df):
    unique_tags = transition_df.index.tolist()
    n = len(words)
    viterbi = [{}]
    backpointer = [{}]
    total_tags = len(unique_tags)
    smoothing = 1

    for tag in unique_tags:
        transition_p = transition_df.loc['SOS', tag] 
        emission_p = observation_df.loc[tag, words[1]] if words[1] in observation_df.columns else 0.001
        viterbi[0][tag] = transition_p * emission_p
        backpointer[0][tag] = 'SOS'

    for t in range(2, n):
        viterbi.append({})
        backpointer.append({})
        for tag in unique_tags:
            #laplace smoothing to take care of unseen words
            max_prob, best_prev_tag = max(
                (viterbi[t-2][prev_tag] * ((transition_df.loc[prev_tag, tag] + smoothing) / (transition_df.loc[prev_tag].sum() + smoothing * total_tags)) * 
                (observation_df.loc[tag, words[t]] if words[t] in observation_df.columns else 0.001), prev_tag)
                for prev_tag in unique_tags
            )
            viterbi[t-1][tag] = max_prob
            backpointer[t-1][tag] = best_prev_tag

    best_sequence = []
    last_tag = max(viterbi[-1], key=viterbi[-1].get)
    best_sequence.append(last_tag)

    for t in range(n-1, 0, -1):
        last_tag = backpointer[t-1][last_tag]
        best_sequence.append(last_tag)

    best_sequence.reverse()
    return list(zip(words, best_sequence))

In [19]:
def evaluate_viterbi_on_test_data(test_data, transition_df, observation_df):
    correct_tags = 0
    total_tags = 0
    all_true_tags = []
    all_predicted_tags = []
    num_sentences = len(test_data)
    print_interval = 2000

    for i, sentence in enumerate(test_data):
        words = [word for word, _ in sentence]
        true_tags = [tag for _, tag in sentence]
        tagged_seq = Viterbi(words, transition_df, observation_df)
        predicted_tags = [tag for _, tag in tagged_seq]
        correct_tags += sum(p == t for p, t in zip(predicted_tags, true_tags))
        total_tags += len(true_tags)

        all_true_tags.extend(true_tags)
        all_predicted_tags.extend(predicted_tags)

        if (i + 1) % print_interval == 0 or (i + 1) == num_sentences:
            print(f'Processed {i + 1}/{num_sentences} sentences')

    accuracy = correct_tags / total_tags if total_tags > 0 else 0
    return accuracy * 100, all_true_tags, all_predicted_tags

In [20]:
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold
from sklearn.metrics import precision_recall_fscore_support

def cross_validate_hmm(tagged_corpus, n_splits=5):
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    accuracies = []
    all_precision = []
    all_recall = []
    all_f1 = []
    all_true_tags = []
    all_predicted_tags = []

    for fold, (train_idx, test_idx) in enumerate(kf.split(tagged_corpus)):
        print(f"Fold {fold + 1}")
        train_data = [tagged_corpus[i] for i in train_idx]
        test_data = [tagged_corpus[i] for i in test_idx]

        # Train HMM
        transition_df, observation_df = train_HMM(train_data)

        # Evaluate on test data
        accuracy, true_tags, predicted_tags = evaluate_viterbi_on_test_data(test_data, transition_df, observation_df)

        # Calculate precision, recall, f1-score
        precision, recall, f1, _ = precision_recall_fscore_support(true_tags, predicted_tags, average=None, labels=sorted(set(true_tags)))
        # Accumulate scores
        all_precision.append(precision)
        all_recall.append(recall)
        all_f1.append(f1)

        # Accumulate true and predicted tags for confusion matrix
        all_true_tags.extend(true_tags)
        all_predicted_tags.extend(predicted_tags)

        print(f'Accuracy for fold {fold + 1}: {accuracy:.2f}%')
        accuracies.append(accuracy)
     
    avg_precision = np.mean(all_precision, axis=0)
    avg_recall = np.mean(all_recall, axis=0)
    avg_f1 = np.mean(all_f1, axis=0)
    avg_accuracy = np.mean(accuracies)
    print(f'\nAverage Accuracy over {n_splits} folds: {avg_accuracy:.2f}%')

    labels = sorted(set(all_true_tags))  # Get all unique tags
    conf_matrix = confusion_matrix(all_true_tags, all_predicted_tags, labels=labels)

    # Convert to DataFrame for better readability
    conf_matrix_df = pd.DataFrame(conf_matrix, index=labels, columns=labels)

    # Calculate per-POS accuracy
    per_pos_accuracy = {}
    for i, tag in enumerate(labels):
        true_positives = conf_matrix[i, i]
        total_relevant = np.sum(conf_matrix[i, :])
        per_pos_accuracy[tag] = true_positives / total_relevant if total_relevant > 0 else 0

    # Calculate average precision, recall, F1-score
    overall_precision, overall_recall, overall_f1, _ = precision_recall_fscore_support(
        all_true_tags, all_predicted_tags, average='macro'
    )
    
    # Calculate F2 score (beta=2) and F0.5 score (beta=0.5)
    _, _, f2_score, _ = precision_recall_fscore_support(
        all_true_tags, all_predicted_tags, average='macro', beta=2
    )

    _, _, f0_5_score, _ = precision_recall_fscore_support(
        all_true_tags, all_predicted_tags, average='macro', beta=0.5
    )

    return conf_matrix_df, per_pos_accuracy, avg_precision, avg_recall, avg_f1, overall_precision, overall_recall, overall_f1, f2_score, f0_5_score

# Run 5-fold cross-validation

In [21]:
conf_matrix_df, per_pos_accuracy, avg_precision, avg_recall, avg_f1, overall_precision, overall_recall, overall_f1, f2_score, f0_5_score =cross_validate_hmm(processed_corpus)
print("Average Confusion Matrix:\n", conf_matrix_df)
print("Per-POS Accuracy:\n", per_pos_accuracy)
print("\nAverage Precision per POS tag:\n", dict(zip(conf_matrix_df.index, avg_precision)))
print("\nAverage Recall per POS tag:\n", dict(zip(conf_matrix_df.index, avg_recall)))
print("\nAverage F1-Score per POS tag:\n", dict(zip(conf_matrix_df.index, avg_f1)))
print(f"\nOverall Precision: {overall_precision:.4f}")
print(f"Overall Recall: {overall_recall:.4f}")
print(f"Overall F1-Score: {overall_f1:.4f}")
print(f"Overall F2-Score: {f2_score:.4f}")
print(f"Overall F0.5-Score: {f0_5_score:.4f}")

Fold 1
Processed 2000/11468 sentences
Processed 4000/11468 sentences
Processed 6000/11468 sentences
Processed 8000/11468 sentences
Processed 10000/11468 sentences
Processed 11468/11468 sentences
Accuracy for fold 1: 94.36%
Fold 2
Processed 2000/11468 sentences
Processed 4000/11468 sentences
Processed 6000/11468 sentences
Processed 8000/11468 sentences
Processed 10000/11468 sentences
Processed 11468/11468 sentences
Accuracy for fold 2: 94.40%
Fold 3
Processed 2000/11468 sentences
Processed 4000/11468 sentences
Processed 6000/11468 sentences
Processed 8000/11468 sentences
Processed 10000/11468 sentences
Processed 11468/11468 sentences
Accuracy for fold 3: 94.45%
Fold 4
Processed 2000/11468 sentences
Processed 4000/11468 sentences
Processed 6000/11468 sentences
Processed 8000/11468 sentences
Processed 10000/11468 sentences
Processed 11468/11468 sentences
Accuracy for fold 4: 94.36%
Fold 5
Processed 2000/11468 sentences
Processed 4000/11468 sentences
Processed 6000/11468 sentences
Processe

In [5]:
transition_df, observation_df = train_HMM(processed_corpus)



In [6]:
import pandas as pd

# Assuming transition_df and observation_df are computed as before
# Save the transition and observation matrices to CSV files
transition_df.to_csv('transition_matrix.csv')
observation_df.to_csv('observation_matrix.csv')