In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os
import pandas as pd
import numpy as np
from PIL import Image
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import Dataset, DataLoader
from datetime import datetime
from torchvision import models

In [2]:
!pwd

/workspace


In [3]:
import cv2
import matplotlib.pyplot as plt
targetsize =(224,224)


In [4]:
import math

def euler_to_quaternion(roll, pitch, yaw):
    # Convert degrees to radians
    roll = math.radians(roll)
    pitch = math.radians(pitch)
    yaw = math.radians(yaw)

    cy = math.cos(yaw * 0.5)
    sy = math.sin(yaw * 0.5)
    cp = math.cos(pitch * 0.5)
    sp = math.sin(pitch * 0.5)
    cr = math.cos(roll * 0.5)
    sr = math.sin(roll * 0.5)

    w = cr * cp * cy + sr * sp * sy
    x = sr * cp * cy - cr * sp * sy
    y = cr * sp * cy + sr * cp * sy
    z = cr * cp * sy - sr * sp * cy

    return w, x, y, z


In [5]:
  def preprocess_input_img(test_image_path):
    '''
    This function takes the path to the input test image
    and returns a preprocessed image (which can be used as a input to the model)
    (Input): Single test image path
    (Output): Preprocessed image
    '''

    # Read the original test image
    orig_sample_test_img = cv2.cvtColor(cv2.imread(test_image_path), cv2.COLOR_BGR2RGB)
    x = 50
    y = 0
    width = 200
    height = 200
#     print("preprocess input done")
    if orig_sample_test_img is None :
            print("Failed to load images at preprocess")
            return None
# Crop the image
    orig_sample_test_img = orig_sample_test_img[y:y+height, x:x+width]
    # Convert image to gray scale
    gray_sample_test_img = cv2.cvtColor(orig_sample_test_img, cv2.COLOR_RGB2GRAY)

    # Resizing image to desired input size
    gray_resized_test_img = cv2.resize(gray_sample_test_img, targetsize,
                        interpolation = cv2.INTER_AREA)   # To shrink an image

    # Remove blemishes from image (if any)
    (thresh, black_n_white_sample_img) = cv2.threshold(gray_resized_test_img, 70,255, cv2.THRESH_BINARY_INV)
#     black_n_white_sample_img =cv2.GaussianBlur(black_n_white_sample_img , (3, 3), 0)
    black_n_white_sample_img= cv2.dilate(black_n_white_sample_img, kernel, iterations=1)

    _, black_n_white_sample_img = cv2.threshold(black_n_white_sample_img, 50, 255, cv2.THRESH_BINARY)
    black_n_white_sample_img = black_n_white_sample_img/255
    return orig_sample_test_img, black_n_white_sample_img

In [6]:

def load_image(image_path, target_size=targetsize):
    try:
        _,img = preprocess_input_img(image_path)
        transform = transforms.Compose([
#         transforms.Resize(target_size),
        transforms.ToTensor(),
#         transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
        ])
        img = transform(img)
        return img
    except Exception as e:
        print(f"Error loading image {image_path}: {e}")
        return None

In [7]:
import os
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch


class CustomDataset(Dataset):
    def __init__(self, main_folder, target_size=(25, 285), step_size=10):
        self.main_folder = main_folder
        self.target_size = target_size
        self.step_size = step_size
        self.data = []

        for subdir, _, files in os.walk(main_folder):
#             print(subdir, _)
            if 'image_data.csv' in files:
                csv_path = os.path.join(subdir, 'image_data.csv')
                df = pd.read_csv(csv_path)
                target_image_path = os.path.join(subdir, 'target.jpg')
                for _, row in df.iterrows():
                    X2_image_path = os.path.join(subdir, row['Image_Name'])
                    self.data.append((target_image_path, X2_image_path, row['x'], row['y']))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        target_image_path, X2_image_path, x, y = self.data[idx]
        
        # Debugging print statements
#         print(f"Loading target image from: {target_image_path}")
#         print(f"Loading X2 image from: {X2_image_path}")
        
        target_image = load_image(target_image_path, self.target_size)
        X2_image = load_image(X2_image_path, self.target_size)

        # Check if load_image returns None
        if target_image is None or X2_image is None:
            print(f"Failed to load images: {target_image_path}, {X2_image_path}")
            return None

        # Convert roll, pitch, yaw to quaternion
