In [None]:
!pip install sentence-transformers

In [None]:
from sentence_transformers import SentenceTransformer,InputExample,losses,evaluation,util
import pandas as pd
import numpy as np
from torch.utils.data import DataLoader
from tqdm import tqdm,trange
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics.pairwise import cosine_similarity, rbf_kernel as gaussian_similarity
from torch import nn
import torch
import torch.nn.functional as F
from sklearn.semi_supervised import LabelPropagation, LabelSpreading
from scipy import sparse as sp
import pickle
from sklearn.metrics import f1_score
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder

  from tqdm.autonotebook import tqdm, trange


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
data_dir = '/content/drive/MyDrive/SBERT_experiments/data/MR_toy/'
train = pd.read_csv(data_dir+'train.csv')
val = pd.read_csv(data_dir+'val.csv')
test = pd.read_csv(data_dir+'test.csv')
print('Train size:', train.shape)
print('Val size:', val.shape)
print('Test size:', test.shape)
train.head()
label_encoder = LabelEncoder()
X_train = train['text'].values
y_train = label_encoder.fit_transform(train['label'].values)

print("X_train shape:",X_train.shape)
print("y_train shape:",y_train.shape)

X_val = val['text'].values
y_val = label_encoder.fit_transform(val['label'].values)

X_test = test['text'].values
y_test = label_encoder.fit_transform(test['label'].values)

n_classes = len(np.unique(y_train))
print('No of classes:', n_classes)

Train size: (198, 5)
Val size: (444, 5)
Test size: (1110, 5)
X_train shape: (198,)
y_train shape: (198,)
No of classes: 2


In [None]:
seed = 42
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.mps.manual_seed(seed)
import sys
import os
import datetime
now = datetime.datetime.now()

In [None]:
if torch.cuda.is_available():
    device = 'cuda'
elif torch.backends.mps.is_available():
    device = 'mps'
else:
    device = 'cpu'
print('Using device:', device)
device = torch.device(device)

Using device: cuda


In [None]:
def encode(class_label1, class_label2):
    return 100*class_label1 + class_label2

# def make_training_pairs(X,y):
#     train_examples = []
#     # 80 -> p , 20 -> n
#     # for each sentence s (say in class n) ->
#     # select 20 random sentences out of 100, and make pairs accordingly(both labels 1,0) -> 16 np, 4 nn
#     # select 5 sentences from same class as s, label = 1 -> 5 nn
#     for i in range(len(X)):
#         random_indices = np.random.choice(len(X), random_pairs, replace=False)
#         for j in random_indices:
#             train_examples.append(InputExample(texts=[X[i], X[j]], label=encode(y[i],y[j])))
#         same_class_indices = np.where(y==y[i])[0]
#         random_indices = np.random.choice(same_class_indices, positive_pairs, replace=False)
#         for j in random_indices:
#             train_examples.append(InputExample(texts=[X[i], X[j]], label=encode(y[i],y[j])))
#     return train_examples

def make_training_pairs(X,y):
    examples = []
    for i in range(len(X)):
        examples.append(InputExample(texts=[X[i]],label=(y[i])))
    return examples

In [None]:
class ModifiedSentenceTransformer(SentenceTransformer):
    def __init__(self,model_name):
        super(ModifiedSentenceTransformer,self).__init__(model_name)
        self.num_epochs = 0
        self.current_loss = 0

    def update_epoch(self):
        print(f"Loss for epoch {self.num_epochs}:{self.current_loss}")
        self.current_loss = 0
        self.num_epochs = self.num_epochs + 1

In [None]:
from typing import Iterable, Dict
from torch import Tensor

def compute_weight_factor(beta,class_size):
        return (1-beta)/(1-beta**class_size)

def decode_labels(labels):
    return labels//100, labels%100

class CustomCosineSimilarityLoss(nn.Module):
    def __init__(self, model: SentenceTransformer,beta, cos_score_transformation=nn.Identity()):
        super(CustomCosineSimilarityLoss, self).__init__()
        self.model = model
        self.cos_score_transformation = cos_score_transformation
        self.beta = beta


    def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
        embeddings = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
        output = self.cos_score_transformation(torch.cosine_similarity(embeddings[0], embeddings[1]))
        class_labels_1, class_labels_2 = decode_labels(labels)
        freq = torch.bincount(class_labels_1,minlength=n_classes) + torch.bincount(class_labels_2,minlength=n_classes)
        w1 = compute_weight_factor(self.beta,freq[class_labels_1])
        w2 = compute_weight_factor(self.beta,freq[class_labels_2])
        w = w1*w2
        label = torch.where(class_labels_1==class_labels_2,1.,-1.)
        loss = w*(output-label)**2
        ret = (loss*w).mean()
        model.current_loss = model.current_loss + ret
        return ret

