In [41]:
import torch
import torch.nn as nn
import random
import torch.optim as optim
import pandas as pd
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from sentence_transformers import SentenceTransformer
from scipy.spatial.distance import cosine
import numpy as np

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [27]:
train_df = pd.read_csv('/content/drive/MyDrive/train.csv')
dev_df = pd.read_csv('/content/drive/MyDrive/dev.csv')

In [28]:
class EncoderLSTM(nn.Module):
    def __init__(self, input_size, embedding_size, hidden_size, n_layers=1):
        super(EncoderLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers

        self.embedding = nn.Embedding(input_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size, hidden_size, n_layers, batch_first=True)

    def forward(self, x):
        # x: (batch_size, seq_length)
        embedded = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

In [29]:
class DecoderLSTM(nn.Module):
    def __init__(self, output_size, embedding_size, hidden_size, n_layers=1):
        super(DecoderLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.n_layers = n_layers

        self.embedding = nn.Embedding(output_size, embedding_size)
        self.lstm = nn.LSTM(embedding_size, hidden_size, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        # x: (batch_size, 1) - generiramo znak po znak
        x = x.unsqueeze(1)
        embedded = self.embedding(x)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell

In [30]:
class Seq2SeqBaseline(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2SeqBaseline, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.shape[0]
        target_len = target.shape[1]
        target_vocab_size = self.decoder.fc.out_features

        outputs = torch.zeros(batch_size, target_len, target_vocab_size).to(self.device)

        # Encoder
        hidden, cell = self.encoder(source)

        # Prvi input u decoder je <SOS> (Start of Sentence) token
        input = target[:, 0]

        for t in range(1, target_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t, :] = output

            # Teacher forcing: koristimo pravu sljedeću riječ kao input ili predviđenu
            top1 = output.argmax(1)
            input = target[:, t] if random.random() < teacher_forcing_ratio else top1

        return outputs

In [None]:
special_tokens = ['<PAD>', '<SOS>', '<EOS>']

# Collect unique characters from both 'Word' and 'Description' columns
# from both train_df and dev_df
all_chars = set()

# From train_df
for text in train_df['Word']:
    all_chars.update(list(str(text)))
for text in train_df['Description']:
    all_chars.update(list(str(text)))

# From dev_df
for text in dev_df['Word']:
    all_chars.update(list(str(text)))
for text in dev_df['Description']:
    all_chars.update(list(str(text)))

# Combine with special tokens and sort
vocab = sorted(list(all_chars))
vocab = special_tokens + vocab

# Create mappings
char_to_int = {char: i for i, char in enumerate(vocab)}
int_to_char = {i: char for i, char in enumerate(vocab)}

# Calculate maximum lengths from both train_df and dev_df
max_word_len = max(
    train_df['Word'].apply(lambda x: len(str(x))).max(),
    dev_df['Word'].apply(lambda x: len(str(x))).max()
)

max_description_len = max(
    train_df['Description'].apply(lambda x: len(str(x))).max(),
    dev_df['Description'].apply(lambda x: len(str(x))).max()
)

# Account for <SOS> and <EOS> tokens in max lengths for encoding
max_word_len += 2
max_description_len += 2

print(f"Vocabulary size: {len(vocab)}")
print(f"Example char_to_int: {list(char_to_int.items())[:5]}")
print(f"Example int_to_char: {list(int_to_char.items())[:5]}")
print(f"Maximum word length (including SOS/EOS): {max_word_len}")
print(f"Maximum description length (including SOS/EOS): {max_description_len}")

def encode_text(text, char_to_int_map, max_len):
    encoded = [char_to_int_map['<SOS>']] # Start with SOS token
    encoded.extend([char_to_int_map[char] for char in str(text)])
    encoded.append(char_to_int_map['<EOS>']) # End with EOS token

    # Pad if necessary
    if len(encoded) < max_len:
        encoded.extend([char_to_int_map['<PAD>']] * (max_len - len(encoded)))

    # Truncate if too long (though max_len should already account for this)
    return encoded[:max_len]


In [None]:
class NeologismDataset(Dataset):
    def __init__(self, dataframe, char_to_int_map, max_description_len, max_word_len):
        self.dataframe = dataframe
        self.char_to_int_map = char_to_int_map
        self.max_description_len = max_description_len
        self.max_word_len = max_word_len

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        description = self.dataframe.iloc[idx]['Description']
        word = self.dataframe.iloc[idx]['Word']

        encoded_description = encode_text(description, self.char_to_int_map, self.max_description_len)
        encoded_word = encode_text(word, self.char_to_int_map, self.max_word_len)

        description_tensor = torch.LongTensor(encoded_description)
        word_tensor = torch.LongTensor(encoded_word)

        return description_tensor, word_tensor

# Create Dataset instances
train_dataset = NeologismDataset(train_df, char_to_int, max_description_len, max_word_len)
dev_dataset = NeologismDataset(dev_df, char_to_int, max_description_len, max_word_len)

# Define batch size
batch_size = 64

# Create DataLoader instances
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
dev_dataloader = DataLoader(dev_dataset, batch_size=batch_size, shuffle=False)

print(f"Number of training batches: {len(train_dataloader)}")
print(f"Number of development batches: {len(dev_dataloader)}")
print("Custom Dataset and DataLoaders created successfully.")

In [None]:
INPUT_DIM = len(vocab)
OUTPUT_DIM = len(vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

encoder = EncoderLSTM(INPUT_DIM, ENC_EMB_DIM, HID_DIM, N_LAYERS)
decoder = DecoderLSTM(OUTPUT_DIM, DEC_EMB_DIM, HID_DIM, N_LAYERS)

model = Seq2SeqBaseline(encoder, decoder, device).to(device)

optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=char_to_int['<PAD>'])

print(f"Using device: {device}")

In [None]:
def train(model, dataloader, optimizer, criterion, clip, device):
    model.train() # Set the model to training mode
    epoch_loss = 0

    for i, (source, target) in enumerate(dataloader):
        source = source.to(device)
        target = target.to(device)

        optimizer.zero_grad() # Zero the optimizer's gradients

        # Pass the source and target through the model
        # In training, we use teacher forcing (default ratio 0.5)
        output = model(source, target)

        # target has <SOS> token, we want to predict from the first actual word char
        # and onwards. So, we slice output and target
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim) # Skip <SOS> token prediction
        target = target[:, 1:].reshape(-1) # Skip <SOS> token in target

        loss = criterion(output, target) # Calculate the loss

        loss.backward() # Perform backpropagation

        # Clip gradients to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

        optimizer.step() # Update model parameters

        epoch_loss += loss.item()

    return epoch_loss / len(dataloader)

def evaluate(model, dataloader, criterion, device):
    model.eval() # Set the model to evaluation mode
    epoch_loss = 0

    with torch.no_grad(): # Disable gradient calculations
        for i, (source, target) in enumerate(dataloader):
            source = source.to(device)
            target = target.to(device)

            # Get predictions from the model (no teacher forcing for evaluation)
            output = model(source, target, 0) # teacher_forcing_ratio = 0

            # Calculate loss similarly to train function
            output_dim = output.shape[-1]
            output = output[:, 1:].reshape(-1, output_dim)
            target = target[:, 1:].reshape(-1)

            loss = criterion(output, target)
            epoch_loss += loss.item()

    return epoch_loss / len(dataloader)

N_EPOCHS = 20
CLIP = 1

best_valid_loss = float('inf')
train_losses = []
eval_losses = []
for epoch in range(N_EPOCHS):
    train_loss = train(model, train_dataloader, optimizer, criterion, CLIP, device)
    eval_loss = evaluate(model, dev_dataloader, criterion, device)

    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f}')
    print(f'\t Val. Loss: {eval_loss:.3f}')
    train_losses.append(train_loss)
    eval_losses.append(eval_loss)

    if eval_loss < best_valid_loss:
        best_valid_loss = eval_loss
        torch.save(model.state_dict(), 'best_model.pt')
        print(f'\tModel saved! New best validation loss: {best_valid_loss:.3f}')

print("Training complete.")

In [None]:
def plot_training_history(train_loss, eval_loss):
    """Vizualiziraj povijest treninga"""
    plt.figure()
    plt.plot(train_loss, label='Training Loss', color='blue', linewidth=2)
    plt.xlabel('Epoha')
    plt.ylabel('Loss')
    plt.title('Train Loss')
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.figure()
    plt.plot(range(len(eval_loss)), eval_loss, label='Development Loss', color='cyan', linestyle='-.', linewidth=2)
    plt.xlabel('Epoha')
    plt.ylabel('Loss')
    plt.title('Eval Loss')
    plt.grid(True, alpha=0.3)
    plt.legend()

    plt.tight_layout()
    plt.savefig('/content/drive/MyDrive/training_history_baseline.png', dpi=300, bbox_inches='tight')
    plt.show()

plot_training_history(train_losses, eval_losses)

In [39]:
def get_char_ngrams(text, n=3):
    """Extracts character n-grams from a text."""
    text = text.lower()
    if len(text) < n:
        return [text]
    return [text[i:i+n] for i in range(len(text) - n + 1)]

def calculate_ngram_f1(actual, generated, n=3):
    """Calculates F1 score based on character n-gram overlap."""
    actual_ngrams = set(get_char_ngrams(actual, n))
    generated_ngrams = set(get_char_ngrams(generated, n))

    if not actual_ngrams and not generated_ngrams:
        return 1.0 # Both empty, perfect match
    if not actual_ngrams or not generated_ngrams:
        return 0.0 # One is empty, no match

    intersection = len(actual_ngrams.intersection(generated_ngrams))
    precision = intersection / len(generated_ngrams)
    recall = intersection / len(actual_ngrams)

    if precision + recall == 0:
        return 0.0
    f1 = 2 * (precision * recall) / (precision + recall)
    return f1

In [None]:
# Load Semantic Similarity Model
semantic_model = SentenceTransformer('sentence-transformers/distiluse-base-multilingual-cased-v2')
print("Loaded semantic similarity model: sentence-transformers/distiluse-base-multilingual-cased-v2")

def calculate_semantic_similarity(actual, generated, model):
    """
    Calculates the cosine similarity between embeddings of the actual and generated words.
    """
    if not actual and not generated:
        return 1.0 # Both empty, perfect similarity
    if not actual or not generated:
        return 0.0 # One is empty, no similarity

    embeddings = model.encode([actual, generated])
    # Cosine similarity is 1 - cosine distance
    similarity = 1 - cosine(embeddings[0], embeddings[1])
    return similarity

In [44]:
def calculate_target_cross_entropy(model, char_to_int, description_text, actual_word, max_description_len, max_word_len, device):
    model.eval()

    # Encode description and actual word
    encoded_description = encode_text(description_text, char_to_int, max_description_len)
    encoded_actual_word = encode_text(actual_word, char_to_int, max_word_len)

    source = torch.LongTensor(encoded_description).unsqueeze(0).to(device)
    target = torch.LongTensor(encoded_actual_word).unsqueeze(0).to(device)

    with torch.no_grad():
        # Get model output with teacher forcing (ratio 1.0) to calculate CE for actual word
        output = model(source, target, teacher_forcing_ratio=0.0) # Use 0.0 teacher forcing for target word calculation, as we are calculating loss for *actual* word

        # Prepare output and target for loss calculation (skip <SOS> token)
        output_dim = output.shape[-1]
        output = output[:, 1:].reshape(-1, output_dim)
        target = target[:, 1:].reshape(-1)

        # Define CrossEntropyLoss (use the same criterion as in training/evaluation)
        criterion = nn.CrossEntropyLoss(ignore_index=char_to_int['<PAD>'])
        loss = criterion(output, target)

    return loss.item()

In [None]:
test_df = pd.read_csv('/content/drive/MyDrive/test.csv')

def generate_word(description_text, model, char_to_int, int_to_char, max_description_len, max_word_len, device):
    model.eval() # Set the model to evaluation mode

    # Encode the description text
    encoded_description = encode_text(description_text, char_to_int, max_description_len)
    description_tensor = torch.LongTensor(encoded_description).unsqueeze(0).to(device) # Add batch dimension

    # Get initial hidden and cell states from the encoder
    with torch.no_grad():
        hidden, cell = model.encoder(description_tensor)

    # Initialize decoder input with <SOS> token
    decoder_input = torch.LongTensor([char_to_int['<SOS>']]).to(device)

    generated_indices = []

    # Loop to generate word character by character
    for _ in range(max_word_len - 1): # -1 to account for EOS token
        with torch.no_grad():
            output, hidden, cell = model.decoder(decoder_input, hidden, cell)

        # Get the predicted character index
        predicted_char_idx = output.argmax(1).item()
        generated_indices.append(predicted_char_idx)

        # If <EOS> token is predicted, break the loop
        if predicted_char_idx == char_to_int['<EOS>']:
            break

        # Set the predicted character as the input for the next step
        decoder_input = torch.LongTensor([predicted_char_idx]).to(device)

    # Convert generated indices to characters, excluding special tokens
    generated_word_chars = []
    for idx in generated_indices:
        char = int_to_char[idx]
        if char not in ['<SOS>', '<EOS>', '<PAD>']:
            generated_word_chars.append(char)

    return ''.join(generated_word_chars)

print("\n--- Testing generate_word function with test.csv ---")
num_test_samples = len(test_df)

all_ngram_f1_scores = []
all_cross_entropy_scores = []
all_semantic_similarity_scores = []

model.to(device)

for i in range(num_test_samples):
    description = test_df.loc[i, 'Description']
    actual_word = test_df.loc[i, 'Word']
    generated_word = generate_word(description, model, char_to_int, int_to_char, max_description_len, max_word_len, device)

    ngram_f1 = calculate_ngram_f1(actual_word, generated_word, n=3)
    cross_entropy_loss = calculate_target_cross_entropy(model, char_to_int, description, actual_word, max_description_len, max_word_len, device) # This needs to be defined
    semantic_sim = calculate_semantic_similarity(actual_word, generated_word, semantic_model)

    all_ngram_f1_scores.append(ngram_f1)
    all_cross_entropy_scores.append(cross_entropy_loss)
    all_semantic_similarity_scores.append(semantic_sim)

    print(f"Description: '{description}'")
    print(f"  Actual Word: '{actual_word}'")
    print(f"  Generated Word: '{generated_word}'")
    print(f"  3-gram F1 Score: {ngram_f1:.4f}")
    print(f"  Target Cross-Entropy Loss: {cross_entropy_loss:.4f}")
    print(f"  Semantic Similarity: {semantic_sim:.4f}")
    print("--------------------------------------------------")

if all_ngram_f1_scores:
    avg_ngram_f1 = np.mean(all_ngram_f1_scores)
    avg_cross_entropy = np.mean(all_cross_entropy_scores)
    avg_semantic_sim = np.mean(all_semantic_similarity_scores)


    print(f"\nAverage 3-gram F1 Score over {len(all_ngram_f1_scores)} samples: {avg_ngram_f1:.4f}")
    print(f"Average Target Cross-Entropy Loss over {len(all_cross_entropy_scores)} samples: {avg_cross_entropy:.4f}")
    print(f"Average Semantic Similarity over {len(all_semantic_similarity_scores)} samples: {avg_semantic_sim:.4f}")
else:
    print("No test samples processed.")