<a href="https://colab.research.google.com/github/domingues100/IEEE---Water_Level/blob/main/SN2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **IMPORTS**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.nn.functional as F
from torch.optim.lr_scheduler import ReduceLROnPlateau
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
from skimage import io
from torchvision import transforms, models
import matplotlib.pyplot as plt
import os
import numpy as np
import torchvision
import random
import torchvision.models as models
import torch.optim as optim
from torch.autograd import Variable
import torch.hub
from itertools import product, combinations
from sklearn.model_selection import train_test_split
import gc
import shutil
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import json
import statistics

# **PARAMETERS**

In [None]:
batch_size = 16
learning_rate = 0.001
num_epochs = 250
validation_ratio = 0.2
margin = 1.4
random.seed(42)


input_image = #images path
input_csv = #csv path

output_folder = #desired output folder for pairs
save_path =  #saving path


train_dir = output_folder
test_dir = train_dir

# **CREATE PAIRS**

In [None]:
def create_pairs(df, digit_indices, num_classes):
    pairs = []
    labels = []
    n = min([len(digit_indices[d]) for d in range(num_classes)]) - 1
    for d in range(num_classes):
        for i in range(n):

            z1, z2 = digit_indices[d][i], digit_indices[d][i + 1]
            pairs += [[df['pic'][z1], df['pic'][z2]]]
            labels += [0]

            inc = random.randrange(1, num_classes)
            dn = (d + inc) % num_classes
            z1, z2 = digit_indices[d][i], digit_indices[dn][i]
            pairs += [[df['pic'][z1], df['pic'][z2]]]
            labels += [1]
    return np.array(pairs), np.array(labels)


def copiar_imagens(origem, destino):

  if os.path.exists(output_folder):
    shutil.rmtree(output_folder)

  os.makedirs(output_folder, exist_ok=True)

  arquivos = os.listdir(origem)

  for arquivo in arquivos:
      if arquivo.lower().endswith(('.png', '.jpg', '.jpeg')):
          caminho_origem = os.path.join(origem, arquivo)
          caminho_destino = os.path.join(destino, arquivo)

          shutil.copyfile(caminho_origem, caminho_destino)


def create_images_pairs():
  copiar_imagens(input_image, output_folder)

  df = pd.read_csv(input_csv)
  df = df.rename(columns={'id': 'pic'})
  df = df.rename(columns={'label': 'Bars_above'})
  train_df, test_df = train_test_split(df, test_size=0.2, stratify=df['Bars_above'])
  train_df["Bars_above"] = train_df["Bars_above"].apply(lambda x: x - 3)
  test_df["Bars_above"] = test_df["Bars_above"].apply(lambda x: x - 3)
  train_df.to_csv(save_path + "train_df")
  test_df.to_csv(save_path + "teste_df")

  num_classes = 5

  train_digit_indices = [train_df.index[train_df['Bars_above'] == i].tolist() for i in range(num_classes)]
  test_digit_indices = [test_df.index[test_df['Bars_above'] == i].tolist() for i in range(num_classes)]

  train_pairs, train_labels = create_pairs(train_df, train_digit_indices, num_classes)
  test_pairs, test_labels = create_pairs(test_df, test_digit_indices, num_classes)

  df_to_save = pd.DataFrame({'Grupo 1': train_pairs[:, 0], 'Grupo 2': train_pairs[:, 1], 'Match': train_labels})
  df_to_save_teste = pd.DataFrame({'Grupo 1': test_pairs[:, 0], 'Grupo 2': test_pairs[:, 1], 'Match': test_labels})

  df_to_save.to_csv(output_folder + 'formated_train.csv', index=False)
  df_to_save_teste.to_csv(output_folder +'formated_test.csv', index=False)

# **NETWORK TRAINING**

In [None]:
class SiameseDataset(Dataset):
    def __init__(self, training_csv=None, training_dir=None, transform=None):
        self.train_df = pd.read_csv(training_csv)
        self.train_dir = training_dir
        self.transform = transform

    def __getitem__(self, index):
        image1_path = os.path.join(self.train_dir, str(self.train_df.iloc[index, 0]))
        image2_path = os.path.join(self.train_dir, str(self.train_df.iloc[index, 1]))

        img0 = Image.open(image1_path).convert('RGB')
        img1 = Image.open(image2_path).convert('RGB')

        if self.transform is not None:
            img0 = self.transform(img0)
            img1 = self.transform(img1)

        img0 = img0
        img1 = img1

        label = torch.tensor(int(self.train_df.iloc[index, 2]))

        return img0, img1, label

    def __len__(self):
        return len(self.train_df)

class BaseNetwork(nn.Module):
    def __init__(self, dropout_rate):
        super(BaseNetwork, self).__init__()

        self.resnet = models.resnet50(pretrained=True)

        for param in self.resnet.parameters():
            param.requires_grad = False

        num_features_in = self.resnet.fc.in_features

        self.resnet.fc = nn.Sequential(
            nn.Linear(num_features_in, 256),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(256, 128))

    def forward(self, x):
        x = self.resnet(x)
        return x