#         roll *= self.step_size
#         pitch *= self.step_size
#         yaw *= self.step_size
#         w, p, q, r = euler_to_quaternion(roll, pitch, yaw)
        poses = torch.tensor([x, y], dtype=torch.float32)
        return (target_image, X2_image), poses


# Initialize dataset and dataloaders
main_folder = '/workspace/processed/'  # Replace with your main folder path
dataset = CustomDataset(main_folder)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)



In [8]:
from torch.autograd import Variable

__all__ = ['PoseNet', 'posenet_v1', 'PoseLoss']

class InceptionV1(nn.Module):
    def __init__(self, in_channels, n1x1, n3x3red, n3x3, n5x5red, n5x5, pool_planes):
        super(InceptionV1, self).__init__()
        # 1x1 conv branch
        self.b1 = nn.Sequential(
            nn.Conv2d(in_channels, n1x1, kernel_size=1),
            nn.ReLU(True),
        )

        # 1x1 -> 3x3 conv branch
        self.b2 = nn.Sequential(
            nn.Conv2d(in_channels, n3x3red, kernel_size=1),
            nn.ReLU(True),
            nn.Conv2d(n3x3red, n3x3, kernel_size=3, padding=1),
            nn.ReLU(True),
        )

        # 1x1 -> 5x5 conv branch
        self.b3 = nn.Sequential(
            nn.Conv2d(in_channels, n5x5red, kernel_size=1),
            nn.ReLU(True),
            nn.Conv2d(n5x5red, n5x5, kernel_size=5, padding=2),
            nn.ReLU(True),
        )

        # 3x3 pool -> 1x1 conv branch
        self.b4 = nn.Sequential(
            nn.MaxPool2d(kernel_size=3, stride=1, padding=1),
            nn.Conv2d(in_channels, pool_planes, kernel_size=1),
            nn.ReLU(True),
        )

    def forward(self, x):
        y1 = self.b1(x)
        y2 = self.b2(x)
        y3 = self.b3(x)
        y4 = self.b4(x)
        return torch.cat([y1, y2, y3, y4], 1)


# PoseNet
class PoseNet(nn.Module):
    def __init__(self):
        super(PoseNet, self).__init__()

        self.pre_layers = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size=2, stride=1, padding=1),
            nn.Conv2d(8, 16, kernel_size=2, stride=1, padding=1),
            nn.ReLU(True),
            nn.Conv2d(16, 64, kernel_size=7, stride=2, padding=3),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.LocalResponseNorm(5, 0.0001, 0.75),
            nn.Conv2d(64, 128, kernel_size=2, stride=1),
            nn.ReLU(True),
            nn.Conv2d(128, 192, kernel_size=2, padding=1),
            nn.ReLU(True),
            nn.LocalResponseNorm(5, 0.0001, 0.75),
            nn.MaxPool2d(kernel_size=3, stride=2)
        )
        
        
        self.lastlayers= nn.Sequential(  nn.Dropout(p=0.5),     
            nn.Linear(512,256),
            nn.ReLU(True),
                                       nn.Dropout(p=0.5),
         nn.Linear(256, 128),
            nn.ReLU(True), nn.Dropout(p=0.5),
         nn.Linear(128, 64),
            nn.ReLU(True),nn.Dropout(p=0.5),
         nn.Linear(64, 32),
            nn.ReLU(True),nn.Dropout(p=0.5),
        nn.Linear(32, 16),
            nn.ReLU(True),nn.Dropout(p=0.5),
        nn.Linear(16, 8),
            nn.ReLU(True),nn.Dropout(p=0.5),
        nn.Linear(8, 2),
            nn.Tanh()
            
        )

        self.a3 = InceptionV1(192, 64, 96, 128, 16, 32, 32)
        self.b3 = InceptionV1(256, 128, 128, 192, 32, 96, 64)

        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        
        
        self.a4 = InceptionV1(960, 192, 96, 208, 16, 48, 64)
        self.b4 = InceptionV1(512, 160, 112, 224, 24, 64, 64)
        self.c4 = InceptionV1(512, 128, 128, 256, 24, 64, 64)
        self.d4 = InceptionV1(512, 112, 144, 288, 32, 64, 64)
        self.e4 = InceptionV1(528, 256, 160, 320, 32, 128, 128)

        self.a5 = InceptionV1(832, 256, 160, 320, 32, 128, 128)
        self.b5 = InceptionV1(832, 384, 192, 384, 48, 128, 128)

        self.avg_pool = nn.AvgPool2d(kernel_size=7, stride=1)
        self.avg_pool5x5 = nn.AvgPool2d(kernel_size=5, stride=3)
        self.conv1x1 = nn.Conv2d(512, 128, kernel_size=1, stride=1)
        self.conv1x12 = nn.Conv2d(528, 128, kernel_size=1, stride=1)
        self.fc = nn.Linear(1024, 2048)
        self.fc2048 = nn.Linear(2048, 1024)

        self.dropout5 = nn.Dropout(p=0.5)
        self.dropout7 = nn.Dropout(p=0.7)
        self.relu = nn.ReLU()
        self.cls_fc_pose_xyz = nn.Linear(2048, 1024)
