<a href="https://colab.research.google.com/github/LindaSekhoasha/POS_Tagging/blob/main/POS_Tagging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Meet Our Group



*   Linda Sekhoasha (Student Number)
*   Wandile Ngobese (222056013)
*   Khonzinkosi Mkhize (Student Number)
*   Samukelo Mkhize (Student Number)

# Imports

In [1]:
!pip install datasets
!pip install pomegranate

Collecting datasets
  Downloading datasets-3.5.0-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.12.0,>=2023.1.0 (from fsspec[http]<=2024.12.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.12.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.5.0-py3-none-any.whl (491 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m491.2/491.2 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.12.0-py3-none-any.wh

In [3]:
import pandas as pd
from datasets import Dataset
from collections import Counter, defaultdict, namedtuple
# from pomegranate import HiddenMarkovModel, DiscreteDistribution

# Data Pre-processing

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [5]:
# --- Function to handle bad lines ---
def handle_bad_lines(line):
    fields = line.strip().split('\t')
    if len(fields) == 3:  # Expected format
        return fields
    elif len(fields) == 4: # Handle 4-field lines
        # This handles the cases you specified
        # You might need to adjust based on your data structure
        return [fields[0], fields[1], ' '.join(fields[2:])]
    else:
        return None  # Skip lines with other formats

# --- Read the CSV, handling bad lines with a custom function ---
with open("/content/drive/MyDrive/zu.gold.seg.data", 'r', encoding='utf-8') as file:
    data_lines = [handle_bad_lines(line) for line in file]
    data_lines = [line for line in data_lines if line is not None] # Remove skipped lines

# --- Create DataFrame ---
df = pd.DataFrame(data_lines, columns=['TOKEN', 'UPOS', 'MORPH ANALYSIS'])
df.head(3)

Unnamed: 0,TOKEN,UPOS,MORPH ANALYSIS
0,TOKEN,MORPH ANALYSIS,UPOS
1,Ukwengeza,u-kw-engez-a,V
2,kulokhu,ku-lokhu,CDEM


In [6]:
# drop/remove the MORPH ANALYSIS column
df = df.drop(columns=['MORPH ANALYSIS'])

# convert all tokens to lowercase for a consistent vocab
df['TOKEN'] = [token.lower() for token in df['TOKEN']]
df.head(10)

Unnamed: 0,TOKEN,UPOS
0,token,MORPH ANALYSIS
1,ukwengeza,u-kw-engez-a
2,kulokhu,ku-lokhu
3,",",","
4,imibandela,i-mi-bandela
5,iyenziwa,i-ye-enz-iw-a
6,ukwakha,u-kw-akh-a
7,amakomiti,a-ma-komiti
8,amawadi,a-ma-wadi
9,",",","


In [7]:
# convert panda dataframe to HuggingFace dataset
dataset = Dataset.from_pandas(df)
print(dataset.features)

{'TOKEN': Value(dtype='string', id=None), 'UPOS': Value(dtype='string', id=None)}


In [8]:
data = dataset.train_test_split(test_size=0.2)

print(f"There are {len(data['train'])} tokens in the training set.")
print(f"There are {len(data['test'])} tokens in the testing set.\n")

print(f"First 3 training tokens:\n{data['train'][:3]}\n")
print(f"First 3 test tokens:\n{data['test'][:3]}")

There are 39278 tokens in the training set.
There are 9820 tokens in the testing set.

First 3 training tokens:
{'TOKEN': ['kumele', 'nemisebenzi', 'kanye'], 'UPOS': ['ku-mel-e', 'na-i-mi-sebenzi', 'ka-nye']}

First 3 test tokens:
{'TOKEN': ['umpompi', 'ndlela', 'ekhaya'], 'UPOS': ['u-m-pompi', 'n-dlela', 'e-khaya']}


# Most Frequent Class Tagger (Base)

In [9]:
"""A function to map either the tags to the count of words with said tag or
the words to the count of tags corresponding to the word."""
# keys -> list of either tags or words
# values -> is a similar, corresponding list
def pair_counts(keys, values):
    out = defaultdict(Counter)

    for key, value in zip(keys, values):
        out[key][value] += 1

    return {key: dict(value_counts) for key, value_counts in out.items()}

In [10]:
subset = data['train'].select(range(7))
tags = [example['UPOS'] for example in subset]
words = (example['TOKEN'] for example in subset)

emission_counts = pair_counts(tags, words)
print(emission_counts.keys())

dict_keys(['ku-mel-e', 'na-i-mi-sebenzi', 'ka-nye', 'i-wadi-ABC', '.', 'na-kho', 'nga-a-ba-ntu'])


In [11]:
"""MFC model class.
FakeState is used as the 'states' are actually strings to mimmick a complete tagger like HMM tagger.
FakeState has a named field 'name' which could be a Tag or word."""
FakeState = namedtuple("FakeState", "name")

class MFCTagger:
    missing = FakeState(name="<UNK>")

    def __init__(self, table):
        self.table = defaultdict(lambda: MFCTagger.missing)
        self.table.update({word: FakeState(name=tag) for word, tag in table.items()})

    def viterbi(self, seq):
        """This method simplifies predictions by matching the Pomegranate viterbi() interface"""
        return 0., list(enumerate(["<s>"] + [self.table[w] for w in seq] + ["</s>"]))

In [12]:
train_set = data['train']
test_set = data['test']

tags = [example['UPOS'] for example in train_set]
words = (example['TOKEN'] for example in train_set)

word_counts = pair_counts(words, tags)

mfc_table = dict()
for word, tags in word_counts.items():
    mfc_table[word] = max(tags.keys(), key = lambda key: tags[key])

mfc_model = MFCTagger(mfc_table) # Create a Most Frequent Class tagger instance

In [13]:
# function that returns the vocab of a given dataset
def vocab(data_s):
    vocab = set()
    for token in data_s['TOKEN']:
        vocab.add(token)
    return sorted(vocab)

vocab_list = vocab(train_set)

# show vocab starting from 50 to hide punct
print(vocab_list[50:60])

['aba-10', 'aba-28', 'aba-4', 'ababacabangeli', 'ababamba', 'ababambe', 'ababambi', 'ababambiqhaza', 'ababamele', 'ababandakanyekayo']


In [14]:
def replace_unknown(sequence):
    """Return a copy of the input sequence where each unknown word is replaced
    by the literal string value 'nan'. Pomegranate will ignore these values
    during computation.
    """
    return [w if w in vocab(train_set) else 'nan' for w in sequence]

def simplify_decoding(observations, model):
    """observations are the sequences (words) for the model to predict"""
    _, state_path = model.viterbi(replace_unknown(observations))
    return [state[1].name for state in state_path[1:-1]]  # do not show the start/end state predictions

In [15]:
print("Predicted labels:\n-----------------")
print(simplify_decoding(test_set[:20], mfc_model))
print()
print("Actual labels:\n--------------")
print(test_set['UPOS'][:10])
print("\n")

Predicted labels:
-----------------
['<UNK>', '<UNK>']

Actual labels:
--------------
['u-m-pompi', 'n-dlela', 'e-khaya', ',', 'u-lu-cwaningo', 'i-nge-ngeza', ',', 'aba-hluk-an-e', 'oku-ba', 'u-ku-phil-a']




##  **MODELS**

##  CRF Model

**1. Feature Extraction**

In [None]:
import string

def word_shape(word):
  shape = ''
  for character in word:
    if character.isupper():
      shape += 'X'
    elif character.islower():
      shape += 'x'
    elif character in string.digits:
      shape += 'd'
    else:
      shape += character
  return shape

In [None]:
def is_noun_class_prefix(word):
    noun_prefixes = ['u', 'um', 'aba', 'ama', 'isi', 'izi', 'in', 'izin', 'imi', 'abe', 'umu', 'i', 'ili', 'ama']
    return any(word.startswith(prefix) for prefix in noun_prefixes)

def vowel_count(word):
    return sum(1 for c in word.lower() if c in 'aeiou')


def word2features(sentence, i):
    current_word = sentence[i][0]

    features = {
        'bias': 1.0,
        'word.lower()': current_word.lower(),
        'word.isupper()': current_word.isupper(),
        'word.istitle()': current_word.istitle(),
        'word.isdigit()': current_word.isdigit(),
        'prefix-1': current_word[:1],
        'prefix-2': current_word[:2],
        'prefix-3': current_word[:3],
        'prefix-4': current_word[:4],
        'suffix-3': current_word[-3:],
        'suffix-4': current_word[-4:],
        'word.shape': word_shape(current_word),
        'has_noun_class_prefix': is_noun_class_prefix(current_word),
        'vowel_count': vowel_count(current_word),
        'word.length': len(current_word),
        # handle current_word being None:
        'starts_with_vowel': current_word[0].lower() in 'aeiou' if current_word and current_word[0] else False,
    }

    # Handle current_word being None or empty:
    features = {k: v for k, v in features.items() if v is not None}  # Remove features with None values

    if not current_word:
        features['word.is_empty'] = True  # Add a feature for empty words if needed

    if i > 0:
        prev_word = sentence[i-1][0]
        features.update({
            '-1:word.lower()': prev_word.lower(),
            '-1:prefix-3': prev_word[:3],
            '-1:suffix-3': prev_word[-3:],
        })
    else:
        features['BOS'] = True  # Beginning of sentence

    if i < len(sentence)-1:
        next_word = sentence[i+1][0]
        features.update({
            '+1:word.lower()': next_word.lower(),
            '+1:prefix-3': next_word[:3],
            '+1:suffix-3': next_word[-3:],
        })
    else:
        features['EOS'] = True  # End of sentence

    return features

In [None]:
def sentence2features(sentence):
  return [word2features(sentence, i) for i in range(len(sentence))]

The following is a test case for word2features() and sentence2features()

In [None]:
# Define the test sentence as list of tuples (word, dummy_label)
test_sentence = [('uJohn', 'X'), ('Doe', 'X'), (',', 'X'), ('ukwengeza', 'X'), ('imibandela', 'X'), ('kubasebenzi', 'X')]

# Run feature extraction
features_of_sentence = [word2features(test_sentence, idx) for idx in range(len(test_sentence))]

# Print nicely
for idx, features in enumerate(features_of_sentence):
    print(f"Word {idx}: '{test_sentence[idx][0]}' ➔ Features:")
    for key, value in features.items():
        print(f"   {key}: {value}")
    print("\n")


Word 0: 'uJohn' ➔ Features:
   bias: 1.0
   word.lower(): ujohn
   word.isupper(): False
   word.istitle(): False
   word.isdigit(): False
   prefix-1: u
   prefix-2: uJ
   prefix-3: uJo
   prefix-4: uJoh
   suffix-3: ohn
   suffix-4: John
   word.shape: xXxxx
   has_noun_class_prefix: True
   vowel_count: 2
   word.length: 5
   starts_with_vowel: True
   BOS: True
   +1:word.lower(): doe
   +1:prefix-3: Doe
   +1:suffix-3: Doe


Word 1: 'Doe' ➔ Features:
   bias: 1.0
   word.lower(): doe
   word.isupper(): False
   word.istitle(): True
   word.isdigit(): False
   prefix-1: D
   prefix-2: Do
   prefix-3: Doe
   prefix-4: Doe
   suffix-3: Doe
   suffix-4: Doe
   word.shape: Xxx
   has_noun_class_prefix: False
   vowel_count: 2
   word.length: 3
   starts_with_vowel: False
   -1:word.lower(): ujohn
   -1:prefix-3: uJo
   -1:suffix-3: ohn
   +1:word.lower(): ,
   +1:prefix-3: ,
   +1:suffix-3: ,


Word 2: ',' ➔ Features:
   bias: 1.0
   word.lower(): ,
   word.isupper(): False
   word.ist

In [15]:
!pip install datasets
!pip install sklearn-crfsuite


Collecting sklearn-crfsuite
  Downloading sklearn_crfsuite-0.5.0-py2.py3-none-any.whl.metadata (4.9 kB)
Collecting python-crfsuite>=0.9.7 (from sklearn-crfsuite)
  Downloading python_crfsuite-0.9.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.3 kB)
Downloading sklearn_crfsuite-0.5.0-py2.py3-none-any.whl (10 kB)
Downloading python_crfsuite-0.9.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: python-crfsuite, sklearn-crfsuite
Successfully installed python-crfsuite-0.9.11 sklearn-crfsuite-0.5.0


In [3]:
import pandas as pd
from datasets import Dataset
import sklearn_crfsuite
from sklearn_crfsuite import metrics


Code to split into sentences

In [16]:
# --- Extract tokens and labels from the training and testing sets ---
train_tokens = data['train']['TOKEN']
train_labels = data['train']['UPOS']

test_tokens = data['test']['TOKEN']
test_labels = data['test']['UPOS']

# --- Now split the tokens into real sentences ---
def split_into_sentences(tokens, labels):
    sentences = []
    current_tokens = []
    current_labels = []

    for token, label in zip(tokens, labels):
        current_tokens.append(token)
        current_labels.append(label)

        if token in ['.', '!', '?']:  # Sentence boundary
            sentences.append((current_tokens, current_labels))
            current_tokens = []
            current_labels = []

    # Catch any leftover tokens (in case no final punctuation)
    if current_tokens:
        sentences.append((current_tokens, current_labels))

    return sentences

train_sentences = split_into_sentences(train_tokens, train_labels)
test_sentences = split_into_sentences(test_tokens, test_labels)

# --- Prepare for CRF ---
X_train = [sentence2features(sentence) for sentence, _ in train_sentences]
y_train = [[label for label in labels] for _, labels in train_sentences]

X_test = [sentence2features(sentence) for sentence, _ in test_sentences]
y_test = [[label for label in labels] for _, labels in test_sentences]


NameError: name 'sentence2features' is not defined

Training of the crf model

In [None]:
# --- Create and configure the CRF model ---
crf = sklearn_crfsuite.CRF(
    algorithm='lbfgs',
    c1=0.1,
    c2=0.1,
    max_iterations=30,  # Reduce from 100 -> 30 to prevent long training
    all_possible_transitions=False,  # Only allow seen transitions
    verbose=True  # Print progress so you know it's working
)

# --- Train the model ---
crf.fit(X_train, y_train)

# --- Predict on the test set ---
y_pred = crf.predict(X_test)

# --- Evaluate the model ---
print(metrics.flat_classification_report(y_test, y_pred, digits=3))



loading training data to CRFsuite: 100%|██████████| 2513/2513 [00:03<00:00, 646.80it/s]



Feature generation
type: CRF1d
feature.minfreq: 0.000000
feature.possible_states: 0
feature.possible_transitions: 0
0....1....2....3....4....5....6....7....8....9....10

## **Maximum Entropy Markov Model**

In [16]:
df = pd.DataFrame(data_lines[:10000], columns=['TOKEN', 'UPOS', 'MORPH ANALYSIS'])

In [17]:
#Group data into sentences for MEMM training
def group_by_sentences(dataset, sentence_length=7):
    grouped = []
    temp = []
    for i, ex in enumerate(dataset):
        temp.append(ex)
        if len(temp) == sentence_length:
            grouped.append(temp)
            temp = []
    if temp:
        grouped.append(temp)
    return grouped

#Apply grouping
#train_sentences = group_by_sentences(train_set)
train_sentences = group_by_sentences(train_set.select(range(7000)))  # Or even 500
test_sentences = group_by_sentences(test_set)

In [20]:
# --- Feature extraction for MEMM ---
def extract_features(sentence, i, prev_tag):
    word = sentence[i]['TOKEN']
    features = {
        'word': word,
        'prev_word': sentence[i-1]['TOKEN'] if i > 0 else '<s>',
        'next_word': sentence[i+1]['TOKEN'] if i < len(sentence)-1 else '</s>'#,
        #'is_upper': word[0].isupper(),
        #'is_digit': word.isdigit(),
        #'suffix': word[-3:],
        #'prefix': word[:3],
        #'prev_tag': prev_tag,'''
    }
    return features


In [18]:
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction import DictVectorizer
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score

In [21]:
X, y = [], []
for sentence in train_sentences:
    prev_tag = '<s>'  # Start of sentence tag
    for i in range(len(sentence)):
        feats = extract_features(sentence, i, prev_tag)
        X.append(feats)
        y.append(sentence[i]['UPOS'])
        prev_tag = sentence[i]['UPOS']

# Convert features into numerical vectors
'''vec = DictVectorizer()
X_vec = vec.fit_transform(X)'''
vec = DictVectorizer(sparse=True)  # This is default behavior
X_vec = vec.fit_transform(X)       # Avoid converting to .toarray()


# Encode string labels into integers
lbl = LabelEncoder()
y_enc = lbl.fit_transform(y)

# Train a logistic regression classifier (MEMM)
clf = LogisticRegression(max_iter=700)
#clf = LogisticRegression(max_iter=400, solver='liblinear')  # Use lightweight solver
clf.fit(X_vec, y_enc)

In [22]:
#Greedy decoder: predict tags for a sentence
def predict_tags(sentence):
    predicted_tags = []
    prev_tag = '<s>'
    for i in range(len(sentence)):
        feats = extract_features(sentence, i, prev_tag)
        feats_vec = vec.transform([feats])
        pred_idx = clf.predict(feats_vec)[0]
        pred_tag = lbl.inverse_transform([pred_idx])[0]
        predicted_tags.append(pred_tag)
        prev_tag = pred_tag
    return predicted_tags

In [None]:
#Evaluate MEMM on the test set
y_true, y_pred = [], []
for sentence in test_sentences:
    true_tags = [tok['UPOS'] for tok in sentence]
    predicted = predict_tags(sentence)
    y_true.extend(true_tags)
    y_pred.extend(predicted)

print("MEMM POS Tagging Accuracy:", accuracy_score(y_true, y_pred))