In [None]:
# make a classification head on top of SBERT
class ClassificationHead(nn.Module):
    def __init__(self, in_features, hidden_size, out_features):
    #def __init__(self, in_features, hidden_size1,hidden_size2, out_features):
        super().__init__()
        #self.linear1 = nn.Linear(in_features, hidden_size1)
        #self.relu1 = nn.ReLU()
        #self.dropout1 = nn.Dropout(p=0.1)
        #self.linear2 = nn.Linear(hidden_size1, hidden_size2)
        #self.relu2 = nn.ReLU()
        #self.dropout2 = nn.Dropout(p=0.1)
        #self.linear3 = nn.Linear(hidden_size2, out_features)

        self.linear1 = nn.Linear(in_features, hidden_size)
        self.relu = nn.ReLU()
        self.linear2 = nn.Linear(hidden_size, out_features)
        self.dropout = nn.Dropout(p=0.1)
        self.softmax = nn.Softmax(dim=1)


    def forward(self, features, **kwargs):
        #x = self.dropout1(self.relu1(self.linear1(features)))
        #x = self.dropout2(self.relu2(self.linear2(x)))
        #x = self.linear3(x)
        #return F.softmax(x, dim = 1)

        x = self.linear1(features)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return self.softmax(x)

def train_head(train_emb,val_emb,test_emb,y_train,y_val,y_test):

    train_dataloader = DataLoader(list(zip(train_emb,y_train)),shuffle=True,batch_size=16)
    val_dataloader = DataLoader(list(zip(val_emb,y_val)),shuffle=True,batch_size=16)
    test_dataloader = DataLoader(list(zip(test_emb,y_test)),shuffle=True,batch_size=16)
    # make classification head
    head = ClassificationHead(768,32,2)
    #head = ClassificationHead(768,32,16,2)

    head.to(device)
    # make optimizer
    optimizer = torch.optim.Adam(head.parameters(),lr=1e-2)
    # make loss function
    loss_func = nn.CrossEntropyLoss()
    # train the head
    head.train()
    for epoch in range(100):
        bloss = 0
        for batch, (X, y) in enumerate(train_dataloader):

            X = X.to(device)
            print(y)
            y = y.to(device)
            y_pred = head(X)
            loss = loss_func(y_pred,y.long())
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
#         if epoch % 10 == 0:
#             print("Classification head: EPOCH",epoch,"loss",bloss)
    # evaluate the head
    head.eval()
    y_true = []
    y_pred = []
    for X,y in train_dataloader:
        X = X.to(device)
        y = y.to(device)
        y_true.extend(y.cpu().numpy())
        y_pred.extend(torch.argmax(head(X),dim=1).cpu().numpy())
    train_acc = accuracy_score(y_true,y_pred)

    y_true = []
    y_pred = []
    for X,y in val_dataloader:
        X = X.to(device)
        print(y)
        y = y.to(device)
        y_true.extend(y.cpu().numpy())
        y_pred.extend(torch.argmax(head(X),dim=1).cpu().numpy())
    val_acc = accuracy_score(y_true,y_pred)

    y_true = []
    y_pred = []
    for X,y in test_dataloader:
        X = X.to(device)
        y = y.to(device)
        y_true.extend(y.cpu().numpy())
        y_pred.extend(torch.argmax(head(X),dim=1).cpu().numpy())
    test_acc = accuracy_score(y_true,y_pred)

    #head.train()
    return head,train_acc,val_acc,test_acc,y_true,y_pred

In [None]:
class SupConLossNew(nn.Module): #Multiple anchors
    def __init__(self, model: SentenceTransformer,temperature=0.1,base_temperature=0.07):
        super(SupConLossNew, self).__init__()
        self.model = model
        # self.class_len = class_len
        self.temperature = temperature
        self.base_temperature = base_temperature

    def forward(self,sentence_features:Iterable[Dict[str, Tensor]], labels):
        '''
             labels:labels corresponding to each embeddings
        '''
        embeddings = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
        classes,count = torch.unique(labels,return_counts=True)
        embeddings=embeddings[0]
        main_loss = 0
        for i in range(embeddings.shape[0]):
          denominator = 0
          for j in range(i+1,embeddings.shape[0]):
            if((i!=j and labels[j]!=labels[i])):
              dot_product = torch.dot(embeddings[i],embeddings[j])
              denominator=denominator+torch.exp(dot_product/self.temperature)
          loss = 0
          if(denominator==0):
            denominator = 1e-3
          cardinality = count[classes[labels[i]]]
          if(cardinality==0):
            cardinality = 1e-3
          for j in range(i+1,embeddings.shape[0]):
            if((i!=j)and labels[j]==labels[i]):
              loss = loss-(torch.log(torch.exp(torch.dot(embeddings[i],embeddings[j])/self.temperature)/denominator))/cardinality
          main_loss = main_loss+loss
        model.current_loss += main_loss
        return main_loss

