In [2]:
from glob import glob
import os
import numpy as np

import random
random.seed(42)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

import cv2
import torch.multiprocessing as mp

import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset

import torch.nn.functional as F
from efficientnet_pytorch import EfficientNet
from tqdm import tqdm


device ='cuda' if torch.cuda.is_available() else 'cpu'

import warnings
warnings.filterwarnings('ignore')


import matplotlib.pyplot as plt
import gc         
mp.set_start_method('spawn', force=True)


In [3]:
train=[]
test=[]
y_train=[]
y_test=[]
fichero='Datasets/GenImage/'
itera=os.walk(fichero)
datasets=next(iter(itera))[1]
for index,dataset in enumerate(datasets):
    direct=fichero+dataset+'/'
    train.append(glob(direct+'train/ai/*.PNG')+glob(direct+'train/ai/*.png'))
    test.append(glob(direct+'val/ai/*.PNG')+glob(direct+'val/ai/*.png'))
    y_train.append([dataset]*len(train[index]))
    y_test.append([dataset]*len(test[index]))

train = [item for sublist in train for item in sublist]
test = [item for sublist in test for item in sublist]
y_train = [item for sublist in y_train for item in sublist]
y_test = [item for sublist in y_test for item in sublist]

label_encoder=LabelEncoder().fit(y_train)
y_train=label_encoder.transform(y_train)
y_test=label_encoder.transform(y_test)


train,val,y_train,y_val = train_test_split(train,y_train,train_size=0.9,stratify=y_train,random_state=5)

len(train),len(val),len(test),len(y_train),len(y_val),len(y_test)


    

(1169999, 130000, 50000, 1169999, 130000, 50000)

In [5]:
for i in tqdm(train, desc=f"Validating Epoch {1 + 1}/{1}"):
    image1 = cv2.imread(i)
    if type(image1) == type(None):
            print(f"Warning: Image at {i}")
            os.remove(i)
    


Validating Epoch 2/1:  18%|█▊        | 205758/1169999 [14:24<1:13:33, 218.49it/s]



Validating Epoch 2/1:  25%|██▍       | 288709/1169999 [20:15<1:00:31, 242.65it/s]



Validating Epoch 2/1:  33%|███▎      | 390098/1169999 [27:19<52:50, 245.96it/s]  



Validating Epoch 2/1:  33%|███▎      | 391316/1169999 [27:24<50:51, 255.16it/s]  



Validating Epoch 2/1:  35%|███▌      | 413209/1169999 [28:56<52:43, 239.22it/s]  



Validating Epoch 2/1:  36%|███▌      | 417375/1169999 [29:13<47:50, 262.19it/s]  



Validating Epoch 2/1:  37%|███▋      | 431953/1169999 [30:14<48:56, 251.30it/s]  



Validating Epoch 2/1:  39%|███▊      | 452264/1169999 [31:39<43:47, 273.17it/s]  



Validating Epoch 2/1:  49%|████▊     | 567738/1169999 [39:43<37:01, 271.06it/s]  



Validating Epoch 2/1:  76%|███████▌  | 883479/1169999 [1:01:51<18:15, 261.61it/s]



Validating Epoch 2/1:  80%|███████▉  | 935248/1169999 [1:05:29<15:51, 246.76it/s]



Validating Epoch 2/1:  82%|████████▏ | 963442/1169999 [1:07:30<16:24, 209.73it/s]



Validating Epoch 2/1:  86%|████████▌ | 1006345/1169999 [1:10:30<11:38, 234.18it/s]



Validating Epoch 2/1:  87%|████████▋ | 1014252/1169999 [1:11:04<10:38, 243.95it/s]



Validating Epoch 2/1:  93%|█████████▎| 1092293/1169999 [1:16:33<05:29, 236.04it/s]



Validating Epoch 2/1: 100%|██████████| 1169999/1169999 [1:22:00<00:00, 237.77it/s]


