<a href="https://colab.research.google.com/github/Mohamed-Devp/language-identification-with-CNNs/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Overview
This notebook demonstrates the training of a deep learning model capable of identifying six distinct languages with 99% accuracy.

# Data Loading

## Import necessary modules

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.optim import SGD

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

import re
from collections import Counter

sns.set_theme(style = "whitegrid")

## Load the dataset
```wili_subset.csv``` is a subset of the Wili-2018 dataset from kaggle. you can find it [here](https://www.kaggle.com/datasets/mexwell/wili-2018)

In [3]:
# Load the dataset
df = pd.read_csv('/content/drive/MyDrive/Datasets/Wili Dataset/wili_subset.csv')
df.head()

Unnamed: 0,Sentence,Language
0,エノが行きがかりでバスに乗ってしまい、気分が悪くなった際に助けるが、今すぐバスを降りたいと運...,Japanese
1,シャーリー・フィールドは、サン・ベルナルド・アベニュー沿い市民センターとR&Tマーティン高校...,Japanese
2,Barocco (pt: Escândalo de 1ª página) é um film...,Portuguese
3,Association de recherche et de sauvegarde de l...,French
4,"한국에서 성씨가 사용되기 시작한 정확한 시기는 알 수 없으나, 한자(漢字) 등 중국...",Korean


In [None]:
# Clean the data
df = df.drop_duplicates()
print(f"Number of duplicate rows: {df.duplicated().sum()}")

# Data preprocessing


## Tokenization
the code cells below convert each sentence in the dataset into a sequence of characters, since character-level models perform the best at this task.

In [None]:
# Encode the "Language" column
unique_langs = np.unique(df['Language'])

df['Encoded'] = df['Language'].apply(lambda lang: np.where(unique_langs == lang)[0][0])
df.head()

In [None]:
max_len = 128

sequences = []
labels = []

for row in range(df.shape[0]):
  sentence = df.iloc[row]['Sentence']
  label = df.iloc[row]['Encoded']

  # remove punctuation and specail characters
  sentence = re.sub(r"[^\w\s]", "", sentence.lower())
  tokenized = [char for char in sentence if char != " "] # Tokenize the text

  # Convert sentence into sequences of characters
  for start in range(0, len(tokenized), max_len):
    chars_seq = tokenized[start:start + max_len]

    if len(chars_seq) == max_len:
      sequences.append(chars_seq)
      labels.append(label)

print(f"Sequence: {sequences[0][:8]} - label: {labels[0]}")

## Convert characters into indices

In [8]:
# Convert tokens into indices
chars = [char for sequence in sequences for char in sequence]
counts = Counter(chars)

num_chars = 3000
char_to_index = {char: index for index, (char, _) in enumerate(counts.most_common(num_chars - 2), start = 2)}
char_to_index['<OOV>'] = 0
char_to_index['<PAD>'] = 1

inputs = torch.empty((len(sequences), max_len), dtype = torch.int32)
targets = torch.tensor(labels, dtype = torch.long)

for i, sequence in enumerate(sequences):
  inputs[i] = torch.tensor([char_to_index.get(char, char_to_index['<OOV>']) for char in sequence])

device = 'cuda' if torch.cuda.is_available() else 'cpu'

if device == 'cuda':
  torch.cuda.empty_cache()

  inputs = inputs.to(device)
  targets = targets.to(device)

## Split the data

In [9]:
# Split the data into training, validation and testing sets
def split(tensor, perc = .8):
  idx = int(len(tensor) * perc)
  return tensor[:idx], tensor[idx:]

X_train, X_test = split(inputs)
X_valid, X_test = split(X_test, perc = .5)

y_train, y_test = split(targets)
y_valid, y_test = split(y_test, perc = .5)

# Train the model

## Define the model architecture


In [10]:
# Define the model architecture
class CharCNN(nn.Module):
  def __init__(
      self,
      vocab_size,
      embedding_dim,
      num_classes,
  ):
    super().__init__()

    self.embedding = nn.Embedding(vocab_size, embedding_dim)

    num_kernels = 128
    self.conv = nn.Conv1d(in_channels=embedding_dim, out_channels=num_kernels, kernel_size=4)

    self.output = nn.Linear(num_kernels, num_classes)

  def configure(self, criterion, optimizer):
    """Configure the model loss function and optimizer."""
    self.criterion = criterion
    self.optimizer = optimizer

  def forward(self, inputs, probs = False):
    """Peform a forward pass in the network."""
    embedded = self.embedding(inputs).permute(0, 2, 1) # -> (batch_size, embed_dim, seq_len)

    conv_out = F.relu(self.conv(embedded))

    # apply max pooling to the convolutional layer output
    pooled = F.max_pool1d(conv_out, kernel_size=conv_out.shape[-1]).squeeze(-1)
    output = self.output(pooled)

    return F.softmax(output) if probs else output

  def backward(self, inputs, targets):
    """Peform a backward pass in the network."""
    self.optimizer.zero_grad() # reset the gradients

    outputs = self.forward(inputs)
    loss = self.criterion(outputs, targets)

    loss.backward()
    self.optimizer.step()

    return loss.item()

# Build the model
model = CharCNN(
    vocab_size=len(char_to_index),
    embedding_dim=64,
    num_classes=len(unique_langs)
).to(device)

optimizer = SGD(model.parameters(), lr = .01)
criterion = nn.CrossEntropyLoss()

model.configure(criterion, optimizer)

# Store the training & validation losses
losses = {
    'train': [],
    'valid': []
}

## train a validate the model

In [None]:
# Fit the model to the data
batch_size = 128
num_epochs = 1

for epoch in range(num_epochs):
  epoch_loss = num_batchs = 0

  # Shuffle the data before each epoch
  shuffled = torch.randperm(len(X_train))

  for start in range(0, len(X_train), batch_size):
    end = min(start + batch_size, len(X_train))

    X_batch = X_train[shuffled][start:end]
    y_batch = y_train[shuffled][start:end]

    loss = model.backward(X_batch, y_batch)
    epoch_loss += loss
    num_batchs += 1

  # Compute validation losses
  with torch.no_grad():
      outputs = model.forward(X_valid)
      loss = criterion(outputs, y_valid).item()

  losses['train'].append(epoch_loss / num_batchs)
  losses['valid'].append(loss)

  # Log the training progress
  if epoch == 0 or (epoch + 1) % 5 == 0:
    print(f"Epoch: {epoch + 1} - Training: {losses['train'][-1]:.2f} - Validation: {losses['valid'][-1]:.2f}")

In [None]:
# Plot the loss curve
sns.lineplot(losses['train'], label = 'training')
sns.lineplot(losses['valid'], label = 'validation')
plt.title('Loss curve')
plt.show()

In [13]:
# Define the metrics functions
def accuracy(targets, preds):
  """Computes the accuracy score."""
  return torch.sum(targets == preds) / len(targets)

def recall_per_class(targets, preds, cl = 0):
  """Computes the recall score per class."""
  true = torch.sum((targets == cl) & (preds == cl))
  false = torch.sum((targets == cl) & (preds != cl))

  return true / (true + false)

def precision_per_class(targets, preds, cl = 0):
  """Computes the precision score per class."""
  true = torch.sum((targets == cl) & (preds == cl))
  total = torch.sum(preds == cl)

  return true / total

def f1_per_class(targets, preds, cl = 0):
  """Computes the F1 score per class."""
  recall = recall_per_class(targets, preds, cl)
  precision = precision_per_class(targets, preds, cl)

  return 2 * (precision * recall) / (precision + recall)

## Evaluate the model
Compute the recall, precision and F1 scores for each distint language.

In [14]:
def evaluate(x, y, classes):
  """Computes the recall, precision and F1 score."""
  metrics = [
      ('Recall', recall_per_class),
      ('Precision', precision_per_class),
      ('F1 score', f1_per_class)
  ]

  results = {
      'lang': [],
      'metric': [],
      'value': []
  }

  # Model predictions
  probs = model.forward(x)
  preds = torch.argmax(probs, axis = 1)

  # Compute each metric for per each given class
  for metric, func in metrics:
    for lang, cl in classes.items():
      results['lang'].append(lang)
      results['metric'].append(metric)
      results['value'].append(func(y, preds, cl).item())

  # Wrapp the results in a dataframe
  return pd.DataFrame(results)

In [None]:
# Compute and log the model accuracy
probs = model.forward(X_test)
preds = torch.argmax(probs, axis = 1)

acc = accuracy(y_test, preds)
print(f"Accuracy score: {acc:.2f}")

# Visualize the model performance across different languages
languages = [('English', 'French'), ('Spanish', 'Portuguese'), ('Japanese', 'Korean')]

for lang1, lang2 in languages:
  classes = {}

  classes[lang1] = np.where(unique_langs == lang1)[0][0]
  classes[lang2] = np.where(unique_langs == lang2)[0][0]

  res = evaluate(X_test, y_test, classes)

  g = sns.catplot(
      data=res, kind='bar',
      x="metric", y="value", hue="lang",
      alpha=.7, height=5
  )
  g.set_axis_labels("", "Value")
  g.legend.set_title("")