In [None]:
import os
import matplotlib.pyplot as plt
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import tensorflow as tf
import numpy as np
import pandas as pd
import random
import cv2
import uuid
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
import shutil
from PIL import Image
from tqdm import tqdm

In [None]:
from keras.layers import Dense, Conv2D, GlobalAveragePooling2D, MaxPool2D, Dropout, BatchNormalization, Input
from keras.optimizers import Adam
from keras.losses import SparseCategoricalCrossentropy, CategoricalCrossentropy
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
from keras.layers import Flatten
from keras.models import Model  #the functional API

In [None]:
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

# **Making required directories**

In [None]:
#setting up the directory structure for our project. Namely, three folders Anchor, Negative, Positive
pos_path = os.path.join('data', 'positive')  
#positive images (verification images)
neg_path = os.path.join('data', 'negative')       
#negative images (different from the anchor images)
anc_path = os.path.join('data', 'anchor')        
#input image (object to be recognized)

In [None]:
os.makedirs(pos_path,exist_ok=True)
os.makedirs(neg_path,exist_ok=True)
os.makedirs(anc_path,exist_ok=True)
#first, we collect the negative examples through Labelled Faces in the wild repository
#first download the required files from the website into the working directory. Then run the cell




In [None]:
!tar -xf lfw.tgz

# if the code has ran earlier then already pre-installed directories. no need to run them again....


# will take hell__lot_time. (stop the cell after  3min or even less) will have enough images downloaded to proceed

In [None]:
def download_image(url, folder):
    response = requests.get(url)
    if response.status_code == 200:
        # Extract the filename from the URL
        filename = os.path.join(folder, os.path.basename(url))
        with open(filename, 'wb') as file:
            file.write(response.content)
        print(f"Image downloaded: {filename}")
    else:
        print(f"Failed to download image from {url}")

# Function to download images from a webpage
def download_images_from_webpage(webpage_url, folder):
    # Fetch HTML content
    response = requests.get(webpage_url)
    if response.status_code == 200:
        soup = BeautifulSoup(response.text, 'html.parser')

        # Find all image tags
        img_tags = soup.find_all('img')

        # Download each image
        for img_tag in img_tags:
            img_url = img_tag.get('src')
            if img_url:
                img_url = urljoin(webpage_url, img_url)
                download_image(img_url, folder)
    else:
        print(f"Failed to fetch webpage content from {webpage_url}")

# Specify the webpage URL and the folder to save images
webpage_url = 'https://vis-www.cs.umass.edu/lfw/sets_1.html'
download_folder = 'downloaded_images'

# Create the folder if it doesn't exist
os.makedirs(download_folder, exist_ok=True)

# Download images from the webpage
download_images_from_webpage(webpage_url, download_folder)


# creating positive pair of images from the dataset link provided....logical way to pair images....if the first 10 characters of images are matching they are formed as a positive pair(with label 1)

In [None]:
def categorize_images(source_dir, dest_dir1, dest_dir2):
    # Create destination directories if they don't exist
    os.makedirs(dest_dir1, exist_ok=True)
    os.makedirs(dest_dir2, exist_ok=True)

    # Dictionary to store pairs of images based on the first 10 characters
    image_pairs = {}

    # Iterate through each file in the source directory
    for filename in os.listdir(source_dir):
        if filename.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp')):
            # Extract the first 10 characters from the filename
            first_10_chars = filename[:10]

            # Check if the first 10 characters exist in the dictionary
            if first_10_chars in image_pairs:
                # Move the current file to dest_dir2
                destination_path = os.path.join(dest_dir2, filename)
                shutil.move(os.path.join(source_dir, filename), destination_path)

                # Move the paired file to dest_dir1
                paired_filename = image_pairs[first_10_chars]
                paired_destination_path = os.path.join(dest_dir1, paired_filename)
                shutil.move(os.path.join(source_dir, paired_filename), paired_destination_path)

                # Remove the pair from the dictionary
                del image_pairs[first_10_chars]
            else:
                # Add the first 10 characters and filename to the dictionary
                image_pairs[first_10_chars] = filename

# Example usage:
source_directory = 'downloaded_images'
destination_directory1 = 'p1'
destination_directory2 = 'p2'