#         self.cls_fc_pose_wpqr = nn.Linear(2048, 4)
        self.cls_fc_pose_xyz_1024 = nn.Linear(1024, 512)
#         self.cls_fc_pose_wpqr_1024 = nn.Linear(1024, 4)
        self.tanh = nn.Tanh()

    def forward(self, x1,x2):
        out = self.pre_layers(x1)
        out = self.a3(out)
        out = self.b3(out)
        out = self.max_pool(out)
#         out = self.a4(out)
        
        out2 = self.pre_layers(x1)
        out2 = self.a3(out2)
        out2 = self.b3(out2)
        out2 = self.max_pool(out2)
            
        
        out = torch.cat((out, out2), dim=1)
        out = self.a4(out)   
#         out = self.max_pool(out)

        cls1_pool = self.avg_pool5x5(out)
        cls1_reduction = self.conv1x1(cls1_pool)
        cls1_reduction = F.relu(cls1_reduction)
        cls1_reduction = cls1_reduction.view(cls1_reduction.size(0), -1)
        cls1_fc1 = self.fc2048(cls1_reduction)
        cls1_fc1 = self.relu(cls1_fc1)
        cls1_fc1 = self.dropout7(cls1_fc1)
        cls1_fc_pose_xyz = self.cls_fc_pose_xyz_1024(cls1_fc1)
        cls1_fc_pose_xyz = self.lastlayers(cls1_fc_pose_xyz)
#         cls1_pose_wpqr = self.cls_fc_pose_wpqr_1024(cls1_fc1)

        out = self.b4(out)
        out = self.c4(out)
        out = self.d4(out)
        cls2_pool = self.avg_pool5x5(out)
        cls2_reduction = self.conv1x12(cls2_pool)
        cls2_reduction = F.relu(cls2_reduction)
        cls2_reduction = cls2_reduction.view(cls2_reduction.size(0), -1)
        cls2_fc1 = self.fc2048(cls2_reduction)
        cls2_fc1 = self.relu(cls2_fc1)
        cls2_fc1 = self.dropout7(cls2_fc1)
        cls2_fc_pose_xyz = self.cls_fc_pose_xyz_1024(cls2_fc1)
        cls2_fc_pose_xyz = self.lastlayers(cls2_fc_pose_xyz)

#         cls2_pose_wpqr = self.cls_fc_pose_wpqr_1024(cls2_fc1)
        out = self.e4(out)

        out = self.max_pool(out)

        out = self.a5(out)
        out = self.b5(out)
        cls3_pool = self.avg_pool(out)
        cls3_pool = cls3_pool.view(cls3_pool.size(0), -1)
        cls3_fc1 = self.fc(cls3_pool)
        cls3_fc1 = self.relu(cls3_fc1)
        cls3_fc1 = self.dropout5(cls3_fc1)
        cls3_fc_pose_xyz = self.cls_fc_pose_xyz(cls3_fc1)
        cls3_fc_pose_xyz = self.cls_fc_pose_xyz_1024(cls3_fc_pose_xyz)
        cls3_fc_pose_xyz = self.lastlayers(cls3_fc_pose_xyz)

#         cls3_pose_wpqr = self.cls_fc_pose_wpqr(cls3_fc1)

        return cls1_fc_pose_xyz, \
               cls2_fc_pose_xyz, \
               cls3_fc_pose_xyz
# , \              cls3_pose_wpqr


class PoseLoss(nn.Module):

    def __init__(self, w1_x, w2_x, w3_x):
        super(PoseLoss, self).__init__()
        self.w1_x = w1_x
        self.w2_x = w2_x
        self.w3_x = w3_x
