In [491]:
# Importing necessary libraries
import torch
import math
from torch import linalg as LA
import torch.nn as nn
import torch.nn.functional as F
import numpy as np 
import time
import random

In [420]:
## Contrastive Loss Module (old/inefficient functions—they are correct but use things like for loops instead of vectorization)

# Define the similarity function: Dot product divided by L2 norm of vectors, with temperature term that is default to 1

# sim(i, j) = torch.dot(i, j) / T * |i| * |j|

# My sim function incorporates a Temperature constant that can be changed. For regular cosine sim, temp = 1.

def sim(i, j, T=1):
    sim = torch.dot(i, j) / (T * LA.vector_norm(i) * LA.vector_norm(j)) 
    return sim


# pair_loss = sim(i, j) / Σ(k = 0--> 2N - 1, k != i)[sim(i, k)] 

def pair_loss(images_tensor, i, j):
    # Defining the numerator and denominator terms
    numer = torch.exp(sim(images_tensor[i], images_tensor[j])).float()
    denom = torch.zeros(1)
    
    # Calculating the denominator term
    for index in range(len(images_tensor)):
        if index == i:
            continue
        else:
            denom += torch.exp(sim(images_tensor[i], images_tensor[index]))
            
    # Combining the numerator and denominator terms, and finally applying the -log to get loss
    contr_loss = numer / denom
    contr_loss = -torch.log(contr_loss)
    return contr_loss

def new_pair_loss(sim_matrix, i, j):
    # Defining the numerator and denominator terms
    numer = torch.exp(sim_matrix[i, j]).float()
    denom = torch.exp(sim_matrix[i]).sum() - torch.exp(sim_matrix[i, i])
            
    # Combining the numerator and denominator terms, and finally applying the -log to get loss
    contr_loss = numer / denom
    contr_loss = -torch.log(contr_loss)
    return contr_loss


# contrastive_loss = (1/number_of_images) * Σ(i = 0 --> 2N - 2)[pair_loss(images_tensor, i, i + 1) + pair_loss(images_tensor, i + 1, i)]

def total_loss(images_tensor):
    batch_size = len(images_tensor) # instead of doing 1 / 2 * num_pairs, we can just do 1 / batch_size
    first_term = 1 / batch_size
    summation_term = torch.zeros(1)
    # Calculating the summation term
    for index in range(0, len(images_tensor), 2):
        summation_term += pair_loss(images_tensor, index, index + 1) + pair_loss(images_tensor, index + 1, index)
    total_loss = first_term * summation_term
    return total_loss

In [421]:
# Sanity Checks
t = torch.ones(5) 
u = torch.ones(5)
test = sim(t, u)               

i = torch.ones(5) * -1
j = torch.ones(5)
test1 = sim(i, j)

h = torch.tensor((0,1)).float()
k = torch.tensor((1, 0)).float()
test2 = sim(h, k)

r = torch.randn(10).float()
s = torch.randn(10).float()
test3 = sim(r, s)

test, test1, test2, test3

(tensor(1.), tensor(-1.), tensor(0.), tensor(0.0156))

In [422]:
# Comparing with PyTorch CosineSimilarity
cos = torch.nn.CosineSimilarity(dim=0, eps=1e-6) # For 1-D tensors, we compute cosine similarity across dim=0
testPy = cos(t, u)
test1Py = cos(i, j)
test2Py = cos(h, k)
test3Py = cos(r, s)
testPy, test1Py, test2Py, test3Py

(tensor(1.0000), tensor(-1.0000), tensor(0.), tensor(0.0156))

In [423]:
# Testing out pair_loss. First I make a list of tensors to pass into the pair_loss function
ls = [t, u, i, j] # Putting tensors all into one list
ls_tensor = torch.stack(ls, dim=0) # Stacking the tensors into a new tensor

In [424]:
contr_test = pair_loss(ls, 0, 1)
contr_test_tensor = pair_loss(ls_tensor, 0, 1)
contr_test, contr_test_tensor

(tensor([0.7586]), tensor([0.7586]))

In [425]:
# Testing out the total_loss function. 
tot_loss = total_loss(ls)
tot_loss

tensor([1.3436])

In [426]:
# Doing more sanity checks on the total_loss function. First case will be high loss case, second case will be low loss case
a = torch.tensor((-1, -1)).float()
b = torch.tensor((1, 1)).float()
c = torch.tensor((-1, -1)).float()
d = torch.tensor((1, 1)).float()

w = torch.tensor((1, 1)).float()
x = torch.tensor((1, 1)).float()
y = torch.tensor((-1, -1)).float()
z = torch.tensor((-1, -1)).float()

