In [None]:
import numpy as np
import json
from collections import defaultdict, Counter
from typing import List, Dict, Tuple, Set
import re

class HMMTagger:
    def __init__(self, order: int = 1, word_depends_prev: bool = False):
        self.order = order
        self.word_depends_prev = word_depends_prev

        # Simplified data structures using nested defaultdict
        if word_depends_prev:
            self.emission_counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
        else:
            self.emission_counts = defaultdict(lambda: defaultdict(int))

        self.transition_counts = defaultdict(lambda: defaultdict(int))

        if order == 2:
            self.second_order_counts = defaultdict(lambda: defaultdict(int))

        self.tag_counts = defaultdict(int)
        self.vocabulary = set()
        self.most_frequent_tag = None

    def train(self, sentences: List[List[Tuple[str, str]]]):
        """Two-pass training approach for better statistics."""
        # First pass: collect word and tag statistics
        for sentence in sentences:
            for word, tag in sentence:
                self.vocabulary.add(word)
                self.tag_counts[tag] += 1

        # Set most frequent tag for unseen words
        self.most_frequent_tag = max(self.tag_counts.items(), key=lambda x: x[1])[0]
        print(f"Most frequent tag (for unseen words): {self.most_frequent_tag}")

        # Second pass: collect transition and emission statistics
        for sentence in sentences:
            previous_tag = '<START>'
            previous_tag2 = '<START>' if self.order == 2 else None
            previous_word = '<START>' if self.word_depends_prev else None

            for word, tag in sentence:
                # Update emission counts
                if self.word_depends_prev:
                    self.emission_counts[tag][previous_word][word] += 1
                else:
                    self.emission_counts[tag][word] += 1

                # Update transition counts
                self.transition_counts[previous_tag][tag] += 1
                if self.order == 2:
                    self.second_order_counts[(previous_tag2, previous_tag)][tag] += 1

                # Update previous states
                previous_word = word if self.word_depends_prev else None
                previous_tag2 = previous_tag if self.order == 2 else None
                previous_tag = tag

        self._normalize_probabilities()

    def _normalize_probabilities(self):
        """Normalize counts to probabilities with simple smoothing."""
        # Normalize transition probabilities
        for prev_tag in self.transition_counts:
            total = sum(self.transition_counts[prev_tag].values())
            if total > 0:
                for tag in self.transition_counts[prev_tag]:
                    self.transition_counts[prev_tag][tag] /= total

        # Normalize second-order transitions if applicable
        if self.order == 2:
            for prev_tags in self.second_order_counts:
                total = sum(self.second_order_counts[prev_tags].values())
                if total > 0:
                    for tag in self.second_order_counts[prev_tags]:
                        self.second_order_counts[prev_tags][tag] /= total

        # Normalize emission probabilities
        if self.word_depends_prev:
            for tag in self.emission_counts:
                for prev_word in self.emission_counts[tag]:
                    total = sum(self.emission_counts[tag][prev_word].values())
                    if total > 0:
                        for word in self.emission_counts[tag][prev_word]:
                            self.emission_counts[tag][prev_word][word] /= total
        else:
            for tag in self.emission_counts:
                total = sum(self.emission_counts[tag].values())
                if total > 0:
                    for word in self.emission_counts[tag]:
                        self.emission_counts[tag][word] /= total

    def get_emission_probability(self, tag: str, word: str, prev_word: str = None) -> float:
        """Simplified emission probability calculation with basic smoothing."""
        if self.word_depends_prev:
            if word not in self.vocabulary:
                return 1.0 if tag == self.most_frequent_tag else 1e-10
            return self.emission_counts[tag][prev_word].get(word, 1e-10)
        else:
            if word not in self.vocabulary:
                return 1.0 if tag == self.most_frequent_tag else 1e-10
            return self.emission_counts[tag].get(word, 1e-10)

    def viterbi(self, sentence: List[str]) -> List[str]:
        """Simplified Viterbi implementation without log probabilities."""
        n = len(sentence)
        tags = list(self.tag_counts.keys())

        viterbi = defaultdict(lambda: defaultdict(float))
        backpointer = defaultdict(lambda: defaultdict(str))

        # Initialize for first word
        prev_word = '<START>'
        for tag in tags:
            emission_prob = self.get_emission_probability(tag, sentence[0], prev_word)
            viterbi[tag][0] = self.transition_counts['<START>'].get(tag, 1e-10) * emission_prob

        # Process rest of sentence
        for i in range(1, n):
            prev_word = sentence[i-1] if self.word_depends_prev else None
            for tag in tags:
                emission_prob = self.get_emission_probability(tag, sentence[i], prev_word)

                if self.order == 1:
                    max_prob, best_prev_tag = max(
                        (viterbi[prev_tag][i-1] *
                         self.transition_counts[prev_tag].get(tag, 1e-10) *
                         emission_prob, prev_tag)
                        for prev_tag in tags
                    )
                else:
                    if i == 1:
                        max_prob, best_prev_tag = max(
                            (viterbi[prev_tag][i-1] *
                             self.transition_counts[prev_tag].get(tag, 1e-10) *
                             emission_prob, prev_tag)
                            for prev_tag in tags
                        )
                    else:
                        max_prob, best_prev_tag = max(
                            (viterbi[prev_tag][i-1] *
                             self.second_order_counts.get((backpointer[prev_tag][i-1], prev_tag), {}).get(tag, 1e-10) *
                             emission_prob, prev_tag)
                            for prev_tag in tags
                        )

                viterbi[tag][i] = max_prob
                backpointer[tag][i] = best_prev_tag

        # Backtrack to find best sequence
        best_last_tag = max(tags, key=lambda tag: viterbi[tag][n-1])
        best_sequence = [best_last_tag]
        for i in range(n-1, 0, -1):
            best_sequence.insert(0, backpointer[best_sequence[0]][i])

        return best_sequence

    def tag_sentence(self, sentence: List[str]) -> List[str]:
        """Tag a sentence."""
        if not sentence:
            return []
        return self.viterbi(sentence)