In [None]:
class SupConLossHardNegative(nn.Module): #Multiple anchors, mine
    def __init__(self, model: SentenceTransformer,temperature=0.1,base_temperature=0.07):
        super(SupConLossNew, self).__init__()
        self.model = model
        # self.class_len = class_len
        self.temperature = temperature
        self.base_temperature = base_temperature

    def forward(self,sentence_features:Iterable[Dict[str, Tensor]], labels):
        '''
             labels:labels corresponding to each embeddings
        '''
        embeddings = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
        classes,count = torch.unique(labels,return_counts=True)
        embeddings=embeddings[0]
        main_loss = 0
        for i in range(embeddings.shape[0]):
          denominator = 0
          for j in range(embeddings.shape[0]):
            if((i!=j and labels[j]!=labels[i])):
              dot_product = torch.dot(embeddings[i],embeddings[j])
              denominator=denominator+torch.exp(dot_product/self.temperature)
          denominator_new = 0
          for j in range(embeddings.shape[0]):
            if((i!=j and labels[j]!=labels[i])):
              dot_product = torch.dot(embeddings[i],embeddings[j])
              denominator_new=denominator_new+torch.exp(dot_product/self.temperature)*torch.exp(dot_product/self.temperature)*(embeddings[0].shape-count[classes[labels[i]]])/denominator
          loss = 0
          if(denominator==0):
            denominator = 1e-3
          cardinality = count[classes[labels[i]]]-1
          if(cardinality==0):
            cardinality = 1e-3
          p=0
          for j in range(embeddings.shape[0]):
            if((i!=j)and labels[j]==labels[i]):
              p = p+torch.dot(embeddings[i],embeddings[j])
          for j in range(embeddings.shape[0]):
            if((i!=j)and labels[j]==labels[i]):
              loss = loss-(torch.log(torch.exp(torch.dot(embeddings[i],embeddings[j])/self.temperature)/(p+denominator_new)))/cardinality
          main_loss = main_loss+loss
        model.current_loss += main_loss
        return main_loss

In [None]:
from torch import Tensor
from typing import Iterable, Dict

class SupConLoss(nn.Module): ##Saurabh's
    def __init__(self, model: SentenceTransformer,temperature=0.1,base_temperature=0.07):
        super(SupConLoss, self).__init__()
        self.model = model
        # self.class_len = class_len
        self.temperature = temperature
        self.base_temperature = base_temperature

    def forward(self,sentence_features:Iterable[Dict[str, Tensor]], labels):
        '''embeddings: Tensor containing the embeddings
           labels:labels corresponding to each embeddings
        '''
        embeddings = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
        print(embeddings[0].shape)
        embeddings = torch.stack(embeddings)
        print(embeddings.shape)
        embeddings = embeddings.permute(1,0,2)
        # for each anchor, find negative pairs by randomly sampling from rest of batch
        batch_size = embeddings.size(0)
        print(batch_size)
        loss = 0
        for i in range(batch_size):
            anchor = embeddings[i][-1]
            positives = embeddings[i][:-1]
            j = np.random.randint(0, batch_size)
            negatives = torch.empty((0,768)).to(device)
            while negatives.shape[0] < 3:
                if labels[j] != labels[i]:
                    emb = embeddings[j][-1]
                    emb = emb.unsqueeze(0)
                    negatives = torch.cat((negatives, emb))
                j = np.random.randint(0, batch_size)
            sim_pos = torch.nn.functional.cosine_similarity(anchor.unsqueeze(0).expand_as(positives), positives)
            sim_pos = torch.exp(sim_pos / self.temperature)
            sim_neg = torch.nn.functional.cosine_similarity(anchor.unsqueeze(0).expand_as(negatives), negatives)
            sim_neg = torch.exp(sim_neg / self.temperature)
            loss += -torch.log(sim_pos.sum() / sim_neg.sum())
        return loss