In [427]:
ls_high_loss = [a, b, c, d]
ls_low_loss = [w, x, y, z]

In [428]:
high_loss = total_loss(ls_high_loss)
low_loss = total_loss(ls_low_loss)

In [429]:
high_loss

tensor([2.2395])

In [430]:
low_loss

tensor([0.2395])

In [431]:
# We get the expected result that the low_loss case, which has positive pairs that are similar to each other and 
# dissimilar from the rest, has a significantly lower loss than the high_loss case. 

In [432]:
ls_high_loss_tensor = torch.stack(ls_high_loss, dim=0)
ls_low_loss_tensor = torch.stack(ls_low_loss, dim=0)

In [433]:
high_loss_tensor = total_loss(ls_high_loss_tensor)
low_loss_tensor = total_loss(ls_low_loss_tensor)

In [434]:
high_loss_tensor

tensor([2.2395])

In [435]:
low_loss_tensor

tensor([0.2395])

In [436]:
# Final versions of Contrastive Loss Functions

# Pre-compute all similarities to index into later. T = temperature term 
def sim_matrix(images_tensor, T=1):
    sim_mat = torch.matmul(images_tensor, images_tensor.T) / (T * torch.matmul(LA.vector_norm(images_tensor, dim=1, ord=2, keepdim=True), 
                                                                               LA.vector_norm(images_tensor.T, dim=0, ord=2, keepdim=True)))
    return sim_mat

# Lots of un-needed information here--only care about pair_loss for the positive pairs(0,1 2,3 4,5 etc)
def pair_loss_matrix(sim_matrix):
    numer = torch.exp(sim_matrix).float()
    denom_row = torch.exp(sim_matrix).sum(dim=1) 
    denom_ii = torch.exp(torch.diagonal(sim_matrix, 0)).float()
    denom = denom_row.sub(denom_ii.reshape(1, -1))
    
    pair_loss_matrix = numer / denom
    pair_loss_matrix = -torch.log(pair_loss_matrix)
    pair_loss_matrix = pair_loss_matrix.T 
    return pair_loss_matrix

def total_contrastive_loss(pair_loss_matrix):
    batch_size = len(pair_loss_matrix)
    first_term = 1 / batch_size
    summation_term = torch.zeros(1)
    
    # Calculating summation term (I don't know how to get rid of this for-loop)
    for i in range(0, len(pair_loss_matrix), 2):
        summation_term += pair_loss_matrix[i, i+1] + pair_loss_matrix[i+1, i]
    
    total_contrastive_loss = first_term * summation_term
    return total_contrastive_loss

In [437]:
# Put in matrix form, get it working with a small CNN(grayscale), 

# 4 images, global average pool to get vector representation
# 4 x 1 x 12 x 12 (2D convolutions expect a channel representation)
# unsqueeze for input
# .squeeze
# pytorch.conv2D
# 2 convolutions then pool 


In [438]:
# Initializing my test images/tensors

a = torch.randn(4, 1, 32, 32) # This represents a tensor with 4 images, each
# is 12x12, with one channel(Conv2D) expects a channel representation.
a[0] = torch.ones(1, 32, 32)
a[1] = a[0] * 1.5

In [439]:
# Using tensors coming from normal distribution 
c0 = torch.normal(mean=0, std=1, size=(32,32))
c1 = torch.normal(mean=0, std=1, size=(32,32))
c2 = torch.normal(mean=1, std=1, size=(32,32))
c3 = torch.normal(mean=1, std=1, size=(32,32))
c = torch.stack((c0, c1, c2, c3), dim=0)
c = torch.unsqueeze(c, 1) # Adding in a dimension because the ConvNet expects a dimension for channel


In [440]:
c.shape # This is the correct shape

torch.Size([4, 1, 32, 32])

In [441]:
test0 = c0.view(1, -1).squeeze()
test1 = c1.view(1, -1).squeeze()
test2 = c2.view(1, -1).squeeze()
test3 = c3.view(1, -1).squeeze()

In [450]:
sim(test0, test3)

tensor(-0.0481)

In [451]:
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, 5) # 1 input channel, 3 output, 5x5 kernel_size
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(3, 16, 5) # input channel = 3, output = 16, kernel_size = 5x5
        self.fc1 = nn.Linear(16*5*5, 80) # Converting into linear layer
        self.fc2 = nn.Linear(80, 40)
    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16*5*5)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [452]:
# Initializing the ConvNet Model
model = ConvNet()

In [453]:
# Initializing hyperparameters and choice of optimizer, etc
learning_rate = 0.1
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

