In [None]:
# Script to remove images smaller than 100x100 pixels
import os
from PIL import Image
import shutil
import time

def remove_small_images(folder_path, min_width=100, min_height=100):
    """Remove images that are smaller than the specified dimensions"""
    removed_count = 0
    error_count = 0
    total_count = 0
    removed_files = []
    
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff')):
                file_path = os.path.join(root, file)
                total_count += 1
                
                try:
                    with Image.open(file_path) as img:
                        width, height = img.size
                        
                        if width < min_width or height < min_height:
                            img.close()
                            time.sleep(0.1)
                            
                            for attempt in range(3):
                                try:
                                    os.remove(file_path)
                                    removed_count += 1
                                    removed_files.append(f"{file} ({width}x{height})")
                                    print(f"Removed: {file_path} - Size: {width}x{height}")
                                    break
                                except PermissionError:
                                    if attempt < 2:
                                        time.sleep(0.5)
                                        continue
                                    else:
                                        print(f"Permission denied: {file_path} - Could not remove after 3 attempts")
                                        error_count += 1
                            
                except Exception as e:
                    error_count += 1
                    print(f"Error processing {file_path}: {e}")
    
    return {
        'total_images': total_count,
        'removed_count': removed_count,
        'error_count': error_count,
        'removed_files': removed_files
    }

snake_folder = "Snake Images for Model"
print(f"Scanning {snake_folder} for images smaller than 100x100 pixels...\n")

stats = remove_small_images(snake_folder, min_width=100, min_height=100)

print(f"\n=== CLEANUP SUMMARY ===")
print(f"Total images processed: {stats['total_images']}")
print(f"Images removed: {stats['removed_count']}")
print(f"Errors encountered: {stats['error_count']}")
print(f"Remaining images: {stats['total_images'] - stats['removed_count']}")

if stats['removed_files']:
    print(f"\nRemoved files (first 10):")
    for file_info in stats['removed_files'][:10]:
        print(f"  - {file_info}")
    if len(stats['removed_files']) > 10:
        print(f"  ... and {len(stats['removed_files']) - 10} more files")
else:
    print("\nNo images were removed - all images meet the minimum size requirement!")

In [None]:
%pip install tensorflow keras keras-preprocessing keras-applications split-folders opencv-python matplotlib

In [None]:
# Import necessary libraries
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import matplotlib.pyplot as plt
import numpy as np
import splitfolders
import cv2
from tensorflow.keras.applications.resnet50 import preprocess_input

In [None]:
# Split the dataset into train, validation, and test sets
input_folder = "Snake Images for Model"
output_folder = "split_data"

splitfolders.ratio(input_folder, output=output_folder, seed=42, ratio=(0.7, 0.2, 0.1))

IMG_HEIGHT = 160
IMG_WIDTH = 160
BATCH_SIZE = 32

def simple_preprocessing(img):
    img = img / 255.0
    return img

train_datagen = ImageDataGenerator(
    preprocessing_function=simple_preprocessing,
    rotation_range=15,
    width_shift_range=0.12, # changed from 0.1 to 0.12
    height_shift_range=0.12, # changed from 0.1 to 0.12
    shear_range=0.12, # changed from 0.1 to 0.12
    zoom_range=0.12, # changed from 0.1 to 0.12
    horizontal_flip=True,
    vertical_flip=False,
    fill_mode='nearest'
)

val_datagen = ImageDataGenerator(preprocessing_function=simple_preprocessing)
test_datagen = ImageDataGenerator(preprocessing_function=simple_preprocessing)

train_generator = train_datagen.flow_from_directory(
    f'{output_folder}/train',
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='categorical'
)

validation_generator = val_datagen.flow_from_directory(
    f'{output_folder}/val',
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='categorical'
)

test_generator = test_datagen.flow_from_directory(
    f'{output_folder}/test',
    target_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=BATCH_SIZE,
    class_mode='categorical'
)

num_classes = len(train_generator.class_indices)
print(f"Number of snake classes: {num_classes}")
print("\nClass mapping:")
for class_name, class_index in train_generator.class_indices.items():
    print(f"{class_index}: {class_name}")