#         self.w1_q = w1_q
#         self.w2_q = w2_q
#         self.w3_q = w3_q
        return

    def forward(self, p1_x, p2_x, p3_x, poseGT):
        pose_x = poseGT
#         pose_q = poseGT[:, 3:]

        l1_x = torch.sqrt(torch.sum(Variable(torch.Tensor(np.square(F.pairwise_distance(pose_x, p1_x).detach().cpu().numpy())), requires_grad=True))) * self.w1_x
#         l1_q = torch.sqrt(torch.sum(Variable(torch.Tensor(np.square(F.pairwise_distance(pose_q, p1_q).detach().cpu().numpy())), requires_grad=True))) * self.w1_q
        l2_x = torch.sqrt(torch.sum(Variable(torch.Tensor(np.square(F.pairwise_distance(pose_x, p2_x).detach().cpu().numpy())), requires_grad=True))) * self.w2_x
#         l2_q = torch.sqrt(torch.sum(Variable(torch.Tensor(np.square(F.pairwise_distance(pose_q, p2_q).detach().cpu().numpy())), requires_grad=True))) * self.w2_q
        l3_x = torch.sqrt(torch.sum(Variable(torch.Tensor(np.square(F.pairwise_distance(pose_x, p3_x).detach().cpu().numpy())), requires_grad=True))) * self.w3_x
#         l3_q = torch.sqrt(torch.sum(Variable(torch.Tensor(np.square(F.pairwise_distance(pose_q, p3_q).detach().cpu().numpy())), requires_grad=True))) * self.w3_q

        loss = l1_x  + l2_x + l3_x 
        return loss


# def posenet_v1():
#     model = PoseNet()
#     return model

model = PoseNet()

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

