In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
import random
import os

# Manual GloVe loader
def load_glove(file_path):
    word2idx = {}
    vectors = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            values = line.strip().split()
            word = values[0]
            vec = np.array(values[1:], dtype=np.float32)
            word2idx[word] = i
            vectors.append(vec)
    # Add OOV token at the end
    word2idx["<OOV>"] = len(vectors)
    vectors.append(np.zeros_like(vectors[0]))
    glove_vectors = torch.tensor(np.stack(vectors))
    return word2idx, glove_vectors

# Load GloVe embeddings
word2idx, glove_vectors = load_glove("glove/glove.6B.100d.txt")


In [10]:
def splitData(file_path="data/data.tsv"):
    df = pd.read_csv(file_path, sep="\t")
    train, test = train_test_split(df, test_size=0.2, random_state=40, stratify=df.label)
    train, val = train_test_split(train, test_size=0.2, random_state=40, stratify=train.label)

    # Save splits
    os.makedirs("data", exist_ok=True)
    train.to_csv("data/train.tsv", sep="\t", index=False)
    val.to_csv("data/validation.tsv", sep="\t", index=False)
    test.to_csv("data/test.tsv", sep="\t", index=False)
    overfit = df.sample(n=50)
    overfit.to_csv("data/overfit.tsv", sep="\t", index=False)


In [11]:
splitData()

In [12]:
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, split="train"):
        df = pd.read_csv(f"data/{split}.tsv", sep="\t")
        self.texts = df.text.tolist()
        self.labels = df.label.tolist()

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

def my_collate_function(batch, device, word2idx=word2idx):
    texts, labels = zip(*batch)
    max_len = max(len(t.split()) for t in texts)
    tokenized = []
    for t in texts:
        tokens = [word2idx.get(w, word2idx["<OOV>"]) for w in t.split()]
        tokens += [word2idx["<OOV>"]] * (max_len - len(tokens))
        tokenized.append(tokens)
    return torch.tensor(tokenized, dtype=torch.long).to(device), torch.tensor(labels, dtype=torch.float32).to(device)


In [13]:
class CNNModel(nn.Module):
    def __init__(self, glove_vectors, k1, k2, n1, n2, freeze=True):
        super().__init__()
        self.embedding = nn.Embedding.from_pretrained(glove_vectors, freeze=freeze)
        self.k1 = (k1, glove_vectors.shape[1])
        self.k2 = (k2, glove_vectors.shape[1])
        self.n1 = n1
        self.n2 = n2

        self.conv1 = nn.Conv2d(1, n1, self.k1, bias=False)
        self.bn1 = nn.BatchNorm2d(n1)
        self.pool1 = nn.AdaptiveMaxPool2d((1,1))

        self.conv2 = nn.Conv2d(1, n2, self.k2, bias=False)
        self.bn2 = nn.BatchNorm2d(n2)
        self.pool2 = nn.AdaptiveMaxPool2d((1,1))

        self.out = nn.Linear(n1 + n2, 1)
    
    def forward(self, x):
        e = self.embedding(x)  # (seq_len, batch, embed)
        e = e.transpose(0,1).unsqueeze(1)  # (batch,1,seq_len,embed)
        x1 = F.relu(self.bn1(self.conv1(e)))
        x1 = self.pool1(x1)
        x2 = F.relu(self.bn2(self.conv2(e)))
        x2 = self.pool2(x2)
        concat = torch.cat((x1, x2), dim=1).squeeze()
        return torch.sigmoid(self.out(concat)).reshape(-1)


In [14]:
def train_model(glove_vectors, split_names, epochs, batch_size, lr, k1,k2,n1,n2, freeze=True):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    datasets = {name: TextDataset(name) for name in split_names}
    dataloaders = {name: torch.utils.data.DataLoader(
        datasets[name], batch_size=batch_size, shuffle=False,
        collate_fn=lambda batch: my_collate_function(batch, device)
    ) for name in split_names}

    model = CNNModel(glove_vectors, k1,k2,n1,n2, freeze=freeze).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCEWithLogitsLoss()

    history = {"train_loss":[], "val_loss":[], "test_loss":[], "accuracy":[]}

    for epoch in range(epochs):
        model.train()
        train_loss = 0
        for X, y in dataloaders["train"]:
            optimizer.zero_grad()
            logits = model(X)
            loss = criterion(logits, y)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        history["train_loss"].append(train_loss/len(dataloaders["train"]))

        model.eval()
        val_loss, test_loss, acc = 0,0,0
        n_total = 0
        with torch.no_grad():
            # Validation
            for Xv, yv in dataloaders["validation"]:
                logits_v = model(Xv)
                val_loss += criterion(logits_v, yv).item()
            # Test
            for Xt, yt in dataloaders["test"]:
                logits_t = model(Xt)
                test_loss += criterion(logits_t, yt).item()
                preds = torch.round(torch.sigmoid(logits_t))
                acc += (preds==yt).sum().item()
                n_total += len(yt)
        history["val_loss"].append(val_loss/len(dataloaders["validation"]))
        history["test_loss"].append(test_loss/len(dataloaders["test"]))
        history["accuracy"].append(acc/n_total)
    
    return model, history