train_generator.reset()

In [None]:
# Progressive training at multiple resolutions
from tensorflow.keras.layers import Input, GlobalAveragePooling2D
from PIL import Image

TRAINING_STAGES = [
    {"img_size": 160, "epochs": 25, "stage_name": "Stage 1: 160x160"},
    {"img_size": 192, "epochs": 15, "stage_name": "Stage 2: 192x192"},
    {"img_size": 224, "epochs": 15, "stage_name": "Stage 3: 224x224"},
    {"img_size": 400, "epochs": 40, "stage_name": "Stage 4: 400x400", "learning_rate": 5e-5}
]

def create_model_for_size(img_size, num_classes):
    model = Sequential([
        Input(shape=(img_size, img_size, 3)),
        
        Conv2D(32, (3, 3), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        MaxPooling2D(2, 2),
        
        Conv2D(64, (3, 3), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        MaxPooling2D(2, 2),
        
        Conv2D(128, (3, 3), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        MaxPooling2D(2, 2),
        
        Conv2D(128, (3, 3), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        MaxPooling2D(2, 2),
        
        Conv2D(256, (3, 3), padding='same'),
        tf.keras.layers.BatchNormalization(),
        tf.keras.layers.Activation('relu'),
        MaxPooling2D(2, 2),
        
        GlobalAveragePooling2D(),
        Dense(512, activation='relu'),
        Dropout(0.5),
        Dense(256, activation='relu'),
        Dropout(0.3),
        Dense(num_classes, activation='softmax')
    ])
    
    return model

def transfer_weights(source_model, target_model, num_classes):
    for source_layer, target_layer in zip(source_model.layers, target_model.layers):
        if isinstance(source_layer, tf.keras.layers.InputLayer):
            continue
        if source_layer.name.startswith('dense') and target_layer.name.startswith('dense'):
            if source_layer.get_weights() and target_layer.get_weights():
                source_weights = source_layer.get_weights()
                target_weights = target_layer.get_weights()
                if source_weights[0].shape == target_weights[0].shape:
                    target_layer.set_weights(source_weights)
        else:
            try:
                if source_layer.get_weights():
                    target_layer.set_weights(source_layer.get_weights())
            except:
                pass

def filter_images_by_size(directory, min_width, min_height):
    """Filter images in directory to only include those meeting minimum size requirements"""
    valid_images = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp', '.tiff')):
                file_path = os.path.join(root, file)
                try:
                    with Image.open(file_path) as img:
                        width, height = img.size
                        if width >= min_width and height >= min_height:
                            valid_images.append(file_path)
                except:
                    pass
    return valid_images

class AdaptiveLearningRateIncrease(tf.keras.callbacks.Callback):
    """Custom callback to increase learning rate when loss improvement is too slow"""
    def __init__(self, patience=4, increase_factor=2.0, min_lr=1e-6, max_lr=1e-3, monitor='val_loss'):
        super().__init__()
        self.patience = patience
        self.increase_factor = increase_factor
        self.min_lr = min_lr
        self.max_lr = max_lr
        self.monitor = monitor
        self.wait = 0
        self.best_loss = float('inf')
        self.improvement_threshold = 0.001  # Minimum improvement to reset patience
        
    def on_epoch_end(self, epoch, logs=None):
        current_loss = logs.get(self.monitor)
        current_lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))
        
        # Check if there's meaningful improvement
        if current_loss < self.best_loss - self.improvement_threshold:
            self.best_loss = current_loss
            self.wait = 0
        else:
            self.wait += 1
            
        # If no improvement for patience epochs and LR can be increased
        if self.wait >= self.patience and current_lr < self.max_lr:
            new_lr = min(current_lr * self.increase_factor, self.max_lr)
            if new_lr > self.min_lr:
                tf.keras.backend.set_value(self.model.optimizer.learning_rate, new_lr)
                print(f"\nEpoch {epoch + 1}: Loss plateau detected. Increasing learning rate from {current_lr:.2e} to {new_lr:.2e}")
                self.wait = 0  # Reset patience after LR increase

print("=" * 60)
print("PROGRESSIVE TRAINING: Multi-Resolution CNN")
print("=" * 60)

current_model = None
stage_histories = {}

for stage_idx, stage_config in enumerate(TRAINING_STAGES):
    img_size = stage_config["img_size"]
    epochs = stage_config["epochs"]
    stage_name = stage_config["stage_name"]
    learning_rate = stage_config.get("learning_rate", 1e-4)
    
    print(f"\n{stage_name}")
    print(f"Image size: {img_size}x{img_size}, Epochs: {epochs}, Learning rate: {learning_rate}")
    print("-" * 60)
    
    stage_train_datagen = ImageDataGenerator(
        preprocessing_function=simple_preprocessing,
        rotation_range=15,
        width_shift_range=0.11,
        height_shift_range=0.11,
        shear_range=0.11,
        zoom_range=0.11,
        horizontal_flip=True,
        vertical_flip=False,
        fill_mode='nearest'
    )
    
    stage_val_datagen = ImageDataGenerator(preprocessing_function=simple_preprocessing)
    
    stage_train_gen = stage_train_datagen.flow_from_directory(
        f'{output_folder}/train',
        target_size=(img_size, img_size),
        batch_size=16,
        class_mode='categorical',
        shuffle=True
    )
    
    stage_val_gen = stage_val_datagen.flow_from_directory(
        f'{output_folder}/val',
        target_size=(img_size, img_size),
        batch_size=16,
        class_mode='categorical',
        shuffle=False
    )
    
    stage_model = create_model_for_size(img_size, num_classes)
    
    if current_model is not None:
        print(f"Transferring weights from {TRAINING_STAGES[stage_idx - 1]['stage_name']}...")
        transfer_weights(current_model, stage_model, num_classes)
    
    stage_model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )
    
    if stage_idx == 0:
        print(f"\nQuick sanity check...")
        sanity_check = stage_model.fit(
            stage_train_gen,
            epochs=1,
            validation_data=stage_val_gen,
            verbose=1,
            steps_per_epoch=5,
            validation_steps=2
        )
        stage_train_gen.reset()
        stage_val_gen.reset()
    
    print(f"\nTraining {stage_name}...")
    
    # Define base callbacks
    callbacks_list = [
        tf.keras.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=8,  # Increased patience for final stage
            restore_best_weights=True,
            verbose=1
        ),
        tf.keras.callbacks.ModelCheckpoint(
            f'model_stage_{img_size}.keras',
            monitor='val_accuracy',
            save_best_only=True,
            mode='max',
            verbose=1
        )
    ]
    
    # Add different callbacks for final stage vs other stages
    if stage_idx == len(TRAINING_STAGES) - 1:  # Final stage (400x400)
        print("Final stage: Using adaptive learning rate increase callback")
        callbacks_list.append(AdaptiveLearningRateIncrease(
            patience=4,
            increase_factor=2.0,
            min_lr=1e-6,
            max_lr=1e-3,
            monitor='val_loss'
        ))
    else:
        # Standard ReduceLROnPlateau for earlier stages
        callbacks_list.append(tf.keras.callbacks.ReduceLROnPlateau(
            monitor='val_loss',
            factor=0.5,
            patience=3,
            min_lr=1e-6,
            verbose=1
        ))
    
    history = stage_model.fit(
        stage_train_gen,
        epochs=epochs,
        validation_data=stage_val_gen,
        verbose=1,
        callbacks=callbacks_list
    )
    
    stage_histories[stage_name] = history
    current_model = stage_model
    
    print(f"\n{stage_name} completed!")
    print(f"Final training accuracy: {history.history['accuracy'][-1]:.4f}")
    print(f"Final validation accuracy: {history.history['val_accuracy'][-1]:.4f}")

