# POS Tagging:

We will create a POS (Part-of-Speech) tagging system for the French language using the data indicated in the following link: [ANTILLES Data Repository](https://github.com/qanastek/ANTILLES/tree/main/ANTILLES).

## Data Preparation

In [1]:
import string
from collections import defaultdict
import numpy as np

In [2]:
def extract_words_and_pos_tags(input_file_path, output_file_path):
    words_pos_tags = []

    with open(input_file_path, "r", encoding="utf-8") as file:
        for line in file:
            # Check if the line marks the start of a new sentence
            if line.startswith("# text = "):
                words_pos_tags.append(("<s>", ""))  # Add sentence start marker <s>
                continue  # Move to the next line

            # Skip comment lines
            if line.startswith("#"):
                continue

            # Process the line if it contains valid word and POS tag information
            fields = line.strip().split("\t")

            # Ensure that the line contains the necessary fields (at least 4)
            if len(fields) > 3:
                word = fields[1]  # Word is in the second field
                pos_tag = fields[3]  # POS tag is in the fourth field
                words_pos_tags.append((word, pos_tag))

    # Write the extracted words and POS tags to the output file
    with open(output_file_path, "w", encoding="utf-8") as output_file:
        output_file.writelines(f"{word}\t{pos_tag}\n" for word, pos_tag in words_pos_tags)

In [3]:
extract_words_and_pos_tags("train.conllu","preprocessed_train.txt")
extract_words_and_pos_tags("dev.conllu","preprocessed_dev.txt")

In [4]:
def prepare_dev(input_file_path):
    sentences = []
    sentence = []
    pos_tags_sentences = []
    pos_tag_sentence = []

    with open(input_file_path, "r", encoding="utf-8") as file:
        for line in file:
            # Skip comment lines starting with '#'
            if not line.startswith("#"):
                # Split the line by tabs to extract fields
                fields = line.strip().split("\t")

                # Ensure the line contains at least two fields (word and POS tag)
                if len(fields) > 1:
                    word = fields[0]  # Word is the first field
                    pos_tag = fields[1]  # POS tag is the second field
                    sentence.append(word)
                    pos_tag_sentence.append(pos_tag)
                else:
                    # If it's an empty line, it marks the end of a sentence
                    if sentence and pos_tag_sentence:
                        sentences.append(sentence)
                        pos_tags_sentences.append(pos_tag_sentence)
                    # Reset sentence and POS tag list for the next sentence
                    sentence = []
                    pos_tag_sentence = []

    # Ensure to remove any empty sentence at the start if present
    if sentences and pos_tags_sentences:
        sentences.pop(0)
        pos_tags_sentences.pop(0)

    # Return the list of sentences and POS tags
    return sentences, pos_tags_sentences

In [5]:
dev_words, dev_pos_tags = prepare_dev("preprocessed_dev.txt")

## Calculating Counts

- **emission_counts**: Stores the number of times a word is tagged by a specific tag.

- **transition_counts**: Stores the number of times a tag is preceded by another tag.

- **tag_counts**: Stores the number of occurrences of a tag.

- **word_counts**: Stores the number of occurrences of a word.

### Handling Unknowns

This section describes the process of managing unknown words in the dataset. Unknown words are words that do not appear in the training data and need to be categorized appropriately for model training and evaluation. The following strategies can be used to handle unknowns:

1. **Assigning Unknown Tags**: Use a function to assign specific tags to unknown words based on their characteristics, such as:
   - **Digits**: Tag words containing numbers as `--unk_digit--`.
   - **Punctuation**: Tag words containing punctuation as `--unk_punct--`.
   - **Uppercase Words**: Tag words starting with an uppercase letter as `--unk_upper--`.
   - **Suffix Analysis**: Classify unknown words as nouns, verbs, adjectives, or adverbs based on common suffixes.

2. **Updating Word Counts**: When processing the corpus, replace unknown words with their assigned tags to ensure that the counts reflect the presence of these words in the data.

3. **Impact on Model Training**: Ensure that the model is trained to recognize and handle unknown words effectively, improving its robustness and accuracy in real-world applications.

By implementing these strategies, the system can manage unknown words more effectively and maintain the integrity of the tagging process.


In [6]:
def assign_unk(tok):
    punct = set(string.punctuation)

    # Common French noun suffixes
    noun_suffix = [
        "ance", "ence", "ité", "isme", "ment", "eur", "ion", "ure", "ade", "age", "oire",
        "esse", "ette", "ance", "ie", "ance", "ence", "erie", "ence"
    ]

    # Common French verb suffixes
    verb_suffix = ["er", "ir", "re", "oir", "ifier", "iser", "ayer"]

    # Common French adjective suffixes
    adj_suffix = [
        "able", "ible", "ant", "ent", "el", "al", "if", "ive", "eux", "euse", "ais",
        "ien", "ienne", "ique", "in", "ine", "on", "onne"
    ]

    # Common French adverb suffixes
    adv_suffix = ["ment", "amment", "emment"]

    # Digits
    if any(char.isdigit() for char in tok):
        return "--unk_digit--"

    # Punctuation
    elif any(char in punct for char in tok):
        return "--unk_punct--"

    # Upper-case (e.g., proper nouns or sentence start)
    elif any(char.isupper() for char in tok):
        return "--unk_upper--"

    # Nouns
    elif any(tok.endswith(suffix) for suffix in noun_suffix):
        return "--unk_noun--"

    # Verbs
    elif any(tok.endswith(suffix) for suffix in verb_suffix):
        return "--unk_verb--"

    # Adjectives
    elif any(tok.endswith(suffix) for suffix in adj_suffix):
        return "--unk_adj--"

    # Adverbs
    elif any(tok.endswith(suffix) for suffix in adv_suffix):
        return "--unk_adv--"

    # Default unknown token
    return "--unk--"

In [7]:
def define_unknowns(word_counts, threshold):
    # Dictionary to store unknown word mappings
    unknowns = {}

    # Dictionary to store updated word counts, initialized with int
    new_word_count = defaultdict(int)

    # Loop through each word and its count in the word_counts dictionary
    for word, count in word_counts.items():
        # If the word count is less than the threshold, classify it as unknown
        if count < threshold:
            unk_label = assign_unk(word)  # Get the unknown label for the word
            unknowns[word] = unk_label  # Map the word to its unknown label
            new_word_count[unk_label] += count  # Increment the count for the unknown category
        else:
            # Keep the word if it meets the threshold
            new_word_count[word] = count

    return unknowns, new_word_count

In [8]:
# Initialize defaultdicts for counting emissions, transitions, tags, and words
emission_counts = defaultdict(int)
transition_counts = defaultdict(int)
tag_counts = defaultdict(int)
word_counts = defaultdict(int)

# Read the file and process the lines
with open("preprocessed_train.txt", 'r', encoding="utf-8") as file:
    tags = []  # List to keep track of tags in sequence
    for line in file:
        columns = line.strip().split("\t")  # Split the line by tabs

        # Handle sentence start tag
        if columns[0] == "<s>":
            tags = ["<s>"]  # Reset tags to start of sentence
            tag_counts["<s>"] += 1  # Increment the count of start-of-sentence tags
        else:
            # Add the current tag (last column) to the tags list
            tags.append(columns[-1])

        # Handle tag transitions
        if len(tags) == 2:
            transition_counts[tuple(tags)] += 1  # Record the tag transition
            tags.pop(0)  # Keep only the most recent tag

        # Process words and their associated tags if the line contains both
        if len(columns) == 2:
            word, tag = columns  # Extract word and tag from columns
            emission_counts[(tag, word)] += 1  # Record the tag-word emission
            tag_counts[tag] += 1  # Increment the tag count
            word_counts[word] += 1  # Increment the word count

In [9]:
unknowns,word_counts = define_unknowns(word_counts,5)

Function that replaces unknown words with their new tag

In [10]:
def change_emission(emission_counts, unknowns):
    # Initialize a defaultdict to store the new emission counts
    new_emission_count = defaultdict(int)

    # Iterate through each emission and its count
    for (tag, word), count in emission_counts.items():
        # Check if the word is among the unknowns
        if word in unknowns:
            # Create a new emission with the unknown tag
            new_emission = (tag, unknowns[word])
            new_emission_count[new_emission] += count  # Update the count for the new emission
        else:
            # Retain the original emission count if the word is known
            new_emission_count[(tag, word)] = count

    return new_emission_count

In [11]:
emission_counts = change_emission(emission_counts,unknowns)

## Transition Matrix A

This matrix stores the probabilities of all possible transitions between tags.

In [12]:
def transition_matrix(transition_counts, tag_counts, smoothing_coefficient):
    """
    Computes the transition matrix A for the given tag counts and transition counts.

    Parameters:
    - transition_counts: A dictionary containing counts of transitions (tag_from, tag_to) pairs.
    - tag_counts: A dictionary containing counts of each tag.
    - smoothing_coefficient: A float value used for smoothing to handle zero probabilities.

    Returns:
    - A: A 2D numpy array representing the transition probabilities between tags.
    """

    # Get the number of unique tags
    num_tags = len(tag_counts)

    # Sort the tags to maintain a consistent order
    tags = sorted(tag_counts.keys())

    # Initialize a transition matrix A with zeros
    A = np.zeros((num_tags, num_tags))

    # Calculate transition probabilities with smoothing
    for i, tag_from in enumerate(tags):
        for j, tag_to in enumerate(tags):
            tag_pair = (tag_from, tag_to)
            tag_pair_count = transition_counts.get(tag_pair, 0)  # Default to 0 if not found

            # Apply smoothing to calculate the probability
            tag_pair_probability = (tag_pair_count + smoothing_coefficient) / (tag_counts[tag_from] + smoothing_coefficient * num_tags)
            A[i, j] = tag_pair_probability

    return A

In [13]:
A = transition_matrix(transition_counts, tag_counts, 0.02)

## Emission Matrix B

This matrix stores the probabilities of all possible emissions between tags and words.

In [14]:
def emission_matrix(emission_counts, tag_counts, word_counts, smoothing_coefficient):
    """
    Computes the emission matrix B.

    Parameters:
    - emission_counts: A dictionary containing counts of emissions (tag, word) pairs.
    - tag_counts: A dictionary containing counts of each tag.
    - word_counts: A dictionary containing counts of each word.
    - smoothing_coefficient: A float value used for smoothing.

    Returns:
    - B: A 2D numpy array representing the emission probabilities between tags and words.
    """

    # Get sorted lists of tags and words
    tags = sorted(tag_counts.keys())
    words = sorted(word_counts.keys())

    # Initialize the emission matrix with zeros
    num_tags = len(tags)
    num_words = len(words)
    B = np.zeros((num_tags, num_words))

    # Calculate the emission probabilities for each tag-word pair
    for i, tag in enumerate(tags):
        for j, word in enumerate(words):
            tag_word_pair = (tag, word)
            emission_count = emission_counts.get(tag_word_pair, 0)  # Use 0 if the pair is not found
            # Calculate probability with smoothing
            tag_probability = (emission_count + smoothing_coefficient) / (tag_counts[tag] + smoothing_coefficient * num_words)
            B[i, j] = tag_probability

    return B

In [15]:
B = emission_matrix(emission_counts, tag_counts,word_counts, 0.02)

## Viterbi Algorithm

Using the two matrices A and B, we will populate the two matrices used in the Viterbi algorithm:
- Matrix C, which stores the probabilities of transitioning from a POS tag to a word.
- Matrix D, which will keep track of the best path taken.

In [16]:
class ViterbiAlgorithm:
    def __init__(self, A, B, tag_counts, word_counts):
        """
        Initialize the ViterbiAlgorithm with transition matrix A, emission matrix B,
        tag counts, and word counts.

        Args:
            A (np.ndarray): Transition matrix.
            B (np.ndarray): Emission matrix.
            tag_counts (dict): Dictionary of tag counts.
            word_counts (dict): Dictionary of word counts.
        """
        self.A = A
        self.B = B
        self.tag_counts = tag_counts
        self.word_counts = word_counts
        self.tags = sorted(tag_counts.keys())
        self.number_tags = len(self.tags)

    def get_index_of_key(self, dictionary, key):
        """
        Get the index of a key in a sorted dictionary.

        Args:
            dictionary (list): The list of dictionary keys.
            key: The key to search for.

        Returns:
            int: The index of the key, or None if not found.
        """
        for index, dict_key in enumerate(dictionary):
            if dict_key == key:
                return index
        return None

    def initialisation(self, corpus):
        """
        Initialize matrices C and D and preprocess the corpus.

        Args:
            corpus (list): The input corpus (list of tokens).

        Returns:
            tuple: Matrices C, D, and the new corpus with unknown tokens replaced.
        """
        new_corpus = [token if token in self.word_counts else assign_unk(token) for token in corpus]

        C = np.zeros((self.number_tags, len(new_corpus)))
        D = np.zeros((self.number_tags, len(new_corpus)))

        index_of_first_word = self.get_index_of_key(sorted(self.word_counts.keys()), new_corpus[0])
        for i in range(self.number_tags):
            C[i, 0] = self.A[0, i] * self.B[i, index_of_first_word]

        return C, D, new_corpus

    def forward_pass(self, C, D, new_corpus):
        """
        Perform the forward pass of the Viterbi algorithm.

        Args:
            C (np.ndarray): Matrix C for probabilities.
            D (np.ndarray): Matrix D for storing best paths.
            new_corpus (list): The processed corpus.

        Returns:
            tuple: Updated matrices C and D.
        """
        for j in range(1, len(new_corpus)):
            for i in range(self.number_tags):
                max_value = 0
                max_index = 0
                index_of_word = self.get_index_of_key(sorted(self.word_counts.keys()), new_corpus[j])

                for i_tag in range(self.number_tags):
                    value = C[i_tag, j - 1] * self.A[i_tag, i] * self.B[i, index_of_word]
                    if value > max_value:
                        max_value = value
                        max_index = i_tag

                C[i, j] = max_value
                D[i, j] = max_index

        return C, D

    def backward_pass(self, C, D, new_corpus):
        """
        Perform the backward pass of the Viterbi algorithm to retrieve the predicted tags.

        Args:
            C (np.ndarray): Matrix C with probabilities.
            D (np.ndarray): Matrix D with best paths.
            new_corpus (list): The processed corpus.

        Returns:
            list: The predicted tags for the corpus.
        """
        corpus_length = len(new_corpus)
        best_proba = 0
        predicted_last_tags_indexes = 0
        predicted_tags = [None] * corpus_length

        for i in range(self.number_tags):
            if C[i, -1] > best_proba:
                best_proba = C[i, -1]
                predicted_last_tags_indexes = i

        predicted_tags[corpus_length - 1] = self.tags[predicted_last_tags_indexes]

        for j in range(corpus_length - 2, -1, -1):
            pos_tag_j = D[np.argmax(C[:, j + 1]), j + 1]
            predicted_tags[j] = self.tags[int(pos_tag_j)]

        return predicted_tags

    def compute_accuracy(self, pred, y):
        """
        Compute the accuracy of the predicted tags against the true tags.

        Args:
            pred (list): List of predicted tags.
            y (list): List of true tags.

        Returns:
            float: The accuracy as a fraction of correct predictions.
        """
        num_correct = sum(p == t for p, t in zip(pred, y))
        total = len(pred)
        return num_correct / total if total > 0 else 0

    def run(self, validation_words, validation_pos_tags):
        """
        Run the Viterbi algorithm and print results.

        Args:
            validation_words (list): List of words for validation.
            validation_pos_tags (list): List of original POS tags for validation.
        """
        C, D, new_corpus = self.initialisation(validation_words)
        C, D = self.forward_pass(C, D, new_corpus)
        predicted_pos_tags = self.backward_pass(C, D, new_corpus)

        print("Generated tags by the algorithm:")
        print(predicted_pos_tags)
        print("Original tags:")
        print(validation_pos_tags)
        print("Accuracy:")
        print(self.compute_accuracy(predicted_pos_tags, validation_pos_tags))



In [17]:
viterbi = ViterbiAlgorithm(A, B, tag_counts, word_counts)
viterbi.run(dev_words[15], dev_pos_tags[15])

Generated tags by the algorithm:
['PROPN', 'CHIF', 'NMP', 'PREL', 'DINTMS', 'NMS', 'ADJMS', 'PREP', 'CHIF', 'NMP', 'COCO', 'DINTFS', 'NFS', 'PREP', 'NFS', 'PUNCT', 'PDEMMS', 'AUX', 'DINTFS', 'NFS', 'ADJFS', 'PREP', 'NFS', 'YPFOR']
Original tags:
['VERB', 'CHIF', 'NMP', 'PREL', 'DINTMS', 'NMS', 'VERB', 'PREP', 'CHIF', 'NMP', 'COCO', 'DINTFS', 'NFS', 'PREP', 'NFS', 'PUNCT', 'PDEMMS', 'AUX', 'DINTFS', 'NFS', 'ADJFS', 'PREP', 'NFS', 'YPFOR']
Accuracy:
0.9166666666666666
