In [1]:
print("Hello World")

Hello World


In [22]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
from scipy import spatial
import numpy as np
import cv2
import os
from torch.utils.data import Dataset, DataLoader
import random
import shutil
from torch import nn
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torchvision.models import ResNet50_Weights

In [4]:
resnet_model = models.resnet50(weights=ResNet50_Weights.DEFAULT)
resnet_model.eval()
resnet_model = torch.nn.Sequential(*(list(resnet_model.children())[:-1]))
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                        std=[0.229, 0.224, 0.225]),
])

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to C:\Users\97433/.cache\torch\hub\checkpoints\resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:35<00:00, 2.90MB/s]


In [4]:
def softmax ( x : float ) -> float: 
    """
        Returns output after applying softmax function on input x. 
    """
    return np.exp ( x ) / np.sum ( np.exp ( x ) )

In [5]:
def get_normalized_feat_vec ( image_path : str, 
                  transform = transform,
                  model = resnet_model):
    """
        Returns a feature vector for the image stored 
        at image path using pretrained model.
    """
    image = Image.open ( image_path )
    input_image = transform(image).unsqueeze(0)  
    with torch.no_grad():
        output = model(input_image)
    feat_vec_as_np_array = output.squeeze().numpy()
    normalized_feat_vec_as_np_array = softmax ( feat_vec_as_np_array )
    return normalized_feat_vec_as_np_array

In [6]:
def get_cosine_similarity_between_vectors ( first_feat_vec : list,
                                        second_feat_vec : list ) -> float:
    """
        Returns cosine similarity score between two feature vectors.
    """
    return 1 - spatial.distance.cosine( first_feat_vec, second_feat_vec)

In [7]:
def get_euclidean_distance ( first_feat_vec : list, 
                            second_feat_vec : list ) -> float:
    """
        Returns euclidean distance between two feature vectors.
    """
    return spatial.distance.euclidean ( first_feat_vec, second_feat_vec )

In [8]:
def get_cosine_similarity_score ( first_image_path : str , second_image_path : str,
                                transform = transform , model = resnet_model ) -> float:
    """
        Returns cosine similarity score between two images based 
        on their feature vectors obatined from pretrained models.
    """    
    first_image_feat_vec = get_normalized_feat_vec ( first_image_path , transform , model )
    second_image_feat_vec = get_normalized_feat_vec ( second_image_path , transform , model )

    return get_cosine_similarity_between_vectors ( first_image_feat_vec, 
                                                  second_image_feat_vec)

In [9]:
def get_euclidean_dissimilarity_score ( first_image_path : str , second_image_path : str ,
                            transform = transform , model = resnet_model ) -> float:
    """
        Returns dissimilarity score between two images based on the euclidean distance 
        between their feature vectors obatined from pretrained models.
    """    
    first_image_feat_vec = get_normalized_feat_vec ( first_image_path , transform , model )
    second_image_feat_vec = get_normalized_feat_vec ( second_image_path ,transform , model )

    return get_euclidean_distance ( first_image_feat_vec , second_image_feat_vec )

In [12]:
image1 = r"C:\Users\97433\Mafkin_Robotics_FrontEnd\VisualOdometry\feature_extraction\dataset\blobs\image_pair1\blob0\image2\blob_at_2448_1924_image_2.jpg"
image2 = r"C:\Users\97433\Mafkin_Robotics_FrontEnd\VisualOdometry\feature_extraction\dataset\blobs\image_pair1\blob0\image1\blob_at_1573_2187_image_1.jpg"

In [10]:
def pad_image ( image : list, border_len = 1 ):
    """
        Pads image with 0s such that length and height
        of image increase by border_len.
    """
    return np.pad ( image , border_len, mode = "constant" )

In [11]:
def get_normalized_vec( data : list, min_val : int, max_val : int) -> list:
    """
    Perform Min-Max normalization on the input data.

    Parameters:
    - data: A numpy array or list containing the data to be normalized.

    Returns:
    - normalized_data: The normalized data.
    """

    normalized_data = (data - min_val) / (max_val - min_val)
    return normalized_data