PoseNet(
  (pre_layers): Sequential(
    (0): Conv2d(1, 8, kernel_size=(2, 2), stride=(1, 1), padding=(1, 1))
    (1): Conv2d(8, 16, kernel_size=(2, 2), stride=(1, 1), padding=(1, 1))
    (2): ReLU(inplace=True)
    (3): Conv2d(16, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (4): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=1.0)
    (6): Conv2d(64, 128, kernel_size=(2, 2), stride=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(128, 192, kernel_size=(2, 2), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): LocalResponseNorm(5, alpha=0.0001, beta=0.75, k=1.0)
    (11): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (lastlayers): Sequential(
    (0): Dropout(p=0.5, inplace=False)
    (1): Linear(in_features=512, out_features=256, bias=True)
    (2): ReLU(inplace=True)
    (3): Dropout(p=0.5, inplace=False)
    (4): Linear(in_feature

In [9]:
from torchsummary import summary
summary(model, [(1, 224, 224),(1, 224, 224)])
# summary(model, (1, 224, 224))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 8, 225, 225]              40
            Conv2d-2         [-1, 16, 226, 226]             528
              ReLU-3         [-1, 16, 226, 226]               0
            Conv2d-4         [-1, 64, 113, 113]          50,240
         MaxPool2d-5           [-1, 64, 56, 56]               0
 LocalResponseNorm-6           [-1, 64, 56, 56]               0
            Conv2d-7          [-1, 128, 55, 55]          32,896
              ReLU-8          [-1, 128, 55, 55]               0
            Conv2d-9          [-1, 192, 56, 56]          98,496
             ReLU-10          [-1, 192, 56, 56]               0
LocalResponseNorm-11          [-1, 192, 56, 56]               0
        MaxPool2d-12          [-1, 192, 27, 27]               0
           Conv2d-13           [-1, 64, 27, 27]          12,352
             ReLU-14           [-1, 64,

In [10]:
def save_results(epoch, train_loss, val_loss, train_mae, val_mae, train_accuracy, val_accuracy, output_dir):
    with open(os.path.join(output_dir, 'training_results.txt'), 'a') as f:
        f.write(f'Epoch [{epoch+1}], Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}, '
                f'Train MAE: {train_mae:.4f}, Validation MAE: {val_mae:.4f}, '
                f'Train Accuracy: {train_accuracy:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')


# Get the current date and time
current_time = datetime.now().strftime("%Y_%m_%d-%H:%M:%S")

# Create a directory to save results using the current timestamp
output_dir = f'posenet_results_{current_time}'
os.makedirs(output_dir, exist_ok=True)

In [11]:
# criterion = nn.MSELoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
learning_rate = 0.001
batch_size = 16
EPOCHS = 80000

criterion = PoseLoss(0.3, 0.3, 1)


In [12]:
optimizer = torch.optim.SGD(nn.ParameterList(model.parameters()), lr=learning_rate)


train_losses = []
val_losses = []
# train_maes = []
# val_maes = []
# train_accuracies = []
# val_accuracies = []

for epoch in range(EPOCHS):
    model.train()
    running_loss = 0.0
#    running_mae = 0.0
#    running_corrects = 0
    for i, (inputs, poses) in enumerate(train_loader):
            (target_images, X2_images), poses = inputs, poses
            target_images, X2_images, poses = target_images.to(device), X2_images.to(device), poses.to(device)

            optimizer.zero_grad()

            p1_x, p2_x, p3_x = model(target_images, X2_images)
            loss = criterion(p1_x, p2_x, p3_x, poses)

#             optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
            running_loss += loss.item()
#         running_mae += mae.item()
#         running_corrects += accuracy(outputs, labels)

#             if i % 20 == 0:
#                 print("iteration: " + str(epoch) + "\n    " + "Loss is: " + str(loss)
    train_losses.append(running_loss / len(train_loader))
#     train_maes.append(running_mae / len(train_loader))
#     train_accuracies.append(running_corrects / len(train_loader))
        
    # Validation step
    model.eval()
    val_loss = 0.0
#     val_mae = 0.0
#     val_corrects = 0
    with torch.no_grad():
        for data in val_loader:
            (target_images, X2_images), poses = data
            target_images, X2_images, poses = target_images.to(device), X2_images.to(device), poses.to(device)
            optimizer.zero_grad()
            p1_x, p2_x, p3_x = model(target_images, X2_images)
            vloss = criterion(p1_x, p2_x, p3_x, poses)

#             vloss = vloss1+vloss2+vloss3
#             mae = mae_criterion(outputs, labels)
            val_loss += vloss.item()
#             val_mae += mae.item()
#             val_corrects += accuracy(outputs, labels)
    
    val_losses.append(val_loss / len(val_loader))
#     val_maes.append(val_mae / len(val_loader))
#     val_accuracies.append(val_corrects / len(val_loader))
    
#     save_results(epoch, train_losses[-1], val_losses[-1], train_maes[-1], val_maes[-1], train_accuracies[-1], val_accuracies[-1], output_dir)

    print(f'Epoch [{epoch+1}/{EPOCHS}], Train Loss: {train_losses[-1]:.4f}, Validation Loss: {val_losses[-1]:.4f}')

print('Training Done')
                      

Error loading image /workspace/processed/D6_1/target.jpg: name 'kernel' is not defined
Error loading image /workspace/processed/D6_1/img04660.jpg: name 'kernel' is not defined
Failed to load images: /workspace/processed/D6_1/target.jpg, /workspace/processed/D6_1/img04660.jpg
Error loading image /workspace/processed/D8_1_2_3/target.jpg: name 'kernel' is not defined
Error loading image /workspace/processed/D8_1_2_3/img0010.jpg: name 'kernel' is not defined
Failed to load images: /workspace/processed/D8_1_2_3/target.jpg, /workspace/processed/D8_1_2_3/img0010.jpg
Error loading image /workspace/processed/D9_1_2_3/target.jpg: name 'kernel' is not defined
Error loading image /workspace/processed/D9_1_2_3/img01649.jpg: name 'kernel' is not defined
Failed to load images: /workspace/processed/D9_1_2_3/target.jpg, /workspace/processed/D9_1_2_3/img01649.jpg
Error loading image /workspace/processed/D6/target.jpg: name 'kernel' is not defined
Error loading image /workspace/processed/D6/img040.jpg: n

TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class 'NoneType'>

In [None]:
torch.save(model.state_dict(), f'results_{current_time}posenet.pth')


In [None]:
# optimizer = torch.optim.SGD(nn.ParameterList(model.parameters()), lr=learning_rate)
# for epoch in range(EPOCHS):
#     model.train()
#     for i, (inputs, poses) in enumerate(train_loader):
#             (target_images, X2_images), poses = inputs, poses
#             target_images, X2_images, poses = target_images.to(device), X2_images.to(device), poses.to(device)

#             optimizer.zero_grad()


#             p1_x, p2_x, p3_x = model(target_images, X2_images)
#             loss1 = criterion(p1_x, poses)
#             loss2= criterion(p1_x, poses)
#             loss3 = criterion(p1_x, poses)
#             loss = loss1+loss2+loss3
# #             optimizer.zero_grad()
#             loss.backward()
#             optimizer.step()

#             if i % 20 == 0:
#                 print("iteration: " + str(epoch) + "\n    " + "Loss is: " + str(loss))

In [None]:

# total_loss = 0

# with torch.no_grad():
#     for data in val_loader:
#         (target_images, X2_images), labels = data
#         target_images, X2_images, labels = target_images.to(device), X2_images.to(device), labels.to(device)
# #         print(target_images.size())
        
#         outputs = model(target_images, X2_images)
# #         print(outputs.size())
# #         print("____\n")
        
#         loss = criterion(outputs, labels)
#         total_loss += loss.item()
# #         correct += (predicted == labels).sum().item()

# avg_loss = total_loss / len(val_loader)
# print('Validation Loss: ', avg_loss)


In [None]:
# import numpy as np
# import torch
# from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
# import matplotlib.pyplot as plt
# import seaborn as sns

# # Define a function to convert continuous values to discrete classes
# def continuous_to_discrete(y):
#     y_discrete = np.zeros_like(y)
#     y_discrete[y > 0.5] = 1
#     y_discrete[y < -0.5] = -1
#     return y_discrete

# # Define a function to extract true and predicted values
# def extract_true_and_predicted_values(loader, model, device):
#     model.eval()
#     true_y1 = []
#     true_y2 = []
#     pred_y1 = []
#     pred_y2 = []

#     with torch.no_grad():
#         for data in loader:
#             (target_images, X2_images), labels = data
#             target_images, X2_images, labels = target_images.to(device), X2_images.to(device), labels.to(device)
#             outputs = model(target_images, X2_images)

#             true_y1.extend(labels[:, 0].cpu().numpy())
#             true_y2.extend(labels[:, 1].cpu().numpy())
#             pred_y1.extend(outputs[:, 0].cpu().numpy())
#             pred_y2.extend(outputs[:, 1].cpu().numpy())

#     return np.array(true_y1), np.array(pred_y1), np.array(true_y2), np.array(pred_y2)

# # Extract true and predicted values
# true_y1, pred_y1, true_y2, pred_y2 = extract_true_and_predicted_values(val_loader, model, device)

# # Convert continuous predictions and true values to discrete classes
# true_y1_discrete = continuous_to_discrete(true_y1)
# pred_y1_discrete = continuous_to_discrete(pred_y1)
# true_y2_discrete = continuous_to_discrete(true_y2)
# pred_y2_discrete = continuous_to_discrete(pred_y2)

# # print(true_y1_discrete-pred_y1_discrete)
# # print()
# # Generate confusion matrices
# cm_y1 = confusion_matrix(true_y1_discrete, pred_y1_discrete , labels=[-1, 0, 1])
# cm_y2 = confusion_matrix(true_y2_discrete, pred_y2_discrete , labels=[-1, 0, 1])

# # Plot confusion matrices
# fig, ax = plt.subplots(1, 2, figsize=(12, 6))

# # ConfusionMatrixDisplay(cm_y1).plot(ax=ax[0])
# sns.heatmap(cm_y1, annot=True, fmt='d', cmap='Blues', ax=ax[0], cbar=False)

# ax[0].set_title('Confusion Matrix for y1')
# ax[0].set_xticklabels(['-1', '0', '1'])
# ax[0].set_yticklabels(['-1', '0', '1'])

# # ConfusionMatrixDisplay(cm_y2).plot(ax=ax[1])
# sns.heatmap(cm_y2, annot=True, fmt='d', cmap='Blues', ax=ax[1], cbar=False)

# ax[1].set_title('Confusion Matrix for y2')
# ax[1].set_xticklabels(['-1', '0', '1'])
# ax[1].set_yticklabels(['-1', '0', '1'])
# plt.show()


In [None]:
# test_image_path = './processed/D9/img026.jpg'
# target_image_path= './processed/D9/target.jpg'

# # Load the images
# target_image = load_image(target_image_path).unsqueeze(0).to(device)  # Adding batch dimension
# test_image = load_image(test_image_path).unsqueeze(0).to(device)     # Adding batch dimension

# print("target state image is of shape",target_image.shape)  # Expected: torch.Size([1, 1, 200, 200])
# print("current state frame is of shape",test_image.shape)    # Expected: torch.Size([1, 1, 200, 200])

# # Evaluate the model
# model.eval()
# with torch.no_grad():
#     outputs = model(target_image, test_image)
#     print("model predictions",outputs)
#     nparray = outputs.cpu().numpy()  # Move tensor to CPU before conversion
# #     if nparray[0]<0.5 and nparray[0]>-0.5:
# #         nparray[0]=0
# #     if nparray[0]>0.5:
# #         nparray[0]=1
# #     if nparray[0]<-0.5:
# #         nparray[0]=-1
    
# #     print("one hot encoding",nparray.size)
    

    
    




In [None]:
# net = ResNet50(10).to('cuda')
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(net.parameters(), lr=0.1, momentum=0.9, weight_decay=0.0001)
# scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor = 0.1, patience=5)

In [None]:
# test_image_path = './processed/D5/img021.jpg'
# target_image_path= './processed/D5/target.jpg'

# # Load and preprocess the images
# target_image = load_image(target_image_path).to(device)
# test_image = load_image(test_image_path).to(device)

# # Set the model to evaluation mode
# # model.eval()

# # Disable gradient calculation
# with torch.no_grad():
#     # Get the model's output
#     output = model(target_image, test_image)

# # Print the output
# print('Model Output:', output)

In [None]:
# EPOCHS = 30
# for epoch in range(EPOCHS):
#     model.train()
#     running_loss = 0.0
#     for i, (inputs, labels) in enumerate(train_loader):
#         (target_images, X2_images), labels = inputs, labels
#         target_images, X2_images, labels = target_images.to(device), X2_images.to(device), labels.to(device)

#         optimizer.zero_grad()
#         outputs = model(target_images, X2_images)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()
        
#         running_loss += loss.item()
        
#         if i % 10 == 0 and i > 0:
#             print(f'Loss [{epoch+1}, {i}](epoch, minibatch): ', running_loss / 100)
#             running_loss = 0.0

# print('Training Done')

# # Validation loop
# model.eval()
# correct = 0
# total = 0

# with torch.no_grad():
#     for data in val_loader:
#         (target_images, X2_images), labels = data
#         target_images, X2_images, labels = target_images.to(device), X2_images.to(device), labels.to(device)
#         outputs = model(target_images, X2_images)

#         _, predicted = torch.max(outputs.data, 1)
#         total += labels.size(0)
#         correct += (predicted == labels).sum().item()

# print('Accuracy on validation dataset: ', 100 * (correct / total), '%')

In [None]:
# target_image_path= './processed/D4/img001.jpg'


# def preprocess_input_img(test_image_path):
#     '''
#     This function takes the path to the input test image
#     and returns a preprocessed image (which can be used as a input to the model)
#     (Input): Single test image path
#     (Output): Preprocessed image
#     '''

#     # Read the original test image
#     orig_sample_test_img = cv2.cvtColor(cv2.imread(test_image_path), cv2.COLOR_BGR2RGB)
    
#     x = 50
#     y = 0
#     width = 200
#     height = 200

# # Crop the image
#     orig_sample_test_img = orig_sample_test_img[y:y+height, x:x+width]


#     # Convert image to gray scale
#     gray_sample_test_img = cv2.cvtColor(orig_sample_test_img, cv2.COLOR_RGB2GRAY)

#     # Resizing image to desired input size
#     gray_resized_test_img = cv2.resize(gray_sample_test_img, (285, 285),
#                         interpolation = cv2.INTER_AREA)   # To shrink an image

#     # Remove blemishes from image (if any)
#     (thresh, black_n_white_sample_img) = cv2.threshold(gray_resized_test_img, 70,255, cv2.THRESH_BINARY_INV)
    
#     # Display Images	: Plot Sample Input and Preprocessed Test Image

#     f = plt.figure(figsize=(10,5))
#     ax1 = f.add_subplot(121)
#     ax2 = f.add_subplot(122)

#     ax1.imshow(np.squeeze(orig_sample_test_img), cmap='gray')
#     ax1.set_title("Original Test Input Image", pad=15, fontsize=13, fontweight='bold')
#     ax2.imshow(np.squeeze(black_n_white_sample_img), cmap='gray')
#     ax2.set_title("Preprocessed Test Input Image", pad=15, fontsize=13, fontweight='bold')
#     plt.tight_layout()
#     plt.show()

#     return orig_sample_test_img, black_n_white_sample_img


# x,y = preprocess_input_img(target_image_path)