# Language Identification with Perceptron and Naive Bayes models

## Introduction

Welcome to this Jupyter Notebook, which is dedicated to the fascinating field of Natural Language Processing (NLP), specifically focusing on the task of Language Identification. This project is an exploration into the realm of machine learning, where we employ two distinct models - the Perceptron and Naive Bayes - to accurately identify the language of given text samples.

Language Identification is a fundamental task in NLP that involves determining the language that a piece of text is written in. This is crucial in various applications such as content categorization, filtering, and as a preprocessing step in complex NLP tasks like translation and sentiment analysis.

The language identification dataset is from: https://www.kaggle.com/datasets/zarajamshaid/language-identification-datasst/

From the original dataset, we split it into train-dev-test sets by the proportion of 70-15-15

In [28]:
import os
import sys
import random
import math
from pathlib import Path
from typing import Iterable, Generator, Sequence

import pandas as pd
from sklearn.model_selection import train_test_split
from collections import defaultdict, Counter
from operator import itemgetter
from sklearn.metrics import accuracy_score, f1_score

# Load the dataset
dataset_path = Path('dataset') 
dataset = pd.read_csv(dataset_path / 'lid_dataset.csv')

# Splitting the dataset into training (70%) and the remaining (30%)
train_set, remaining_set = train_test_split(dataset, test_size=0.3, random_state=42, stratify=dataset['language'])

# Split the remaining 30% into validation and test sets (50% each of the remaining, which is 15% each of the total)
validation_set, test_set = train_test_split(remaining_set, test_size=0.5, random_state=42, stratify=remaining_set['language'])

# Save the sets into separate CSV files
train_set.to_csv(os.path.join(dataset_path, 'lid_train.csv'), index=False)
validation_set.to_csv(os.path.join(dataset_path, 'lid_dev.csv'), index=False)
test_set.to_csv(os.path.join(dataset_path, 'lid_test.csv'), index=False)

Define necessary helper classes we need

In [29]:
class ClassificationInstance:
    def __init__(self, label: str, features: Iterable[str]) -> None:
        self.label: str = label
        self.features: tuple[str, ...] = tuple(features)

    def __repr__(self) -> str:
        return f"<ClassificationInstance: {str(self)}>"

    def __str__(self) -> str:
        return f"label={self.label}; features={self.features}"


class LanguageIdentificationInstance:
    def __init__(self, text: str, language: str) -> None:
        self.language: str = language
        self.text: str = text

    def __repr__(self) -> str:
        return f"<LanguageIdentificationInstance: {str(self)}>"

    def __str__(self) -> str:
        return f"label={self.language}; text={self.text}"

    @classmethod
    def from_line(cls, line: str) -> "LanguageIdentificationInstance":
        splits = line.rstrip("\n").split(",")
        return cls(splits[0], splits[1])


def load_lid_instances(path: str) -> Generator[LanguageIdentificationInstance, None, None]:
    with open(path, encoding="utf8") as file:
        next(file)
        for line in file:
            yield LanguageIdentificationInstance.from_line(line)


def max_item(scores: dict[str, float]) -> tuple[str, float]:
    return max(scores.items(), key=itemgetter(1))


def items_descending_value(counts: Counter[str]) -> list[str]:
    return [key for key, value in sorted(counts.items(), key=_items_sort_key)]


def _items_sort_key(item: tuple[str, int]) -> tuple[int, str]:
    return -item[1], item[0]


class CharBigramFeatureExtractor:
    @staticmethod
    def extract_features(instance: LanguageIdentificationInstance) -> ClassificationInstance:
        features = set()
        text = instance.text
        for i in range(len(text)-1):
            features.add(text[i:i+2])
        return ClassificationInstance(instance.language, features)
    
    