model = current_model

print("\n" + "=" * 60)
print("PROGRESSIVE TRAINING COMPLETE!")
print("=" * 60)
print(f"Final model input size: {TRAINING_STAGES[-1]['img_size']}x{TRAINING_STAGES[-1]['img_size']}")
print(f"Model saved as: model_stage_{TRAINING_STAGES[-1]['img_size']}.keras")

In [None]:
# Plot progressive training history across all stages
num_stages = len(stage_histories)
fig, axes = plt.subplots(2, num_stages, figsize=(6*num_stages, 10))
fig.suptitle('Progressive Training: Multi-Resolution CNN', fontsize=16, fontweight='bold')

stage_names = list(stage_histories.keys())

# Handle case where there's only one stage (axes won't be 2D)
if num_stages == 1:
    axes = axes.reshape(2, 1)

for idx, stage_name in enumerate(stage_names):
    history = stage_histories[stage_name]
    
    ax_acc = axes[0, idx]
    ax_acc.plot(history.history['accuracy'], label='Training Accuracy', linewidth=2)
    ax_acc.plot(history.history['val_accuracy'], label='Validation Accuracy', linewidth=2)
    ax_acc.set_title(f'{stage_name}\nAccuracy')
    ax_acc.set_xlabel('Epoch')
    ax_acc.set_ylabel('Accuracy')
    ax_acc.legend()
    ax_acc.grid(True, alpha=0.3)
    
    ax_loss = axes[1, idx]
    ax_loss.plot(history.history['loss'], label='Training Loss', linewidth=2)
    ax_loss.plot(history.history['val_loss'], label='Validation Loss', linewidth=2)
    ax_loss.set_title(f'{stage_name}\nLoss')
    ax_loss.set_xlabel('Epoch')
    ax_loss.set_ylabel('Loss')
    ax_loss.legend()
    ax_loss.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

