<a href="https://colab.research.google.com/github/BelkissSouayed/machine-learning-nlp-assignments/blob/main/Assignment1_Language_Identification_Solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import csv
import re
import string
from collections import defaultdict

# Linear Regression

In [None]:
# download dataset
!gdown 1QP6YuwdKFNUPpvhOaAcvv2Pcp4JMbIRs # x_train
!gdown 1QVo7PZAdiZKzifK8kwhEr_umosiDCUx6 # x_test
!gdown 1QbBeKcmG2ZyAEFB3AKGTgSWQ1YEMn2jl # y_train
!gdown 1QaZj6bI7_78ymnN8IpSk4gVvg-C9fA6X # y_test

In [None]:
with open(f'x_train.txt') as f:
    x_train = f.read().splitlines()
with open(f'y_train.txt') as f:
    y_train = f.read().splitlines()
with open(f'x_test.txt') as f:
    x_test = f.read().splitlines()
with open(f'y_test.txt') as f:
    y_test = f.read().splitlines()

In [None]:
# combine x_train and y_train into one dataframe
train_df = pd.DataFrame({'text': x_train, 'label': y_train})
# write train_df to csv with tab as separator
train_df.to_csv('train_df.csv', index=False, sep='\t')
# comibne x_test and y_test into one dataframe
test_df = pd.DataFrame({'text': x_test, 'label': y_test})

In [None]:
train_df.head()

In [None]:
# get list of all labels
labels = train_df['label'].unique().tolist()
print(labels)

In [None]:
# Peek at some examples from the training data
print(train_df.sample(10))


Challenges Noticed:
Non-Latin Scripts: Texts in scripts like Arabic, Amharic, etc.
Similar Scripts: Languages like Swedish and Danish using similar scripts.
Special Characters: Presence of special characters and diacritics.
Short Text Lengths: Some texts may be too short to provide enough context.
Code-Switching: Potential mixing of multiple languages in a single text.

In [None]:
# Count instances per label in the training set
train_label_counts = train_df['label'].value_counts()

# Count instances per label in the test set
test_label_counts = test_df['label'].value_counts()

print("Training label counts:\n", train_label_counts)
print("Test label counts:\n", test_label_counts)


Training label counts:
 label
est    500
eng    500
vep    500
sgs    500
uig    500
      ... 
lmo    500
mya    500
ilo    500
csb    500
ltz    500
Name: count, Length: 235, dtype: int64
Test label counts:
 label
mwl    500
uig    500
tat    500
nno    500
new    500
      ... 
frp    500
krc    500
mlg    500
msa    500
ckb    500
Name: count, Length: 235, dtype: int64


The dataset contains 500 instances per label for both the training and test sets, indicating that each label is equally represented in both sets. This is evident from the following output:

- Training Label Counts:
Each label has 500 instances.
Total number of unique labels: 235

- Test Label Counts:
Each label has 500 instances.
Total number of unique labels: 235

- =>The dataset is balanced since each label has an equal number of instances in both the training and test sets.
- => Appropriate Train/Test Split: The train/test split appears appropriate because each label is equally represented in both sets.

In [None]:

# Predefined set of mandatory language labels
given_labels = set(['eng', 'deu', 'nld', 'dan', 'swe', 'nno', 'jpn'])

# Get all unique labels present in the training dataset
all_labels = set(train_df['label'].unique())

# Randomly select 20 additional labels from the set of all labels, excluding the given mandatory labels
target_labels = list(np.random.choice(list(all_labels - given_labels), 20, replace=False))

# Combine the mandatory labels with the randomly selected additional labels
target_labels += given_labels

# Filter the training dataframe to include only the rows with labels in the target_labels list
train_df = train_df[train_df['label'].isin(target_labels)]

# Filter the test dataframe to include only the rows with labels in the target_labels list
test_df = test_df[test_df['label'].isin(target_labels)]

# Print the size of the filtered training and test datasets
print("Filtered training data size:", train_df.shape)
print("Filtered test data size:", test_df.shape)


In [None]:
train_df.head(20)

In [None]:
from sklearn.preprocessing import LabelEncoder
le_fitted = LabelEncoder().fit(train_df['label'])

In [None]:
le_fitted.classes_