def load_and_split_data(json_data: List[List], train_ratio: float = 0.8) -> Tuple[List[List[Tuple[str, str]]], List[List[Tuple[str, str]]]]:
    """Simplified data loading and preprocessing."""
    processed_data = []

    for sentence_data in json_data:
        # Simple cleaning
        text = sentence_data[0]
        tags = sentence_data[1]

        # Basic cleaning
        text = re.sub(r',', '', text)
        text = re.sub(r'\.(?=[^\.]*$)', '', text)
        words = text.split()

        # Create word-tag pairs
        if len(words) == len(tags):
            sentence_pairs = list(zip(words, tags))
            processed_data.append(sentence_pairs)

    # Simple random split
    np.random.seed(42)
    split_idx = int(len(processed_data) * train_ratio)
    indices = list(range(len(processed_data)))
    np.random.shuffle(indices)

    train_data = [processed_data[i] for i in indices[:split_idx]]
    test_data = [processed_data[i] for i in indices[split_idx:]]

    return train_data, test_data

def evaluate_tagger(tagger: HMMTagger, test_data: List[List[Tuple[str, str]]]) -> Tuple[float, Dict[str, float]]:
    """Evaluate tagger performance."""
    total_correct = 0
    total_words = 0
    unseen_words_correct = 0
    unseen_words_total = 0
    tag_correct = defaultdict(int)
    tag_total = defaultdict(int)

    for sentence in test_data:
        words = [word for word, tag in sentence]
        true_tags = [tag for word, tag in sentence]
        pred_tags = tagger.tag_sentence(words)

        for word, true_tag, pred_tag in zip(words, true_tags, pred_tags):
            if true_tag == pred_tag:
                total_correct += 1
                tag_correct[true_tag] += 1
            total_words += 1
            tag_total[true_tag] += 1

            if word not in tagger.vocabulary:
                unseen_words_total += 1
                if true_tag == pred_tag:
                    unseen_words_correct += 1

    # Calculate accuracies
    overall_accuracy = total_correct / total_words if total_words > 0 else 0
    tag_accuracy = {
        tag: tag_correct[tag] / tag_total[tag]
        for tag in tag_total if tag_total[tag] > 0
    }

    # Print unseen word performance
    if unseen_words_total > 0:
        unseen_accuracy = unseen_words_correct / unseen_words_total
        print(f"\nUnknown word performance:")
        print(f"Total unknown words: {unseen_words_total}")
        print(f"Unknown word accuracy: {unseen_accuracy:.4f}")

    return overall_accuracy, tag_accuracy

