In [None]:
# import all relevant libraries for preprocessing and gensim 
import os 
import re
import random
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

In [None]:
# read in the data
um = pd.read_csv(r"C:\Users\Home\Desktop\Python Scripts\kat-master\um_features.csv")
um.head()

In [None]:
# Check for Latin characters in the lemma column
latin_pattern = re.compile(r'[a-zA-Z]')
um['contains_latin'] = um['lemma'].apply(lambda lemmas: any(latin_pattern.search(lemma) for lemma in lemmas))

# Display rows with Latin characters in the lemma column
latin_rows = um[um['contains_latin']]
print(latin_rows)

In [None]:
# make a list of Georgian chars
georgian_chars = [
    'ა', 
    'ბ', 
    'გ', 
    'დ', 
    'ე', 
    'ვ', 
    'ზ', 
    'თ', 
    'ი', 
    'კ', 
    'ლ', 
    'მ', 
    'ნ', 
    'ო', 
    'პ', 
    'ჟ', 
    'რ', 
    'ს', 
    'ტ', 
    'უ', 
    'ფ', 
    'ქ', 
    'ღ', 
    'ყ', 
    'შ', 
    'ჩ', 
    'ც', 
    'ძ', 
    'წ', 
    'ჭ', 
    'ხ', 
    'ჯ', 
    'ჰ']

In [None]:
# vocab dict with chars as keys and indices as values
SPECIAL_TOKENS = ['<pad>', '<bos>', '<eos>']
# create a dictionary with special tokens and Georgian characters as keys and their respective indices as values
char2idx = {
    **{c: i for i, c in enumerate(SPECIAL_TOKENS)},
    **{c: i + len(SPECIAL_TOKENS) for i, c in enumerate(sorted(georgian_chars))}
}

print(char2idx)

In [None]:
# vocab dict with tags from the data as keys and indices as values
## first seperate the tag column by the delimiter and make a list of all tags
um['tag'] = um['tag'].str.split(';')
um["tag"]

In [None]:
um['tag'] = um['tag'].apply(lambda tags: [tag for tag in tags if tag != "V"])
um["tag"]

In [None]:
# make a list of all tags
all_tags = []
for tags in um['tag']:
    all_tags.extend(tags)
all_tags
# remove duplicates from the list of tags
all_tags = list(set(all_tags))
# make a dict with tags as keys and indices as values
tag2idx = {
    tag: i+0 for i, tag in enumerate(sorted(all_tags))
}

tag2idx

In [None]:
# split the lemma column by characters and make a list of all lemmas
um['lemma'] = um['lemma'].str.split('')
# strip empty strings from the list of lemmas
um['lemma'] = um['lemma'].apply(lambda lemmas: [lemma for lemma in lemmas if lemma != ""])
um['lemma']

In [None]:
def tokenize(row):
    tokens = ["<bos>"]
    tokens.extend(row['lemma']) 
    tokens.extend(row['tag'])
    tokens.append("<eos>")
    return tokens

um['tokens'] = um.apply(tokenize, axis=1)
um['tokens']

In [None]:
# build one unified vocab for X
symbols = SPECIAL_TOKENS + sorted(georgian_chars) + sorted(all_tags)
feature_vocab_dict = {sym: i for i, sym in enumerate(symbols)}
feature_vocab_dict

In [None]:
# read in target data
target = pd.read_csv(r"C:\Users\Home\Desktop\Python Scripts\kat-master\um_target.csv")
target.head()

In [None]:
# Identify and correct rows where Latin 'a' appears in the 'form' column
target['form'] = target['form'].apply(
    lambda x: x.replace('a', 'ა') if re.search(r'[a-zA-Z]', x) else x
)

In [None]:
def tokenize_target(row):
    tokens_target = ["<bos>"]
    tokens_target.extend(row['form']) 
    tokens_target.append("<eos>")
    return tokens_target
# apply the function to the target data
target['tokens'] = target.apply(tokenize_target, axis=1)
target['tokens']
symbols = ['<bos>', '<eos>'] + sorted(georgian_chars)
target_vocab_dict = {sym: i for i, sym in enumerate(symbols)}

