In [None]:
import os
import numpy as np
import tensorflow as tf
from sklearn.model_selection import KFold
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Concatenate, Multiply, Dropout, GlobalAveragePooling2D
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping
from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
# Dataset directory
dataset_dir = r"dir"  # Replace with your dataset path

# Function to load image paths and corresponding labels
def load_image_paths_and_labels(dataset_dir):
    image_paths = []  # To store all image file paths
    labels = []       # To store corresponding labels as integers
    class_names = sorted(os.listdir(dataset_dir))  # Sort class names alphabetically for consistent labeling

    for label, class_name in enumerate(class_names):
        class_dir = os.path.join(dataset_dir, class_name)
        if os.path.isdir(class_dir):  # Ensure it's a directory
            for file_name in os.listdir(class_dir):
                if file_name.lower().endswith(('.png', '.jpg', '.jpeg')):  # Check for valid image extensions
                    file_path = os.path.join(class_dir, file_name)
                    image_paths.append(file_path)
                    labels.append(label)  # Assign label based on the class index

    return image_paths, labels, class_names

# Load data
image_paths, labels, class_names = load_image_paths_and_labels(dataset_dir)

# Convert labels to numpy array
labels = np.array(labels)

print(f"Loaded {len(image_paths)} images from {len(class_names)} classes:")
for i, class_name in enumerate(class_names):
    print(f"Class {i}: {class_name}")

In [None]:
# Data augmentation settings
datagen = ImageDataGenerator(
    rotation_range=20,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    fill_mode='nearest'
)

In [None]:
# Custom generator to yield dual inputs
def dual_input_generator(image_paths, labels, batch_size, input_shape):
    while True:
        indices = np.random.choice(len(image_paths), batch_size)
        batch_images = []
        batch_labels = []

        for i in indices:
            img = tf.keras.utils.load_img(image_paths[i], target_size=input_shape[:2])
            img = tf.keras.utils.img_to_array(img) / 255.0  # Normalize to [0, 1]
            batch_images.append(img)
            batch_labels.append(labels[i])

        batch_images = np.array(batch_images)
        batch_labels = tf.keras.utils.to_categorical(batch_labels, num_classes=4)  # Convert labels to one-hot

        # Yield two identical inputs for dual-stream model
        yield (batch_images, batch_images), batch_labels

In [None]:
# Define a CNN stream by creating a custom layer with ResNet50 features
class CNNStream(tf.keras.layers.Layer):
    def __init__(self, input_shape, name_suffix):
        super(CNNStream, self).__init__(name=f"CNNStream_{name_suffix}")
        self.base_model = ResNet50(weights='imagenet', include_top=False, input_shape=input_shape, name=f'resnet_{name_suffix}')
        self.pooling = GlobalAveragePooling2D()
        self.fc1 = Dense(128, activation='relu')
        self.dropout = Dropout(0.4)

    def call(self, inputs):
        x = self.base_model(inputs)
        x = self.pooling(x)
        x = self.fc1(x)
        x = self.dropout(x)
        return x

In [None]:
# Define the soft attention mechanism
def soft_attention(fusion_features):
    attention = Dense(128, activation='sigmoid')(fusion_features)
    attention = Dense(1, activation='sigmoid')(attention)
    return attention

In [None]:
# Build the Dual-Stream Model with Soft Attention
def build_dual_stream_model(input_shape, num_classes):
    # Stream 1 with unique name suffix
    input1 = Input(shape=input_shape)
    stream1_features = CNNStream(input_shape, 'stream1')(input1)
    
    # Stream 2 with a different name suffix
    input2 = Input(shape=input_shape)
    stream2_features = CNNStream(input_shape, 'stream2')(input2)

    # Concatenate the outputs of both streams
    fusion_features = Concatenate()([stream1_features, stream2_features])

    # Apply the soft attention mechanism
    attention = soft_attention(fusion_features)
    
    # Multiply attention weights with the fusion features
    attended_features = Multiply()([fusion_features, attention])

    # Fully connected layer for classification
    x = Dense(256, activation='relu')(attended_features)
    x = Dropout(0.5)(x)
    output = Dense(num_classes, activation='softmax')(x)

    # Define the model
    model = Model(inputs=[input1, input2], outputs=output)
    return model

In [None]:
# Initialize KFold
n_splits = 5
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)

# Parameters
input_shape = (174, 208, 3)  # Adjust based on your image size
num_classes = 4  # 4 classes
batch_size = 32
epochs = 40

# Store results
fold_accuracies = []
fold_losses = []

# for fold, (train_idx, val_idx) in enumerate(kf.split(image_paths)):
#     print(f"Starting Fold {fold+1}/{n_splits}")
    
#     # Split into training and validation
#     train_images = [image_paths[i] for i in train_idx]
#     train_labels = labels[train_idx]
#     val_images = [image_paths[i] for i in val_idx]
#     val_labels = labels[val_idx]
# for fold, (train_idx, val_idx) in enumerate(kf.split(image_paths)):
#     print(f"\nStarting Fold {fold + 1}/5")
    
#     # Train and validation splits
#     train_images = image_paths[train_idx]
#     train_labels = labels[train_idx]
#     val_images = image_paths[val_idx]
#     val_labels = labels[val_idx]

# Convert image_paths and labels to NumPy arrays
image_paths = np.array(image_paths)  # Convert the list of file paths to a NumPy array
labels = np.array(labels)            # Convert the list of labels to a NumPy array
from sklearn.model_selection import KFold

# Initialize KFold
kf = KFold(n_splits=5, shuffle=True, random_state=42)
for fold, (train_idx, val_idx) in enumerate(kf.split(image_paths)):
    print(f"\nStarting Fold {fold + 1}/5")
    
    # Convert indices to NumPy arrays (just in case)
    train_idx = np.array(train_idx)
    val_idx = np.array(val_idx)

    # Split data using the indices
    train_images = image_paths[train_idx]  # Train image paths
    train_labels = labels[train_idx]       # Train labels
    val_images = image_paths[val_idx]      # Validation image paths
    val_labels = labels[val_idx]           # Validation labels

    print(f"Fold {fold + 1}: Training on {len(train_images)} samples, Validating on {len(val_images)} samples")
print(f"Type of train_idx: {type(train_idx)}, Shape: {train_idx.shape}")
print(f"Type of train_images: {type(train_images)}, Shape: {len(train_images)}")

In [None]:
# Create data generators
train_generator = dual_input_generator(train_images, train_labels, batch_size, input_shape)
val_generator = dual_input_generator(val_images, val_labels, batch_size, input_shape)

In [None]:
# Build and compile the model
model = build_dual_stream_model(input_shape, num_classes)
model.compile(optimizer=Adam(learning_rate=0.0001), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# Callbacks
lr_scheduler = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)

    # Train the model
history = model.fit(
    train_generator,
    steps_per_epoch=len(train_images) // batch_size,
    validation_data=val_generator,
    validation_steps=len(val_images) // batch_size,
    epochs=epochs,
    callbacks=[lr_scheduler, early_stopping]
)

In [None]:
# Evaluate the model
val_loss, val_accuracy = model.evaluate(val_generator, steps=len(val_images) // batch_size)
print(f"Fold {fold+1} - Validation Accuracy: {val_accuracy:.4f}, Validation Loss: {val_loss:.4f}")

# Store results
fold_accuracies.append(val_accuracy)
fold_losses.append(val_loss)

# Final Cross-Validation Results
print("\nFinal Cross-Validation Results:")
print(f"Average Accuracy: {np.mean(fold_accuracies):.4f}, Average Loss: {np.mean(fold_losses):.4f}")