In [None]:
# %pip install optuna datasets

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.optim import AdamW
import pandas as pd
import numpy as np
from datasets import load_dataset, Dataset
from collections import Counter
from tqdm.cli import tqdm
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import classification_report
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from sklearn.metrics import PrecisionRecallDisplay, precision_recall_curve, roc_curve
import matplotlib.pyplot as plt
import optuna

In [None]:
# set torch device
# mps device = Mac M1 and above
device = torch.device("cpu")
if torch.has_cuda(): 
    device = torch.device("cuda")
if torch.has_mps():
    device = torch.device("mps")


In [None]:
# gender classification dataset
dataset = load_dataset("mingiryu/name_gender_inference")['train']

In [None]:
dataset_df = pd.DataFrame(dataset)
dataset_df = dataset_df[dataset_df['gender'] != 'u']
# coding labels, male = 0, female = 1
dataset_df['label'] = dataset_df['gender'].apply(lambda ge: 0 if ge == 'm' else 1)

In [None]:
max(len(s) for s in dataset_df['full_name'])

In [None]:
dataset_df.iloc[4]

In [None]:
# create a very simple char tokenizer with unknown token

char_counter = Counter()
for n in dataset_df['full_name']:
    char_counter.update(n)

char_dict = dict()
char_dict['<pad>'] = 0
char_dict['<unk>'] = 1
char_dict['<s>'] = 2
char_dict['</s>'] = 3

# only keep chars with frequency >= 5
for w in char_counter.keys():
    if char_counter[w] < 5: 
        continue
    char_dict[w] = len(char_dict)
id_to_char = dict([(v, k) for (k, v) in char_dict.items()])

# char_dict
# id_to_char

In [None]:
# replace unknown with <unk> in tokenizer
# pad all names to max_length
MAX_LENGTH = 30

def tokenize_function(e):
    tokenized_batch = dict()
    tokenized_batch['input_ids'] = [
        [char_dict.get("<s>")] + \
        [char_dict.get(w, char_dict['<unk>']) for w in ee[:MAX_LENGTH]] + \
        [char_dict.get("</s>")] +
        [char_dict.get('<pad>')] * (MAX_LENGTH - len(ee)) \
        for ee in e['full_name']
    ]
    if 'label' in e:
        tokenized_batch['label'] = e['label']

    return tokenized_batch

ds_all = Dataset.from_pandas(dataset_df[["full_name", "label"]], preserve_index=False).map(tokenize_function, batched=True)

ds_all = ds_all.remove_columns(['full_name'])

for train_ids, test_ids in StratifiedShuffleSplit(n_splits=1, 
                                                  test_size=0.01, 
                                                  random_state=1331
                                                  ).split(ds_all, ds_all['label']):
    ds_train = Dataset.from_dict(ds_all[train_ids])
    ds_test = Dataset.from_dict(ds_all[test_ids])


In [None]:
ds_test[1]

In [None]:
# set format to torch tensors

ds_train.set_format('torch')
ds_test.set_format('torch')

In [None]:
class TextCNN(nn.Module):
    def __init__(self, kernel_num, embed_dim, vocab_size, class_num):
        super(TextCNN, self).__init__()
        ci = 1  # input channel size
        kernel_size = [2, 3, 4]
        dropout = 0.5

        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)        
        self.convs = nn.ModuleList([nn.Conv2d(ci, kernel_num, (k, embed_dim)) for k in kernel_size])
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(len(kernel_size) * kernel_num, class_num)

        self.init_weight()
        
    def conv_and_pool(self, x, conv):
        # x: (batch, 1, sentence_length, embed_dim)
        x = conv(x)
        # x: (batch, kernel_num, H_out, 1)
        x = F.relu(x.squeeze(3))
        # x: (batch, kernel_num, H_out)
        x = F.max_pool1d(x, x.size(2)).squeeze(2)
        #  (batch, kernel_num)
        return x

    def forward(self, x):
        # x: (batch, sentence_length)
        x = self.embed(x)
        # x: (batch, sentence_length, embed_dim)
        x = x.unsqueeze(1)
        # x: (batch, 1, sentence_length, embed_dim)       
        x = torch.cat([self.conv_and_pool(x, conv) for conv in self.convs], 1)  
        # x: (batch, len(kernel_size) * kernel_num)
        x = self.dropout(x)
        logit = F.log_softmax(self.fc1(x), dim=1)
        return logit

    def init_weight(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight)
                nn.init.constant_(m.bias, 0)