In [12]:
def normalize ( x : int, min_val : int, max_val : int ) -> int:
    return ( x - min_val ) / ( max_val - min_val )

In [21]:
def get_blob(image : list, key_point : tuple , blob_size : tuple):
    """
    Extracts a patch (blob) from the image centered at the key_point
    with the specified blob_size (height, width).

    Args:
    - image: The input image.
    - key_point: The key point (x, y) around which the blob is extracted.
    - blob_size: The size of the blob in pixels (height, width).

    Returns:
    - blob_patch: The extracted patch (blob) from the image.
    """

    kp_x, kp_y = int(key_point[0]), int(key_point[1])

    blob_height, blob_width = blob_size

    top_left_x = max(0, kp_x - blob_width // 2)
    top_left_y = max(0, kp_y - blob_height // 2)

    bottom_right_x = min(image.shape[1], kp_x + blob_width // 2)
    bottom_right_y = min(image.shape[0], kp_y + blob_height // 2)

    blob_patch = image[top_left_y:bottom_right_y, top_left_x:bottom_right_x]

    return blob_patch

In [22]:
def get_blob_sequence_at_point ( image : list, coord : tuple, 
                               blob_sizes = [ 10, 25, 50, 100 ] ) -> list:
    """
    Parameters:
    - image (list): Image from which squence of blobs is to be generated.
    - coord (tuple): A tuple containing the (x, y) coordinate of the center of each blob.
    - blob_sizes (list): Sizes of blobs of the sequence.
    Returns:
    - blob_sequence (list): A list containing the blobs.
                            Each blob is represented as a square patch
                            of the specified size with its center
                            as given by the center_coordinates.
    """
    return [ get_blob ( image, coord, ( blob_size, blob_size)) \
            for blob_size in blob_sizes ]

In [24]:
def get_matching_keypoints( first_image_path : list, second_image_path : list) -> list:
    """ 
        Returns list of corresponding keypoints in both images sing ORB_FLANN feature matching.
    """ 
    first_image = cv2.imread ( first_image_path , 0 )
    second_image = cv2.imread ( second_image_path , 0 )
    
    orb = cv2.ORB_create()

    first_image_keypoints, first_descriptors = orb.detectAndCompute( first_image , None)
    second_image_keypoints, second_descriptors = orb.detectAndCompute( second_image , None)

    FLANN_INDEX_LSH = 6
    index_params = dict(algorithm=FLANN_INDEX_LSH, table_number=6, key_size=12, multi_probe_level=1)
    search_params = dict(checks=50)  

    flann = cv2.FlannBasedMatcher(index_params, search_params)

    matches = flann.knnMatch( first_descriptors, second_descriptors, k=2)

    good_matches = []
    for match_pair in matches:
        if len(match_pair) < 2:
            continue
        m , n = match_pair
        if m.distance < 0.7 * n.distance:
            good_matches.append(m)

    first_image_matched_keypoints = [ (int(first_image_keypoints[match.queryIdx].pt[0]) ,
                                      int(first_image_keypoints[match.queryIdx].pt[1]))  \
                                    for match in good_matches]
    second_image_matched_keypoints = [ (int(second_image_keypoints[match.trainIdx].pt[0]),
                                        int(second_image_keypoints[match.trainIdx].pt[1])) \
                                      for match in good_matches]

    return [ [keypoint, second_image_matched_keypoints[index] ] \
            for index,keypoint in enumerate ( first_image_matched_keypoints )]


In [26]:
def get_non_matching_keypoints ( first_image_path : str, second_image_path : str ) -> list:
    """
        Returns corresponding keypoints between two images.
    """
    orb_keypoints = get_matching_keypoints ( first_image_path, second_image_path )
    return [ [ first_image_keypoint[ 0 ], orb_keypoints [::-1] [ index ][ 1 ]] \
            for index, first_image_keypoint in enumerate ( orb_keypoints ) 
            if first_image_keypoint[ 1 ] != orb_keypoints [ ::-1 ][ index ][ 1 ] ]

In [27]:
def save_matching_keypoints_blob_sequences ( folder_to_save : str, 
                                            image_pair_folder : str,
                                            generate_keypoints_func : str,
                                            generate_blob_sequence_func = get_blob_sequence_at_point,
                                            extended_name_of_blob_sequence_folder = None) -> None:
    """
        Functionality : 
        - Saves blob sequence consisting of blobs of various sizes having keypoints as center
        for both images in the specified image pair folder. 
        
        Parameters : 
        - folder_to_save : folder path to save blob_sequence.
        - image_pair_folder : folder path for image pair.
        - generate_keypoints_function : function used to generate keypoints.
        - generate_blob_sequence_func : function used to generate blob sequence.
    """

    if not os.path.join ( folder_to_save ): os.makedirs ( folder_to_save )

    first_image_path, second_image_path =\
        [ os.path.join ( image_pair_folder, image )\
        for image in os.listdir( image_pair_folder ) ]
    
    list_of_keypoints = generate_keypoints_func ( first_image_path, second_image_path )

    first_image = cv2.imread ( first_image_path )
    second_image = cv2.imread ( second_image_path )

    blob_sequences_for_first_image = [ generate_blob_sequence_func ( first_image, keypoint[0] )\
                                      for keypoint in list_of_keypoints ]
    
    blob_sequences_for_second_image = [ generate_blob_sequence_func ( second_image, keypoint[1] )\
                                      for keypoint in list_of_keypoints ]

    for index, blob_sequence in enumerate ( blob_sequences_for_first_image ):
        blob_sequence_folder_name = "blob_sequence_" + str ( index ) + extended_name_of_blob_sequence_folder
        blob_sequence_folder_path = os.path.join ( folder_to_save, blob_sequence_folder_name )
        
        if not os.path.exists ( blob_sequence_folder_path ): os.makedirs ( blob_sequence_folder_path )

        first_image_blob_sequence_folder_path = os.path.join ( blob_sequence_folder_path, "image1" )
        second_image_blob_sequence_folder_path = os.path.join ( blob_sequence_folder_path, "image2" )

        if not os.path.exists ( first_image_blob_sequence_folder_path ): 
            os.makedirs ( first_image_blob_sequence_folder_path )

        if not os.path.exists ( second_image_blob_sequence_folder_path ):
            os.makedirs ( second_image_blob_sequence_folder_path )

        for blob_index, blob_image in enumerate ( blob_sequence ):

            blob_image_name = "blob" + str(blob_index) + ".jpg"
            
            blob_image_path = os.path.join ( first_image_blob_sequence_folder_path, 
                                                        blob_image_name )
            cv2.imwrite ( blob_image_path, blob_image )

    
        for blob_index, blob_image in enumerate ( blob_sequences_for_second_image [ index ] ):

            blob_image_name = "blob" + str(blob_index) + ".jpg"
            
            blob_image_path = os.path.join ( second_image_blob_sequence_folder_path, 
                                                        blob_image_name )
            cv2.imwrite ( blob_image_path, blob_image )
    print ( "Done :)" )


In [28]:
def folder_save_matching_keypoints_blob_sequences ( folder_to_save : str, 
                                            folder_containing_image_pairs : str,
                                            generate_keypoints_func : str,
                                            generate_blob_sequence_func = get_blob_sequence_at_point ) -> None:
    """
        Functionality : 
        - Saves blob sequence consisting of blobs of various sizes having specified keypoints as center
        for images of image pairs in the specified folder. 
        
        Parameters : 
        - folder_to_save : folder path to save blob_sequence.
        - folder_containing_image_pairs : folder path containing image pairs.
        - generate_keypoints_func : function used to generate keypoints.
        - generate_blob_sequence_func : function used to generate blob sequence.
    """
    if not os.path.exists ( folder_to_save ): os.makedirs ( folder_to_save )

    for index, image_pair_folder in enumerate ( os.listdir ( folder_containing_image_pairs ) ):
        
        image_pair_folder_path = os.path.join ( folder_containing_image_pairs, image_pair_folder )
        
        # image_pair_folder_to_save_blob_sequence = "image_pair" + str(index)
        
        # image_pair_folder_to_save_blob_sequence_path = os.path.join ( folder_to_save,
        #                                                             image_pair_folder_to_save_blob_sequence )
        
        # if not os.path.exists ( image_pair_folder_to_save_blob_sequence_path ):
        #     os.makedirs ( image_pair_folder_to_save_blob_sequence_path )

        extended_name_of_blob_sequence_folder = "_image_pair" + str(index)

        save_matching_keypoints_blob_sequences ( folder_to_save = folder_to_save,
                                                image_pair_folder = image_pair_folder_path,
                                                generate_blob_sequence_func = generate_blob_sequence_func,
                                                generate_keypoints_func = generate_keypoints_func,
                                                extended_name_of_blob_sequence_folder = \
                                                    extended_name_of_blob_sequence_folder)

    print ( "Done :)" )


In [29]:
def balance_dataset ( root_dir : str ) -> None:
    """ 
        Balances dataset consisting of two classes by : 
        - removing additional files from class containing maximum number of files if difference 
        between number of files in both classes is less than 10. 
    """
    class_folder_paths = [ os.path.join ( root_dir, class_folder_name ) \
                          for class_folder_name in os.listdir ( root_dir ) ]
    
    class_folder_and_number_of_files = { class_folder_path : len(os.listdir( class_folder_path ))\
                                        for class_folder_path in class_folder_paths }
    
    sorted_class_folder_list = sorted ( class_folder_and_number_of_files, 
                                       key = class_folder_and_number_of_files.get )
    
    diff_in_number_of_files_in_classes = len ( os.listdir ( sorted_class_folder_list[1] ) ) - len ( os.listdir ( sorted_class_folder_list[0] ) )

    if diff_in_number_of_files_in_classes < 20:
        
        majority_class_folder_path = sorted_class_folder_list [ 1 ]
        
        folders_in_majority_class =\
            [ os.path.join ( majority_class_folder_path, sub_folder ) \
            for sub_folder in os.listdir ( majority_class_folder_path )]
    
        folders_to_delete = random.sample(folders_in_majority_class, 
                                        min( diff_in_number_of_files_in_classes, 
                                            len(folders_in_majority_class)))

        print ( len ( folders_to_delete ) )

        for folder in folders_to_delete:
            folder_path = os.path.join( majority_class_folder_path , folder)
            shutil.rmtree ( folder_path )
                

In [91]:
class custom_blob_sequence_dataset ( Dataset ):
    
    def __init__(self, root_dir : str, blob_index: int) -> None:
        self.root_dir = root_dir
        self.blob_index = blob_index
        self.transform = transforms.Compose([ transforms.Resize((224, 224)),  transforms.ToTensor() ])

    def __len__ ( self ) -> int:
        
        class_folders = [ os.path.join ( self.root_dir, class_folder ) \
                        for class_folder in os.listdir ( self.root_dir ) ]
        
        return len ( os.listdir ( class_folders [ 0 ] ) ) + len ( os.listdir ( class_folders [ 1 ] ) )
    
    def __getitem__(self, index):
        class_folders = [ os.path.join ( self.root_dir, class_folder ) \
                        for class_folder in os.listdir ( self.root_dir ) ]
        
        blob_sequences = sorted([ os.path.join ( class_folder, blob_sequence_folder ) \
                            for class_folder in class_folders \
                            for blob_sequence_folder in os.listdir ( class_folder ) ])
        
        labels = [ 1 if "dissimilar" == blob_sequence.split("\\")[-2] else 0 \
                for blob_sequence in blob_sequences  ]
        
        images_folder = [ os.path.join ( blob_sequences[ index ], image ) \
                        for image in os.listdir(blob_sequences[index]) ]
        
        blob_images = [os.path.join ( images_folder[0], os.listdir ( images_folder[0] )[self.blob_index] ),
                       os.path.join ( images_folder[1], os.listdir(images_folder[1])[self.blob_index] )]
        
        if self.transform:
            blob_images = [ self.transform ( Image.open(blob_image) )\
                            for blob_image in blob_images ]

        return blob_images[ 0 ], blob_images[1] , labels [ index ]


In [81]:
class CustomResNet ( nn.Module ) :
    def __init__( self , num_classes , pretrained = True ):
        super ( CustomResNet, self ).__init__()
        
        # Load pretrained ResNet50 model
        self.resnet = models.resnet50( weights = ResNet50_Weights.DEFAULT )
        
        # Freeze convolutional layers if using pretrained weights
        if pretrained:
            for param in self.resnet.parameters():
                param.requires_grad = False
        
        # Replace the last fully connected layer
        num_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Sequential ( nn.Linear ( num_features, 512 ), nn.ReLU(), 
                                        nn.Dropout ( 0.5 ), nn.Linear ( 512, num_classes ))
        
    def forward(self, x):
        x = self.resnet(x)
        return x

In [82]:
class SiameseNetwork(nn.Module):
    def __init__(self, num_classes = 10):
        super(SiameseNetwork, self).__init__()
        
        # Define the architecture for each subnetwork (or branch)
        self.subnetwork1 = CustomResNet(num_classes)
        self.subnetwork2 = CustomResNet(num_classes)
        
    def forward(self, input1, input2):
        # Forward pass through both branches
        output1 = self.subnetwork1(input1)
        output2 = self.subnetwork2(input2)
        return output1, output2

In [25]:
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
      # Calculate the euclidian distance and calculate the contrastive loss
      euclidean_distance = F.pairwise_distance(output1, output2, keepdim = True)

      loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
                                    (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))


      return loss_contrastive

In [93]:
def train( root_dir, index):
    model = SiameseNetwork()

    counter = []
    loss_history = []
    iteration_number= 0

    criterion = ContrastiveLoss()
    optimizer = optim.Adam(model.parameters(), lr = 0.001 )

    device = "cuda" if torch.cuda.is_available() else "cpu"

    trainDS = custom_blob_sequence_dataset( root_dir, index)
    train_dataloader = DataLoader ( trainDS, batch_size = 1, shuffle = False, pin_memory = True)

    for epoch in range(100):
        epoch_loss = 0.0
        # Iterate over batches
        for i, (img0, img1, label) in enumerate(train_dataloader):

            # Send the images and labels to CUDA
            #print ( img0 )
            img0, img1, label = img0.to(device), img1.to(device), label.to(device)

            # Zero the gradients
            optimizer.zero_grad()

            # Pass in the two images into the network and obtain two outputs
            output1, output2 = model(img0, img1)
            # Pass the outputs of the networks and label into the loss function
            loss_contrastive = criterion(output1, output2, label)

            epoch_loss += loss_contrastive.item()

            # Calculate the backpropagation
            loss_contrastive.backward()

            # Optimize
            optimizer.step()

            # Every 10 batches print out the loss
            if i % 10 == 0 :
                print(f"Epoch number {epoch} Bacth number{i} Current loss {loss_contrastive.item()}\n")
                iteration_number += 10

                counter.append(iteration_number)
                loss_history.append(loss_contrastive.item())
    
        avg_epoch_loss = epoch_loss / len ( train_dataloader )
        
        print(f"Epoch [{epoch+1}/100], Average Loss: {avg_epoch_loss}")

    print ( "Training Completed." )
    model_name = "model" + str(index)
    print ( "Saving Model." )
    torch.save ( model, model_name )
    print ( "Model saved." )

In [94]:
train( r"C:\Users\97433\Mafkin_Robotics_FrontEnd\VisualOdometry\feature_extraction\dataset_for_ANN\train",0 )

Epoch number 0 Bacth number0 Current loss 3.342322587966919

Epoch number 0 Bacth number10 Current loss 0.0

Epoch number 0 Bacth number20 Current loss 0.0

Epoch number 0 Bacth number30 Current loss 0.0

Epoch number 0 Bacth number40 Current loss 0.0

Epoch number 0 Bacth number50 Current loss 0.0

Epoch number 0 Bacth number60 Current loss 0.0

Epoch number 0 Bacth number70 Current loss 0.0

Epoch number 0 Bacth number80 Current loss 0.0

Epoch number 0 Bacth number90 Current loss 0.0

Epoch number 0 Bacth number100 Current loss 0.0

Epoch number 0 Bacth number110 Current loss 0.0

Epoch number 0 Bacth number120 Current loss 0.0

Epoch number 0 Bacth number130 Current loss 0.0

Epoch number 0 Bacth number140 Current loss 0.0

Epoch number 0 Bacth number150 Current loss 0.0

Epoch number 0 Bacth number160 Current loss 0.0

Epoch number 0 Bacth number170 Current loss 0.0

Epoch number 0 Bacth number180 Current loss 0.0

Epoch number 0 Bacth number190 Current loss 0.0

Epoch number 0 Ba

KeyboardInterrupt: 

In [32]:
def split_into_training_and_testing_folders ( root_dir : str, train_test_split = 0.15 ) -> None:
    """ 
        Splits the root directory into train and test folders.
    """
    
    train_folder = os.path.join ( root_dir , "train" )
    test_folder = os.path.join ( root_dir , "test" )

    for class_folder in os.listdir ( root_dir ):

        if class_folder == "train" or class_folder == "test":
            continue

        if not os.path.exists ( train_folder ): os.makedirs ( train_folder )
        if not os.path.exists ( test_folder ): os.makedirs ( test_folder )

        train_class_folder = os.path.join ( train_folder , class_folder )
        test_class_folder = os.path.join ( test_folder , class_folder )

        if not os.path.exists ( train_class_folder ):
            os.makedirs ( train_class_folder )

        if not os.path.exists ( test_class_folder ):
            os.makedirs ( test_class_folder )

        class_folder_path = os.path.join ( root_dir, class_folder )

        blob_sequence_folder_paths = [ os.path.join ( class_folder_path, blob_sequence_folder ) \
                                    for blob_sequence_folder in os.listdir ( class_folder_path ) ]
        
        train_test_split_index = int ( len ( blob_sequence_folder_paths ) * train_test_split )
        
        test_data = blob_sequence_folder_paths [ : train_test_split_index ]
        train_data = blob_sequence_folder_paths [ train_test_split_index : ]

        for train_data_path in train_data:
            shutil.move ( train_data_path , train_class_folder )
        
        for test_data_path in test_data:
            shutil.move ( test_data_path , test_class_folder  )

In [33]:
train_directory = r"C:\Users\97433\Mafkin_Robotics_FrontEnd\VisualOdometry\feature_extraction\dataset_for_ANN\train"
test_directory = r"C:\Users\97433\Mafkin_Robotics_FrontEnd\VisualOdometry\feature_extraction\dataset_for_ANN\test"

train_DS = custom_blob_sequence_dataset ( root_dir = train_directory )
test_DS = custom_blob_sequence_dataset ( root_dir = test_directory )

train_loader = DataLoader ( train_DS , batch_size = 4 , shuffle = True )
test_loader = DataLoader ( test_DS , batch_size = 4 , shuffle = True  )


In [40]:
class SequenceClassifier ( nn.Module ) :
    def __init__ ( self ) :
        super ( SequenceClassifier , self ).__init__()
        self.fc1 = nn.Linear ( 4 , 16 ).float ()
        self.fc2 = nn.Linear ( 16 , 8 ).float ()
        self.fc3 = nn.Linear ( 8 , 1 ).float ()

    def forward(self, x):
        x = torch.relu ( self.fc1( x ) )
        x = torch.relu ( self.fc2( x ) )
        x = torch.sigmoid( self.fc3( x ) )  
        return x