In [16]:
overfit_model, overfit_history = train_model(
    glove_vectors, split_names=["overfit","overfit","overfit"],
    epochs=1500, batch_size=4, lr=0.001, k1=2,k2=4,n1=50,n2=50
)

plt.plot(overfit_history["train_loss"], label="Overfit Loss")
plt.plot(overfit_history["accuracy"], label="Overfit Accuracy")
plt.legend()
plt.show()
print(overfit_history["accuracy"][-1])


ImportError: cannot import name 'Config' from 'torch.utils._config_module' (c:\Users\goran\anaconda3\envs\ece1786\Lib\site-packages\torch\utils\_config_module.py)

In [None]:
model, history = train_model(
    glove_vectors, split_names=["train","validation","test"],
    epochs=50, batch_size=4, lr=0.001, k1=2,k2=4,n1=20,n2=20
)

plt.plot(history["train_loss"], label="Train Loss")
plt.plot(history["val_loss"], label="Validation Loss")
plt.plot(history["test_loss"], label="Test Loss")
plt.plot(history["accuracy"], label="Accuracy")
plt.legend()
plt.show()

torch.save(model.state_dict(), "CNNModel.pt")


In [None]:
model_ft, history_ft = train_model(
    glove_vectors, split_names=["train","validation","test"],
    epochs=50, batch_size=4, lr=0.001, k1=2,k2=4,n1=20,n2=20, freeze=False
)

torch.save(model_ft.state_dict(), "CNNModel_FT.pt")
print(history_ft["accuracy"][-1])


In [None]:
# please create a folder called 'data' in colab and put 'overfit.tsv' 'train.tsv' 'test.tsv' 'validation.tsv' into 'data' folder
embedding, totalLoss, totalValidationLoss, totalTestLoss, totalAccuray = main(50,4,0.001,2,4,20,20)
plt.plot(totalLoss, label='Train Loss')
plt.plot(totalValidationLoss, label='Validation Loss')
plt.plot(totalTestLoss, label='Test Loss')
plt.plot(totalAccuray, label='Accuray')
plt.legend()

In [None]:
model_ft, history_ft = train_model(
    glove_vectors, split_names=["train","validation","test"],
    epochs=50, batch_size=4, lr=0.001, k1=2,k2=4,n1=20,n2=20, freeze=False
)

torch.save(model_ft.state_dict(), "CNNModel_FT.pt")
print(history_ft["accuracy"][-1])


In [None]:
print(max(totalAccuray))

In [None]:
class CNNModelNoFreeze(torch.nn.Module):
  def __init__(self, vocab,k1,k2,n1,n2):
    super().__init__()
    self.k1 = (k1, 100)
    self.k2 = (k2, 100)
    self.n1 = n1
    self.n2 = n2
    self.probabilityFunction = torch.nn.Sigmoid()

    self.embedding = torch.nn.Embedding.from_pretrained(vocab.vectors,freeze=False)

    self.conv1 = torch.nn.Conv2d(in_channels=1, out_channels=self.n1, kernel_size=self.k1, bias=False)
    self.bn1 = torch.nn.BatchNorm2d(self.n1)
    self.maxpool1 = torch.nn.AdaptiveMaxPool2d(output_size=(1, 1))

    self.conv2 = torch.nn.Conv2d(in_channels=1, out_channels=self.n2, kernel_size=self.k2, bias=False)
    self.bn2 = torch.nn.BatchNorm2d(self.n2)
    self.maxpool2 = torch.nn.AdaptiveMaxPool2d(output_size=(1, 1))

    self.out = torch.nn.Linear(self.n1+self.n2, 1)

  def forward(self, x):
    e = self.embedding(x)
    input = torch.transpose(e, 0, 1).unsqueeze(1)
    x1 = self.conv1(input)
    x1 = F.relu(x1)
    x1 = self.bn1(x1)
    x1 = self.maxpool1(x1)

    x2 = self.conv2(input)
    x2 = F.relu(x2)
    x2 = self.bn2(x2)
    x2 = self.maxpool2(x2)

    concatenate = torch.cat((x1, x2), dim=1)
    output = self.out(concatenate.squeeze())
    logits = self.probabilityFunction(output)

    return logits.reshape([-1])