In [None]:
len(feature_vocab_dict)

In [None]:
# model variables 
X = um['tokens'].values
y = target['tokens'].values
# check the data
print(X[0])
print(y[0])

In [None]:
# train, val, test split
X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.15, random_state=42)
val_size = 0.15 / (1 - 0.15)  # Adjust the validation size to account for the test split
X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=val_size, random_state=42)
# check the data
print("Train size X:", len(X_train), "Validation size:", len(X_val), "Test size:", len(X_test))
print("Train size y:", len(y_train), "Validation size:", len(y_val), "Test size:", len(y_test))


In [None]:
# visualize train, val, test split
import matplotlib.pyplot as plt

# Create a bar plot to visualize the distribution of the dataset
plt.figure(figsize=(10, 6))
plt.bar(['Train', 'Validation', 'Test'], [len(X_train), len(X_val), len(X_test)], color=['blue', 'orange', 'green'])
plt.xlabel('Dataset Split')
plt.ylabel('Number of Samples')
plt.title('Distribution of Dataset Splits')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
# Encode the datasets into their index representations
X_train_idx = [[feature_vocab_dict[token] for token in sequence] for sequence in X_train]
X_val_idx = [[feature_vocab_dict[token] for token in sequence] for sequence in X_val]
X_test_idx = [[feature_vocab_dict[token] for token in sequence] for sequence in X_test]

print("Encoded X_train_idx:", X_train_idx[:2])  # Print first two examples for verification
print("Encoded X_val_idx:", X_val_idx[:2])    # Print first two examples for verification
print("Encoded X_test_idx:", X_test_idx[:2])  # Print first two examples for verification

# Encode the target datasets into their index representations
y_train_idx = [[target_vocab_dict[token] for token in sequence] for sequence in y_train]
y_val_idx = [[target_vocab_dict[token] for token in sequence] for sequence in y_val]
y_test_idx = [[target_vocab_dict[token] for token in sequence] for sequence in y_test]

print("Encoded y_train_idx:", y_train_idx[:2])  # Print first two examples for verification
print("Encoded y_val_idx:", y_val_idx[:2])    # Print first two examples for verification
print("Encoded y_test_idx:", y_test_idx[:2])  # Print first two examples for verification

In [None]:
# Function to check if all elements in a dataset are integers
def check_for_strings(encoded_dataset):
    for sequence in encoded_dataset:
        if any(not isinstance(token, int) for token in sequence):
            return True
    return False

# Check for strings in the encoded datasets
has_strings_in_X_train = check_for_strings(X_train_idx)
has_strings_in_X_val = check_for_strings(X_val_idx)
has_strings_in_X_test = check_for_strings(X_test_idx)
has_strings_in_y_train = check_for_strings(y_train_idx)
has_strings_in_y_val = check_for_strings(y_val_idx)
has_strings_in_y_test = check_for_strings(y_test_idx)

print("Strings in X_train_idx:", has_strings_in_X_train)
print("Strings in X_val_idx:", has_strings_in_X_val)
print("Strings in X_test_idx:", has_strings_in_X_test)
print("Strings in y_train_idx:", has_strings_in_y_train)
print("Strings in y_val_idx:", has_strings_in_y_val)
print("Strings in y_test_idx:", has_strings_in_y_test)

In [None]:
# import the necessary libraries for the model
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt


In [None]:
# implement the dataset class
class CustomDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return torch.tensor(self.X[idx]), torch.tensor(self.y[idx])

In [None]:
# implement encoder
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim, batch_first=True)

    def forward(self, x):
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.rnn(embedded)
        return hidden, cell

In [None]:
# implement decoder
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.LSTM(emb_dim, hidden_dim)
        self.fc_out = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell):
        embedded = self.embedding(x).unsqueeze(0)
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

In [None]:
# implement seq2seq model
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        trg_len = trg.shape[1]
        batch_size = trg.shape[0]
        output_dim = self.decoder.fc_out.out_features

        outputs = torch.zeros(batch_size, trg_len, output_dim).to(self.device)

        hidden, cell = self.encoder(src)

        x = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(x, hidden, cell)
            outputs[:, t] = output
            top1 = output.argmax(1) 
            x = trg[:, t] if random.random() < teacher_forcing_ratio else top1

        return outputs