categorize_images(source_directory, destination_directory1, destination_directory2)


# friends
# like if you want to add more negative pictures taken from them camera....
# if you have friends around___take their pictures........if you are alone___then just move out of the frame and take lesser pics

# hold 'a' to capture multiple shots
# press 'q' to quit


In [None]:
save_directory = "captured_images_friends"
os.makedirs(save_directory, exist_ok=True)

# Find the latest image index in the directory
latest_index = max([0] + [int(filename.split("_")[2].split(".")[0]) for filename in os.listdir(save_directory) if filename.startswith("captured_image")])

# Open the webcam
cap = cv2.VideoCapture(0)

while True:
    # Read a frame from the webcam
    ret, frame = cap.read()

    # Check if the frame is successfully captured
    if not ret:
        print("Failed to capture frame")
        break

    # Crop the frame to the region (0:500, 0:500)
    height, width, _ = frame.shape

    # Crop the same pixels but orient it towards the center-top
    cropped_frame = frame[0:500, (width-500)//2:(width+500)//2]


    # Display the cropped frame
    cv2.imshow("Cropped Frame", cropped_frame)
    #collecting images
    if(cv2.waitKey(1) & 0XFF == ord('a')):
        imgname = os.path.join(save_directory, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, cropped_frame)
    # Check for 'q' key press to exit
    if cv2.waitKey(1) & 0xFF == ord('q'):
        # Save the cropped frame with an incremented index to the directory
        image_index = latest_index + 1
        image_path = os.path.join(save_directory, f"captured_image_{image_index}.png")
        cv2.imwrite(image_path, cropped_frame)
        
        break

# Release the webcam and close all OpenCV windows
cap.release()
cv2.destroyAllWindows()


# !! this is for additional images.....like taking the pictures in different orientations and different lightings 


# take atleast 22 images....hold 'a' to store a lot of screen captures at once

In [None]:
save_directory = "captured_images"
os.makedirs(save_directory, exist_ok=True)
latest_index = max([0] + [int(filename.split("_")[2].split(".")[0]) for filename in os.listdir(save_directory) if filename.startswith("captured_image")])
cap = cv2.VideoCapture(0)
while True:
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture frame")
        break
    height, width, _ = frame.shape
    cropped_frame = frame[0:500, (width-500)//2:(width+500)//2]
    cv2.imshow("Cropped Frame", cropped_frame)
    # Check for 'q' key press to exit
    if(cv2.waitKey(1) & 0XFF == ord('a')):
        imgname = os.path.join(save_directory, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, cropped_frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        # Save the cropped frame with an incremented index to the directory
        image_index = latest_index + 1
        image_path = os.path.join(save_directory, f"captured_image_{image_index}.png")
        cv2.imwrite(image_path, cropped_frame)
        
        break

# Release the webcam and close all OpenCV windows
cap.release()
cv2.destroyAllWindows()


# step 1 of making positive pairs with all types of combination of the above images.
# making a directory containing the same positive pair but in different order

In [None]:
input_directory = "captured_images"
output_directory = "captured_images_2"

# Create the output directory if it doesn't exist
os.makedirs(output_directory, exist_ok=True)

# List all files in the input directory
file_list = [filename for filename in os.listdir(input_directory) if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))]

# Iterate through each file in the input directory
for filename in tqdm(file_list, desc="Processing images", unit="image"):
    # Construct the full path of the input image
    input_image_path = os.path.join(input_directory, filename)

    try:
        # Open the image using PIL
        with Image.open(input_image_path) as img:
            # Generate 22 copies of the image
            for i in range(22):
                # Construct the filename for the output image
                output_filename = f"{os.path.splitext(filename)[0]}_{i + 1}.jpg"
                output_image_path = os.path.join(output_directory, output_filename)

                # Save the copied image to the output directory
                img.save(output_image_path)
    except (OSError, UnidentifiedImageError):
        print(f"Skipped non-image file: {filename}")

print(f"Processed {len(file_list)} images. Created {len(file_list) * 22} images in {output_directory}.")


In [None]:
target_size = (100, 100)
# Create the target directory if it doesn't exist
os.makedirs(neg_path, exist_ok=True)
for directory in os.listdir('lfw'):
    for file in os.listdir(os.path.join('lfw', directory)):
        EX_PATH = os.path.join('lfw', directory, file)
        NEW_PATH = os.path.join(neg_path, file)
        # Open the image
        with Image.open(EX_PATH) as img:
            # Resize the image
            img_resized = img.resize(target_size)
            # Save the resized image to the target directory
            img_resized.save(NEW_PATH)
        # Remove the original image (optional)
        os.remove(EX_PATH)
"""
first we loop over every sub directory in the lfw folder. Then for every file in each sub directory, we get the path of the file, create a new path for the
file and the replace the two paths (in effect cutting the files from the initial location to the final location)"""

# **Writing a function to capture User's image through the webcam**

In [None]:
pip install --upgrade opencv-python


# this is the real capture 

In [None]:
#However, the resolution is large and needs to be reduced for processing
#for this we make a change to the original image capturing function
#capturing the anchor and the positive images using our webcam and opencv
#step1. establish a connection to the webcam using VideoCapture()
cap = cv2.VideoCapture(0)
while cap.isOpened():
    ret, frame = cap.read()
    frame = frame[50:650, 600:1000, :]  #arbitrary values
    #collecting anchor images
    if(cv2.waitKey(1) & 0XFF == ord('a')):
        imgname = os.path.join(anc_path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    #collecting positive images
    if(cv2.waitKey(1) & 0XFF == ord('p')):
        imgname = os.path.join(pos_path, '{}.jpg'.format(uuid.uuid1()))
        cv2.imwrite(imgname, frame)

    cv2.imshow("Image Collection", frame)
    if(cv2.waitKey(1) & 0XFF == ord('q')):
        break
#Release the webcam and destroy the image show frame
cap.release()
cv2.destroyAllWindows()

In [None]:
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
plt.axis(False)
plt.show()
#this is actually the last frame captured by the camera before i pressed q (as soon as it is pressed, the image capture stops)

In [None]:
# Set the desired size for resizing
resize_width = 200
resize_height = 150

# Assuming 'frame' is the variable that contains the image
# Resize the frame to the desired dimensions
frame_resized = cv2.resize(frame, (resize_width, resize_height))

# Save the resized frame as an anchor image
imgname = os.path.join(anc_path, '{}.jpg'.format(uuid.uuid1()))
cv2.imwrite(imgname, frame_resized)
print("Anchor image saved:", imgname)

# Save the resized frame as a positive image
imgname = os.path.join(pos_path, '{}.jpg'.format(uuid.uuid1()))
cv2.imwrite(imgname, frame_resized)
print("Positive image saved:", imgname)


In [None]:
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
plt.axis(False)
plt.show()
#last image captured by the camera. Also, in this case, the resolution is smaller than the pervious case

In [None]:
def data_aug(img, num_augmentations=5):
    augmented_data = []

    for i in range(num_augmentations):
        img_aug = img

        # Apply random transformations
        img_aug = tf.image.stateless_random_brightness(img_aug, max_delta=0.02, seed=(1, 2))
        img_aug = tf.image.stateless_random_contrast(img_aug, lower=0.6, upper=1, seed=(1, 3))
        img_aug = tf.image.stateless_random_flip_left_right(img_aug, seed=(np.random.randint(100), np.random.randint(100)))
        img_aug = tf.image.stateless_random_jpeg_quality(img_aug, min_jpeg_quality=90, max_jpeg_quality=100, seed=(np.random.randint(100), np.random.randint(100)))
        img_aug = tf.image.stateless_random_saturation(img_aug, lower=0.9, upper=1, seed=(np.random.randint(100), np.random.randint(100)))

        augmented_data.append(img_aug)

    return augmented_data


In [None]:
for file_name in os.listdir(pos_path):
    img_path = os.path.join(pos_path, file_name)
    img = cv2.imread(img_path)
    
    if img is not None:
        augmented_images = data_aug(img)

        # Use the original filename with an index to avoid overwriting
        base_file_name, file_extension = os.path.splitext(file_name)
        
        for idx, augmented_image in enumerate(augmented_images):
            new_file_name = '{}_aug_{}{}'.format(base_file_name, idx, file_extension)
            new_img_path = os.path.join(pos_path, new_file_name)
            
            # Save the augmented image
            cv2.imwrite(new_img_path, augmented_image.numpy().astype(np.uint8))


            # Check if the file exists
            if os.path.exists(new_img_path):
                print(f"Successfully saved: {new_img_path}")
            else:
                print(f"Error: Failed to save {new_img_path}")

In [None]:
for file_name in os.listdir(anc_path):
    img_path = os.path.join(anc_path, file_name)
    img = cv2.imread(img_path)
    
    if img is not None:
        augmented_images = data_aug(img)

        # Use the original filename with an index to avoid overwriting
        base_file_name, file_extension = os.path.splitext(file_name)
        
        for idx, augmented_image in enumerate(augmented_images):
            new_file_name = '{}_aug_{}{}'.format(base_file_name, idx, file_extension)
            new_img_path = os.path.join(anc_path, new_file_name)
            
            # Save the augmented image
            cv2.imwrite(new_img_path, augmented_image.numpy().astype(np.uint8))


            # Check if the file exists
            if os.path.exists(new_img_path):
                print(f"Successfully saved: {new_img_path}")
            else:
                print(f"Error: Failed to save {new_img_path}")

# **Image Preprocessing for the Neural Network**

### 1. Getting the image directories

In [None]:
anchor = tf.data.Dataset.list_files(anc_path +"/*.jpg").take(500)
positive = tf.data.Dataset.list_files(pos_path +"/*.jpg").take(500)
negative = tf.data.Dataset.list_files(neg_path +"/*.jpg").take(400)

In [None]:
positive_1_path = os.path.join('p1')
positive_2_path=os.path.join('p2')

# adding some more data.....like labelling the similar datasets to be 1 and different datasets to be 0

In [None]:
file_paths = sorted(tf.io.gfile.glob(os.path.join(positive_1_path, '*.jpg')))
# Use from_tensor_slices to maintain order
positive_1 = tf.data.Dataset.from_tensor_slices(file_paths).take(1000)
file_paths2 = sorted(tf.io.gfile.glob(os.path.join(positive_2_path, '*.jpg')))
# Use from_tensor_slices to maintain order
positive_2 = tf.data.Dataset.from_tensor_slices(file_paths2).take(1000)
# positive_1=tf.data.Dataset.list_files(file_paths+"/*.jpg").take(500)
file_paths_mine = sorted(tf.io.gfile.glob(os.path.join("captured_images", '*.jpg')))
positive_mine=tf.data.Dataset.from_tensor_slices(file_paths_mine).take(500)
# positive_2=tf.data.Dataset.list_files(file_paths2+"/*.jpg").take(500)
file_paths_mine2 = sorted(tf.io.gfile.glob(os.path.join("captured_images_2", '*.jpg')))
positive_mine2 = tf.data.Dataset.from_tensor_slices(file_paths_mine2).take(500)
# file_paths_friends=sorted(tf.io.gfile.glob(os.path.join("captured_images_friends", '*.png')))
# friends=tf.data.Dataset.from_tensor_slices(file_paths_friends).take(500)
friends_path = 'captured_images_friends'
friends = tf.data.Dataset.list_files(os.path.join(friends_path, '*.jpg')).take(500)

In [None]:
friends

In [None]:
tf.data.experimental.cardinality(friends).numpy()


In [None]:
#to create a performant data pipeline, we first create a function which is mapped to the entire dataset
def preprocess(file_path):
    #reading in image from filepath
    byte_img = tf.io.read_file(file_path)
    img = tf.io.decode_jpeg(byte_img)
    img = tf.image.resize(img, (100,100))
    img = img/255.0   #rescaling for better learning
    return img

### Creating a labelled dataset

In [None]:
# (anchor, positive) = 1,1,1,1,1
# (anchor, negative) = 0,0,0,0,0
positives = tf.data.Dataset.zip((anchor, positive, tf.data.Dataset.from_tensor_slices(tf.ones(len(anchor)))))
negatives = tf.data.Dataset.zip((anchor.repeat(), negative, tf.data.Dataset.from_tensor_slices(tf.zeros(len(negative)))))
data = positives.concatenate(negatives)
# tf.data.experimental.cardinality(negatives).numpy()
additional_positives=tf.data.Dataset.zip((positive_1, positive_2, tf.data.Dataset.from_tensor_slices(tf.ones(len(positive_1)))))
data=data.concatenate(additional_positives)
p_mine = tf.data.Dataset.zip((positive_mine.repeat(), positive_mine2, tf.data.Dataset.from_tensor_slices(tf.ones(484))))
p_mine= p_mine.shuffle(100000)
p_mine=p_mine.take(100)
n_friends=tf.data.Dataset.zip((positive_mine2,friends.repeat(),tf.data.Dataset.from_tensor_slices(tf.zeros(100))))
n_friends= n_friends.shuffle(1000)
n_friends=n_friends.take(100)
m_friends=tf.data.Dataset.zip((positive_mine.repeat(),friends.repeat(),tf.data.Dataset.from_tensor_slices(tf.zeros(100))))
m_friends= m_friends.shuffle(1000)
m_friends=m_friends.take(100)
# # Concatenate the converted p_mine with data
data = data.concatenate(p_mine)
data=data.concatenate(n_friends)
data=data.concatenate(m_friends)




# the below two codes are just for viewing....if taking lot of time to load the images.....then stop the below two cells

In [None]:
# Assuming anchor and positive are datasets with file paths to images
# data is your concatenated dataset
# Iterate through the positive pairs in the dataset
for anchor_path, positive_path, label in data:
    # Check if the pair is a positive pair (label=1)
    if label.numpy() == 1:
        # Load images from file paths
        anchor_img = Image.open(anchor_path.numpy().decode())
        positive_img = Image.open(positive_path.numpy().decode())

        # Display the images
        plt.figure()
        plt.subplot(1, 2, 1)
        plt.imshow(anchor_img)
        plt.title('Anchor Image')

        plt.subplot(1, 2, 2)
        plt.imshow(positive_img)
        plt.title('Positive Image')
        plt.show()


In [None]:
# Assuming anchor and positive are datasets with file paths to images
# data is your concatenated dataset
# Iterate through the positive pairs in the dataset
for anchor_path, negative_path, label in data:
    # Check if the pair is a positive pair (label=1)
    if label.numpy() == 0:
        # Load images from file paths
        anchor_img = Image.open(anchor_path.numpy().decode())
        negative_img = Image.open(negative_path.numpy().decode())

        # Display the images
        plt.figure()
        plt.subplot(1, 2, 1)
        plt.imshow(anchor_img)
        plt.title('Anchor Image')

        plt.subplot(1, 2, 2)
        plt.imshow(negative_img)
        plt.title('negative Image')

        plt.show()


In [None]:
len(list(data))

In [None]:
#visualizing what is happening
sample = data.as_numpy_iterator()
sample.next()


In [None]:
def preprocess_twin(input_img, validation_img, label):
    return (preprocess(input_img), preprocess(validation_img), label)

res = preprocess_twin(*sample.next())
plt.subplot(1,2,1)
plt.imshow(res[0])
plt.axis(False)
plt.subplot(1,2,2)
plt.imshow(res[1])
plt.axis(False)

#this is an example of anchor and (positive or negative) image

In [None]:
data

In [None]:
#building the data loader pipeline
data = data.map(preprocess_twin)
data = data.cache()
data = data.shuffle(buffer_size = 100000)

In [None]:
samples = data.as_numpy_iterator()
res = samples.next()
plt.subplot(1,2,1)
plt.imshow(res[0])
plt.axis(False)
plt.subplot(1,2,2)
plt.imshow(res[1])
plt.axis(False)

#this is an example of anchor and (negative or positive) image

In [None]:
#splitting the entire data into training and testing data
train_data = data.take(round(len(data)*0.7))
# train_data = train_data.batch(16).prefetch(tf.data.AUTOTUNE)
test_data = data.skip(round(len(data)*0.7))
test_data = test_data.take(round(len(data)*0.3))
# test_data = test_data.batch(16).prefetch(tf.data.AUTOTUNE)
print(tf.data.experimental.cardinality(test_data).numpy())

# was facing difficulty in zip dataframe...so converted it to pandas dataframe

In [None]:
import pandas as pd
import numpy as np

# Assuming data is your dataset
data_list = list(data.as_numpy_iterator())  # Convert the dataset to a list

# Extracting anchor images, positive/negative images, and labels
anchor_images = [item[0].flatten() for item in data_list]
positive_or_negative_images = [item[1].flatten() for item in data_list]
labels = [item[2] for item in data_list]

# Create a pandas DataFrame
df = pd.DataFrame({'anchor_images': anchor_images, 
                   'positive_or_negative_images': positive_or_negative_images, 
                   'labels': labels})

# Display the DataFrame
print(df)


In [None]:
from sklearn.model_selection import train_test_split
data_list = list(data.as_numpy_iterator())  
anchor_images = [item[0] for item in data_list]
positive_or_negative_images = [item[1] for item in data_list]
labels = [item[2] for item in data_list]
pairs = []
for i in range(len(anchor_images)):
    pairs.append((anchor_images[i], positive_or_negative_images[i], labels[i]))
train_pairs, test_pairs = train_test_split(pairs, test_size=0.3, random_state=42)
train_anchor_images, train_positive_or_negative_images, train_labels = zip(*train_pairs)
test_anchor_images, test_positive_or_negative_images, test_labels = zip(*test_pairs)
train_df = pd.DataFrame({'anchor_images': train_anchor_images, 
                         'positive_or_negative_images': train_positive_or_negative_images, 
                         'labels': train_labels})
test_df = pd.DataFrame({'anchor_images': test_anchor_images, 
                        'positive_or_negative_images': test_positive_or_negative_images, 
                        'labels': test_labels})
print("Training Data:")
print(train_df.head())

print("\nTesting Data:")
print(test_df.head())


In [None]:
data

# **Building the Model**

In [None]:
def build_base_model(input_shape):
    model = Sequential([
        Conv2D(64, (9, 9), activation='relu', input_shape=input_shape),
        BatchNormalization(),
        MaxPool2D((2, 2)),
        Conv2D(128, (7, 7), activation='relu'),
        MaxPool2D((2, 2)),
        Conv2D(128, (3, 3), activation='relu'),
        MaxPool2D((2, 2)),
        Conv2D(256, (4, 4), activation='relu'),
        Flatten(),
        Dropout(0.5),
#         layers.Dense(4096, activation='relu'),
        Dense(4096, activation='relu'),
#         layers.Dense(units=1, activation='sigmoid')
    ])
    return model

# Specify input shape based on your image size and channels
input_shape = (100, 100, 3)

# Build the base model
base_model = build_base_model(input_shape)

# Display the model summary
base_model.summary()


### **Building the distance layer**

In [None]:
def build_distance_layer(output_anchor, output_positive):
    distance = tf.reduce_sum(tf.abs(output_anchor - output_positive), axis=1, keepdims=True)
    return distance

# Define placeholder tensors for anchor and positive outputs
output_anchor = Input(shape=(4096,))
output_positive = Input(shape=(4096,))

# Build the distance layer
distance = build_distance_layer(output_anchor, output_positive)

# Display the distance layer summary
distance_model = Model(inputs=[output_anchor, output_positive], outputs=distance)
distance_model.summary()


### **Making the Siamese Model**

In [None]:
def build_siamese_model(input_shape):
    # Assuming build_base_model and build_distance_layer functions are defined
    base_model = build_base_model(input_shape)

    input_anchor = Input(shape=input_shape)
    input_positive = Input(shape=input_shape)

    output_anchor = base_model(input_anchor)
    output_positive = base_model(input_positive)

    distance = build_distance_layer(output_anchor, output_positive)

#     Add a dense layer for binary classification
    logistic_layer = Dense(1, activation='sigmoid')(distance)

    siamese_model = Model(inputs=[input_anchor, input_positive], outputs=logistic_layer)
    return siamese_model

# Specify input shape based on your image size and channels
input_shape = (100, 100, 3)

# Build the siamese model
siamese_model = build_siamese_model(input_shape)

# Display the siamese model summary
siamese_model.summary()


# **Setting up Training Parameters**

In [None]:


# # Specify input shape based on your image size and channels
# input_shape = (100, 100, 3)

# # Build the siamese model
# siamese_model = build_siamese_model(input_shape)

# Compile the model with an optimizer and a loss function
siamese_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Convert DataFrame columns to NumPy arrays
train_anchor_images = np.array(train_df['anchor_images'].tolist())
train_positive_or_negative_images = np.array(train_df['positive_or_negative_images'].tolist())
train_labels = np.array(train_df['labels'].tolist())

test_anchor_images = np.array(test_df['anchor_images'].tolist())
test_positive_or_negative_images = np.array(test_df['positive_or_negative_images'].tolist())
test_labels = np.array(test_df['labels'].tolist())

# Now you can train the model
siamese_model.fit(
    [train_anchor_images, train_positive_or_negative_images],
    train_labels,
    epochs=10,
    batch_size=16,
    validation_data=([test_anchor_images, test_positive_or_negative_images], test_labels)
)


# **Real Time test**

In [None]:
r = tf.keras.metrics.Recall()
p = tf.keras.metrics.Precision()
a=0
b=1
c=0
for test_input, test_val, y_true in test_data.as_numpy_iterator():
    # Ensure that the input data has the correct shape
    test_input = np.expand_dims(test_input, axis=0)
    test_val = np.expand_dims(test_val, axis=0)
    
    # Assuming that the images are RGB, add an extra dimension for the color channel
    test_input = np.expand_dims(test_input, axis=-1)
    test_val = np.expand_dims(test_val, axis=-1)

    # Ensure y_true is a numpy array
    y_true = np.array(y_true)

    # Make predictions
    yhat = siamese_model.predict([test_input, test_val])
    print(yhat)
    # Remove the extra dimensions from yhat
    yhat = np.squeeze(yhat, axis=0)
    
    # Handle the case when y_true has an empty shape
    if y_true.shape == ():
        y_true = np.expand_dims(y_true, axis=0)
    print(y_true)
    # Ensure y_true and yhat have the same shape
    y_true = np.reshape(y_true, yhat.shape)
    if(y_true==1):
        a=max(yhat,a)
    if(y_true==0):
        b=min(yhat,b)
    # Update metrics
    r.update_state(y_true, yhat)
    p.update_state(y_true, yhat) 

print("Recall:", r.result().numpy())
print("Precision:", p.result().numpy())
print("a:",a)


In [None]:
verification_images_dir = 'application_data/verification_images'
input_image_dir = 'application_data/input_image'

# Create directories if they don't exist
os.makedirs(verification_images_dir, exist_ok=True)
os.makedirs(input_image_dir, exist_ok=True)

# Verify if directories are created successfully
print(f"Verification images directory: {verification_images_dir}")
print(f"Input image directory: {input_image_dir}")


In [None]:
os.listdir(os.path.join('application_data', 'verification_images'))
os.path.join('application_data', 'input_image', 'input_image.jpg')
for image in os.listdir(os.path.join('application_data', 'verification_images')):
    validation_img = os.path.join('application_data', 'verification_images', image)
    print(validation_img)
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    for image in os.listdir(os.path.join('application_data', 'verification_images')):
        input_img = preprocess(os.path.join('application_data', 'input_image', 'input_image.jpg'))
        validation_img = preprocess(os.path.join('application_data', 'verification_images', image))
        
        # Make Predictions 
        result = model.predict(list(np.expand_dims([input_img, validation_img], axis=1)))
        results.append(result)
    
    # Detection Threshold: Metric above which a prediciton is considered positive 
    detection = np.sum(np.array(results) > detection_threshold)
    
    # Verification Threshold: Proportion of positive predictions / total positive samples 
    verification = detection / len(os.listdir(os.path.join('application_data', 'verification_images'))) 
    verified = verification > verification_threshold
    
    return results, verified

## taking an input image

In [None]:
cap = cv2.VideoCapture(0)

# Check if the camera is opened successfully
if not cap.isOpened():
    print("Error: Could not open camera.")
else:
    while True:
        # Read a frame from the camera
        ret, frame = cap.read()
        height, width, _ = frame.shape

        # Crop the same pixels but orient it towards the center-top
        frame = frame[0:500, (width-500)//2:(width+500)//2]

        # Display the captured image
        cv2.imshow("Captured Image", frame)

        # Save the captured frame as an image when 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            imgname = os.path.join('application_data', 'input_image', 'captured_image.jpg')
            cv2.imwrite(imgname, frame)
            print("Image captured and saved successfully.")
            break

    # Release the webcam and destroy the image show frame
    cap.release()
    cv2.destroyAllWindows()


In [None]:

import cv2
import os

# Path to the input image
input_image_path = os.path.join('application_data', 'input_image', 'captured_image.jpg')

# Read the original image
original_image = cv2.imread(input_image_path)

# Resize the image to 100x100
resized_image = cv2.resize(original_image, (100, 100))

# Overwrite the original image with the resized version
cv2.imwrite(input_image_path, resized_image)

print(f"Resized image overwritten at {input_image_path}")


## copy/pasting an image from the anchor files to verification file

In [None]:
anchor_dir = 'data/anchor'
verification_dir = 'application_data/verification_images'

# List all files in the 'data/anchor' directory
all_images = os.listdir(anchor_dir)

# Randomly choose one image
random_image = random.choice(all_images)

# Construct the full path for the source image
source_path = os.path.join(anchor_dir, random_image)

# Construct the full path for the destination image in 'application_data/verification_images'
destination_path = os.path.join(verification_dir, random_image)

# Copy the image from source to destination
shutil.copyfile(source_path, destination_path)

# Read the image
image = cv2.imread(destination_path)

# Resize the image to 100x100
resized_image1 = cv2.resize(image, (100, 100))

# Save the resized image
cv2.imwrite(destination_path, resized_image1)

print(f"Image '{random_image}' copied and resized to '{verification_dir}'.")


In [None]:
# Load the captured image
captured_image = cv2.imread(os.path.join('application_data', 'input_image', 'captured_image.jpg'))

# Resize the image
resized_image = cv2.resize(captured_image, (100, 100))




In [None]:
# test1=cv2.imread(os.path.join('test1', 'captured_image_4.png'))
# # Resize the image
# resized_imoge1 = cv2.resize(test1, (100, 100))
# test2=cv2.imread(os.path.join('test2', 'captured_image_6.png'))
# # Resize the image
# resized_imoge2 = cv2.resize(test2, (100, 100))
# siamese_model.predict(
#     [np.expand_dims(resized_imoge1, axis=0), np.expand_dims(resized_imoge2, axis=0)])


In [None]:
def verify(model, detection_threshold, verification_threshold):
    # Build results array
    results = []
    input_img_path = os.path.join('application_data', 'input_image', 'captured_image.jpg')

    if os.path.isfile(input_img_path):
        # Read the input image
        input_img = preprocess(input_img_path)

        # List only image files in 'application_data/verification_images'
        verification_images = [
            os.path.join('application_data', 'verification_images', image)
            for image in os.listdir(os.path.join('application_data', 'verification_images'))
            if image.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp'))
        ]

       

        for validation_img_path in verification_images:
            validation_img = preprocess(validation_img_path)

            # Make Predictions
            result = model.predict([np.expand_dims(input_img, axis=0), np.expand_dims(validation_img, axis=0)])
            results.append(result[0][0])  # Assuming model output shape is (batch_size, 1)

        # Debugging: Print results
        print("Verification Results:", results)

        # Detection Threshold: Metric above which a prediction is considered positive
        detection = np.sum(np.array(results) > detection_threshold)
        print(np.sum(np.array(results)))
        # Verification Threshold: Proportion of positive predictions / total number of negative images
        verification = np.sum(np.array(results) > detection_threshold) / 4031
        verified = np.sum(np.array(results)) > verification_threshold

        return results, verified

    else:
        print(f"Error: Input image {input_img_path} not found.")
        return results, False

# Run verification again with adjusted thresholds
verification_results, verified = verify(siamese_model, 0.6, a)#<= you have to update this 0.4 according to the 
# below

if verified:
    print("Verified")
else:
    print("Not Verified")