class InstanceCounter:
    """Holds counts of the labels and features seen during training.

    See the assignment for an explanation of each method."""

    def __init__(self) -> None:
        self._label_counts = Counter()
        self._total_labels = 0
        self._feature_label_joint_counts = defaultdict(Counter)
        self._label_list = []
        self._feature_vocab_size = 0
        self._feature_set = set()
        self._total_feature_count_for_label = Counter()

    def count_instances(self, instances: Iterable[ClassificationInstance]) -> None:
        for instance in instances:
            label = instance.label
            features = instance.features

            self._label_counts[label] += 1
            self._total_labels += 1
            self._feature_label_joint_counts[label].update(features)

            self._feature_set.update(features)
            self._total_feature_count_for_label[label] += len(features)

        self._label_list = list(self._label_counts.keys())
        self._feature_vocab_size = len(self._feature_set)

    def label_count(self, label: str) -> int:
        return self._label_counts[label]

    def total_labels(self) -> int:
        return self._total_labels

    def feature_label_joint_count(self, feature: str, label: str) -> int:
        return self._feature_label_joint_counts[label][feature]

    def unique_labels(self) -> list[str]:
        return self._label_list

    def feature_vocab_size(self) -> int:
        return self._feature_vocab_size

    def feature_set(self) -> set[str]:
        return self._feature_set

    def total_feature_count_for_label(self, label: str) -> int:
        return self._total_feature_count_for_label[label]


Prepare out data to be used for both models

In [30]:
# Paths
TRAIN_PATH = dataset_path / 'lid_train.csv'
DEV_PATH = dataset_path / 'lid_dev.csv'
TEST_PATH = dataset_path / 'lid_test.csv'

# Feature extraction
feature_extractor = CharBigramFeatureExtractor()
train_instances = [feature_extractor.extract_features(inst) for inst in load_lid_instances(TRAIN_PATH)]
dev_instances = [feature_extractor.extract_features(inst) for inst in load_lid_instances(DEV_PATH)]
test_instances = [feature_extractor.extract_features(inst) for inst in load_lid_instances(TEST_PATH)]

# Count instances
instance_counter = InstanceCounter()
instance_counter.count_instances(train_instances)
labels = instance_counter.unique_labels()

# Labels
dev_labels = [instance.label for instance in dev_instances]
test_labels = [instance.label for instance in test_instances]

## Perceptron & Averaged Perceptron models

Define our perceptron and averaged perceptron model

In [31]:
class Perceptron:
    def __init__(self, labels: list[str]) -> None:
        self.labels: list[str] = labels
        self.weights: dict[str, defaultdict[str, float]] = {}
        for label in labels:
            self.weights[label] = defaultdict(float)
        self.step: int = 0

    def classify(self, features: Iterable[str]) -> str:
        scores = {}
        for label in self.labels:
            scores[label] = 0
            for feature in features:
                scores[label] += self.weights[label][feature]
        return max_item(scores)[0]

    def learn(self, instance: ClassificationInstance, lr: float) -> None:
        self.step += 1
        features = instance.features
        label_pred = self.classify(features)
        label_true = instance.label

        if label_pred != label_true:
            for feature in features:
                self.pre_update(feature, label_true)
                self.pre_update(feature, label_pred)
                self.weights[label_true][feature] += lr
                self.weights[label_pred][feature] -= lr

    def pre_update(self, feature: str, label: str) -> None:
        # Do nothing
        pass

    def finalize_weights(self) -> None:
        # Do nothing
        pass


class AveragedPerceptron(Perceptron):
    def __init__(self, labels: list[str]) -> None:
        super().__init__(labels)
        self.sums: dict[str, defaultdict[str, float]] = {}
        self.last_updated: dict[str, defaultdict[str, int]] = {}
        for label in labels:
            self.sums[label] = defaultdict(float)
            self.last_updated[label] = defaultdict(int)

    def pre_update(self, feature: str, label: str) -> None:
        self.sums[label][feature] += (self.step - self.last_updated[label][feature]) * self.weights[label][feature]
        self.last_updated[label][feature] = self.step

    def finalize_weights(self) -> None:
        self.step += 1
        for label in self.labels:
            for feature, weight_sum in self.sums[label].items():
                self.sums[label][feature] += (self.step-self.last_updated[label][feature])*self.weights[label][feature]
                self.weights[label][feature] = self.sums[label][feature] / self.step


def train_perceptron(
    model: Perceptron, data: list[ClassificationInstance], epochs: int, lr: float
) -> None:
    for _ in range(epochs):
        for instance in data:
            model.learn(instance, lr)
        random.shuffle(data)
    model.finalize_weights()


Train the perceptron model with different hyperparameters and tune with the dev set

