In [None]:
import numpy as np
import pandas as pd
from torch.utils.data import TensorDataset, DataLoader
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
#!pip install lingpy
#import lingpy
#from lingpy import ipa2tokens
import re

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#device = torch.device("cpu") #For debugging
print(f"Using device: {device}")
torch.set_default_device(device)

In [None]:
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)

In [None]:
dat = pd.read_csv('data/ielexData.csv')

In [None]:
dat

In [None]:
dat[dat['Meaning'] == 'few']

In [None]:
concepts = [re.sub(':.*', '', x) for x in dat['cc']]
dat['concepts'] = concepts
uniqueconcepts = np.unique(concepts)
cognates = [list(re.sub('^.*:', '', x))[0] for x in dat['cc']]
dat['cognate_char'] = cognates

In [None]:
source_words = []
target_words = []
label = []
for concept in uniqueconcepts:
  tmp_df = dat[dat['concepts'] == concept]
  for word_idx in range(len(tmp_df)):
    source_word = tmp_df['ASJP'].iloc[word_idx]
    source_class = tmp_df['cognate_char'].iloc[word_idx]
    if len(list(source_word)) < 11:
      for word_idx2 in range(len(tmp_df)):
        if word_idx != word_idx2:
          target_word = tmp_df['ASJP'].iloc[word_idx2]
          target_class = tmp_df['cognate_char'].iloc[word_idx2]
          if len(list(target_word)) < 11:
            if source_class == target_class:
              label.append(1)
            else:
              label.append(0)
            source_words.append(source_word)
            target_words.append(target_word)


In [None]:
source_words[0]

In [None]:
charlens = []
unique_characters = []
for i in source_words:
  charlens.append(len(list(i)))
  for j in list(i):
    unique_characters.append(j)
unique_characters = ['PAD'] + np.unique(unique_characters).tolist()

In [None]:
maxlen = max(charlens)

In [None]:
source_words_tokens = []
target_words_tokens = []
for j in range(len(source_words)):
  source_tmp = [unique_characters.index(i) for i in list(source_words[j])]
  target_tmp = [unique_characters.index(i) for i in list(target_words[j])]
  source_tmp = source_tmp + [0 for x in range(maxlen-len(source_tmp))]
  target_tmp = target_tmp + [0 for x in range(maxlen-len(target_tmp))]

  source_words_tokens.append(source_tmp)
  target_words_tokens.append(target_tmp)

In [None]:
source_words_tokens[0], target_words_tokens[0]

In [None]:
unique_characters[25]

In [None]:
#indices = np.arange(len(source_words_tokens))
#np.random.shuffle(indices)
#split_index = int(0.9 * len(indices))
#train_indices = indices[:split_index]
#test_indices = indices[split_index:]

In [None]:
#source_words_tokens_train = source_words_tokens[train_indices]
#source_words_tokens_test = source_words_tokens[test_indices]
#target_words_tokens_train = target_words_tokens[train_indices]
#target_words_tokens_test = target_words_tokens[test_indices]
#label_train = label[train_indices]
#label_test = label[test_indices]

In [None]:
from sklearn.model_selection import train_test_split
source_train, source_test, target_train, target_test, label_train, label_test = train_test_split(source_words_tokens, target_words_tokens, label, test_size=0.1, shuffle=True, random_state=42)

In [None]:
source_train_tensor = torch.tensor(source_train, dtype=torch.long, device=device)
source_test_tensor = torch.tensor(source_test, dtype=torch.long, device=device)
target_train_tensor = torch.tensor(target_train, dtype=torch.long, device=device)
target_test_tensor = torch.tensor(target_test, dtype=torch.long, device=device)
label_train_tensor = torch.tensor(label_train, dtype=torch.int, device=device)
label_test_tensor = torch.tensor(label_test, dtype=torch.int, device=device)

In [None]:
train_dataset = TensorDataset(source_train_tensor, target_train_tensor, label_train_tensor)
test_dataset = TensorDataset(source_test_tensor, target_test_tensor, label_test_tensor)