class SiameseNetwork(nn.Module):
    def __init__(self, base_network):
        super(SiameseNetwork, self).__init__()
        self.base_network = base_network

    def forward_one(self, x):
        return self.base_network(x)

    def forward(self, input1):
        output = self.forward_one(input1)
        return output

def contrastive_loss(out1, out2, label):
    distance = torch.sum(torch.pow(out2-out1, 2), 1)

    loss_contrastive = torch.mean((1 - label) * torch.pow(distance, 2) +
                                  (label) * torch.pow(torch.clamp(margin - distance, min=0.0), 2))
    return loss_contrastive

def train_siamese_network(model, train_loader, val_loader):
  train_losses = []
  val_losses = []

  for epoch in range(num_epochs):
      siamese_network.train()
      total_loss = 0.0

      for batch_idx, (img0, img1, label) in enumerate(train_loader):
          img0, img1, label = img0.to(device), img1.to(device), label.to(device)

          optimizer.zero_grad()

          output1 = siamese_network(img0)
          output2 = siamese_network(img1)

          loss = contrastive_loss(output1, output2, label)
          loss.backward()
          optimizer.step()

          total_loss += loss.item()

      avg_loss = total_loss / len(train_loader)
      train_losses.append(avg_loss)


      siamese_network.eval()
      val_loss = 0.0

      with torch.no_grad():
          for val_img0, val_img1, val_label in val_loader:
              val_img0, val_img1, val_label = val_img0.to(device), val_img1.to(device), val_label.to(device)

              val_output1 = siamese_network(val_img0)
              val_output2 = siamese_network(val_img1)

              val_loss += contrastive_loss(val_output1, val_output2, val_label)

      avg_val_loss = val_loss / len(val_loader)
      val_losses.append(avg_val_loss)

      print(f"Epoch [{epoch + 1}/{num_epochs}], Training Loss: {avg_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

In [None]:
results = []

#pairs path
train_csv = f'{output_folder}+/formated_train.csv'
test_csv = f'{output_folder}+/formated_test.csv'

create_images_pairs()

gc.collect()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomAffine(
            degrees=0.4,
            translate=(0.3, 0.3),
            scale=(1 - 0.4, 1 + 0.4)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.ToTensor(),
            transforms.Normalize(
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225])   ])

#Datasets
train_dataset = SiameseDataset(training_csv=train_csv, training_dir=train_dir, transform=transform)
validation_size = int(validation_ratio * len(train_dataset))
train_size = len(train_dataset) - validation_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, validation_size])
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

test_dataset = SiameseDataset(training_csv=test_csv, training_dir=test_dir,transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

#Networks
base_network = BaseNetwork(dropout_rate=0.2)

siamese_network = SiameseNetwork(base_network)

optimizer = optim.SGD(siamese_network.parameters(), lr=learning_rate, momentum = 0.9)

#Training Step
train_siamese_network(siamese_network.to(device), train_loader, val_loader)

torch.save(siamese_network.state_dict(), f"{save_path}/modelo1.pth")

remember to train 5 times and save five weights/models with different names

Every time there is a "train_df" and a "test_df" that are saved in save_path. You need to change there names before running again to avoid losing test and train csv. It will be used in TEST STEP

# TEST STEP

In [None]:
import cv2
from PIL import Image

base_network = BaseNetwork(dropout_rate=0.2)
siamese_network = SiameseNetwork(base_network)
siamese_network .load_state_dict(torch.load(f"{save_path}/modelo1.pth"))
siamese_network.eval()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
siamese_network.to(device)

#euclidean distance
def distancia_euclidiana(vec1, vec2):
    return np.linalg.norm(vec1 - vec2)

#preprocess images
def load_and_preprocess_image(image_path):
    imagem = Image.open(image_path).convert('RGB')

    preprocess = transforms.Compose([transforms.Resize((224, 224)),
                                     transforms.ToTensor(),
                                     transforms.Normalize(
                                     mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225]) ])

    imagem = preprocess(imagem)
    imagem = imagem.unsqueeze(0)
    imagem = imagem.to(device)
    return imagem

def generate_feature_vector(img_path):
    img = load_and_preprocess_image(img_path)
    features = siamese_network(img)
    feature_vector = features.flatten().cpu().detach().numpy()
    return feature_vector


def list_features_classes(images_df, path):
    feature_vectors = []
    class_name = []

    for index, row in images_df.iterrows():
        image = row['pic']
        classe = row['Bars_above']

        feature_vector = generate_feature_vector(os.path.join(path, image))
        class_name.append(classe)
        feature_vectors.append(feature_vector)

    return class_name, feature_vectors