In [None]:
### Training the model
# model hyperparameters
num_epochs = 30
batch_size = 64
# optimizer hyperparameters
learning_rate = 0.001

# training hyperparameters
load_model = False
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_dim_encoder = len(feature_vocab_dict)
output_dim_decoder = len(target_vocab_dict)
encoder_emb_dim = 100 # refer to guriel et al. 
decoder_emb_dim = 100 # refer to guriel et al.
hidden_dim = 64
num_layers = 2
enc_dropout = 0.5
dec_dropout = 0.5

# tensorboard logging
writer = SummaryWriter('runs/seq2seq_experiment_1')
steps = 0

# setup data loaders with collate function
def collate_fn(batch):
    X_batch, y_batch = zip(*batch)
    X_batch = nn.utils.rnn.pad_sequence(X_batch, batch_first=True, padding_value=0)
    y_batch = nn.utils.rnn.pad_sequence(y_batch, batch_first=True, padding_value=0)
    return X_batch, y_batch
# create datasets and dataloaders
train_dataset = CustomDataset(X_train_idx, y_train_idx)
val_dataset = CustomDataset(X_val_idx, y_val_idx)
test_dataset = CustomDataset(X_test_idx, y_test_idx)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
# initialize encoder, decoder, and seq2seq model
encoder = Encoder(input_dim_encoder, encoder_emb_dim, hidden_dim).to(device)
decoder = Decoder(output_dim_decoder, decoder_emb_dim, hidden_dim).to(device)
model = Seq2Seq(encoder, decoder, device).to(device)

# define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss(ignore_index=feature_vocab_dict['<pad>'])  # assuming 0 is the padding index

# training loop
for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0

    for batch_idx, (src, trg) in enumerate(train_loader):
        src, trg = src.to(device), trg.to(device)

        optimizer.zero_grad()
        output = model(src, trg)

        # reshape output and target for loss calculation
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()

        # gradient clipping (optional)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)

        optimizer.step()
        epoch_loss += loss.item()

    avg_loss = epoch_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}")

    # log to tensorboard
    writer.add_scalar('Training Loss', avg_loss, global_step=epoch)

In [None]:
PAD_IDX = feature_vocab_dict['<pad>']  # dynamically set padding index
target_idx_to_token = {idx: tok for tok, idx in target_vocab_dict.items()}

model.eval()
val_loss = 0
val_predictions = []
val_targets = []

with torch.no_grad():
    for src, trg in val_loader:
        src, trg = src.to(device), trg.to(device)

        output = model(src, trg, teacher_forcing_ratio=0)  # no teacher forcing

        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)
        val_loss += loss.item() * src.size(0)

        pred = output.argmax(1)
        mask = trg != PAD_IDX

        # Filter out pad tokens for accuracy
        filtered_pred = pred[mask].cpu().numpy()
        filtered_trg = trg[mask].cpu().numpy()

        val_predictions.extend(filtered_pred)
        val_targets.extend(filtered_trg)

        # Optional: print 5 example sequences
        batch_size = src.size(0)
        seq_len = trg.view(batch_size, -1).size(1)
        pred_seq = pred.view(batch_size, -1)
        trg_seq = trg.view(batch_size, -1)

        for i in range(min(5, batch_size)):
            pred_tokens = [target_idx_to_token[idx.item()] for idx in pred_seq[i] if idx.item() != PAD_IDX]
            target_tokens = [target_idx_to_token[idx.item()] for idx in trg_seq[i] if idx.item() != PAD_IDX]

            print(f"Target   : {' '.join(target_tokens)}")
            print(f"Predicted: {' '.join(pred_tokens)}")
            print('-' * 40)

    avg_val_loss = val_loss / len(val_dataset)
    val_accuracy = accuracy_score(val_targets, val_predictions)

    print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")

    writer.add_scalar('Validation Loss', avg_val_loss, global_step=epoch)
    writer.add_scalar('Validation Accuracy', val_accuracy, global_step=epoch)