In [None]:
batch_size = 512
train_loader = DataLoader(train_dataset, batch_size = batch_size, generator=torch.Generator(device=device.type))
test_loader = DataLoader(test_dataset, batch_size = batch_size, generator=torch.Generator(device=device.type))

Next step: NN that takes in both words, converts them to embeddings and predicts whether they are cognates (yes/no) = (1/0), sigmoid classification

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

class SimpleCognateDataset(Dataset):
    def __init__(self, data, unique_characters, maxlen):
        self.data = data
        self.char_to_idx = {char: i+1 for i, char in enumerate(unique_characters)}
        self.maxlen = maxlen

    def encode_word(self, word):
        #This is way faster than the "proper method"
        #Because all the data on the cpu device, so making a tensor would send it to the gpu, then pull it from the gpu, then send it again
        #Maybe if we could make this dataset once and put it on the gpu once, then it would be way faster
        encoded = [self.char_to_idx.get(c, 0) for c in word]
        return encoded + [0] * (self.maxlen - len(encoded))
    
        #encoded = [self.char_to_idx.get(c, 0) for c in word]
        #return nn.functional.pad(torch.tensor(encoded, dtype=torch.long), (0, self.maxlen - len(encoded)), value=0)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        word1, word2, label = self.data[idx]
        return (
            torch.tensor(self.encode_word(word1), dtype=torch.long, device=device),
            torch.tensor(self.encode_word(word2), dtype=torch.long, device=device),
            torch.tensor(label, dtype=torch.float, device=device)
        )

In [None]:
import pandas as pd
from itertools import combinations

df = pd.read_csv("data/ielexData.csv")

df = df[['Meaning', 'Phonological Form', 'cc']].dropna()
df.columns = ['meaning', 'word', 'cognate_class']

ilexPairs = []

for _, group in df.groupby('meaning'):
    entries = group.to_dict('records')
    for w1, w2 in combinations(entries, 2):
        word1 = str(w1['word'])
        word2 = str(w2['word'])
        label = int(w1['cognate_class'] == w2['cognate_class'])
        ilexPairs.append((word1, word2, label))

import csv

# Initialize an empty set to store the languages
languages = set()

# Open and read the CSV file with UTF-8 encoding and error handling
with open('data/ielexData.csv', 'r', encoding='utf-8', errors='replace') as file:
    reader = csv.reader(file)
    next(reader)  # Skip the header row
    for row in reader:
        languages.add(row[1])  # Add the language (second column) to the set

# Print the set of languages
print(languages)

df

In [None]:
df = pd.read_csv("data/gled.tsv", delimiter="\t")

df.rename(columns={'LANGUAGE_NAME': 'Language'}, inplace=True)

df['Language'] = df['Language'].str.replace('Modern Greek', 'Greek', regex=False)

#df = df[df['Language'].isin(languages)]

#df = df[df.apply(lambda row: row['DOCULECT'].lower() in row['Language'].lower(), axis=1)]

#These languages seem to be aight
df = df[df["DOCULECT"].str.contains("_2")]

#df.dropna(subset=["COGSET"], inplace=True) All of them have a cogset

df

In [None]:
df = df[['CONCEPT', 'FORM', 'COGSET']].dropna()
df.columns = ['meaning', 'word', 'cognate_class']

pairs = []

from tqdm import tqdm
for _, group in tqdm(df.groupby('meaning')):
    #if len(pairs) > 100000:
    #    break

    entries = group.to_dict('records')
    for w1, w2 in combinations(entries, 2):
        word1 = str(w1['word'])
        word2 = str(w2['word'])
        if(word1 == word2):
            continue
        label = int(w1['cognate_class'] == w2['cognate_class'])
        #if label == 1:
        pairs.append((word1, word2, label))

In [None]:
len(pairs)

In [None]:
pairs += ilexPairs

In [None]:
pairs

In [None]:
pairs_df = pd.DataFrame(pairs, columns=['Word1', 'Word2', 'Label'])
pairs_df

In [None]:
from sklearn.model_selection import LeaveOneOut, KFold, StratifiedKFold
import numpy as np
from tqdm import tqdm
import time

# Prepare data
all_words = [w for pair in pairs for w in pair[:2]]
all_answers = [w for pair in pairs for w in pair[2:]]
unique_characters = sorted(set("".join(all_words)))
maxlen = max(len(w) for w in all_words)