In [None]:
def mainNoFreeze(epochs,batchSize,learningRate,k1,k2,n1,n2):
    #   fix seed
    torch.manual_seed(2)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print ("Using device:", device)

    ### 3.3 Processing of the data ###


    # 3.3.2

    train_dataset = TextDataset(glove, "train")
    val_dataset = TextDataset(glove, "validation")
    test_dataset = TextDataset(glove, "test")

    # 3.3.3
    train_dataloader = torch.utils.data.DataLoader(
        dataset=train_dataset,
        batch_size=batchSize,
        shuffle=False,
        collate_fn=lambda batch: my_collate_function(batch, device))
    validation_dataloader = torch.utils.data.DataLoader(
        dataset=val_dataset,
        batch_size=batchSize,
        shuffle=False,
        collate_fn=lambda batch: my_collate_function(batch, device))

    test_dataloader = torch.utils.data.DataLoader(
        dataset=test_dataset,
        batch_size=batchSize,
        shuffle=False,
        collate_fn=lambda batch: my_collate_function(batch, device))

    # Instantiate your model(s) and train them and so on
    # We suggest parameterizing the model - k1, n1, k2, n2, and other hyperparameters
    # so that it is easier to experiment with

    #4.3
    model = CNNModelNoFreeze(glove,k1=k1,k2=k2,n1=n1,n2=n2)
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learningRate)
    lossFunction = torch.nn.BCEWithLogitsLoss()
    probabilityFunction = torch.nn.Sigmoid()

    totalLoss = []
    totalTestLoss = []
    totalValidationLoss = []
    totalAccuray = []

    for i in range(epochs):
      print(i)
      currentEpochLoss = 0
      currentEpochLossV = 0
      currentEpochLossT = 0
      currentEpochValidationLoss = 0
      for X_train, y_train in train_dataloader:
        optimizer.zero_grad()
        logits = model(x=X_train)
        currentLoss = lossFunction(logits, y_train.float())
        currentEpochLoss = currentEpochLoss + currentLoss
        currentLoss.backward()
        optimizer.step()

      averageCurrentEpochLoss = currentEpochLoss/len(train_dataloader)
      totalLoss.append(averageCurrentEpochLoss.item())

      for X_validation, y_validation in validation_dataloader:
        model.eval()
        with torch.no_grad():
          logitsV = model(x=X_validation)
        currentLossV = lossFunction(logitsV, y_validation.float())
        currentEpochLossV = currentEpochLossV + currentLossV

      averageCurrentEpochLossV = currentEpochLossV/len(validation_dataloader)
      totalValidationLoss.append(averageCurrentEpochLossV.item())

    #4.5
      currentAccuray = 0
      numbers = 0
      for X_test, y_test in test_dataloader:
        model.eval()
        with torch.no_grad():
          logitsT = model(x=X_test)
        currentLossT = lossFunction(logitsT, y_test.float())
        currentEpochLossT = currentEpochLossT + currentLossT

        probability = probabilityFunction(logitsT)
        probability = torch.maximum(probability, torch.tensor([1e-5]))
        probability = torch.minimum(probability, torch.tensor([0.99999]))
        Y_pred = torch.round(probability)

        n = 0
        for eachy_test in y_test:
          if eachy_test.item() == Y_pred[n]:
            currentAccuray  = currentAccuray + 1
          n = n + 1
        numbers = numbers + len(Y_pred)

      totalEpochAccuray = currentAccuray/numbers
      totalAccuray.append(totalEpochAccuray)

      averageCurrentEpochLossT = currentEpochLossT/len(validation_dataloader)
      totalTestLoss.append(averageCurrentEpochLossT.item())

    #4.7
    torch.save(model.state_dict(), '/content/CNNNoFreezeModel')


    return model, totalLoss, totalValidationLoss, totalTestLoss, totalAccuray

In [None]:
embeddingNoFreeze, totalLossNoFreeze, totalValidationLossNoFreeze, totalTestLossNoFreeze, totalAccurayNoFreeze = mainNoFreeze(50,4,0.001,2,4,20,20) #n1,2-20 0.903
plt.plot(totalLossNoFreeze, label='Train Loss')
plt.plot(totalValidationLossNoFreeze, label='Validation Loss')
plt.plot(totalTestLossNoFreeze, label='Test Loss')
plt.plot(totalAccurayNoFreeze, label='Accuray')
plt.legend()