In [None]:
train_batch_size = 16
test_batch_size = 16
train_dataloader = torch.utils.data.DataLoader(ds_train, batch_size=train_batch_size, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(ds_test, batch_size=test_batch_size, shuffle=False)

In [None]:
weight_pt = torch.tensor([1., 1.]) # can set different weights to each class
weight_pt = weight_pt.to(device)

def train(model, device, train_dataloader, optim, epoch):
    model.train()
    total_training_data = len(train_dataloader.dataset)
    total_batches = len(train_dataloader)
    
    optim.zero_grad()
    
    for b_i, b in enumerate(train_dataloader):
        
        X = b['input_ids']
        y = b['label']
        X, y = X.to(device), y.to(device)

        pred_prob = model(X)
        loss = F.nll_loss(pred_prob, y, weight=weight_pt) # negative log-likelihood loss
        loss.backward()
        # gradient accumulation steps = 4
        if (b_i % 4 == 0) or (b_i == total_batches - 1):
            optim.step()
            optim.zero_grad()
        # print log steps = 1000
        if (b_i % 1000 == 0) or (b_i == total_batches - 1):
            print('Epoch: {} [{}/{} ({:.0f}%)] training loss: {:.5f}'.format(
                    epoch, 
                    (b_i + 1) * train_batch_size, 
                    total_training_data,
                    100. * (b_i + 1) / total_batches, 
                    loss.item()
                    ))

def test(model, device, test_dataloader):
    model.eval()
    total_test_data = len(test_dataloader.dataset)

    loss = 0.
    success = 0.
    with torch.no_grad():
        for b in test_dataloader:
            X = b['input_ids']
            y = b['label']
            X, y = X.to(device), y.to(device)
            pred_prob = model(X)
            loss += F.nll_loss(pred_prob, y, reduction='sum').item()  # loss summed across the batch
            pred = pred_prob.argmax(dim=1, keepdim=True)  # use argmax to get the most likely prediction
            success += pred.eq(y.view_as(pred)).sum().item()

    loss /= total_test_data
    
    accuracy = 100. * success / total_test_data

    print('\nTest set loss: {:.4f}, Accuracy: {:.0f}/{} ({:.0f}%)\n'.format(
        loss, 
        success, 
        total_test_data, 
        accuracy))
    
    return accuracy

def objective(trial):
    k_num = trial.suggest_int("kernel_num", 128, 384, step=64) # output channel size
    embed_dim = 128
    # if you want to add another hyperparameter, comment the above and do so as follows
    # embed_dim = trial.suggest_int("embed_dim", 128, 384, step=64) # word embedding size
    
    model = TextCNN(kernel_num=k_num, embed_dim=embed_dim, vocab_size=len(char_dict), class_num=2)
    model.to(device)
    
    lr = trial.suggest_float("lr", 1e-4, 1e-3, log=True)
    optimizer = AdamW(model.parameters(), lr=lr)
    
    train_epochs = 3
    for epoch in range(1, train_epochs + 1):
        train(model, device, train_dataloader, optimizer, epoch)
        accuracy = test(model, device, test_dataloader)
        trial.report(accuracy, epoch)
        
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return accuracy

In [None]:
study = optuna.create_study(study_name="gender_from_name", direction="maximize")
study.optimize(objective, n_trials=100, timeout=86400)

pruned_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.PRUNED]
complete_trials = [t for t in study.trials if t.state == optuna.trial.TrialState.COMPLETE]

print("results: ")
print("num_trials_conducted: ", len(study.trials))
print("num_trials_pruned: ", len(pruned_trials))
print("num_trials_completed: ", len(complete_trials))

print("results from best trial:")
trial = study.best_trial

print("accuracy: ", trial.value)
print("hyperparameters: ")
for key, value in trial.params.items():
    print("{}: {}".format(key, value))

In [None]:
study.best_trial.params
# {'kernel_num': 256, 'lr': 0.00044156544406933923}

In [None]:
# use best params to do the final training

model = None

def final_train():
    global model
    kern_num = study.best_trial.params['kernel_num']
    lr = study.best_trial.params['lr']
    
    model = TextCNN(kernel_num=kern_num, embed_dim=128, vocab_size=len(char_dict), class_num=2)
    model.to(device)

    optimizer = AdamW(model.parameters(), lr=lr)
    train_epochs = 3
    for epoch in range(1, train_epochs + 1):
        train(model, device, train_dataloader, optimizer, epoch)
        test(model, device, test_dataloader)

final_train()

In [None]:
torch.save(model.state_dict(), "final_model.pt")

In [None]:
# when loading saved model for testing
# kern_num = study.best_trial.params['kernel_num']
# model = TextCNN(kernel_num=kern_num, embed_dim=128, vocab_size=len(char_dict), class_num=2)
# model.load_state_dict(torch.load("final_model.pt"))
# model.eval()

