In [None]:
import os, sys
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install sentence_transformers

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sentence_transformers
  Downloading sentence-transformers-2.2.2.tar.gz (85 kB)
[K     |████████████████████████████████| 85 kB 3.1 MB/s 
[?25hCollecting transformers<5.0.0,>=4.6.0
  Downloading transformers-4.25.1-py3-none-any.whl (5.8 MB)
[K     |████████████████████████████████| 5.8 MB 28.6 MB/s 
Collecting sentencepiece
  Downloading sentencepiece-0.1.97-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[K     |████████████████████████████████| 1.3 MB 65.0 MB/s 
[?25hCollecting huggingface-hub>=0.4.0
  Downloading huggingface_hub-0.11.1-py3-none-any.whl (182 kB)
[K     |████████████████████████████████| 182 kB 90.9 MB/s 
Collecting tokenizers!=0.11.3,<0.14,>=0.11.1
  Downloading tokenizers-0.13.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (7.6 MB)
[K     |████████████████████████████████| 7.6 MB 73.0 MB/s 
Building wheels for collected pa

In [None]:
from torch.utils.data import DataLoader
import math
from sentence_transformers import models, losses
from sentence_transformers import LoggingHandler, SentenceTransformer, util, InputExample
from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator
import logging
from datetime import datetime
import sys
import os
import gzip
import csv
import random
import torch
from torch import nn, Tensor
from typing import Iterable, Dict

LOADING DATASETS

In [None]:
#### Just some code to print debug information to stdout
logging.basicConfig(format='%(asctime)s - %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    level=logging.INFO,
                    handlers=[LoggingHandler()])
#### /print debug information to stdout

#Check if dataset exsist. If not, download and extract  it
nli_dataset_path = 'data/AllNLI.tsv.gz'
sts_dataset_path = 'data/stsbenchmark.tsv.gz'

if not os.path.exists(nli_dataset_path):
    util.http_get('https://sbert.net/datasets/AllNLI.tsv.gz', nli_dataset_path)

if not os.path.exists(sts_dataset_path):
    util.http_get('https://sbert.net/datasets/stsbenchmark.tsv.gz', sts_dataset_path)

  0%|          | 0.00/40.8M [00:00<?, ?B/s]

  0%|          | 0.00/392k [00:00<?, ?B/s]

Dataset Preparation

In [None]:
# Read the AllNLI.tsv.gz file and create the training dataset
logging.info("Read AllNLI train dataset")

label2int = {"contradiction": 0, "entailment": 1, "neutral": 2}
train_samples = []
positive_samples = []
other_samples = []
with gzip.open(nli_dataset_path, 'rt', encoding='utf8') as fIn:
    reader = csv.DictReader(fIn, delimiter='\t', quoting=csv.QUOTE_NONE)
    for row in reader:
        if row['split'] == 'train':
            label_id = label2int[row['label']]
            if label_id == 1:
              positive_samples.append(InputExample(texts=[row['sentence1'], row['sentence2']], label=label_id))
            else: 
              other_samples.append(InputExample(texts=[row['sentence1'], row['sentence2']], label=label_id))

random.seed(4)
train_samples.extend(positive_samples)
train_samples.extend(other_samples)
random.shuffle(train_samples)



If fraction of training data needed

In [None]:
'''
uncomment to get fraction of training data
'''
# # select fraction of samples
# def get_sample(x, sample_list):
#   total = math.ceil(len(sample_list)*x)
#   return sample_list[:total]

# random.shuffle(positive_samples)
# random.shuffle(other_samples)
# positive_samples_05 = get_sample(0.05, positive_samples)
# other_samples_05 = get_sample(0.05, other_samples)
# train_samples.extend(positive_samples_05)
# train_samples.extend(other_samples_05)
# random.shuffle(train_samples)


Preparing dev and test splits of STSb

In [None]:
logging.info("Read STSbenchmark dev and test dataset")


dev_samples = []
test_samples = []
with gzip.open(sts_dataset_path, 'rt', encoding='utf8') as fIn:
    reader = csv.DictReader(fIn, delimiter='\t', quoting=csv.QUOTE_NONE)
    for row in reader:
        score = float(row['score']) / 5.0  # Normalize score to range 0 ... 1
        inp_example = InputExample(texts=[row['sentence1'], row['sentence2']], label=score)

        if row['split'] == 'dev':
            dev_samples.append(inp_example)
        elif row['split'] == 'test':
            test_samples.append(inp_example)
    

In [None]:
print('Number of training samples: ', len(train_samples))
print('Number of positive samples: ', len(positive_samples_05))
print('Number of dev samples: ', len(dev_samples))
print('Number of test samples: ', len(test_samples))

Number of training samples:  942069
Number of positive samples:  15716
Number of dev samples:  1500
Number of test samples:  1379


NLI test split

In [None]:
'''
uncomment to get NLI test data
'''
# # Read the AllNLI.tsv.gz file and create the training dataset
# logging.info("Read AllNLI train dataset")

# label2int = {"contradiction": 0, "entailment": 1, "neutral": 2}
# test_s1 = []
# test_s2 = []
# test_labels = []

# with gzip.open(nli_dataset_path, 'rt', encoding='utf8') as fIn:
#     reader = csv.DictReader(fIn, delimiter='\t', quoting=csv.QUOTE_NONE)
#     for row in reader:
#         if row['split'] == 'test':
#             label_id = label2int[row['label']]
#             if label_id == 1 or label_id == 0:
#               test_s1.append(row['sentence1'])
#               test_s2.append(row['sentence2'])
#               test_labels.append(label_id)



Lorentz distance function

In [None]:
def lorentz_dist(u, v, beta = 0.1):
  u0 = torch.sqrt(torch.pow(u,2).sum(-1, keepdim=True) + beta)
  v0 = -torch.sqrt(torch.pow(v,2).sum(-1, keepdim=True) + beta)
  u = torch.cat((u,u0),-1)
  v = torch.cat((v,v0),-1)
  result = - 2 * beta - 2 *torch.sum(u * v, dim=-1)
  return result

Custom loss - our implementation of Uniformity and Alignment losses

In [None]:

class customLoss(nn.Module):
  
  def __init__(self, model: SentenceTransformer, distance_metric = lorentz_dist, align_alpha = 2, unif_t = 1e-10, w_align = 3, w_unif = 1):
        super(customLoss, self).__init__()
        self.distance_metric = distance_metric
        self.model = model
        self.align_alpha = align_alpha
        self.unif_t = unif_t
        self.w_align = w_align
        self.w_unif = w_unif
        
  
  def align_loss(self, distance, alpha):
    return distance.norm(p=2, dim=1).pow(alpha).mean()


  def uniform_loss(self, distance, t):
    return distance.norm(p=2, dim=1).pow(2).mul(-t).exp().mean()

  
  def forward(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
        reps = [self.model(sentence_feature)['sentence_embedding'] for sentence_feature in sentence_features]
        pos_label = (labels == 1)
        indices = pos_label.nonzero().squeeze(1)
        label_list = labels.tolist()
        assert len(reps) == 2

        pos_rep_anchor, pos_rep_other = reps[0][indices], reps[1][indices]
        rep_anchor, rep_other = reps
        distances = self.distance_metric(pos_rep_anchor, pos_rep_other)
        a_dist = self.distance_metric(rep_anchor, rep_other)

        a_loss = self.align_loss(distances.unsqueeze(dim = 1), self.align_alpha)
        u_loss = self.uniform_loss(a_dist.unsqueeze(dim = 1), self.unif_t)

        "Uncomment to use euclidean distance instead of lorentz distance"

        # a_loss = self.align_loss((pos_rep_anchor - pos_rep_other), self.align_alpha)
        # u_loss = self.uniform_loss((rep_anchor - rep_other), self.unif_t)


        t_loss = (self.w_align * a_loss) + (self.w_unif * u_loss)
         
        return t_loss


In [None]:
train_batch_size = 32

train_dataloader = DataLoader(train_samples, shuffle=True, batch_size=train_batch_size)
dev_evaluator = EmbeddingSimilarityEvaluator.from_input_examples(dev_samples, batch_size=train_batch_size, name='sts-dev')
test_evaluator = EmbeddingSimilarityEvaluator.from_input_examples(test_samples, batch_size=train_batch_size, name='sts-test')

'''uncomment to create test evaluator for NLI dataset '''
# from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator, BinaryClassificationEvaluator
# test_evaluator = BinaryClassificationEvaluator(test_s1, test_s2, test_labels, batch_size = train_batch_size)

Basic architecture: Bert-base-uncased + Mean pooling

In [None]:
word_embedding_model = models.Transformer('bert-base-uncased')
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(),
                               pooling_mode_mean_tokens=True,
                               pooling_mode_cls_token=False,
                               pooling_mode_max_tokens=False)
model = SentenceTransformer(modules=[word_embedding_model, pooling_model])



Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
'uncomment to set contrastive loss as objective'
#train_loss = losses.ContrastiveLoss(model = model)
'uncomment to set custom loss as objective'
#train_loss = customLoss(model = model)

num_epochs = 1
warmup_steps = math.ceil(len(train_dataloader) * num_epochs  * 0.1) #10% of train data for warm-up
logging.info("Warmup-steps: {}".format(warmup_steps))

model_save_path = 'output/training_nli_custom-'+datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

model.fit(train_objectives=[(train_dataloader, train_loss)],
          evaluator = dev_evaluator,
          epochs = num_epochs,
          evaluation_steps=int(len(train_dataloader)*0.1),
          warmup_steps = warmup_steps,
          optimizer_class = torch.optim.SGD, 
          optimizer_params = {'lr': 1e-3, 'momentum': 0.9},
          output_path = model_save_path
          )

Epoch:   0%|          | 0/1 [00:00<?, ?it/s]

Iteration:   0%|          | 0/29440 [00:00<?, ?it/s]

In [None]:
model = SentenceTransformer(model_save_path)
model.evaluate(test_evaluator) "testing on STS"

0.2500456713804727

In [None]:
'uncomment to download the model and outputs to local drive - paste appropriate path in the second argument'
#!zip -r /content/bert_customNLI.zip /content/output/training_nli_custom-2022-12-04_15-45-34
# from google.colab import files
# files.download('bert_customNLI.zip')

  adding: content/output/training_nli_custom-2022-12-04_15-45-34/ (stored 0%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/vocab.txt (deflated 53%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/1_Pooling/ (stored 0%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/1_Pooling/config.json (deflated 47%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/modules.json (deflated 53%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/pytorch_model.bin (deflated 7%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/config_sentence_transformers.json (deflated 26%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/sentence_bert_config.json (deflated 4%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/eval/ (stored 0%)
  adding: content/output/training_nli_custom-2022-12-04_15-45-34/eval/similarity_evaluation_sts-dev_results.csv (deflated 51%)
  adding: content/outpu

In [None]:
'uncomment to save the model in personal Gdrive instead'
# model_name = 'bertNLI-customloss_customdist_norm'
# path = F"/content/drive/My Drive/{model_name}" 
# torch.save(model.state_dict(), path)

In [None]:
'uncomment to load the saved model'
# word_embedding_model = models.Transformer('bert-base-uncased')

# # Apply mean pooling to get one fixed sized sentence vector
# pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(),
#                                pooling_mode_mean_tokens=False,
#                                pooling_mode_cls_token=False,
#                                pooling_mode_max_tokens=True)


# model = SentenceTransformer(modules=[word_embedding_model, pooling_model])
#model.load_state_dict(torch.load(path))

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.bias', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
'uncomment to test the model on NLI test'
# model_test_path = 'output/'
# test_evaluator(model, output_path=model_test_path)