# sift cnn descriptors 3, load and save image preprocessing and descriptors code

In [4]:
import os
import numpy as np
import scipy.io
import cv2
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, GlobalAveragePooling2D, Input, concatenate, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import requests
from io import BytesIO
from PIL import Image
import time

# Function to print timing information
def print_timing_info(phase, start_time, end_time):
    duration = end_time - start_time
    print(f"{phase} - Start Time: {start_time:.2f}, End Time: {end_time:.2f}, Duration: {duration:.2f} seconds")

# Load File Names and Labels
print("Loading File Names and Labels")
start_time = time.time()

# Load file names from GitHub
github_url = "https://raw.githubusercontent.com/RyanS974/RyanS974/main/datasets/hep2cell/cells2.txt"
response = requests.get(github_url)
file_names = response.text.splitlines()

# Load labels from labels.mat using SciPy
labels_url = "https://raw.githubusercontent.com/RyanS974/RyanS974/main/datasets/hep2cell/labels.mat"
labels_path = tf.keras.utils.get_file("labels.mat", labels_url)
labels = scipy.io.loadmat(labels_path)['labels'].flatten()

# Display the number of file names and labels loaded
print(f"Number of file names loaded: {len(file_names)}")
print(f"Number of labels loaded: {len(labels)}")

# Ensure that the number of file names matches the number of labels
assert len(file_names) == len(labels), "Mismatch between number of file names and labels"

# Filter the file names to ensure they are within the range 1-2,000
filtered_file_names = [file for file in file_names if int(os.path.basename(file).split('.')[0]) <= 2000]

# Select the first 2,000 images from the filtered dataset
selected_file_names = filtered_file_names[:2000]
selected_labels = labels[:2000]

# Display the number of selected file names and labels
print(f"Number of selected file names: {len(selected_file_names)}")
print(f"Number of selected labels: {len(selected_labels)}")

# Ensure that the number of selected file names matches the number of selected labels
assert len(selected_file_names) == len(selected_labels), "Mismatch between number of selected file names and labels"

end_time = time.time()
print_timing_info("Loading Data", start_time, end_time)

# Split Data into Training, Validation, and Testing Sets
print("\nSplitting Data into Training, Validation, and Testing Sets")
start_time = time.time()

# Split the selected file names and labels into training, validation, and testing sets
train_files, temp_files, train_labels, temp_labels = train_test_split(
    selected_file_names, selected_labels, test_size=0.6, random_state=42)

val_files, test_files, val_labels, test_labels = train_test_split(
    temp_files, temp_labels, test_size=0.5, random_state=42)

# Display the number of files in each set
print(f"Number of training files: {len(train_files)}")
print(f"Number of validation files: {len(val_files)}")
print(f"Number of testing files: {len(test_files)}")

# Ensure that the total number of files matches the number of selected files
assert len(train_files) + len(val_files) + len(test_files) == len(selected_file_names), "Mismatch in total number of files after splitting"

end_time = time.time()
print_timing_info("Splitting Data", start_time, end_time)

# Define the target image size
target_size = (96, 96)

# Function to download and preprocess images: resize, convert to grayscale, and normalize
def preprocess_image_for_sift(image_url, target_size, retries=3):
    for attempt in range(retries):
        try:
            # Download the image from the URL
            response = requests.get(image_url)
            response.raise_for_status()
            image = Image.open(BytesIO(response.content))
            # Convert the image to RGB
            image = image.convert('RGB')
            # Resize the image to the target size
            image_resized = image.resize(target_size)
            # Convert the image to grayscale
            image_gray = image_resized.convert('L')
            # Normalize the pixel values to the range [0, 1]
            image_normalized = np.array(image_gray) / 255.0
            # Convert to uint8
            image_uint8 = (image_normalized * 255).astype(np.uint8)
            return image_uint8
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1} failed: {e}")
            if attempt == retries - 1:
                raise

# Preprocess images with progress reporting
def preprocess_images(image_paths, target_size):
    images = []
    total_images = len(image_paths)
    for i, path in enumerate(image_paths):
        image = preprocess_image_for_sift(path, target_size)
        if image is not None:
            images.append(image)
        if (i + 1) % 500 == 0:
            percent_done = ((i + 1) / total_images) * 100
            print(f"Processed {i + 1} of {total_images} images ({percent_done:.2f}%)")
    return np.array(images)

# Extract SIFT descriptors with progress reporting
def extract_sift_descriptors(images):
    sift = cv2.SIFT_create()
    all_descriptors = []
    total_images = len(images)
    for i, img in enumerate(images):
        img_uint8 = (img * 255).astype(np.uint8)
        keypoints, descriptors = sift.detectAndCompute(img_uint8, None)
        if descriptors is None:
            descriptors = np.zeros((1, 128))
        all_descriptors.append(descriptors)
        if (i + 1) % 500 == 0:
            percent_done = ((i + 1) / total_images) * 100
            print(f"Extracted SIFT descriptors for {i + 1} of {total_images} images ({percent_done:.2f}%)")
    return all_descriptors