In [None]:
y_train_dev, y_test = le_fitted.transform(train_df['label']), le_fitted.transform(test_df['label'])

### Create a suitable pipeline in sklearn to preprocess the data. Think about extending the feature space./ Train the following classifier: LogisticRegression/ find the optimal hyperparameter settings for the classifier, use sklearn’s GridSearchCV

In [None]:

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import MinMaxScaler
from sklearn.feature_extraction.text import TfidfVectorizer

# Custom transformer for normalizing text
class TextNormalizer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def _normalize_text(self, text):
        """Remove punctuation, replace newlines, and lowercase the text.

        :param text: string
        :return: normalized string
        """
        text = text.translate(str.maketrans('', '', string.punctuation))  # Remove punctuation
        text = re.sub(r'\n', r'', text)  # Remove newlines
        # text = text.lower()  # Optionally lowercase the text
        return text

    def transform(self, X, y=None):
        """Apply normalization to each text in the input array.

        :param X: array-like of strings
        :return: numpy array of normalized strings
        """
        texts = [self._normalize_text(text) for text in X]
        return np.array(texts)

# Custom transformer for extracting additional linguistic features
class FeatureExtractor(BaseEstimator, TransformerMixin):
    vowels = set('aeiouäöüàéèëï')
    consonants = set('bcdfghklmnlpqrstvwxyz')

    def __init__(self):
        self.scaler = MinMaxScaler()

    def _to_bigrams(self, text):
        """Generate bigrams from the input text.

        :param text: string
        :return: list of bigrams
        """
        return [bg[0] + bg[1] for bg in zip(text, text[1:])]

    def _get_vowel_consonant_ratio(self, text):
        """Calculate the vowel to consonant ratio in the text.

        :param text: string
        :return: float ratio of vowels to consonants
        """
        vf, cf = 0, 0
        for c in text.lower():
            if c in self.vowels:
                vf += 1
            elif c in self.consonants:
                cf += 1
        return vf / (cf + 1)

    def _get_capitalization_ratio(self, text):
        """Calculate the ratio of uppercase to total characters in the text.

        :param text: string
        :return: float ratio of uppercase characters
        """
        up_count = sum(1 for c in text if c.isupper())
        return up_count / (len(text) + 1)

    def _get_double_char_freq(self, text):
        """Calculate the frequency of double characters in the text.

        :param text: string
        :return: int frequency of double characters
        """
        return sum(1 for bg in self._to_bigrams(text) if bg[0] == bg[1])

    def _extract_num_features(self, texts):
        """Extract numerical features from a list of texts.

        :param texts: list of strings
        :return: numpy array of numerical features
        """
        num_features = []
        for text in texts:
            features = [
                self._get_vowel_consonant_ratio(text),
                self._get_capitalization_ratio(text),
                self._get_double_char_freq(text)
            ]
            num_features.append(features)
        return np.array(num_features)

    def fit(self, X, y=None):
        """Fit the scaler on the extracted numerical features.

        :param X: array-like of strings
        :return: self
        """
        numerical_features = self._extract_num_features(X)
        self.scaler.fit(numerical_features)
        return self

    def transform(self, X, y=None):
        """Transform the input texts into scaled numerical features.

        :param X: array-like of strings
        :return: tuple (original texts, scaled numerical features)
        """
        numerical_features = self._extract_num_features(X)
        return X, self.scaler.transform(numerical_features)