final_size = TRAINING_STAGES[-1]['img_size']
print(f"\nEvaluating final model ({final_size}x{final_size}) on test set...")

test_datagen_final = ImageDataGenerator(preprocessing_function=simple_preprocessing)
test_generator_eval = test_datagen_final.flow_from_directory(
    f'{output_folder}/test',
    target_size=(final_size, final_size),
    batch_size=16,
    class_mode='categorical',
    shuffle=False
)

test_loss, test_accuracy = model.evaluate(test_generator_eval, verbose=1)
print(f"\nFinal Model Test Results ({final_size}x{final_size}):")
print(f"Test accuracy: {test_accuracy:.4f}")
print(f"Test loss: {test_loss:.4f}")

def predict_snake(image_path, model_to_use=model):
    final_size = TRAINING_STAGES[-1]['img_size']
    img = tf.keras.preprocessing.image.load_img(
        image_path, target_size=(final_size, final_size)
    )
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    img_array = np.expand_dims(img_array, 0)
    img_array = simple_preprocessing(img_array)
    
    predictions = model_to_use.predict(img_array)
    predicted_class = np.argmax(predictions[0])
    confidence = predictions[0][predicted_class]
    
    class_names = list(train_generator.class_indices.keys())
    predicted_snake = class_names[predicted_class]
    
    plt.figure(figsize=(8, 6))
    plt.imshow(img)
    plt.axis('off')
    plt.title(f'Predicted: {predicted_snake}\nConfidence: {confidence:.2%}')
    plt.show()
    
    top_3_idx = np.argsort(predictions[0])[-3:][::-1]
    print("\nTop 3 predictions:")
    for idx in top_3_idx:
        print(f"{class_names[idx]}: {predictions[0][idx]:.2%}")

In [None]:

# Test predictions on Testing Snake folder
test_folder = "Testing Snake"
image_extensions = ('.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif')

if os.path.exists(test_folder):
    image_files = [f for f in os.listdir(test_folder) if f.lower().endswith(image_extensions)]
    
    if image_files:
        print(f"Found {len(image_files)} images in {test_folder}")
        print("=" * 60)
        
        for image_file in image_files:
            image_path = os.path.join(test_folder, image_file)
            print(f"\nTesting: {image_file}")
            print("-" * 60)
            predict_snake(image_path)
    else:
        print(f"No image files found in {test_folder}")
else:
    print(f"Folder {test_folder} not found")