# Check if preprocessed images and descriptors exist
preprocessed_data_exists = os.path.exists('train_images.npy') and os.path.exists('val_images.npy') and os.path.exists('test_images.npy') and os.path.exists('train_descriptors.npy') and os.path.exists('val_descriptors.npy') and os.path.exists('test_descriptors.npy')

if preprocessed_data_exists:
    # Load preprocessed images and descriptors
    print("\nLoading Preprocessed Data")
    start_time = time.time()
    
    train_images = np.load('train_images.npy')
    val_images = np.load('val_images.npy')
    test_images = np.load('test_images.npy')
    train_descriptors = np.load('train_descriptors.npy', allow_pickle=True)
    val_descriptors = np.load('val_descriptors.npy', allow_pickle=True)
    test_descriptors = np.load('test_descriptors.npy', allow_pickle=True)
    
    end_time = time.time()
    print_timing_info("Loading Preprocessed Data", start_time, end_time)
else:
    # Preprocess training images
    print("\nPreprocessing Images")
    start_time = time.time()
    
    train_images = preprocess_images([f"https://raw.githubusercontent.com/RyanS974/RyanS974/main/datasets/hep2cell/{file}" for file in train_files], target_size)
    val_images = preprocess_images([f"https://raw.githubusercontent.com/RyanS974/RyanS974/main/datasets/hep2cell/{file}" for file in val_files], target_size)
    test_images = preprocess_images([f"https://raw.githubusercontent.com/RyanS974/RyanS974/main/datasets/hep2cell/{file}" for file in test_files], target_size)
    
    end_time = time.time()
    print_timing_info("Preprocessing Images", start_time, end_time)
    
    # Save preprocessed images
    np.save('train_images.npy', train_images)
    np.save('val_images.npy', val_images)
    np.save('test_images.npy', test_images)

# Convert lists to numpy arrays
train_images = np.array(train_images)
val_images = np.array(val_images)
test_images = np.array(test_images)

# Display the shapes of the preprocessed image arrays
print(f"Shape of training images: {train_images.shape}")
print(f"Shape of validation images: {val_images.shape}")
print(f"Shape of testing images: {test_images.shape}")

# Ensure that the number of images matches the number of labels in each set
assert train_images.shape[0] == len(train_labels), "Mismatch between number of training images and labels"
assert val_images.shape[0] == len(val_labels), "Mismatch between number of validation images and labels"
assert test_images.shape[0] == len(test_labels), "Mismatch between number of testing images and labels"

# Visualize 5 images after loading
for i in range(5):
    plt.imshow(train_images[i], cmap='gray')
    plt.title(f"Training Image {i+1}")
    plt.axis('off')
    plt.show()

# Extract SIFT Descriptors
if not preprocessed_data_exists:
    print("\nExtracting SIFT Descriptors")
    start_time = time.time()
    
    train_descriptors = extract_sift_descriptors(train_images)
    val_descriptors = extract_sift_descriptors(val_images)
    test_descriptors = extract_sift_descriptors(test_images)
    
    end_time = time.time()
    print_timing_info("Extracting SIFT Descriptors", start_time, end_time)
    
    # Save extracted descriptors
    np.save('train_descriptors.npy', train_descriptors)
    np.save('val_descriptors.npy', val_descriptors)
    np.save('test_descriptors.npy', test_descriptors)

# Filter images and labels to match the descriptors
train_images = train_images[:len(train_descriptors)]
train_labels = train_labels[:len(train_descriptors)]
val_images = val_images[:len(val_descriptors)]
val_labels = val_labels[:len(val_descriptors)]
test_images = test_images[:len(test_descriptors)]
test_labels = test_labels[:len(test_descriptors)]

# Calculate percentages of images with descriptors
train_percentage = (len(train_descriptors) / len(train_images)) * 100
val_percentage = (len(val_descriptors) / len(val_images)) * 100
test_percentage = (len(test_descriptors) / len(test_images)) * 100

# Calculate average number of descriptors per image
train_avg_descriptors = np.mean([len(desc) for desc in train_descriptors])
val_avg_descriptors = np.mean([len(desc) for desc in val_descriptors])
test_avg_descriptors = np.mean([len(desc) for desc in test_descriptors])

# Display the statistics
print(f"Number of descriptors for the first training image: {len(train_descriptors[0]) if train_descriptors[0] is not None else 0}")
print(f"Number of training images with descriptors: {len(train_descriptors)} ({train_percentage:.2f}%)")
print(f"Number of validation images with descriptors: {len(val_descriptors)} ({val_percentage:.2f}%)")
print(f"Number of testing images with descriptors: {len(test_descriptors)} ({test_percentage:.2f}%)")
print(f"Average number of descriptors per training image: {train_avg_descriptors:.2f}")
print(f"Average number of descriptors per validation image: {val_avg_descriptors:.2f}")
print(f"Average number of descriptors per testing image: {test_avg_descriptors:.2f}")