import matplotlib.pyplot as plt
def show_box_plot(data, title="Cross Validation Accuracies"):
	f = plt.figure()
	f.set_figwidth(4)
	f.set_figheight(1)

	# Create a box and whiskers plot
	plt.boxplot(data, vert=False)

	plt.scatter(data, [1]*len(data), color='blue', alpha=0.3, s=20)

	# Add title and labels
	plt.title(title)
	plt.ylabel('Values')

	# Show the plot
	plt.show()

def show_confusion_matrix(cm, title="Confusion Matrix"):
	fig, ax = plt.subplots()
	cax = ax.matshow(cm, cmap=plt.cm.Blues)

	for (i, j), z in np.ndenumerate(cm):
		ax.text(j, i, '{:0.1f}'.format(z), ha='center', va='center',
				bbox=dict(boxstyle='round', facecolor='white', edgecolor='0.3'))
	
	ax.set_xticks([0, 1])
	ax.set_yticks([0, 1])   
	ax.set_xticklabels(['Predicted Negative', 'Predicted Positive'])
	ax.set_yticklabels(['True Negative', 'True Positive'])
	plt.xlabel('Predicted')
	plt.ylabel('True')
	plt.title(title)
	plt.savefig("test.svg", format="svg")
	plt.savefig("confusion.svg")