In [None]:
class SupConLossFirstImplementation(nn.Module):
    def __init__(self, model: SentenceTransformer,temperature=0.1,base_temperature=0.07):
        super(SupConLoss, self).__init__()
        self.model = model
        # self.class_len = class_len
        self.temperature = temperature
        self.base_temperature = base_temperature

    def forward(self,sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
        embeddings = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
        # output = torch.exp(torch.matmul(embeddings[0], torch.transpose(embeddings[1],0,1))/self.temperature)/torch.sum(torch.exp(torch.matmul(embeddings[0], torch.transpose(embeddings[1],0,1))/self.temperature))
        exp = torch.exp(torch.sum(embeddings[0]*embeddings[1],dim=1)/self.temperature)
        denominator = 0
        for i in labels:
          if i==0:
            denominator = denominator+exp[i.long()]
        if denominator==0:
          print(1)
          denominator = 1
        output = exp/denominator
        return -torch.sum(torch.log(output)*labels.long())/(torch.sum(labels == 1).item())

In [None]:
from sklearn.impute import SimpleImputer
model_name = 'all-mpnet-base-v2'
model = ModifiedSentenceTransformer(model_name)
model.to(device)

epochs = 20
batch_size = 16
random_pairs = 20
positive_pairs = 5

results = []
train_examples = make_training_pairs(X_train,y_train)
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=batch_size)
for epoch in range(epochs):
    train_emb = model.encode(X_train, batch_size=batch_size, show_progress_bar=True, convert_to_numpy=True)
    val_emb = model.encode(X_val, batch_size=batch_size, show_progress_bar=True, convert_to_numpy=True)
    test_emb = model.encode(X_test, batch_size=batch_size, show_progress_bar=True, convert_to_numpy=True)
    df_train = pd.DataFrame(train_emb)
    df_val = pd.DataFrame(val_emb)
    df_test = pd.DataFrame(test_emb)
    df_train.to_csv(f"/content/drive/MyDrive/new/train{epoch}.csv", index=False)
    df_test.to_csv(f"/content/drive/MyDrive/new/test{epoch}.csv", index=False)
    df_val.to_csv(f"/content/drive/MyDrive/new/val{epoch}.csv", index=False)
    #TSNE plots
    F = np.vstack([train_emb, val_emb, test_emb])
    Y = np.hstack((y_train, y_val, y_test))
    X_embedded = TSNE(init='random', n_iter=500).fit_transform(F)

    train_mask = np.arange(0, len(y_train), 1)
    test_mask = np.arange(len(y_train), len(Y), 1)
    L = np.unique(Y)
    plt.figure()
    Xe_train = X_embedded[train_mask, :]
    Xe_test = X_embedded[test_mask, :]
    Y_train = Y[train_mask]
    Y_test = Y[test_mask]
    # color = ['#2ca02c', '#1f77b4', '#d62728', '#ff7f0e']
    # for k in L:
    #     plt.scatter(Xe_test[Y_test == k, 0], Xe_test[Y_test == k, 1], s=5, label=k, marker='*', c=color[int(2*k+1)])
    #     plt.scatter(Xe_train[Y_train == k, 0], Xe_train[Y_train == k, 1], s=1, label=k, marker='o', c=color[int(2*k)])
    # plt.legend(['test-label-0', 'train-label-0', 'test-label-1', 'train-label-1'])
    # plt.savefig('/content/drive/MyDrive/new/' + str(epoch) + '.png')

    # classification
    head,train_acc,val_acc,test_acc,y_true,y_pred = train_head(train_emb,val_emb,test_emb,y_train,y_val,y_test)
    macro_f1 = f1_score(y_true,y_pred,average='macro')
    results.append([macro_f1,train_acc,val_acc])
    print("EPOCH",epoch,"MACRO F1",macro_f1,"Train acc",train_acc,"Val acc",val_acc, "Test acc",test_acc)
    model.fit(train_objectives=[(train_dataloader, SupConLoss(model))], epochs=1)
    model.save('/content/drive/MyDrive/new/'+str(epoch)+".pth")
    model.update_epoch()



Batches:   0%|          | 0/13 [00:00<?, ?it/s]

Batches:   0%|          | 0/28 [00:00<?, ?it/s]

Batches:   0%|          | 0/70 [00:00<?, ?it/s]

tensor([0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
tensor([1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1])
tensor([0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1])
tensor([0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])
tensor([0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1])
tensor([0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1])
tensor([1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
tensor([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0])
tensor([0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0])
tensor([0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1])
tensor([0, 0, 0, 0, 0, 0])
tensor([1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0])
tensor([0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0])
tensor([0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])
tens

NameError: name 'Dataset' is not defined

<Figure size 640x480 with 0 Axes>

In [None]:
class ContrastiveLoss(nn.Module):
  def __init__(
        self,
        model: SentenceTransformer,
        margin: float = 0.5,
        size_average: bool = True,
    ):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        self.model = model
        self.size_average = size_average
  def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
        reps = [self.model(sentence_feature)["sentence_embedding"] for sentence_feature in sentence_features]
        assert len(reps) == 2
        rep_anchor, rep_other = reps
        COSINE_DISTANCE = lambda x, y: 1 - F.cosine_similarity(x, y)
        distances = COSINE_DISTANCE(rep_anchor, rep_other)
        losses = 0.5 * (
            labels.float() * distances.pow(2) + (1 - labels).float() * F.relu(self.margin - distances).pow(2)
        )
        return losses.mean() if self.size_average else losses.sum()