In [32]:
def perceptron_lid_model():
    # Hyperparameters to tune
    epoch_choices = [1, 3, 5]
    lr_choices = [0.01, 0.1, 1]
    average_choices = [False, True]

    best_accuracy = 0.0
    best_hyperparameters = {}
    best_perceptron_model = None

    for average in average_choices:
        for n_epochs in epoch_choices:
            for lr in lr_choices:
                # Training
                perceptron_model = AveragedPerceptron(labels) if average else Perceptron(labels)
                train_perceptron(perceptron_model, train_instances, n_epochs, lr)

                # Evaluation on development set
                dev_predictions = [perceptron_model.classify(instance.features) for instance in dev_instances]
                accuracy = accuracy_score(dev_labels, dev_predictions)

                print(f"Epochs={n_epochs}, LR={lr}, Average={average}, Accuracy: {accuracy}")

                if accuracy > best_accuracy:
                    best_accuracy = accuracy
                    best_hyperparameters = {
                        'n_epochs': n_epochs,
                        'lr': lr,
                        'average': average,
                    }
                    best_perceptron_model = perceptron_model

    print("Best Hyperparameters for Perceptron:")
    print(best_hyperparameters)

    # Testing with the best hyperparameters on the test set
    test_predictions = [best_perceptron_model.classify(instance.features) for instance in test_instances]
    test_accuracy = accuracy_score(test_labels, test_predictions)
    test_micro_f1 = f1_score(test_labels, test_predictions, average='micro')
    test_macro_f1 = f1_score(test_labels, test_predictions, average='macro')

    print("Testing with Best Hyperparameters for Perceptron:")
    print(f"Accuracy: {test_accuracy}")
    print(f"Macro F1: {test_macro_f1}")

perceptron_lid_model()

Epochs=1, LR=0.01, Average=False, Accuracy: 0.9678787878787879
Epochs=1, LR=0.1, Average=False, Accuracy: 0.9557575757575758
Epochs=1, LR=1, Average=False, Accuracy: 0.9675757575757575
Epochs=3, LR=0.01, Average=False, Accuracy: 0.9733333333333334
Epochs=3, LR=0.1, Average=False, Accuracy: 0.9766666666666667
Epochs=3, LR=1, Average=False, Accuracy: 0.9772727272727273
Epochs=5, LR=0.01, Average=False, Accuracy: 0.9760606060606061
Epochs=5, LR=0.1, Average=False, Accuracy: 0.98
Epochs=5, LR=1, Average=False, Accuracy: 0.9775757575757575
Epochs=1, LR=0.01, Average=True, Accuracy: 0.9796969696969697
Epochs=1, LR=0.1, Average=True, Accuracy: 0.9827272727272728
Epochs=1, LR=1, Average=True, Accuracy: 0.9796969696969697
Epochs=3, LR=0.01, Average=True, Accuracy: 0.980909090909091
Epochs=3, LR=0.1, Average=True, Accuracy: 0.9821212121212122
Epochs=3, LR=1, Average=True, Accuracy: 0.9806060606060606
Epochs=5, LR=0.01, Average=True, Accuracy: 0.983030303030303
Epochs=5, LR=0.1, Average=True, Acc

Pros:
-   Simplicity and Speed: The Perceptron is a simple linear classifier that is generally fast to train, making it suitable for large datasets and real-time prediction.

-   Online Learning: It can be easily adapted for online learning, where the model is updated continuously as new data arrives.
Robust to Noisy Data: Perceptrons can be quite robust to noise in the input data.

-   Scalability: Works well with high-dimensional data and scales nicely with the size of the dataset.

Cons:
-   Linear Decision Boundary: The Perceptron can only classify data that is linearly separable. It struggles with complex patterns and non-linear relationships.

-   Prone to Overfitting: Without proper regularization or early stopping, it can overfit to the training data, especially in the case of noisy or unbalanced datasets.

-   Lack of Probabilistic Interpretation: It does not output probabilities for classifications, making it less informative for certain applications where understanding the uncertainty is crucial.

-   Convergence Issues: The algorithm may fail to converge if the data is not linearly separable.

Next, we define our Naive Bayes Classifier model