In [7]:
for i in tqdm(val, desc=f"Validating Epoch {1 + 1}/{1}"):
    image1 = cv2.imread(i)
    if type(image1) == type(None):
            print(f"Warning: Image at {i}")
            os.remove(i)
    del image1

Validating Epoch 2/1:  61%|██████    | 79144/130000 [05:43<03:41, 229.87it/s]



Validating Epoch 2/1: 100%|██████████| 130000/130000 [09:26<00:00, 229.41it/s]


In [8]:
for i in tqdm(test, desc=f"Validating Epoch {1 + 1}/{1}"):
    image1 = cv2.imread(i)
    if type(image1) == type(None):
            print(f"Warning: Image at {i}")
            os.remove(i)
    del image1

Validating Epoch 2/1: 100%|██████████| 50000/50000 [03:41<00:00, 225.44it/s]


In [3]:
def make_pairs(x, y):
    """Creates a tuple containing image pairs with corresponding label.

    Arguments:
        x: List containing images, each index in this list corresponds to one image.
        y: List containing labels, each label with datatype of `int`.

    Returns:
        Tuple containing two numpy arrays as (pairs_of_samples, labels),
        where pairs_of_samples' shape is (2len(x), 2,n_features_dims) and
        labels are a binary array of shape (2len(x)).
    """

    num_classes = max(y) + 1
    digit_indices = [np.where(y == i)[0] for i in range(num_classes)] # 10 vectores con los índices de cada número

    pairs = []
    labels = []
    
    for idx1 in range(len(x)):  # por cada muestra de x_train (o de x_test)
        # add a matching example
        x1 = x[idx1]   # muestra 0,1,2,... 
        label1 = y[idx1] # etiqueta correspondiente a la muestra 0,1,2,....
        idx2 = random.choice(digit_indices[label1]) # escogemos al azar una muestra con la misma etiqueta (label1)
        x2 = x[idx2]

        pairs += [[x1, x2]]  # una muestra para Siamese Network, con un par de imágenes que representan el mismo número
        labels += [0] # 0 aquí indica muestra con imágenes de la misma categoría

        # add a non-matching example
        label2 = random.randint(0, num_classes - 1)
        while label2 == label1: # hasta que salga un número diferente a label1
            label2 = random.randint(0, num_classes - 1)

        idx2 = random.choice(digit_indices[label2]) # tomamos una muestra al azar de entre las imágenes del número label2
        x2 = x[idx2]

        pairs += [[x1, x2]] # una muestra para Siamese Network, con un par de imágenes que representan el mismo número
        labels += [1] # 1 aquí indica muestra con imágenes de la misma categoría

    return np.array(pairs), np.array(labels).astype("float32") # vectores de muestras (de entrenamiento o testeo)

In [4]:
train,y_train=make_pairs(train, y_train)
val,y_val=make_pairs(val, y_val)
test,y_test=make_pairs(test, y_test)

train.shape,y_train.shape,val.shape,y_val.shape,test.shape,y_test.shape


((2339968, 2), (2339968,), (259998, 2), (259998,), (100000, 2), (100000,))

In [5]:
class images_Dataset(Dataset):
    def __init__(self,route_images,classes,device):
        self.route_images=route_images
        self.classes=classes
        self.device=device
    def __len__(self):
        return len(self.route_images)
    def __getitem__(self, index):
        im1,im2=self.route_images[index]
        
        y=self.classes[index]

        #load images
        image1 = cv2.imread(im1)
        image2 = cv2.imread(im2)


        if type(image1) == type(None):
            print(f"Warning: Image at {im1}")
            os.remove(im1)
        
        if  type(image2) == type(None):
            print(f"Warning: Image at {im2}")
            os.remove(im2)

        #to RGB
        try:
            image1 = cv2.cvtColor(image1, cv2.COLOR_BGR2RGB)
        except Exception as e:
            print(im1)

        try:
            image2 = cv2.cvtColor(image2, cv2.COLOR_BGR2RGB)
        except Exception as e:
            print(im2)


        
        #scale images to [0,1]
        image1=image1/255.0
        image2=image2/255.0

        #Rescale resolution to 256,256
        image1 = cv2.resize(image1, (256, 256))
        image2 = cv2.resize(image2, (256, 256))

        #Permute dimensions
        image1=np.transpose(image1,[2,0,1])
        image2=np.transpose(image2,[2,0,1])

        image1=torch.from_numpy(image1).to(self.device)
        image2=torch.from_numpy(image2).to(self.device)

        image1=image1.to(torch.float32)
        image2=image2.to(torch.float32)

        y=torch.tensor(y).to(torch.float32).to(self.device)

        return image1,image2, y #devuelve los datos cargados y las clases