In [469]:
# Do a training loop example, update the network, and see how loss changes over time.
# Use random normal distributions. Have 2 with mean = 1, and 2 with mean = 0

outputs = model(c)
simi_matrix = sim_matrix(outputs)
pair_matrix = pair_loss_matrix(simi_matrix)
loss = total_contrastive_loss(pair_matrix)
optimizer.zero_grad()
loss.backward()
optimizer.step()

In [470]:
loss

tensor([0.2989], grad_fn=<MulBackward0>)

In [471]:
simi_matrix

tensor([[ 1.0000,  0.9936, -0.7556, -0.7555],
        [ 0.9936,  1.0000, -0.7518, -0.7513],
        [-0.7556, -0.7518,  1.0000,  0.9948],
        [-0.7555, -0.7513,  0.9948,  1.0000]], grad_fn=<DivBackward0>)

In [457]:
outputs

tensor([[-0.2629,  0.0681, -0.0458,  0.2901,  0.0620, -0.1511, -0.1972, -0.1514,
          0.0325, -0.2544, -0.0863, -0.1450, -0.0695, -0.0087, -0.2168,  0.0933,
         -0.0544, -0.2334, -0.2493, -0.0604,  0.1137,  0.0458, -0.0370,  0.1339,
         -0.1678, -0.0253,  0.1009,  0.1556, -0.0202,  0.1524, -0.1143,  0.0019,
          0.1305, -0.0919,  0.2155,  0.0052,  0.1132,  0.0717,  0.1444,  0.0371],
        [-0.2642,  0.0097, -0.0591,  0.2470,  0.0571, -0.2071, -0.2574, -0.2841,
         -0.0575, -0.3616, -0.0820, -0.1122, -0.1293, -0.0222, -0.1608,  0.0834,
         -0.0235, -0.2164, -0.1427, -0.0820,  0.1317,  0.0004,  0.0323,  0.1402,
         -0.1589, -0.0373,  0.0077,  0.1627, -0.0200,  0.1851, -0.1127, -0.0360,
          0.1606, -0.1819,  0.2623,  0.1147,  0.1427,  0.0270,  0.1122,  0.0993],
        [-0.2402,  0.0808, -0.0065,  0.2973, -0.0603, -0.2012, -0.2092, -0.2340,
         -0.0126, -0.2514,  0.0758, -0.0837, -0.1252, -0.0264, -0.2593,  0.0217,
          0.0435, -0.1101,

In [458]:
c

tensor([[[[-1.4442e+00,  1.7939e+00, -1.6226e+00,  ..., -7.4788e-01,
           -1.3616e+00,  1.9252e-01],
          [-7.3228e-01, -7.7949e-01,  2.3209e-01,  ...,  4.4246e-01,
            1.3686e+00, -8.8945e-01],
          [-2.8290e-02,  1.6922e+00,  9.5502e-01,  ...,  1.2939e+00,
            1.1230e+00, -3.1070e-01],
          ...,
          [-6.9299e-01, -7.1403e-01,  1.0624e+00,  ..., -2.4580e-01,
            1.7204e-01, -1.6623e+00],
          [ 1.4962e+00, -1.1881e+00,  1.4085e+00,  ...,  2.4769e-02,
           -1.1320e+00, -1.1743e+00],
          [ 1.2182e+00, -1.7938e+00,  6.4066e-01,  ..., -7.4276e-01,
            1.0765e+00,  3.2862e-01]]],


        [[[ 2.5773e-01,  2.2596e+00,  2.4385e-01,  ...,  2.1024e+00,
            1.3358e-01,  1.2560e+00],
          [ 3.4708e-01, -9.8906e-02,  7.0650e-01,  ...,  1.0119e+00,
            9.2819e-01, -3.7837e-01],
          [ 3.4117e-01, -1.7430e-02, -2.5657e-02,  ..., -1.1083e+00,
            7.9414e-01,  1.9632e+00],
          ...,
   

In [474]:
pair_matrix

tensor([[0.2921, 0.2985, 2.0477, 2.0476],
        [0.2995, 0.2932, 2.0450, 2.0444],
        [2.0491, 2.0453, 0.2935, 0.2987],
        [2.0491, 2.0448, 0.2988, 0.2935]], grad_fn=<PermuteBackward0>)

In [411]:
# Turn into proper training loop(prioritize)

# Functions: predict(calls model(x)), train_one_step, train_one_epoch, train

# Switch gears into dataloader (pseudocode first)

In [630]:
def predict(data, model):
    outputs = model(data)
    return outputs

def train_one_step(batch, model, optimizer):
    outputs = predict(batch, model)
    simi_matrix = sim_matrix(outputs)
    pair_matrix = pair_loss_matrix(simi_matrix)
    loss = total_contrastive_loss(pair_matrix)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss
 
def train_one_epoch(dataloader, model, optimizer): # Take in dataloader
    for batch in dataloader:
        currentLoss = train_one_step(batch, model, optimizer)
        random.shuffle(dataloader) # shuffle the dataloader for training
    return currentLoss

def train(dataloader, model, optimizer, num_epochs):
    for epoch in range(num_epochs):
        trainingLoss = train_one_epoch(dataloader, model, optimizer)
    return trainingLoss

In [631]:
# Construct batch, copy it and put into list, then this is dataloader 

In [676]:
# Testing predict function
predict(dataloader[0], model)

tensor([[-0.0899,  0.0754,  0.2073,  0.1371, -0.2054,  0.1405, -0.1768, -0.1879,
          0.1793, -0.0022,  0.0676,  0.0168,  0.1717,  0.0535,  0.0563,  0.1734,
         -0.0268,  0.2787, -0.0393, -0.0943, -0.1901,  0.0461, -0.0285, -0.0453,
         -0.2318,  0.0895, -0.0535, -0.0448,  0.0019, -0.0919,  0.1602, -0.1038,
         -0.1382, -0.2871, -0.2087, -0.1387,  0.0241,  0.0600,  0.1741,  0.2057],
        [-0.1115,  0.1133,  0.2121,  0.1573, -0.1732,  0.1522, -0.1369, -0.1641,
          0.1176, -0.0495,  0.0854,  0.0180,  0.1427,  0.1012,  0.0474,  0.2207,
         -0.0076,  0.2919, -0.0223, -0.0317, -0.1766,  0.0648,  0.0331, -0.0787,
         -0.1565,  0.0978, -0.0820, -0.1387, -0.0033, -0.0718,  0.1993, -0.0794,
         -0.0733, -0.2748, -0.1965, -0.0796,  0.0931,  0.1266,  0.1559,  0.1854],
        [-0.0582,  0.0375,  0.1223,  0.0973, -0.1881,  0.1248, -0.0802, -0.1989,
          0.0731, -0.0773,  0.1191, -0.0032,  0.0825,  0.0677,  0.0333,  0.1184,
          0.0268,  0.2249,

In [677]:
# Using tensors coming from normal distribution 
t0 = torch.normal(mean=0, std=1, size=(32,32))
t1 = torch.normal(mean=0, std=1, size=(32,32))
t2 = torch.normal(mean=1, std=1, size=(32,32))
t3 = torch.normal(mean=1, std=1, size=(32,32))
t = torch.stack((t0, t1, t2, t3), dim=0)
t = torch.unsqueeze(t, 1)

In [678]:
u0 = torch.normal(mean=0, std=1, size=(32,32))
u1 = torch.normal(mean=0, std=1, size=(32,32))
u2 = torch.normal(mean=1, std=1, size=(32,32))
u3 = torch.normal(mean=1, std=1, size=(32,32))
u = torch.stack((u0, u1, u2, u3), dim=0)
u = torch.unsqueeze(u, 1)

In [679]:
v0 = torch.normal(mean=0, std=1, size=(32,32))
v1 = torch.normal(mean=0, std=1, size=(32,32))
v2 = torch.normal(mean=1, std=1, size=(32,32))
v3 = torch.normal(mean=1, std=1, size=(32,32))
v = torch.stack((v0, v1, v2, v3), dim=0)
v = torch.unsqueeze(v, 1)

In [680]:
w0 = torch.normal(mean=0, std=1, size=(32,32))
w1 = torch.normal(mean=0, std=1, size=(32,32))
w2 = torch.normal(mean=1, std=1, size=(32,32))
w3 = torch.normal(mean=1, std=1, size=(32,32))
w = torch.stack((w0, w1, w2, w3), dim=0)
w = torch.unsqueeze(w, 1)

In [681]:
dataloader = [t, u, v, w]

In [682]:
testModel = ConvNet()
test_learning_rate = 0.1
testOptimizer = torch.optim.SGD(testModel.parameters(), lr=test_learning_rate)

In [683]:
train_one_step(dataloader[0], testModel, testOptimizer)

tensor([1.0799], grad_fn=<MulBackward0>)

In [684]:
train_one_epoch(dataloader, testModel, testOptimizer)

tensor([0.6643], grad_fn=<MulBackward0>)

In [685]:
train(dataloader, testModel, testOptimizer, 100)

tensor([0.2396], grad_fn=<MulBackward0>)