<a href="https://colab.research.google.com/github/hidrochin/IT4772-NLP/blob/main/notebooks/Brown_Corpus_POS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import nltk
nltk.download('brown')

[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Unzipping corpora/brown.zip.


True

In [None]:
from nltk.corpus import brown

# Get sentences from the 'news' category
sentences = brown.sents(categories='news')

# Print the first 5 sentences
for i, sentence in enumerate(sentences[:5]):
    print(f"Sentence {i+1}: {' '.join(sentence)}")

Sentence 1: The Fulton County Grand Jury said Friday an investigation of Atlanta's recent primary election produced `` no evidence '' that any irregularities took place .
Sentence 2: The jury further said in term-end presentments that the City Executive Committee , which had over-all charge of the election , `` deserves the praise and thanks of the City of Atlanta '' for the manner in which the election was conducted .
Sentence 3: The September-October term jury had been charged by Fulton Superior Court Judge Durwood Pye to investigate reports of possible `` irregularities '' in the hard-fought primary which was won by Mayor-nominate Ivan Allen Jr. .
Sentence 4: `` Only a relative handful of such reports was received '' , the jury said , `` considering the widespread interest in the election , the number of voters and the size of this city '' .
Sentence 5: The jury said it did find that many of Georgia's registration and election laws `` are outmoded or inadequate and often ambiguous '

# Class

## Task
Use the Brown corpus to construct a first-order hidden markov model for POS tagging.

## Load and prepare data

### Subtask:
Load the Brown corpus tagged sentences and prepare the data for training the HMM, including creating start and end tags.


**Reasoning**:
Load the tagged sentences from the brown corpus, add start and end tags to each sentence, and store them in a list.



In [None]:
from nltk.corpus import brown

tagged_sentences = brown.tagged_sents(categories='news')

prepared_sentences = []
for sentence in tagged_sentences:
    prepared_sentence = [('START', 'START')] + sentence + [('END', 'END')]
    prepared_sentences.append(prepared_sentence)

print(f"Number of prepared sentences: {len(prepared_sentences)}")
print(f"First prepared sentence: {prepared_sentences[0]}")

Number of prepared sentences: 4623
First prepared sentence: [('START', 'START'), ('The', 'AT'), ('Fulton', 'NP-TL'), ('County', 'NN-TL'), ('Grand', 'JJ-TL'), ('Jury', 'NN-TL'), ('said', 'VBD'), ('Friday', 'NR'), ('an', 'AT'), ('investigation', 'NN'), ('of', 'IN'), ("Atlanta's", 'NP$'), ('recent', 'JJ'), ('primary', 'NN'), ('election', 'NN'), ('produced', 'VBD'), ('``', '``'), ('no', 'AT'), ('evidence', 'NN'), ("''", "''"), ('that', 'CS'), ('any', 'DTI'), ('irregularities', 'NNS'), ('took', 'VBD'), ('place', 'NN'), ('.', '.'), ('END', 'END')]


In [None]:
print(prepared_sentences[:10])

[[('START', 'START'), ('Cocktails', 'NNS'), ('will', 'MD'), ('be', 'BE'), ('served', 'VBN'), ('from', 'IN'), ('6', 'CD'), ('to', 'IN'), ('7', 'CD'), ('p.m.', 'RB'), (',', ','), ('with', 'IN'), ('dinner', 'NN'), ('at', 'IN'), ('7', 'CD'), ('and', 'CC'), ('entertainment', 'NN'), ('in', 'IN'), ('the', 'AT'), ('main', 'JJS'), ('dining', 'NN'), ('room', 'NN'), ('immediately', 'RB'), ('following', 'VBG'), ('.', '.'), ('END', 'END')], [('START', 'START'), ('How', 'WRB'), ('effective', 'JJ'), ('have', 'HV'), ('Kennedy', 'NP'), ('administration', 'NN'), ('first', 'OD'), ('foreign', 'JJ'), ('policy', 'NN'), ('decisions', 'NNS'), ('been', 'BEN'), ('in', 'IN'), ('dealing', 'VBG'), ('with', 'IN'), ('Communist', 'NN-TL'), ('aggression', 'NN'), ('?', '.'), ('?', '.'), ('END', 'END')], [('START', 'START'), ('Terror', 'NN'), ('engulfed', 'VBD'), ('the', 'AT'), ('thousands', 'NNS'), ('of', 'IN'), ('Belgian', 'JJ'), ('civilians', 'NNS'), ('who', 'WPS'), ('had', 'HVD'), ('remained', 'VBN'), ('in', 'IN'), 

## Calculate probabilities

### Subtask:
Calculate the transition probabilities between POS tags and the emission probabilities of words given POS tags.


**Reasoning**:
Initialize dictionaries and iterate through the prepared sentences to calculate transition and emission counts.



In [None]:
transition_counts = {}
emission_counts = {}

for sentence in prepared_sentences:
    for i in range(len(sentence)):
        word, tag = sentence[i]

        # Calculate emission counts
        if tag not in emission_counts:
            emission_counts[tag] = {}
        if word not in emission_counts[tag]:
            emission_counts[tag][word] = 0
        emission_counts[tag][word] += 1

        # Calculate transition counts
        if i < len(sentence) - 1:
            next_word, next_tag = sentence[i+1]
            if tag not in transition_counts:
                transition_counts[tag] = {}
            if next_tag not in transition_counts[tag]:
                transition_counts[tag][next_tag] = 0
            transition_counts[tag][next_tag] += 1

**Reasoning**:
Calculate the transition and emission probabilities from the counts and store them in new dictionaries.



In [None]:
transition_probabilities = {}
for current_tag, next_tags in transition_counts.items():
    total_transitions = sum(next_tags.values())
    transition_probabilities[current_tag] = {}
    for next_tag, count in next_tags.items():
        transition_probabilities[current_tag][next_tag] = count / total_transitions

emission_probabilities = {}
for tag, words in emission_counts.items():
    total_emissions = sum(words.values())
    emission_probabilities[tag] = {}
    for word, count in words.items():
        emission_probabilities[tag][word] = count / total_emissions

print("Transition Probabilities (first 5 entries):")
for tag, probs in list(transition_probabilities.items())[:5]:
    print(f"{tag}: {list(probs.items())[:5]}...")

print("\nEmission Probabilities (first 5 tags and their first 5 words):")
for tag, probs in list(emission_probabilities.items())[:5]:
    print(f"{tag}: {list(probs.items())[:5]}...")

Transition Probabilities (first 5 entries):
START: [('AT', 0.18883841661258924), ('``', 0.05970149253731343), ('PPS', 0.07051698031581224), ('NN-HL', 0.014925373134328358), ('WRB', 0.010815487778498811)]...
AT: [('NP-TL', 0.019565950747779153), ('NN', 0.4476554593500506), ('NN-TL', 0.04284268525806814), ('NP', 0.04824018891262791), ('JJ', 0.1711458450466659)]...
NP-TL: [('NN-TL', 0.5236167341430499), ("''", 0.009446693657219974), ('JJ-TL', 0.059379217273954114), ('NNS-TL', 0.05263157894736842), ('VBD', 0.004048582995951417)]...
NN-TL: [('JJ-TL', 0.012872083668543845), ('VBD', 0.035398230088495575), (',', 0.10257441673370878), ('IN-TL', 0.057924376508447305), ('NN-TL', 0.19308125502815768)]...
JJ-TL: [('NN-TL', 0.432510885341074), ('NNS-TL', 0.06966618287373004), ('JJ-TL', 0.02177068214804064), ('RB', 0.002902757619738752), ('NP-TL', 0.24383164005805516)]...

Emission Probabilities (first 5 tags and their first 5 words):
START: [('START', 1.0)]...
AT: [('The', 0.08714719442257955), ('an

## Implement the viterbi algorithm

### Subtask:
Implement the Viterbi algorithm to find the most likely sequence of POS tags for a given sentence.


**Reasoning**:
Implement the Viterbi algorithm as described in the instructions, including initialization, iteration, and reconstruction of the tag sequence.



In [None]:
def viterbi(sentence, transition_probabilities, emission_probabilities):
    """
    Implements the Viterbi algorithm to find the most likely sequence of POS tags.

    Args:
        sentence: A list of words (the sentence to tag).
        transition_probabilities: A dictionary of transition probabilities.
        emission_probabilities: A dictionary of emission probabilities.

    Returns:
        A list of the most likely POS tags for the sentence.
    """
    num_words = len(sentence)
    dp = {}
    backpointer = {}
    tags = list(emission_probabilities.keys())

    # Step 4: Initialization for the first word
    first_word = sentence[0]
    for tag in tags:
        if 'START' in transition_probabilities and tag in transition_probabilities['START'] and \
           tag in emission_probabilities and first_word in emission_probabilities[tag]:
            dp[(0, tag)] = transition_probabilities['START'][tag] * emission_probabilities[tag][first_word]
            backpointer[(0, tag)] = 'START'
        else:
            # Handle zero probabilities with a small value
            dp[(0, tag)] = 1e-10
            backpointer[(0, tag)] = 'START'


    # Step 5 & 6: Iteration through the remaining words
    for i in range(1, num_words):
        current_word = sentence[i]
        for current_tag in tags:
            max_prob = 0
            best_prev_tag = None

            for prev_tag in tags:
                transition_prob = transition_probabilities.get(prev_tag, {}).get(current_tag, 1e-10)
                emission_prob = emission_probabilities.get(current_tag, {}).get(current_word, 1e-10)

                prob = dp.get((i - 1, prev_tag), 0) * transition_prob * emission_prob

                if prob > max_prob:
                    max_prob = prob
                    best_prev_tag = prev_tag

            dp[(i, current_tag)] = max_prob
            backpointer[(i, current_tag)] = best_prev_tag

    # Step 7: Find the best tag sequence ending with 'END'
    best_last_tag = None
    max_final_prob = 0

    for tag in tags:
        if tag in transition_probabilities and 'END' in transition_probabilities[tag]:
             final_prob = dp.get((num_words - 1, tag), 0) * transition_probabilities[tag]['END']
             if final_prob > max_final_prob:
                max_final_prob = final_prob
                best_last_tag = tag

    # Reconstruct the best tag sequence
    best_tag_sequence = []
    current_tag = best_last_tag
    for i in range(num_words - 1, -1, -1):
        best_tag_sequence.insert(0, current_tag)
        current_tag = backpointer.get((i, current_tag), None)
        if current_tag is None and i > 0:
            # Handle cases where backpointer is missing (should not happen with proper initialization)
            print(f"Warning: Backpointer missing at index {i} for tag {best_tag_sequence[0]}.")
            break


    return best_tag_sequence

# Example usage (using a sentence from the Brown corpus for demonstration)
example_sentence = [word for word, tag in prepared_sentences[0][1:-1]] # Exclude START and END
predicted_tags = viterbi(example_sentence, transition_probabilities, emission_probabilities)

print("Sentence:", example_sentence)
print("Predicted Tags:", predicted_tags)

Sentence: ['The', 'Fulton', 'County', 'Grand', 'Jury', 'said', 'Friday', 'an', 'investigation', 'of', "Atlanta's", 'recent', 'primary', 'election', 'produced', '``', 'no', 'evidence', "''", 'that', 'any', 'irregularities', 'took', 'place', '.']
Predicted Tags: ['AT', 'NP-TL', 'NN-TL', 'JJ-TL', 'NN-TL', 'VBD', 'NR', 'AT', 'NN', 'IN', 'NP$', 'JJ', 'NN', 'NN', 'VBD', '``', 'AT', 'NN', "''", 'CS', 'DTI', 'NNS', 'VBD', 'NN', '.']


## Evaluate the model

### Subtask:
Evaluate the performance of the HMM on a test set.


**Reasoning**:
The subtask requires splitting the data, retraining the HMM on the training data, evaluating the performance on the test data using the Viterbi algorithm, and calculating the overall accuracy. This can all be done in a single code block.



In [None]:
import random

# 1. Split the prepared_sentences data into training and test sets
random.shuffle(prepared_sentences)
train_size = int(len(prepared_sentences) * 0.8)
train_sentences = prepared_sentences[:train_size]
test_sentences = prepared_sentences[train_size:]

# 2. Retrain the transition and emission probabilities using only the training data
train_transition_counts = {}
train_emission_counts = {}

for sentence in train_sentences:
    for i in range(len(sentence)):
        word, tag = sentence[i]

        # Calculate emission counts
        if tag not in train_emission_counts:
            train_emission_counts[tag] = {}
        if word not in train_emission_counts[tag]:
            train_emission_counts[tag][word] = 0
        train_emission_counts[tag][word] += 1

        # Calculate transition counts
        if i < len(sentence) - 1:
            next_word, next_tag = sentence[i+1]
            if tag not in train_transition_counts:
                train_transition_counts[tag] = {}
            if next_tag not in train_transition_counts[tag]:
                train_transition_counts[tag][next_tag] = 0
            train_transition_counts[tag][next_tag] += 1

train_transition_probabilities = {}
for current_tag, next_tags in train_transition_counts.items():
    total_transitions = sum(next_tags.values())
    train_transition_probabilities[current_tag] = {}
    for next_tag, count in next_tags.items():
        train_transition_probabilities[current_tag][next_tag] = count / total_transitions

train_emission_probabilities = {}
for tag, words in train_emission_counts.items():
    total_emissions = sum(words.values())
    train_emission_probabilities[tag] = {}
    for word, count in words.items():
        train_emission_probabilities[tag][word] = count / total_emissions

# 3. Iterate through the sentences in the test set and evaluate
total_correct_tags = 0
total_tags = 0

for sentence in test_sentences:
    words = [word for word, tag in sentence[1:-1]]  # Exclude START and END
    true_tags = [tag for word, tag in sentence[1:-1]] # Exclude START and END

    if not words:  # Skip empty sentences
        continue

    predicted_tags = viterbi(words, train_transition_probabilities, train_emission_probabilities)

    # Compare predicted and true tags
    correct_tags_in_sentence = 0
    for i in range(len(true_tags)):
        if i < len(predicted_tags) and predicted_tags[i] == true_tags[i]:
            correct_tags_in_sentence += 1

    total_correct_tags += correct_tags_in_sentence
    total_tags += len(true_tags)

# 4. Calculate the overall accuracy
overall_accuracy = total_correct_tags / total_tags if total_tags > 0 else 0

# 5. Print the overall accuracy
print(f"Overall HMM accuracy on the test set: {overall_accuracy:.4f}")

Overall HMM accuracy on the test set: 0.8978


## Summary:

### Data Analysis Key Findings

*   The Brown corpus 'news' category contains 4623 sentences.
*   The first sentence in the prepared data is `[('START', 'START'), ('The', 'AT'), ('Fulton', 'NP-TL'), ('County', 'NN-TL'), ('Grand', 'JJ-TL'), ('Jury', 'NN-TL'), ('said', 'VBD'), ('Friday', 'NR'), ('an', 'AT'), ('investigation', 'NN'), ('of', 'IN'), ("Atlanta's", 'NP$'), ('recent', 'JJ'), ('primary', 'NN'), ('election', 'NN'), ('produced', 'VBD'), ('``', '``'), ('no', 'AT'), ('evidence', 'NN'), ("''", "''"), ('that', 'CS'), ('any', 'DTI'), ('irregularities', 'NNS'), ('took', 'VBD'), ('place', 'NN'), ('.', '.'), ('END', 'END')]`.
*   The calculated transition and emission probabilities appear plausible based on initial entries (e.g., 'START' transitioning to determiners like 'AT', 'AT' emitting words like 'the' and 'a').
*   The implemented Viterbi algorithm successfully processes a sentence and outputs a predicted tag sequence.
*   The first-order Hidden Markov Model for POS tagging achieved an overall accuracy of approximately 89.78% on the test set of the Brown corpus 'news' category.

### Insights or Next Steps

*   The model's accuracy of around 89.78% indicates a reasonably good performance for a first-order HMM on this dataset.
*   Further improvements could explore higher-order HMMs or smoothing techniques to handle unseen words or tag transitions, potentially increasing accuracy.


# Home

In [19]:
from collections import defaultdict
import numpy as np
import nltk

nltk.download("brown")
nltk.download("universal_tagset")

tagged_sentences = nltk.corpus.brown.tagged_sents(tagset="universal")



[nltk_data] Downloading package brown to /root/nltk_data...
[nltk_data]   Package brown is already up-to-date!
[nltk_data] Downloading package universal_tagset to /root/nltk_data...
[nltk_data]   Package universal_tagset is already up-to-date!


In [20]:
print(type(tagged_sentences))

<class 'nltk.corpus.reader.util.ConcatenatedCorpusView'>


In [21]:
print(tagged_sentences[0])

[('The', 'DET'), ('Fulton', 'NOUN'), ('County', 'NOUN'), ('Grand', 'ADJ'), ('Jury', 'NOUN'), ('said', 'VERB'), ('Friday', 'NOUN'), ('an', 'DET'), ('investigation', 'NOUN'), ('of', 'ADP'), ("Atlanta's", 'NOUN'), ('recent', 'ADJ'), ('primary', 'NOUN'), ('election', 'NOUN'), ('produced', 'VERB'), ('``', '.'), ('no', 'DET'), ('evidence', 'NOUN'), ("''", '.'), ('that', 'ADP'), ('any', 'DET'), ('irregularities', 'NOUN'), ('took', 'VERB'), ('place', 'NOUN'), ('.', '.')]


In [22]:
split_ratio = 0.9
split_index = int(len(tagged_sentences) * split_ratio)

train_dataset = tagged_sentences[:split_index]
test_dataset = tagged_sentences[split_index:]

print(len(train_dataset))
print(len(test_dataset))


51606
5734


In [23]:
# intial for count
emission_counts = defaultdict(lambda: defaultdict(int))
transition_counts = defaultdict(lambda: defaultdict(int))
tag_counts = defaultdict(int)

# start counting on training dataset
for sentence in train_dataset:
    prev_tag = None # reset for new sentence

    for word, tag in sentence:
      #emision count: (word, tag)
      emission_counts[tag][word.lower()] += 1

      # tag counts
      tag_counts[tag] += 1

      # counting transition (previous tag, current tag)
      if prev_tag:
        transition_counts[prev_tag][tag] += 1

      prev_tag = tag

In [24]:
print(emission_counts)



In [25]:
# Emission possibility - P(word | tag)
emission_probs = defaultdict(lambda: defaultdict(float))
for tag, word_counts in emission_counts.items():
  total_tag_count = tag_counts[tag]

  for word, count in word_counts.items():
    emission_probs[tag][word] = count / total_tag_count
# Transition Probabilities - P(tag_i|tag_{i-1})
transition_probs = defaultdict(lambda: defaultdict(float))
for prev_tag, tag_counts in transition_counts.items():
  total_prev_tag_count = tag_counts[prev_tag]
  for next_tag, count in tag_counts.items():
    transition_probs[prev_tag][next_tag] = count / total_prev_tag_count

In [26]:
# Lấy danh sách tất cả các tag duy nhất
all_tags = list(tag_counts.keys())

In [27]:
print(f"Ví dụ Emission P('the'|'DET'): {emission_probs['DET']['the']:.4f}")
print(f"Ví dụ Transition P('NOUN'|'ADJ'): {transition_probs['ADJ']['NOUN']:.4f}")

Ví dụ Emission P('the'|'DET'): 0.5179
Ví dụ Transition P('NOUN'|'ADJ'): 11.5965


In [28]:
def viterbi_tagger(sentence, tags, emission_probs, transition_probs):
    """
    Gán nhãn cho một câu sử dụng thuật toán Viterbi.
    """
    # Xử lý từ chưa biết (Unknown words)
    # Gán một xác suất rất nhỏ để tránh lỗi
    unknown_word_prob = 1e-6

    # Khởi tạo bảng Viterbi và backpointer
    viterbi = defaultdict(lambda: defaultdict(float))
    backpointer = defaultdict(lambda: defaultdict(str))

    # Bước khởi tạo (từ đầu tiên)
    first_word = sentence[0].lower()
    for tag in tags:
        # Giả định xác suất bắt đầu của các tag là như nhau
        # (Một cách cải tiến là tính P(tag|'<s>') - xác suất tag ở đầu câu)
        start_prob = 1.0 / len(tags)

        emission_p = emission_probs[tag].get(first_word, unknown_word_prob)
        viterbi[0][tag] = start_prob * emission_p

    # Bước đệ quy (từ thứ hai trở đi)
    for t in range(1, len(sentence)):
        word = sentence[t].lower()
        for current_tag in tags:
            max_prob = 0.0
            best_prev_tag = None

            emission_p = emission_probs[current_tag].get(word, unknown_word_prob)

            for prev_tag in tags:
                # P(current|prev) * P(path_đến_prev)
                prob = transition_probs[prev_tag].get(current_tag, 0) * viterbi[t-1][prev_tag]

                if prob > max_prob:
                    max_prob = prob
                    best_prev_tag = prev_tag

            viterbi[t][current_tag] = emission_p * max_prob
            backpointer[t][current_tag] = best_prev_tag

    # Bước kết thúc và truy vết
    best_path = []

    # Tìm tag cuối cùng có xác suất cao nhất
    max_prob_final = 0.0
    best_final_tag = None

    last_step = len(sentence) - 1
    for tag in tags:
        if viterbi[last_step][tag] > max_prob_final:
            max_prob_final = viterbi[last_step][tag]
            best_final_tag = tag

    best_path.append(best_final_tag)

    # Truy vết ngược từ cuối về đầu
    for t in range(last_step, 0, -1):
        best_final_tag = backpointer[t][best_final_tag]
        best_path.insert(0, best_final_tag)

    return best_path

# Thử nghiệm với một câu đơn giản
test_sentence = "The man saw the dog".split()
predicted_tags = viterbi_tagger(test_sentence, all_tags, emission_probs, transition_probs)
print(f"\nCâu: {test_sentence}")
print(f"Nhãn dự đoán: {predicted_tags}")


Câu: ['The', 'man', 'saw', 'the', 'dog']
Nhãn dự đoán: ['DET', 'NOUN', 'VERB', 'DET', 'NOUN']


In [30]:
correct_predictions = 0
total_predictions = 0

for sentence_tagged in test_dataset:
    # Tách câu và nhãn thực tế
    words = [word for word, tag in sentence_tagged]
    true_tags = [tag for word, tag in sentence_tagged]

    # Bỏ qua các câu trống
    if not words:
        continue

    # Dự đoán nhãn bằng Viterbi
    predicted_tags = viterbi_tagger(words, all_tags, emission_probs, transition_probs)

    # So sánh và đếm
    for i in range(len(true_tags)):
        if predicted_tags[i] == true_tags[i]:
            correct_predictions += 1
        total_predictions += 1

accuracy = (correct_predictions / total_predictions) * 100
print(f"\nĐộ chính xác của mô hình trên tập test: {accuracy:.2f}%")


Độ chính xác của mô hình trên tập test: 90.02%