train_dataset=images_Dataset(train[:50000],y_train[:50000],device)
train_dataloader=DataLoader(train_dataset,batch_size=128,shuffle=True,num_workers=4)

val_dataset=images_Dataset(val[:10000],y_val[:10000],device)
val_dataloader=DataLoader(val_dataset,batch_size=64,shuffle=True,num_workers=4)

test_dataset=images_Dataset(test[:5000],y_test[:5000],device)
test_dataloader=DataLoader(test_dataset,batch_size=64,shuffle=True)

del train,val,test,y_train,y_test,y_val,train_dataset,val_dataset,test_dataset
gc.collect()
torch.cuda.empty_cache() 

In [6]:
def visualize(images, labels):
    """Creates a plot of pairs and labels, and prediction if it's test dataset.

    Arguments:
        pairs: Numpy Array, of pairs to visualize, having shape
               (Number of pairs, 2, 28, 28).
        to_show: Int, number of examples to visualize (default is 6)
                `to_show` must be an integral multiple of `num_col`.
                 Otherwise it will be trimmed if it is greater than num_col,
                 and incremented if if it is less then num_col.
        num_col: Int, number of images in one row - (default is 3)
                 For test and train respectively, it should not exceed 3 and 7.
        predictions: Numpy Array of predictions with shape (to_show, 1) -
                     (default is None)
                     Must be passed when test=True.
        test: Boolean telling whether the dataset being visualized is
              train dataset or test dataset - (default False).

    Returns:
        None.
    """
    batch_size = images.shape[0]
    fig, axes = plt.subplots(batch_size, 2, figsize=(10, 5 * batch_size))

    # Loop through the batch and plot each pair of images
    for i in range(batch_size):
        # Get the pair of images for this batch
        for j in range(2):  # Two images in each pair
            image = images[i, j].transpose(1, 2, 0)  # Change shape from (channels, height, width) to (height, width, channels)

            # Plot the image
            ax = axes[i, j]
            ax.imshow(image)
            ax.axis('off')  # Remove axis for better visualization

        # Add title with label at the top of each pair (above the two images)
        ax_center = axes[i, 0]  # You can choose either of the axes in the pair, here I used the left image (axes[i, 0])
        ax_center.text(1.07, 1, f'Label: {labels[i]}', color='black', fontsize=19, ha='center', va='bottom', transform=ax_center.transAxes)


    # Adjust layout and show the plot
    plt.tight_layout()
    plt.show()

a,b,c=next(iter(train_dataloader))
a=np.array(a.to('cpu'))
b=np.array(b.to('cpu'))
c=np.array(c.to('cpu'))

visualize(np.array([a[:4,:,:,:],b[:4,:,:,:]]).transpose(1,0,2,3,4), c[:4])

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'images_Dataset' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>


KeyboardInterrupt: 

In [7]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self,out1, out2, labels, margin=1.0):
        """
        Contrastive loss function.
        
        Args:
        - out1: The embedding of the first image in the pair.
        - out2: The embedding of the second image in the pair.
        - labels: A tensor of labels (0 for related, 1 for unrelated).
        - margin: The margin that separates positive and negative pairs.
        
        Returns:
        - The contrastive loss.
        """
        # Compute the Euclidean distance between the embeddings

        euclidean_distance = F.pairwise_distance(out1, out2, p=2)
        
        # Calculate the contrastive loss
        loss = torch.mean((1 - labels) * torch.pow(euclidean_distance, 2) + 
                        (labels) * torch.pow(torch.clamp(margin - euclidean_distance, min=0.0), 2))
        
        return loss
    