def show_roc_curve(y_true, y_scores, title="ROC Curve"):
    """
    Show ROC curve for binary classification
    
    Args:
        y_true: True binary labels
        y_scores: Target scores (probabilities or confidence scores)
        title: Title for the plot
    """
    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    roc_auc = auc(fpr, tpr)
    
    # Create the plot
    plt.figure(figsize=(6, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, 
             label=f'ROC curve (AUC = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', 
             label='Random classifier')
    
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(title)
    plt.legend(loc="lower right")
    plt.grid(True, alpha=0.3)
    plt.show()
    
    return roc_auc

def cv_test_model(model_maker, has_history=False):
	cv = StratifiedKFold(n_splits = 5)
	epochs = 15

	# Store results for each fold
	# All of these are [epoch_number, fold, (confusion_matrix, accuracy)]
	
	history = {
		"train": [
			[tuple((None, None)) for _ in range(cv.get_n_splits())]
			for _ in range(epochs)
		],
		"test": [
			[tuple((None, None)) for _ in range(cv.get_n_splits())]
			for _ in range(epochs)
		]
	}

	for fold, (train_idx, test_idx) in enumerate(tqdm(cv.split(pairs_df.iloc[:, :2], pairs_df.iloc[:, 2:]), total=cv.get_n_splits(), desc="CV Progress")):
		# Split data
		train_data = [pairs[i] for i in train_idx]
		test_data = [pairs[i] for i in test_idx]
		
		# Create model for this fold
		model = model_maker()
		criterion = nn.BCELoss()
		optimizer = optim.Adam(model.parameters(), lr=0.001)
		
		# Create datasets and dataloaders
		train_dataset = SimpleCognateDataset(train_data, unique_characters, maxlen)
		test_dataset = SimpleCognateDataset(test_data, unique_characters, maxlen)
		
		train_loader = DataLoader(train_dataset, batch_size=3200, shuffle=True, 
								generator=torch.Generator(device=device.type))
		test_loader = DataLoader(test_dataset, batch_size=3200, 
								generator=torch.Generator(device=device.type))
		
		
		def test_against_loader(model, loader):
			"""
			Modified test_against_loader that returns predictions, probabilities, and true labels
			"""
			predictions = []
			probabilities = []  # Raw model outputs (probabilities)
			true_labels = []

			with torch.no_grad():
				for word1_batch, word2_batch, label_batch in loader:
					output_batch = model(word1_batch, word2_batch).squeeze()
					predicted_batch = (output_batch > 0.5).int()

					# Store results for the entire batch
					predictions.extend(predicted_batch.tolist())
					probabilities.extend(output_batch.tolist())  # Store raw probabilities
					true_labels.extend(label_batch.int().tolist())

			# Calculate accuracy
			accuracy = sum([pred == true for pred, true in zip(predictions, true_labels)]) / len(true_labels)

			# Additional metrics
			from sklearn.metrics import confusion_matrix
			
			# Calculate confusion matrix
			conf_matrix = confusion_matrix(true_labels, predictions)

			return conf_matrix, accuracy, probabilities, true_labels

		# Train model
		model.train()
		for epoch in range(epochs):
			total_loss = 0
			for word1, word2, label in train_loader:
				optimizer.zero_grad()
				output = model(word1, word2).squeeze()
				#print(output.get_device(), label.get_device())
				loss = criterion(output, label)
				loss.backward()
				optimizer.step()
				total_loss += loss.item()
			#print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(train_loader):.4f}")

			if has_history:
				model.eval()
				history["train"][epoch][fold] = test_against_loader(model, train_loader)
				history["test"][epoch][fold] = test_against_loader(model, test_loader)

				#Early cutoff if overfitting
				if history["train"][epoch][fold][1] >= .99:
					history["train"] = history["train"][0:epoch]
					history["test"] = history["test"][0:epoch]
					return history, model
				
				model.train()

		if not has_history:
			history["test"][-1][fold] = test_against_loader(model, test_loader)

	return history, model


In [None]:
from transformer_stuff import TransformerCognateModel, UnbatchedWrapper
#from lstm_stuff import SimplePairNN
def simple_nn_model_maker():
    # Model parameters
    embedding_dim = 64
    hidden_dim = 128
    vocab_size = len(unique_characters)

    #return SimplePairNN(vocab_size, embedding_dim, hidden_dim)
    return TransformerCognateModel(vocab_size, embedding_dim, hidden_dim)


history, model = cv_test_model(simple_nn_model_maker, has_history=True)

In [None]:
# Calculate overall statistics
last_epoch_test_data = history["test"][-1] #[fold, (pred, labels, accuracies, accuracy)]

all_confusion = np.sum(np.array([fold_data[0] for fold_data in last_epoch_test_data]), axis=0)
all_accuracies = [fold_data[1] for fold_data in last_epoch_test_data]

overall_accuracy = np.mean(all_accuracies)
correct_predictions = all_confusion[1][1] + all_confusion[0][0]
total_predictions = np.sum(all_confusion)

print(f"\nCross-Validation Results:")
print(f"Overall Accuracy: {overall_accuracy * 100:.2f}%")
print(f"Correct Predictions: {correct_predictions}/{total_predictions}")

fold_accuracies = [fold_data[1] for fold_data in history["test"][-1]]
show_box_plot(fold_accuracies)

show_confusion_matrix(all_confusion)

# Initialize lists to store accuracies for each fold
train_fold_accuracies = np.zeros((len(history["train"][-1]), len(history["train"])))
test_fold_accuracies = np.zeros((len(history["train"][-1]), len(history["train"])))

# Extract training and testing accuracies for each fold across all epochs
for epoch_index, epoch_data in enumerate(history["train"]):
    for fold_index, fold_data in enumerate(epoch_data):
        train_fold_accuracies[fold_index][epoch_index] = fold_data[1]

for epoch_index, epoch_data in enumerate(history["test"]):
    for fold_index, fold_data in enumerate(epoch_data):
        test_fold_accuracies[fold_index][epoch_index] = fold_data[1]

# Plot training and testing accuracies for each fold on one graph
plt.figure(figsize=(10, 5))
for fold_index, accuracies in enumerate(train_fold_accuracies):
    plt.plot(accuracies, color='orange', alpha=.4)
for fold_index, accuracies in enumerate(test_fold_accuracies):
    plt.plot(accuracies, color='blue', alpha=.4)

# Add custom legend
leg = plt.legend(['Training', 'Testing'], loc='upper left')
leg.legend_handles[0].set_color('orange')
leg.legend_handles[1].set_color('blue')

plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.xticks(np.arange(1, len(history["train"]), step=1))
plt.title('Training and Testing Accuracies Over Epochs')
plt.show()

In [None]:
history

In [None]:
from sklearn.metrics import roc_curve, auc

def show_multiple_roc_curves(history, title="ROC Curves - Last Epoch by Fold", dataset_type="test"):
    """
    Show ROC curves for each fold from the last epoch only
    Each fold gets its own line, with train/test distinguished by color
    
    Args:
        history: The history dictionary from cv_test_model
        title: Title for the plot
    """
    plt.figure(figsize=(8, 6))
    
    # Get last epoch data
    test_last_epoch = history[dataset_type][-1]
    
    test_aucs = []
    
    # Plot ROC curve for each fold - Test data
    for fold_idx, fold_data in enumerate(test_last_epoch):
        conf_matrix, accuracy, probabilities, true_labels = fold_data
        if probabilities is not None and true_labels is not None:
            fpr, tpr, _ = roc_curve(true_labels, probabilities)
            roc_auc = auc(fpr, tpr)
            test_aucs.append(roc_auc)
            plt.plot(fpr, tpr, color='blue', alpha=0.4, linewidth=2)
    
    # Plot random classifier line
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', alpha=0.8)
    plt.tight_layout()
    plt.xlim([0, 1])
    plt.ylim([0, 1])
    plt.show()

def show_roc_curves_over_epochs(history, title="ROC"):
    """
    Show ROC AUC scores over epochs for train/test data
    Similar to the accuracy over epochs plot
    
    Args:
        history: The history dictionary from cv_test_model
        title: Title for the plot
    """
    num_folds = len(history["train"][0])
    num_epochs = len(history["train"])
    
    # Initialize arrays to store AUC scores
    train_fold_aucs = np.zeros((num_folds, num_epochs))
    test_fold_aucs = np.zeros((num_folds, num_epochs))
    
    # Extract AUC scores for each fold across all epochs
    for epoch_index, epoch_data in enumerate(history["train"]):
        for fold_index, fold_data in enumerate(epoch_data):
            conf_matrix, accuracy, probabilities, true_labels = fold_data
            if probabilities is not None and true_labels is not None:
                fpr, tpr, _ = roc_curve(true_labels, probabilities)
                train_fold_aucs[fold_index][epoch_index] = auc(fpr, tpr)
    
    for epoch_index, epoch_data in enumerate(history["test"]):
        for fold_index, fold_data in enumerate(epoch_data):
            conf_matrix, accuracy, probabilities, true_labels = fold_data
            if probabilities is not None and true_labels is not None:
                fpr, tpr, _ = roc_curve(true_labels, probabilities)
                test_fold_aucs[fold_index][epoch_index] = auc(fpr, tpr)
    
    # Plot AUC scores for each fold on one graph
    plt.figure(figsize=(10, 5))
    for fold_index, aucs in enumerate(train_fold_aucs):
        plt.plot(aucs, color='orange', alpha=0.4)
    for fold_index, aucs in enumerate(test_fold_aucs):
        plt.plot(aucs, color='blue', alpha=0.4)
    
    # Add custom legend
    leg = plt.legend(['Training', 'Testing'], loc='upper left')
    leg.legend_handles[0].set_color('orange')
    leg.legend_handles[1].set_color('blue')
    
    plt.xlabel('Epochs')
    plt.ylabel('ROC AUC')
    plt.xticks(np.arange(0, num_epochs, step=1))
    plt.title(title)
    plt.grid(True, alpha=0.3)
    plt.ylim([0.0, 1.0])  # AUC is always between 0 and 1
    plt.show()
    
    return train_fold_aucs, test_fold_aucs

show_multiple_roc_curves(history)

# Show ROC AUC progression over epochs (similar to accuracy plot)
train_fold_aucs, test_fold_aucs = show_roc_curves_over_epochs(history, "ROC AUC Over Epochs")

In [None]:
def show_classification_report(history):
    all_labels = []
    all_predictions = []
    for fold_data in history["test"][-1]:
        _, _, probabilities, true_labels = fold_data
        all_labels.extend(true_labels)
        all_predictions.extend([int(prob > .5) for prob in probabilities])
    from sklearn.metrics import classification_report
    print(classification_report(all_labels, all_predictions))

show_classification_report(history)

https://github.com/pytorch/examples/tree/main/word_language_model