In [None]:
def final_test(model, device, test_dataloader):
    model.eval()
    predictions = []

    with torch.no_grad():
        for b in test_dataloader:
            X = b['input_ids']
            X = X.to(device)
            pred_prob = model(X)
            pred_prob = torch.exp(pred_prob) # convert to probability
            predictions.append(pred_prob.cpu())
    predictions = torch.vstack(predictions)
    predictions = predictions.numpy()
    return predictions

final_predictions = final_test(model, device, test_dataloader)
display = PrecisionRecallDisplay.from_predictions(ds_test['label'].numpy(), final_predictions[:,1], name="TextCNN")
_ = display.ax_.set_title("Precision-Recall curve")

In [None]:
target_names = ['Male', 'Female']
y_true = ds_test['label'].numpy()
print(classification_report(y_true, torch.argmax(torch.tensor(final_predictions), dim=-1), target_names=target_names))

In [None]:
def visualise_precision_recall_curve():
    plt.rcParams['figure.figsize'] = [6, 4]
    plt.rcParams['figure.dpi'] = 100 
    plt.xlim([0.0, 1.2])
    plt.ylim([0.0, 1.1])
    plt.xticks([0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
    plt.yticks([0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
    plt.plot(precision, recall)
    plt.title('Threshold / Precision-Recall Curve')
    plt.xlabel('Precision')
    plt.ylabel('Recall')
    plt.grid(alpha=0.2)
    last_y = 0.0
    for i in range(0, len(precision) - 1):
        if i == 0 or last_y - recall[i] > 0.05:
            plt.text(precision[i] + 0.02,
                     recall[i] + 0.02,
                     "{:.2f}".format(thresholds[i]),
                     alpha=0.8)
            last_y = recall[i]
            plt.plot(precision[i], recall[i], '-bo', markersize=3)

precision, recall, thresholds = precision_recall_curve(y_true, final_predictions[:, 1])

visualise_precision_recall_curve()

In [None]:
# predict helper

def predict_one(text_in):

    input_ids = tokenize_function({'full_name': [text_in.lower(),]})['input_ids']
    input_ids = torch.tensor(input_ids)
    input_ids = input_ids.to(device)
    model.eval()
    with torch.no_grad():
        pred_prob = model(input_ids)
    pred_prob = torch.exp(pred_prob)
    pred_prob = pred_prob.cpu().numpy()
    return pred_prob[:, 1].item()

predict_one("Aaaaaaaaa")

In [None]:
def visualise_ROC_with_reference(fpr, tpr, thresholds_roc):
    plt.rcParams['figure.figsize'] = [6, 4]
    plt.rcParams['figure.dpi'] = 100 
    plt.xlim([-0.2, 1.02])
    plt.ylim([-0.01, 1.1])
    plt.grid()
    plt.plot(fpr, tpr, label='current classifier')

    plt.xlabel('False Positive Rate / 1-Specificity')
    plt.ylabel('True Positive Rate / Sensitivity')
    last_y = 0.0
    for i in range(1, len(tpr) - 1): 
         if i == 0 or tpr[i] - last_y >= 0.055:
            plt.text(fpr[i] - 0.1,
                     tpr[i] + 0.021,
                     "{:.2f}".format(thresholds_roc[i]),
                                     alpha=0.8)
            last_y = tpr[i]
            plt.plot(fpr[i],tpr[i], '-ro', markersize=3)
    optimal_i = np.argmax(np.sqrt(tpr * (1 - fpr)))
    print("Decision threshold: {:.2f}, tpr: {:.2f}, fpr: {:.2f}"
          .format(thresholds_roc[optimal_i], tpr[optimal_i], fpr[optimal_i]))
    # random classifier curve
    # plt.plot([0.0, 1.0], [0.0, 1.0], color='blue', linestyle='--', alpha=0.3, label="random classifier")
    # perfect classifier 
    plt.plot(0.0, 1.0, 'o', color='blue', label="perfect classifier")
    # most optimal threshold
    plt.plot(fpr[optimal_i], tpr[optimal_i],' x', color='purple', markersize=12, label="threshold: {:2f}".format(thresholds_roc[optimal_i]))
    plt.legend()
    plt.show()

fpr, tpr, thresholds_roc = roc_curve(y_true, final_predictions[:, 1])

visualise_ROC_with_reference(fpr, tpr, thresholds_roc)


In [None]:
# set threshold as optimal and run eval
thr = 0.00203

female_prob = final_predictions[:,1]
probs_after_threshold = [0 if rr < thr else 1 for rr in female_prob]

cls_rep = classification_report(y_pred=probs_after_threshold,
                                y_true=y_true,
                                target_names=target_names, zero_division=0)
print(cls_rep)
        