class Contrastive_loss(nn.Module):
    """Provides 'contrastive_loss' an enclosing scope with variable 'margin'.

    Arguments:
        margin: Integer, defines the baseline for distance for which pairs
                should be classified as dissimilar. - (default is 1).

    Returns:
        'contrastive_loss' function with data ('margin') attached.
    """

    # Contrastive loss = mean( (1-true_value) * square(prediction) +
    #                         true_value * square( max(1-prediction, 0) ))
    def __init__(self,margin=1):
        super(Contrastive_loss,self).__init__()
        self.margin=margin

        
    def forward(self,y_true, y_pred):
        """Calculates the contrastive loss.

        Arguments:
            y_true: List of labels, each label is of type float32.
            y_pred: List of predictions of same length as of y_true,
                    each label is of type float32.

        Returns:
            A tensor containing contrastive loss as floating point value.
        """

        # square_pred = torch.square(y_pred)

        # # Calculate the margin squared term
        # margin_square = torch.square(torch.maximum(self.margin - y_pred, torch.tensor(0.0)))

        # # Contrastive loss formula
        # loss = torch.mean((1 - y_true) * square_pred + y_true * margin_square)

        # return loss
        loss = torch.mean((1 - y_true) * torch.square(y_pred) + y_true * torch.square(torch.maximum(self.margin - y_pred, torch.zeros_like(y_pred))))
        return loss
class CLIP_Loss(nn.Module):
    def __init__(self):
        super().__init__()
    def forward(emb_im1, emb_im2,temperature=1.0):
        # logits[i][j] is the dot_similarity(caption_i, image_j).
        logits = torch.matmul(emb_im1, emb_im2.T) / temperature

        # images_similarity[i][j] is the dot_similarity(image_i, image_j).
        images_similarity = torch.matmul(emb_im2, emb_im2.T)

        # captions_similarity[i][j] is the dot_similarity(caption_i, caption_j).
        captions_similarity = torch.matmul(emb_im1, emb_im1.T)

        # targets[i][j] = average dot_similarity(caption_i, caption_j) and dot_similarity(image_i, image_j).
        targets = F.softmax((captions_similarity + images_similarity) / (2 * temperature), dim=-1)

        # Compute the loss for the captions using crossentropy
        captions_loss = F.cross_entropy(logits, targets.argmax(dim=-1), reduction='mean')

        # Compute the loss for the images using crossentropy
        images_loss = F.cross_entropy(logits.T, targets.argmax(dim=-1), reduction='mean')

        # Return the mean of the loss over the batch.
        return (captions_loss + images_loss) / 2


    
class siamese_model(nn.Module):
    def __init__(self,type,device):
        super().__init__()
        self.device=device
        self.type=type
        self.efficientNet = EfficientNet.from_name(type,3)  # Initialize from scratch
        # Modify the classifier (output layer) of EfficientNet

        self.efficientNet._fc = nn.Linear(1280,512)
        self.norm=nn.BatchNorm1d(512)

        # self.linear=nn.Linear(1280*2,2)
        # self.softmax=nn.Softmax(dim=1)
        

    def euclidean_distance(self,vect1,vect2):
        """Find the Euclidean distance between two vectors.

        Arguments:
            vects: List containing two tensors of same length.

        Returns:
            Tensor containing euclidean distance
            (as floating point value) between vectors.
        """

        sum_square = torch.sum(torch.square(vect1 - vect2), dim=1, keepdim=True)
        
        # Return the square root of the sum of squares with numerical stability
        epsilon = torch.tensor(torch.finfo(vect1.dtype).eps)  # Small value to avoid sqrt(0)
        return torch.sqrt(torch.maximum(sum_square, epsilon))

    def forward(self,im1,im2):
        emb1=self.efficientNet(im1)
        emb2=self.efficientNet(im2)
        del im2, im1
        gc.collect()
        torch.cuda.empty_cache()

        emb1=self.norm(emb1)
        emb2=self.norm(emb2)

        # embs=torch.concat([emb1,emb2],dim=1).to(self.device)
        # out=self.linear(embs)
        # out=self.softmax(out)
        # del embs,emb1,emb2
        # gc.collect()
        # torch.cuda.empty_cache()
        return emb1,emb2
    
    