In [36]:
class NaiveBayesClassifier:

    def __init__(self, k: float):
        self.k: float = k
        self.instance_counter: InstanceCounter = InstanceCounter()

    def train(self, instances: Iterable[ClassificationInstance]) -> None:
        self.instance_counter.count_instances(instances)

    def prior_prob(self, label: str) -> float:
        return self.instance_counter.label_count(label)/self.instance_counter.total_labels()

    def feature_prob(self, feature: str, label) -> float:
        count_feature_given_label = self.instance_counter.feature_label_joint_count(feature, label)
        big_n = self.instance_counter.total_feature_count_for_label(label)
        big_v = self.instance_counter.feature_vocab_size()
        return (count_feature_given_label + self.k) / (big_n + big_v * self.k)

    def log_posterior_prob(self, features: Sequence[str], label: str) -> float:
        log_class_prior_probability = math.log(self.prior_prob(label))
        log_posterior_probability = log_class_prior_probability
        for feature in features:
            if feature in self.instance_counter.feature_set():
                likelihood = self.feature_prob(feature, label)
                log_posterior_probability += math.log(likelihood)
        return log_posterior_probability

    def classify(self, features: Sequence[str]) -> str:
        probs = []
        for label in self.instance_counter.unique_labels():
            probs.append((self.log_posterior_prob(features, label), label))
        return max(probs)[1]

    def test(
        self, instances: Iterable[ClassificationInstance]
    ) -> tuple[list[str], list[str]]:
        predicted_labels = []
        true_labels = []
        for instance in instances:
            true_labels.append(instance.label)
            prediction = self.classify(instance.features)
            predicted_labels.append(prediction)
        return predicted_labels, true_labels

Train the naive bayes model with different k values and tune with the dev set

In [37]:
def naive_bayes_lid_model():
    
    # Hyperparameters to tune
    k_choices = [0.0001, 0.001, 0.01, 0.1]

    best_accuracy = 0.0
    best_k = None
    best_nb_model = None

    for k in k_choices:
        # Training Naive Bayes
        nb_model = NaiveBayesClassifier(k)
        nb_model.train(train_instances)

        # Evaluation on development set
        dev_predictions, _ = nb_model.test(dev_instances)
        accuracy = accuracy_score(dev_labels, dev_predictions)

        print(f"k={k}, Accuracy: {accuracy}")

        if accuracy > best_accuracy:
            best_accuracy = accuracy
            best_k = k
            best_nb_model = nb_model

    print("Best Hyperparameter for k:", best_k)

    # Testing with the best hyperparameters on the test set
    test_predictions, _ = best_nb_model.test(test_instances)
    test_accuracy = accuracy_score(test_labels, test_predictions)
    test_macro_f1 = f1_score(test_labels, test_predictions, average='macro')

    print("Testing with Best Hyperparameters:")
    print(f"Accuracy: {test_accuracy}")
    print(f"Macro F1: {test_macro_f1}")

naive_bayes_lid_model()


k=0.0001, Accuracy: 0.9763636363636363
k=0.001, Accuracy: 0.9769696969696969
k=0.01, Accuracy: 0.9766666666666667
k=0.1, Accuracy: 0.9763636363636363
Best Hyperparameter for k: 0.001
Testing with Best Hyperparameters:
Accuracy: 0.9781818181818182
Macro F1: 0.9785765401597825


Pros:

-   Probabilistic Approach: It provides a probabilistic understanding of classifications, which can be insightful for many applications.

-   Good with Small Data: Often performs well even with a relatively small amount of training data, thanks to its probabilistic nature.

-   Handling Categorical Data: Naturally handles categorical data and can be adapted for text classification problems.

-   Efficiency: Naive Bayes classifiers are computationally efficient, especially in their training phase.

Cons:

-   Naive Assumption: The assumption of feature independence is often unrealistic in real-world scenarios, leading to potentially suboptimal performance.

-   Poor Estimation for Rare Features: It can struggle with features that did not appear in the training set (zero-frequency problem) unless specifically addressed (e.g., with smoothing techniques).

-   Sensitive to Imbalanced Data: The performance can be significantly impacted by imbalanced datasets.

-   Less Effective for Correlated Features: Due to the independence assumption, it may not perform well when features are correlated.

Summary

Perceptron is suitable for problems where a linear decision boundary is sufficient, and the focus is on speed and scalability over interpretability. It's more suitable for online learning scenarios.

Naive Bayes excels in probabilistic classification, especially in text-related tasks and scenarios where training data is limited. Its performance can suffer when the assumption of independent features does not hold.

In practice, the choice between these models would depend on the specific characteristics of the dataset and the requirements of the task at hand. Often, trying both and comparing their performance on a validation set is the best way to determine which is more suitable for a particular problem.