<a href="https://colab.research.google.com/github/Nuwantha97/Sinhala_spell_and_grammer_checker/blob/Notebooks/tokenize_POS_tagging.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
!pip install sinling

Collecting sinling
  Downloading sinling-0.3.6-py3-none-any.whl.metadata (3.0 kB)
Collecting emoji (from sinling)
  Downloading emoji-2.14.0-py3-none-any.whl.metadata (5.7 kB)
Collecting pygtrie (from sinling)
  Downloading pygtrie-2.5.0-py3-none-any.whl.metadata (7.5 kB)
Collecting sklearn-crfsuite (from sinling)
  Downloading sklearn_crfsuite-0.5.0-py2.py3-none-any.whl.metadata (4.9 kB)
Collecting python-crfsuite>=0.9.7 (from sklearn-crfsuite->sinling)
  Downloading python_crfsuite-0.9.11-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.3 kB)
Downloading sinling-0.3.6-py3-none-any.whl (20.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.0/20.0 MB[0m [31m72.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading emoji-2.14.0-py3-none-any.whl (586 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m586.9/586.9 kB[0m [31m31.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pygtrie-2.5.0-py3-none-any.whl (25 kB)
Downloading sklearn_

In [None]:
# prompt: mount to google drive

from google.colab import drive
drive.mount('/content/drive')

In [13]:
import re
from collections import defaultdict, Counter
from typing import List, Tuple, Set, Dict
import logging
import math
import json

class SinhalaPOSTagger:
    """A Part-of-Speech tagger for Sinhala using the Viterbi algorithm."""

    def __init__(self):
        """Initialize the Sinhala POS tagger."""
        self.unknown_prob = math.log(1e-10)
        self.bigram_cnt: Dict[Tuple[str, str], int] = defaultdict(int)
        self.unigram_cnt: Dict[str, int] = defaultdict(int)
        self.tag_count: Dict[str, int] = defaultdict(int)
        self.tag_word_count: Counter = Counter()
        self.transition_probabilities: Dict[Tuple[str, str], float] = defaultdict(lambda: self.unknown_prob)
        self.emission_probabilities: Dict[Tuple[str, str], float] = defaultdict(lambda: self.unknown_prob)
        self.states: Set[str] = set()

        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def ngrams(self, text: List[str], n: int) -> List[Tuple[str, ...]]:
        """Generate n-grams from text."""
        return [tuple(text[i:i + n]) for i in range(len(text) - n + 1)]

    def clean_sinhala(self, word: str) -> str:
        """Clean Sinhala word and handle Unicode normalization.

        Args:
            word: Input Sinhala word

        Returns:
            Cleaned word
        """
        # Remove any whitespace
        word = re.sub(r'\s+', '', word)
        # Normalize Zero Width Joiner and Zero Width Non-Joiner
        word = re.sub(r'[\u200D\u200C]', '', word)
        return word

    def load_corpus(self, corpus_file: str) -> List[List[Tuple[str, str]]]:
        """Load Sinhala tagged corpus from file.

        Expected format (JSON):
        [
            [["word1", "tag1"], ["word2", "tag2"]],  # Sentence 1
            [["word3", "tag3"], ["word4", "tag4"]]   # Sentence 2
        ]
        """
        with open(corpus_file, 'r', encoding='utf-8') as f:
            corpus = json.load(f)
        return corpus

    def train(self, corpus_file: str) -> None:
        """Train the POS tagger on Sinhala corpus.

        Args:
            corpus_file: Path to the JSON file containing tagged Sinhala corpus
        """
        self.logger.info("Starting training process...")

        corpus = self.load_corpus(corpus_file)
        tagged_words = []
        all_tags = []

        # Process corpus
        for sentence in corpus:
            all_tags.append("START")
            for word, tag in sentence:
                if tag and tag not in ['NIL']:
                    all_tags.append(tag)
                    word = self.clean_sinhala(word)
                    tagged_words.append((tag, word))
            all_tags.append("END")

        # Calculate probabilities
        self._calculate_probabilities(tagged_words, all_tags)

        self.logger.info(f"Training complete. Found {len(self.states)} unique tags.")

    def _calculate_probabilities(self, tagged_words: List[Tuple[str, str]], all_tags: List[str]) -> None:
        """Calculate all probabilities needed for the model."""
        # Count occurrences
        for tag, word in tagged_words:
            self.tag_count[tag] += 1
            self.tag_word_count[(tag, word)] += 1

        # Calculate bigram and unigram counts
        for bigram in self.ngrams(all_tags, 2):
            self.bigram_cnt[bigram] += 1
        for tag in all_tags:
            self.unigram_cnt[tag] += 1

        # Calculate transition probabilities
        for bigram in self.bigram_cnt:
            if self.unigram_cnt[bigram[0]] > 0:
                prob = self.bigram_cnt[bigram] / self.unigram_cnt[bigram[0]]
                self.transition_probabilities[bigram] = math.log(prob) if prob > 0 else self.unknown_prob

        # Calculate emission probabilities
        for tag, word in tagged_words:
            if self.tag_count[tag] > 0:
                prob = self.tag_word_count[(tag, word)] / self.tag_count[tag]
                self.emission_probabilities[(tag, word)] = math.log(prob) if prob > 0 else self.unknown_prob

        # Store states
        self.states = set(self.tag_count.keys())

    def viterbi(self, observable: List[str], states: Set[str]) -> List[Tuple[str, str]]:
        """Implement Viterbi algorithm for POS tagging."""
        if not states:
            self.logger.error("No states provided for Viterbi algorithm")
            return []

        V = [{}]  # Viterbi matrix
        path = {}

        # Initialize
        for state in states:
            V[0][state] = (self.transition_probabilities[("START", state)] +
                          self.emission_probabilities[(state, observable[0])])
            path[state] = [state]

        # Run Viterbi
        for t in range(1, len(observable)):
            V.append({})
            newpath = {}

            for state in states:
                emit_p = self.emission_probabilities[(state, observable[t])]
                (prob, state0) = max(
                    (V[t-1][y0] + self.transition_probabilities[(y0, state)] + emit_p, y0)
                    for y0 in states
                )
                V[t][state] = prob
                newpath[state] = path[state0] + [state]
            path = newpath

        # Find best path
        (prob, state) = max((V[len(observable) - 1][y], y) for y in states)
        return list(zip(observable, path[state]))

    def tag_sentence(self, sentence: List[str]) -> List[Tuple[str, str]]:
        """Tag a Sinhala sentence with POS tags."""
        if not self.states:
            self.logger.error("Model not trained. Please run train() first.")
            return []

        # Tokenize each word in the sentence with error handling
        tokenized_words = []
        for word in sentence:
            if isinstance(word, str):
                tokens = tokenizer.tokenize(word)
                tokenized_words.append(tokens[0] if tokens else word)
            else:
                tokenized_words.append(str(word))

        cleaned_words = [self.clean_sinhala(w) for w in tokenized_words]
        return self.viterbi(cleaned_words, self.states)

    def save_model(self, file_path: str) -> None:
      """Save the trained model to a file."""
      model_data = {
          'bigram_cnt': {"|".join(k): v for k, v in self.bigram_cnt.items()},
          'unigram_cnt': dict(self.unigram_cnt),
          'tag_count': dict(self.tag_count),
          'tag_word_count': {"|".join(k): v for k, v in self.tag_word_count.items()},
          'transition_probabilities': {"|".join(k): v for k, v in dict(self.transition_probabilities).items()},
          'emission_probabilities': {"|".join(k): v for k, v in dict(self.emission_probabilities).items()},
          'states': list(self.states)
      }

      with open(file_path, 'w', encoding='utf-8') as f:
          json.dump(model_data, f, ensure_ascii=False, indent=2)

    def load_model(self, file_path: str) -> None:
        """Load a trained model from a file."""
        with open(file_path, 'r', encoding='utf-8') as f:
            model_data = json.load(f)

        self.bigram_cnt = defaultdict(int, {tuple(k.split("|")): v for k, v in model_data['bigram_cnt'].items()})
        self.unigram_cnt = defaultdict(int, model_data['unigram_cnt'])
        self.tag_count = defaultdict(int, model_data['tag_count'])
        self.tag_word_count = Counter({tuple(k.split("|")): v for k, v in model_data['tag_word_count'].items()})
        self.transition_probabilities = defaultdict(
            lambda: self.unknown_prob,
            {tuple(k.split("|")): v for k, v in model_data['transition_probabilities'].items()}
        )
        self.emission_probabilities = defaultdict(
            lambda: self.unknown_prob,
            {tuple(k.split("|")): v for k, v in model_data['emission_probabilities'].items()}
        )
        self.states = set(model_data['states'])

In [14]:
import pandas as pd
import csv

df = pd.read_csv("/content/drive/MyDrive/Projects/Sinhala Spell and Grammer checker/POS data/pos_nod.csv", on_bad_lines='skip', sep=',', engine='python')

# Create a temporary file to store the filtered data
with open("/content/drive/MyDrive/Projects/Sinhala Spell and Grammer checker/POS data/pos_nod.csv", 'r', encoding='utf-8') as infile, open('temp.csv', 'w', encoding='utf-8', newline='') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)

    for row in reader:
        if len(row) <= 2:  # Keep rows with 2 or fewer fields
            writer.writerow(row)

# Load the modified CSV file into a pandas DataFrame
df = pd.read_csv('temp.csv')

# prompt: add df to word and tag colunm names

df.columns = ['word', 'tag']
df

Unnamed: 0,word,tag
0,මිසයිල,NNJ
1,ප්‍රහාර,NNC
2,වලින්,CM
3,පලස්තීනුවෝ,NNP
4,4,NUM
...,...,...
94371,වීරඹුගෙදර,NNP
94372,පොතුහැර,NNP
94373,බංගලාවත්ත,NNP
94374,ලතීෆ්,NNP


In [15]:
# prompt: df split to training and test data

from sklearn.model_selection import train_test_split

# Assuming 'df' is your DataFrame with 'word' and 'tag' columns
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42) # 80% training, 20% testing

print("Training data size:", len(train_df))
print("Testing data size:", len(test_df))

Training data size: 75500
Testing data size: 18876


In [16]:
from sinling import SinhalaTokenizer

# Create your training data in JSON format
tokenizer = SinhalaTokenizer()

# Create your training data in JSON format with tokenized words
training_data = []
for i in range(len(train_df)):
    try:
        word = train_df.iloc[i]['word']
        if not isinstance(word, str):
            continue
        # Tokenize the Sinhala word and handle empty results
        tokens = tokenizer.tokenize(word)
        tokenized_word = tokens[0] if tokens else word
        training_data.append([[tokenized_word, train_df.iloc[i]['tag']]])
    except (KeyError, IndexError) as e:
        print(f"Skipping row {i}: {e}")

# Save training data to file
with open('sinhala_corpus.json', 'w', encoding='utf-8') as f:
    json.dump(training_data, f, ensure_ascii=False, indent=2)

# Create and train the tagger
tagger = SinhalaPOSTagger()
tagger.train('/content/sinhala_corpus.json')

In [17]:
# Save the trained model
tagger.save_model('sinhala_pos_model.json')

In [18]:
# Tag a new sentence
sentence = ["මිනිත්තුවකට" ,"තත්පර" ,"කීයක්" ,"තිබේද", "ශ්ස්"]
tagged_sentence = tagger.tag_sentence(sentence)

# Print results
for word, tag in tagged_sentence:
    print(f"{word}: {tag}")

මිනිත්තුවකට: NNC
තත්පර: VP
කීයක්: NNC
තිබේද: QBE
ශ්ස්: VP


In [19]:
#test data evaluvation

import json
from sklearn.metrics import classification_report

# Load the saved model
tagger = SinhalaPOSTagger()
tagger.load_model('sinhala_pos_model.json')

# Prepare test data
test_data = []
for i in range(len(test_df)):
    try:
        test_data.append([[test_df.iloc[i]['word'], test_df.iloc[i]['tag']]])
    except KeyError:
        print(f"Skipping row {i} due to missing 'word' or 'tag' column")

true_tags = []
predicted_tags = []

# Evaluate on the test set
for sentence in test_data:
    for word, tag in sentence:
        if not isinstance(word, str):
            continue
        tokens = tokenizer.tokenize(word)
        tokenized_word = tokens[0] if tokens else word
        tagged_words = tagger.tag_sentence([tokenized_word])
        if tagged_words:
            predicted_word, predicted_tag = tagged_words[0]
            true_tags.append(tag)
            predicted_tags.append(predicted_tag)

print(classification_report(true_tags, predicted_tags))

              precision    recall  f1-score   support

         ABB       0.93      0.85      0.89        46
         AUX       1.00      1.00      1.00        39
          CC       1.00      1.00      1.00        28
          CM       1.00      0.84      0.91        19
         DET       0.96      0.92      0.94        49
          FS       1.00      1.00      1.00         2
         JCV       0.98      0.92      0.95       159
          JJ       1.00      0.94      0.97      1082
         NCV       0.96      0.97      0.97       158
         NDT       1.00      1.00      1.00        15
         NIP       1.00      0.82      0.90        40
         NNC       0.94      0.99      0.97      8281
        NNC‍       0.99      1.00      0.99        72
         NNJ       0.98      0.93      0.96       298
        NNNP       1.00      1.00      1.00         1
         NNP       0.99      0.95      0.97      3722
         NUM       1.00      0.95      0.98       604
         NVB       1.00    