In [8]:
model=siamese_model('efficientnet-b0',device).to(device)
criterion = ContrastiveLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)


In [10]:
EPOCHS=100
train_loss=[]
train_accuracy=[]
best=9999999.0
val_loss=[]
val_accuracy=[]
for epoch in range(EPOCHS):
    # Set model to training mode
    model.train()

    running_loss = 0.0
    correct = 0
    total = 0

    # Training loop
    for image1, image2, label in tqdm(train_dataloader, desc=f"Training Epoch {epoch + 1}/{EPOCHS}"):

        optimizer.zero_grad()

        # Forward pass
        pred1,pred2 = model(image1, image2)

        del image1,image2
        gc.collect()
        torch.cuda.empty_cache()

        # _,pred=torch.max(pred,1)
        # pred = pred.detach().to(torch.float).requires_grad_(True).to(device)


        # Calculate loss
        loss = criterion(pred1,pred2,label)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Track running loss and accuracy
        running_loss += loss.item()

        # total += label.size(0)
        # correct += (pred == label).sum().item()
        del label, loss,pred1,pred2
        gc.collect()
        torch.cuda.empty_cache()

    # Calculate training loss and accuracy
    train_loss_value = running_loss / len(train_dataloader)
    # train_accuracy_value = 100 * correct / total

    train_loss.append(train_loss_value)
    # train_accuracy.append(train_accuracy_value)

    # Set model to evaluation mode
    model.eval()

    # Initialize validation stats
    running_loss = 0.0
    val_correct = 0
    val_total = 0

    # Validation loop
    with torch.no_grad():
        for image1, image2, label in tqdm(val_dataloader, desc=f"Validating Epoch {epoch + 1}/{EPOCHS}"):

            # Forward pass
            pred1,pred2 = model(image1, image2)
            del image1,image2
            gc.collect()
            torch.cuda.empty_cache()

            # _,pred=torch.max(pred,1)
            # pred = pred.detach().to(torch.float).requires_grad_(True).to(device)


            # Calculate loss
            loss = criterion(pred1,pred2,label)

            # Track running loss and accuracy
            running_loss += loss.item()

            val_total += label.size(0)
            # val_correct += (pred == label).sum().item()
            del pred1,pred2,label,loss
            gc.collect()
            torch.cuda.empty_cache()

    # Calculate validation loss and accuracy
    val_loss_value = running_loss / len(val_dataloader)
    # val_accuracy_value = 100 * val_correct / val_total

    val_loss.append(val_loss_value)
    # val_accuracy.append(val_accuracy_value)

    if val_loss_value <= best:
        best=val_loss_value
        checkpoint = {
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
            }
        torch.save(checkpoint, 'Contrastive_model.pth')

    print()
    print('-'*60)
    # Print results for the epoch
    print(f"Epoch [{epoch + 1}/{EPOCHS}]")
    print(f"Train Loss: {train_loss_value:.4f}")
    print(f"Val Loss: {val_loss_value:.4f}")
    print('-'*60)
    print()



Training Epoch 1/100:   0%|          | 0/391 [00:00<?, ?it/s]Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'images_Dataset' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Training Epoch 1/100:   0%|          | 0/391 [2:13:50<?, ?it/s]


KeyboardInterrupt: 