# Ensure that descriptors are extracted for all images
assert len(train_descriptors) > 0, "Failed to extract descriptors for all training images"
assert len(val_descriptors) > 0, "Failed to extract descriptors for all validation images"
assert len(test_descriptors) > 0, "Failed to extract descriptors for all testing images"

# Function to pad descriptors to a fixed size
def pad_descriptors(descriptors, max_descriptors):
    padded_descriptors = np.zeros((max_descriptors, 128))
    if descriptors is not None and len(descriptors) > 0:
        num_desc = min(len(descriptors), max_descriptors)
        padded_descriptors[:num_desc, :] = descriptors[:num_desc, :]
    return padded_descriptors

# Define the maximum number of descriptors
max_descriptors = 4000  # Adjust based on your dataset

# Pad descriptors for training, validation, and testing sets
train_padded = np.array([pad_descriptors(desc, max_descriptors) if desc is not None else np.zeros((max_descriptors, 128)) for desc in train_descriptors])
val_padded = np.array([pad_descriptors(desc, max_descriptors) if desc is not None else np.zeros((max_descriptors, 128)) for desc in val_descriptors])
test_padded = np.array([pad_descriptors(desc, max_descriptors) if desc is not None else np.zeros((max_descriptors, 128)) for desc in test_descriptors])

# Ensure that the number of padded descriptors matches the number of images
assert train_padded.shape[0] == train_images.shape[0], "Mismatch between number of training images and padded descriptors"
assert val_padded.shape[0] == val_images.shape[0], "Mismatch between number of validation images and padded descriptors"
assert test_padded.shape[0] == test_images.shape[0], "Mismatch between number of testing images and padded descriptors"

# Reshape images to match the input shape of the model
train_images = train_images.reshape((train_images.shape[0], 96, 96, 1))
val_images = val_images.reshape((val_images.shape[0], 96, 96, 1))
test_images = test_images.reshape((test_images.shape[0], 96, 96, 1))

# Reshape padded descriptors to match the input shape of the model
train_padded = train_padded.reshape((train_padded.shape[0], max_descriptors, 128, 1))
val_padded = val_padded.reshape((val_padded.shape[0], max_descriptors, 128, 1))
test_padded = test_padded.reshape((test_padded.shape[0], max_descriptors, 128, 1))

# Define the CNN model for image data
image_input = Input(shape=(96, 96, 1), name='image_input')
x = Conv2D(32, (3, 3), activation='relu')(image_input)
x = MaxPooling2D((2, 2))(x)
x = Conv2D(64, (3, 3), activation='relu')(x)
x = MaxPooling2D((2, 2))(x)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
image_output = Dense(128, activation='relu')(x)

# Define the model for SIFT descriptors
desc_input = Input(shape=(max_descriptors, 128, 1), name='desc_input')
y = Conv2D(32, (3, 3), activation='relu')(desc_input)
y = MaxPooling2D((2, 2))(y)
y = Conv2D(64, (3, 3), activation='relu')(y)
y = MaxPooling2D((2, 2))(y)
y = Flatten()(y)
y = Dense(128, activation='relu')(y)
desc_output = Dense(128, activation='relu')(y)

# Concatenate the outputs of both branches
combined = concatenate([image_output, desc_output])

# Add final classification layers
z = Dense(128, activation='relu')(combined)
z = Dropout(0.5)(z)  # Add dropout for regularization
z = Dense(1, activation='sigmoid')(z)

# Define the final model
model = Model(inputs=[image_input, desc_input], outputs=z)

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Display the model summary
model.summary()

# Train the Model
print("\nTraining the Model")
start_time = time.time()

# Train the CNN model using the training data
history = model.fit(
    [train_images, train_padded], train_labels,
    epochs=10,
    batch_size=32,
    validation_data=([val_images, val_padded], val_labels)
)

end_time = time.time()
print_timing_info("Training Model", start_time, end_time)

# Display the training history
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.show()

# Visualize SIFT Descriptors

# Function to visualize SIFT descriptors on images
def visualize_sift(image, keypoints, max_descriptors=5):
    overlay = cv2.drawKeypoints(image, keypoints[:max_descriptors], None, flags=cv2.DRAW_MATCHES_FLAGS_DRAW_RICH_KEYPOINTS)
    plt.imshow(overlay, cmap='gray')
    plt.axis('off')
    plt.show()

# Visualize SIFT descriptors for 3 images
for i in range(3):
    keypoints, _ = extract_sift_descriptors(train_images[i])
    visualize_sift(train_images[i], keypoints)

Loading File Names and Labels
Number of file names loaded: 63445
Number of labels loaded: 63445
Number of selected file names: 2000
Number of selected labels: 2000
Loading Data - Start Time: 1732419429.90, End Time: 1732419430.44, Duration: 0.54 seconds

Splitting Data into Training, Validation, and Testing Sets
Number of training files: 800
Number of validation files: 600
Number of testing files: 600
Splitting Data - Start Time: 1732419430.44, End Time: 1732419430.44, Duration: 0.00 seconds

Preprocessing Images


KeyboardInterrupt: 