def collapse_tags(tag: str) -> str:
    """Collapse tags into 4 categories."""
    if tag in {'NN', 'NNS', 'NNP', 'NNPS'}:
        return 'N'
    elif tag in {'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'}:
        return 'V'
    elif tag in {'JJ', 'JJR', 'JJS', 'RB', 'RBR', 'RBS'}:
        return 'A'
    else:
        return 'O'

def convert_to_collapsed_tags(data: List[List[Tuple[str, str]]]) -> List[List[Tuple[str, str]]]:
    """Convert dataset to use collapsed tag set."""
    return [[(word, collapse_tags(tag)) for word, tag in sentence] for sentence in data]

def main():
    """Main function to run the HMM POS tagger."""
    try:
        # Load data
        with open('/content/penn-data.json', 'r') as f:
            json_data = json.load(f)

        # Split data
        train_data, test_data = load_and_split_data(json_data)

        # Test configurations
        configs = [
            (1, False, "First Order HMM"),
            (2, False, "Second Order HMM"),
            (1, True, "First Order HMM with word dependency")
        ]

        for order, word_depends_prev, name in configs:
            print(f"\n{'-'*50}")
            print(f"Testing {name}")
            print(f"{'-'*50}")

            # 36-tag version
            tagger = HMMTagger(order=order, word_depends_prev=word_depends_prev)
            tagger.train(train_data)
            accuracy, tag_accuracy = evaluate_tagger(tagger, test_data)
            print(f"\n{name} - 36 tags:")
            print(f"Overall accuracy: {accuracy:.4f}")
            print("Tag-wise accuracy:", tag_accuracy)

            # 4-tag version
            collapsed_train = convert_to_collapsed_tags(train_data)
            collapsed_test = convert_to_collapsed_tags(test_data)
            collapsed_tagger = HMMTagger(order=order, word_depends_prev=word_depends_prev)
            collapsed_tagger.train(collapsed_train)
            collapsed_accuracy, collapsed_tag_accuracy = evaluate_tagger(
                collapsed_tagger,
                collapsed_test
            )
            print(f"\n{name} - 4 tags:")
            print(f"Overall accuracy: {collapsed_accuracy:.4f}")
            print("Tag-wise accuracy:", collapsed_tag_accuracy)

    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()


--------------------------------------------------
Testing First Order HMM
--------------------------------------------------
Most frequent tag (for unseen words): NN

Unknown word performance:
Total unknown words: 1698
Unknown word accuracy: 0.2067

First Order HMM - 36 tags:
Overall accuracy: 0.8766
Tag-wise accuracy: {'NNP': 0.7803242034656233, 'DT': 0.9891238670694864, 'NN': 0.9611046285492026, 'WP': 0.9811320754716981, 'VBZ': 0.884514435695538, 'NNS': 0.8404605263157895, 'VBD': 0.8412162162162162, 'RB': 0.7775590551181102, 'JJ': 0.739247311827957, 'IN': 0.9770171149144254, 'CD': 0.7414285714285714, 'PRP$': 1.0, 'VBP': 0.7741935483870968, 'VBN': 0.6930232558139535, 'JJS': 0.8636363636363636, 'WDT': 0.8111111111111111, 'RBS': 0.7, 'TO': 0.985781990521327, 'PRP': 0.9886039886039886, 'VBG': 0.6491803278688525, 'RP': 0.6904761904761905, 'CC': 0.9933035714285714, 'MD': 0.9073170731707317, 'VB': 0.8148148148148148, 'WRB': 0.9473684210526315, 'RBR': 0.38461538461538464, 'JJR': 0.875, ':'