# Wrapper to combine vectorizer and additional features
class VectorizerWrapper(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.countvec = TfidfVectorizer(ngram_range=(1, 2), max_features=500)

    def fit(self, X, y=None):
        """Fit the vectorizer on the text data.

        :param X: tuple (texts, numerical features)
        :return: self
        """
        texts, _ = X
        self.countvec.fit(texts)
        return self

    def transform(self, X, y=None):
        """Transform the texts into TF-IDF features and combine with numerical features.

        :param X: tuple (texts, numerical features)
        :return: tuple (TF-IDF features, numerical features)
        """
        texts, numerical_features = X
        return self.countvec.transform(texts), numerical_features

# Transformer to convert sparse matrix to dense array
class MatrixToArrayConverter(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        """Convert sparse matrices to dense arrays.

        :param X: tuple (sparse TF-IDF matrix, numerical features)
        :return: tuple (dense TF-IDF array, numerical features)
        """
        return X[0].toarray(), X[1]

# Transformer to unify text and numerical features into a single array
class MatrixUnifier(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        """Concatenate text features and numerical features into a single array.

        :param X: tuple (TF-IDF features, numerical features)
        :return: numpy array of combined features
        """
        return np.concatenate([X[0], X[1]], axis=1)



In [None]:
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression

# Define the hyperparameter grid for GridSearchCV
clf_param_grid = {
    'CLF__penalty': ['l2', 'l1'],  # Regularization types
    'CLF__solver': ['liblinear', 'saga'],  # Solvers for optimization
    'CLF__max_iter': [50, 100]  # Maximum number of iterations
}

# Create the pipeline with custom transformers and logistic regression
pipe = Pipeline(steps=[
    ('TextNormalizer', TextNormalizer()),  # Normalize the text
    ('FeatureExtractor', FeatureExtractor()),  # Extract additional linguistic features
    ('Vectorizer', VectorizerWrapper()),  # Vectorize text and combine with additional features
    ('MatrixToArrayConverter', MatrixToArrayConverter()),  # Convert sparse matrix to dense array
    ('MatrixUnifier', MatrixUnifier()),  # Unify text and numerical features
    ('CLF', LogisticRegression())  # Logistic Regression classifier
], verbose=True)

# Set up GridSearchCV for hyperparameter tuning
grid = GridSearchCV(pipe, n_jobs=1, param_grid=clf_param_grid, scoring='f1_micro', cv=2)
grid.fit(train_df['text'].to_numpy(), y_train_encoded)

# Best model from GridSearchCV
best_model = grid.best_estimator_
print("Best hyperparameters:", grid.best_params_)
print("Best cross-validation score:", grid.best_score_)

# Predict on the test set using the best model
y_pred = best_model.predict(test_df['text'].to_numpy())



In [None]:
grid_results = pd.DataFrame.from_dict(grid.cv_results_)
grid_results.sort_values(by=["rank_test_score"])


In [None]:
from sklearn.metrics import f1_score

# Predict on the test set using the best model
preds = grid.best_estimator_.predict(test_df['text'].to_numpy())

# Calculate F1 scores
f1_micro = f1_score(preds, y_test_encoded, average='micro')
f1_macro = f1_score(preds, y_test_encoded, average='macro')
print(f'F1-micro-score on the test set: {f1_micro}')
print(f'F1-macro-score on the test set: {f1_macro}')

In [None]:
# Define the number of classes
num_classes = len(le_fitted.classes_)

# Function to create a confusion matrix manually
def create_confusion_matrix(num_classes, preds, y_test):
    """Create confusion matrix 'by hand' since test set does not contain all labels."""
    df = pd.DataFrame(np.zeros((num_classes, num_classes), dtype=int))
    for i, j in zip(preds, y_test):
        df.iloc[i, j] += 1
    df.columns = le_fitted.classes_
    df.index = le_fitted.classes_
    return df

# Create confusion matrix
confusion_df = create_confusion_matrix(num_classes, preds, y_test_encoded)
print(confusion_df)

# Identify the best model for interpretation
model_to_interpret = grid.best_estimator_

In [None]:
!pip install eli5


In [None]:
import eli5
from eli5 import show_weights
# Extract the logistic regression model from the pipeline
lr_model = model_to_interpret.named_steps['CLF']

# Get the vectorizer model from the pipeline
vec_model = model_to_interpret.named_steps['Vectorizer'].countvec

# Define the target indices for specific languages
target_indices = [np.where(le_fitted.classes_ == 'eng')[0][0],
                  np.where(le_fitted.classes_ == 'jpn')[0][0],
                  np.where(le_fitted.classes_ == 'swe')[0][0]]

# Create feature names including the additional features
make_new_feature_names = np.concatenate([
    vec_model.get_feature_names_out(),
    np.array(["extra_feature_" + str(i) for i in range(3)])
], axis=-1)

# Display the model weights using ELI5 for specified target languages
show_weights(lr_model, top=(10, 10), feature_names=make_new_feature_names,
             target_names=le_fitted.classes_, targets=['eng', 'swe', 'nno'])


##Ablation study

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.metrics import f1_score

# Custom transformer to reduce text length
class TextReducer(BaseEstimator, TransformerMixin):
    def __init__(self, max_len):
        self.max_len = max_len

    def fit(self, X, y=None):
        return self

    def _reduce_text(self, text):
        """Reduce text length to a maximum of max_len characters."""
        if len(text) > self.max_len:
            text = text[:self.max_len]
        return text

    def transform(self, X, y=None):
        """Apply text reduction to each text in the input array."""
        return [self._reduce_text(text) for text in X]

# Re-create the best model using the best parameters from the grid search
best_model = LogisticRegression(
    solver=grid.best_params_['CLF__solver'],
    penalty=grid.best_params_['CLF__penalty'],
    max_iter=grid.best_params_['CLF__max_iter']
)

# Pipeline for text reduced to 500 characters
pipe500 = Pipeline(steps=[
    ('TextReducer', TextReducer(max_len=500)),
    ('TextNormalizer', TextNormalizer()),
    ('FeatureExtractor', FeatureExtractor()),
    ('Vectorizer', VectorizerWrapper()),
    ('MatrixToArrayConverter', MatrixToArrayConverter()),
    ('MatrixUnifier', MatrixUnifier()),
    ('CLF', best_model)
], verbose=True)

# Pipeline for text reduced to 100 characters
pipe100 = Pipeline(steps=[
    ('TextReducer', TextReducer(max_len=100)),
    ('TextNormalizer', TextNormalizer()),
    ('FeatureExtractor', FeatureExtractor()),
    ('Vectorizer', VectorizerWrapper()),
    ('MatrixToArrayConverter', MatrixToArrayConverter()),
    ('MatrixUnifier', MatrixUnifier()),
    ('CLF', best_model)
], verbose=True)

# Function to fit model and evaluate F1 score
def fit_and_evaluate(pipeline, X_train, y_train, X_test, y_test):
    pipeline.fit(X_train, y_train)
    preds = pipeline.predict(X_test)
    return f1_score(preds, y_test, average='micro')

# Fit and evaluate the model with text reduced to 500 characters
f1_micro_500 = fit_and_evaluate(pipe500, train_df['text'].to_numpy(), y_train_encoded, test_df['text'].to_numpy(), y_test_encoded)
print(f'F1-micro-score with text reduced to 500 characters: {f1_micro_500}')

# Fit and evaluate the model with text reduced to 100 characters
f1_micro_100 = fit_and_evaluate(pipe100, train_df['text'].to_numpy(), y_train_encoded, test_df['text'].to_numpy(), y_test_encoded)
print(f'F1-micro-score with text reduced to 100 characters: {f1_micro_100}')

# Evaluate the model with all characters (no reduction)
pipe_all = Pipeline(steps=[
    ('TextNormalizer', TextNormalizer()),
    ('FeatureExtractor', FeatureExtractor()),
    ('Vectorizer', VectorizerWrapper()),
    ('MatrixToArrayConverter', MatrixToArrayConverter()),
    ('MatrixUnifier', MatrixUnifier()),
    ('CLF', best_model)
], verbose=True)

f1_micro_all = fit_and_evaluate(pipe_all, train_df['text'].to_numpy(), y_train_encoded, test_df['text'].to_numpy(), y_test_encoded)
print(f'F1-micro-score with all characters: {f1_micro_all}')


# Neural Network


In [None]:
!pip install skorch


In [None]:
import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import MinMaxScaler
from skorch import NeuralNetClassifier
import torch
from torch import nn
import torch.nn.functional as F

# Ensure reproducibility
torch.manual_seed(0)
torch.cuda.manual_seed(0)

# Custom text normalizer to preprocess text
class TextNormalizer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def _normalize_text(self, text):
        """Remove punctuation, lowercase, pad with spaces."""
        text = text.translate(str.maketrans('', '', string.punctuation))
        text = re.sub(r'\n', r'', text)
        return text

    def transform(self, X, y=None):
        return [self._normalize_text(text) for text in X]

# Custom feature extractor to add linguistic features
class FeatureExtractor(BaseEstimator, TransformerMixin):
    vowels = set('aeiouäöüàéèëï')
    consonants = set('bcdfghklmnlpqrstvwxyz')

    def __init__(self):
        self.scaler = MinMaxScaler()

    def _to_bigrams(self, text):
        return [bg[0] + bg[1] for bg in zip(text, text[1:])]

    def _get_vowel_consonant_ratio(self, text):
        vf, cf = 0, 0
        for c in text.lower():
            if c in self.vowels:
                vf += 1
            elif c in self.consonants:
                cf += 1
        return vf / (cf + 1)

    def _get_capitalization_ratio(self, text):
        up_count = sum(1 for c in text if c.isupper())
        return up_count / (len(text) + 1)

    def _get_double_char_freq(self, text):
        return sum(1 for bg in self._to_bigrams(text) if bg[0] == bg[1])

    def _extract_num_features(self, texts):
        num_features = []
        for text in texts:
            features = [
                self._get_vowel_consonant_ratio(text),
                self._get_capitalization_ratio(text),
                self._get_double_char_freq(text)
            ]
            num_features.append(features)
        return np.array(num_features)

    def fit(self, X, y=None):
        numerical_features = self._extract_num_features(X)
        self.scaler.fit(numerical_features)
        return self

    def transform(self, X, y=None):
        numerical_features = self._extract_num_features(X)
        return X, self.scaler.transform(numerical_features)

# Wrapper to combine vectorizer and additional features
class VectorizerWrapper(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.countvec = TfidfVectorizer(ngram_range=(1, 2), max_features=500)

    def fit(self, X, y=None):
        texts, _ = X
        self.countvec.fit(texts)
        return self

    def transform(self, X, y=None):
        texts, numerical_features = X
        return self.countvec.transform(texts), numerical_features

# Transformer to convert sparse matrix to dense array
class MatrixToArrayConverter(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        return X[0].toarray(), X[1]

# Transformer to unify text and numerical features into a single array
class MatrixUnifier(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        unified = np.concatenate([X[0], X[1]], axis=1)
        unified = unified.astype(np.float32)
        return unified

# Define the neural network module
class ClassifierModule(nn.Module):
    def __init__(self, input_size=600, num_units=200, num_classes=2, nonlin=F.relu, dropout=0.5):
        super(ClassifierModule, self).__init__()
        self.num_units = num_units
        self.nonlin = nonlin
        self.dropout = dropout

        self.dense0 = nn.Linear(input_size, num_units)
        self.nonlin = nonlin
        self.dropout = nn.Dropout(dropout)
        self.dense1 = nn.Linear(num_units, 50)
        self.output = nn.Linear(50, num_classes)

    def forward(self, X, **kwargs):
        X = self.nonlin(self.dense0(X))
        X = self.dropout(X)
        X = F.relu(self.dense1(X))
        X = self.output(X)
        return X.squeeze(dim=1)

# Define input size and number of classes
input_size = 503  # Adjusted to match the number of features
num_classes = len(le_fitted.classes_)

# Initialize the neural network classifier with Skorch
net = NeuralNetClassifier(
    ClassifierModule(input_size=input_size, num_classes=num_classes),
    max_epochs=30,
    criterion=nn.CrossEntropyLoss(),
    lr=0.1,
    # device='cuda',  # Uncomment this to train with CUDA
)

# Create the pipeline
pipe = Pipeline(steps=[
    ('TextNormalizer', TextNormalizer()),
    ('FeatureExtractor', FeatureExtractor()),
    ('Vectorizer', VectorizerWrapper()),
    ('MatrixToArrayConverter', MatrixToArrayConverter()),
    ('MatrixUnifier', MatrixUnifier()),
    ('net', net)
], verbose=True)

# Fit the model
pipe.fit(train_df['text'].to_numpy(), y_train_encoded)

# Predict on the test set
preds = pipe.predict(test_df['text'].to_numpy())

# Calculate F1 scores
f1_micro = f1_score(preds, y_test_encoded, average='micro')
f1_macro = f1_score(preds, y_test_encoded, average='macro')
print(f'F1-micro-score on the test set: {f1_micro}')
print(f'F1-macro-score on the test set: {f1_macro}')