def create_csv(classe, feature, feature_csv_name):
    vetores_path = f'{save_path}/{feature_csv_name}'
    data = {'Classe': classe, 'Feature_vector': feature}
    faces_df = pd.DataFrame(data)
    faces_df.to_csv(vetores_path, index=False)

    return vetores_path

def comparar_features(new_feature_vector, features_csv):
    df = pd.read_csv(features_csv)

    df['Feature_vector'] = df['Feature_vector'].apply(lambda x: np.fromstring(x[1:-1], sep=" "))

    label = None
    distancia_minima = float('inf')

    for index, linha in df.iterrows():
      distancia = distancia_euclidiana(new_feature_vector, np.array(linha['Feature_vector']))
      if distancia < distancia_minima:
          distancia_minima = distancia
          label = linha['Classe']

    return label


def generate_metrics(results, true_label):
    accuracy = accuracy_score(true_label, results)
    precision = precision_score(true_label, results, average='macro')
    recall = recall_score(true_label, results, average='macro')
    f1 = f1_score(true_label, results, average='macro')
    return accuracy, precision, recall, f1

def generate_results(teste_df, vetores):
    results = []
    true_label = []
    for index, row in teste_df.iterrows():
        image_path = os.path.join(path, row['pic'])
        classe = row['Bars_above']

        new_feature_vector = generate_feature_vector(image_path)
        label = comparar_features(new_feature_vector, vetores)

        results.append(label)
        true_label.append(classe)

    accuracy, precision, recall, f1 = generate_metrics(results, true_label)
    return results, true_label, accuracy, precision, recall, f1

def generate_random_samples(csv):
    df = pd.read_csv(csv)
    insert_df = df.groupby('Bars_above').apply(lambda x: x.sample(n=5))
    return insert_df

def write_txt(i, accuracy, precision, recall, f1, path):
  if not os.path.exists(path):
      with open(path, "w") as arquivo:
          arquivo.write(f"Iteração: {i} \n")
          arquivo.write(f"Accuracy: {accuracy} \n")
          arquivo.write(f"Precision: {precision} \n")
          arquivo.write(f"Recall: {recall} \n")
          arquivo.write(f"F1: {f1} \n")
  else:
      with open(path, "a") as arquivo:
          arquivo.write(f"Iteração: {i} \n")
          arquivo.write(f"Accuracy: {accuracy} \n")
          arquivo.write(f"Precision: {precision} \n")
          arquivo.write(f"Recall: {recall} \n")
          arquivo.write(f"F1: {f1} \n")

**Testing model**

In [None]:
acc = []
pre = []
rec = []
f = []
results1 = []
true_label1 = []

path = #same images path
save_path = #same saving path


insert_df = pd.read_csv(save_path + "train_df")
test_df = pd.read_csv(f'{save_path}/teste_df')


classe, feature = list_features_classes(insert_df, path)

#choose a name
vetores_path = create_csv(classe, feature, feature_csv_name = "vetores_teste2_5.csv")

results, true_label, accuracy, precision, recall, f1 = generate_results(test_df, vetores_path)


acc.append(accuracy)
pre.append(precision)
rec.append(recall)
f.append(f1)
results1.append(results)
true_label1.append(true_label)

df = pd.DataFrame({'results': results1, 'true_label': true_label1})
df.to_csv(f'{save_path}/SN2/df1.csv', index=False)
#remember to change df1 in each training step that you made and change train_df/test_df/model name for each training

In [None]:
acc = []
pre = []
rec = []
f = []
results1 = []
true_label1 = []

path = #same images path
save_path = #same saving path

#this will run 50 times. 3 TESTS were made, you need to run this for each test. FOR 1 sample, 3 samples or 5 samples. And you need to
#repeat the process for each model that was trained. REMEMBER TO CHANGE IN generate_random_samples() the x.sample(n=5), this will produce 5 samples.
#you need to change for 1, 3 or 5. And do it again for each model.


for i in range(50):
  insert_df = generate_random_samples(f'{save_path}/train_df')
  test_df = pd.read_csv(f'{save_path}/teste_df')

  #create features csv
  classe, feature = list_features_classes(insert_df, path)

  #choose the name that you want
  vetores_path = create_csv(classe, feature, feature_csv_name = "vetores_teste2_1.csv")

  #gerar os resultados
  results, true_label, accuracy, precision, recall, f1 = generate_results(test_df, vetores_path)


  acc.append(accuracy)
  pre.append(precision)
  rec.append(recall)
  f.append(f1)
  results1.append(results)
  true_label1.append(true_label)

  write_txt(i, accuracy, precision, recall, f1, (save_path+"/teste5_1.txt"))

df = pd.DataFrame({'results': results1, 'true_label': true_label1})
df.to_csv(f'{save_path}/SN2/df5_{i}.csv', index=False)

#here you need to change the test5_1.txt for the name that you want.
write_txt("média", sum(acc)/len(acc), sum(pre)/len(pre), sum(rec)/len(rec), sum(f)/len(f), (save_path+"/teste5_1.txt"))