## **Mounting and Dataset Directory**

In [None]:
#Drive Mouting
from google.colab import drive
drive.mount('/content/drive')

#Folder Listing
!ls "/content/drive/MyDrive/Patent images"

#Dataset Directory Variable
dataset_dir = "/content/drive/MyDrive/Patent images"

## **Import Libraries**

In [None]:
!pip install tensorflow opencv-python-headless

In [None]:
# Core Libraries
import os #used for navigating directories and listing files
import numpy as np
import pandas as pd
import random
import shutil
import io

# Deep Learning Framework
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.models import load_model

# Evaluation Metrics & Visualization
from sklearn.metrics import classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

#Dataset spliting and training
from sklearn.model_selection import train_test_split

# Grad-CAM & Image Utilities
import cv2
from tensorflow.keras.preprocessing import image
import imghdr
from tqdm import tqdm  # progress bar for checks
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from google.colab.output import eval_js
from base64 import b64decode, b64encode
from IPython.display import display, Javascript, Image
from google.colab import output

# Suppress TensorFlow warnings for clean outputs
import warnings
warnings.filterwarnings('ignore')

# Reproducibility
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
random.seed(SEED)

print("All libraries are imported successfully!")
print("TensorFlow version:", tf.__version__)

## **Define Class Mapping and Creating Binary labels**

In [None]:
# Define mapping: folder name → pest size class
mapping = {
    'Rat': 'Large',
    'Grasshopper': 'Large',
    'Beetle': 'Large',
    'Wasp': 'Large',
    'Dragonfly': 'Large',
    'Ant': 'Small',
    'Mosquito': 'Small',
    'Fly': 'Small',
    'Ladybug': 'Small',
    'Bee': 'Small',
    'Butterfly': 'Small',
    'Spider': 'Small'
}

In [None]:
# Create a DataFrame linking every image path to its binary label
data = []  # will hold dictionaries with {'filepath': ..., 'label': ...}

for species, size_class in mapping.items():
    folder_path = os.path.join(dataset_dir, species)

    # list all files inside this species folder
    for img_file in os.listdir(folder_path):
        # build full file path for each image
        img_path = os.path.join(folder_path, img_file)

        # check for valid image extensions
        if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
            data.append({'filepath': img_path, 'label': size_class})

# Convert list → pandas DataFrame for easy handling
df = pd.DataFrame(data)

# Shuffle the DataFrame so data order is random before splitting
df = df.sample(frac=1, random_state=42).reset_index(drop=True)

# Display first few entries to verify structure
print("Sample entries:")
print(df.head())

# Print label distribution to see class balance
print("\nLabel distribution:")
print(df['label'].value_counts())

## **Data preprocessing**

In [None]:

corrupt_images = []
non_image_files = []

for path in tqdm(df['filepath'], desc="Checking images"):
    try:
        # Check if file is an actual image
        if imghdr.what(path) is None:
            non_image_files.append(path)
            continue

        # Try loading the image
        img = cv2.imread(path)
        if img is None or img.size == 0:
            corrupt_images.append(path)

    except Exception as e:
        corrupt_images.append(path)

print(f"Total corrupt images: {len(corrupt_images)}")
print(f"Non-image files: {len(non_image_files)}")


In [None]:
#remove all the non-image enteries from the dataframe
df = df[~df['filepath'].isin(non_image_files)]
df.reset_index(drop=True, inplace=True)
print(f"Cleaned Dataset:{len(df)}vaid images remaining.")

In [None]:
'''
extra_corrupt = []

for path in tqdm(df['filepath'], desc="Double-checking images safely"):
    try:
        with Image.open(path) as img:
            img.load()  # fully load to confirm it's readable
    except Exception:
        extra_corrupt.append(path)

print(f"Additional corrupt images found: {len(extra_corrupt)}")

# Remove only truly corrupt files
if extra_corrupt:
    df = df[~df['filepath'].isin(extra_corrupt)]
    df.reset_index(drop=True, inplace=True)
    print(f"Cleaned dataset: {len(df)} valid images remain.")
'''

## **Dataset Split and Image Augmentation Setup**

In [None]:
# Split the data: 70% training, 20% validation, 10% testing
train_df, temp_df = train_test_split(df, test_size=0.3, stratify=df['label'], random_state=42)
val_df, test_df = train_test_split(temp_df, test_size=0.33, stratify=temp_df['label'], random_state=42)

print(f"Training samples: {len(train_df)}")
print(f"Validation samples: {len(val_df)}")
print(f"Testing samples: {len(test_df)}")

# ==========================
# ImageDataGenerator Setup
# ==========================

# Training generator with augmentation to increase robustness
train_datagen = ImageDataGenerator(
    rescale=1./255,              # normalize pixel values
    rotation_range=25,           # random rotation
    width_shift_range=0.1,       # horizontal shift
    height_shift_range=0.1,      # vertical shift
    zoom_range=0.2,              # zoom in/out
    horizontal_flip=True,        # random horizontal flips
    fill_mode='nearest'          # fill missing pixels
)

# Validation and test sets — only normalization (no augmentation)
val_test_datagen = ImageDataGenerator(rescale=1./255)

# ==========================
# Flow generators
# ==========================

train_generator = train_datagen.flow_from_dataframe(
    dataframe=train_df,
    x_col='filepath',
    y_col='label',
    target_size=(224, 224),      # MobileNetV2 input size
    batch_size=32,
    class_mode='binary',
    shuffle=True
)

val_generator = val_test_datagen.flow_from_dataframe(
    dataframe=val_df,
    x_col='filepath',
    y_col='label',
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary',
    shuffle=False
)

test_generator = val_test_datagen.flow_from_dataframe(
    dataframe=test_df,
    x_col='filepath',
    y_col='label',
    target_size=(224, 224),
    batch_size=32,
    class_mode='binary',
    shuffle=False
)

## **Transfer Learning Model (MobilNetV2-based CNN)**

Transfer Learning is a technique that reuses knowledge from a model trained on a large generic dataset (e.g., ImageNet) and adapts it to a new, specific task.
In this work, we employed MobileNetV2, a convolutional neural network architecture optimized for edge devices, as the feature extractor.
The lower convolutional layers of MobileNetV2 were frozen to retain pre-learned spatial and texture representations, while the upper layers were customized to learn discriminative features specific to pest size categories (“Large” vs “Small”).

This approach significantly reduces training time, prevents overfitting, and enhances computational efficiency — which is essential for deployment on low-power hardware such as Raspberry Pi or NVIDIA Jetson Nano.

In [None]:
# Load the base model with pretrained ImageNet weights
# exclude the top layers (fully connected classifier part)
base_model = MobileNetV2(
    weights='imagenet',
    include_top=False,
    input_shape=(224, 224, 3)
)

# Freeze base layers so pre-trained weights aren't modified during initial training
for layer in base_model.layers:
    layer.trainable = False

# Build custom classification head
x = base_model.output
x = GlobalAveragePooling2D()(x)    # reduce feature maps to single vector per image
x = Dense(128, activation='relu')(x)  # learn new relationships specific to pests
x = Dropout(0.3)(x)                   # prevent overfitting
output = Dense(1, activation='sigmoid')(x)  # binary output: Large (1) or Small (0)

# Combine base + custom layers
model = Model(inputs=base_model.input, outputs=output)

# Compile the model
model.compile(
    optimizer=Adam(learning_rate=1e-4),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# Model summary
model.summary()

In [None]:
# Unfreeze the last 30 layers for fine-tuning
for layer in base_model.layers[-30:]:
    layer.trainable = True

# Recompile model with a smaller learning rate
model.compile(
    optimizer=Adam(learning_rate=1e-5),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# Callbacks
checkpoint = ModelCheckpoint(
    "mobilenetv2_pest_classifier.h5",
    monitor="val_accuracy",
    save_best_only=True,
    verbose=1
)

early_stop = EarlyStopping(
    monitor="val_loss",
    patience=5,
    restore_best_weights=True,
    verbose=1
)

In [None]:
# Train the model
history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=15,
    callbacks=[checkpoint, early_stop],
    verbose=1
)

In [None]:
# Plot accuracy
plt.figure(figsize=(12, 5))

# Accuracy
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training vs Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)

# Loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Training vs Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True)

plt.show()


## Testing

In [None]:
# load best model (change filename if you used .keras)
model = load_model("mobilenetv2_pest_classifier.h5", compile=False)
# compile for eval metrics (optional)
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# ---- A.2 Evaluate on test set ----
test_loss, test_acc = model.evaluate(test_generator, verbose=1)
print(f"Test accuracy: {test_acc:.4f}, Test loss: {test_loss:.4f}")

# ---- A.3 Predictions and metrics ----
# ensure generator is at start
test_generator.reset()
# predict (sigmoid outputs)
preds = model.predict(test_generator, verbose=1)
# preds shape = (N,1) or (N,) ; flatten if needed
pred_probs = preds.ravel()
pred_labels = (pred_probs >= 0.5).astype(int)  # 1 = Large, 0 = Small

true_labels = test_generator.classes  # integer encoded by generator (0/1)
# Map generator class indices to names (should be {'Large':1,'Small':0} or vice-versa)
class_indices = test_generator.class_indices
print("Class indices mapping:", class_indices)
# build readable class_names in index order
inv_map = {v:k for k,v in class_indices.items()}
class_names = [inv_map[i] for i in sorted(inv_map.keys())]

print("\nClassification Report:")
print(classification_report(true_labels, pred_labels, target_names=class_names))

# confusion matrix
cm = confusion_matrix(true_labels, pred_labels)
plt.figure(figsize=(6,5))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=class_names, yticklabels=class_names, cmap='Blues')
plt.xlabel('Predicted'); plt.ylabel('Actual'); plt.title('Confusion Matrix on Test Set')
plt.show()

# ---- A.4 Show sample test images with predictions ----
import random
from tensorflow.keras.preprocessing import image
indices = random.sample(range(len(test_generator.filenames)), 9)
plt.figure(figsize=(10,10))
for i, idx in enumerate(indices):
    img_path = test_generator.filepaths[idx]  # full path
    img = image.load_img(img_path, target_size=(224,224))
    arr = image.img_to_array(img)/255.0
    prob = model.predict(arr[None, ...])[0][0]
    pred = "Large" if prob>=0.5 else "Small"
    true = inv_map[test_generator.classes[idx]]
    ax = plt.subplot(3,3,i+1)
    plt.imshow(img)
    plt.title(f"Pred: {pred} ({prob:.2f})\nTrue: {true}")
    plt.axis('off')
plt.tight_layout()
plt.show()

In [None]:
def take_photo(filename='photo.jpg', quality=0.8):
    js = Javascript('''
      async function takePhoto(quality) {
        const div = document.createElement('div');
        const capture = document.createElement('button');
        capture.textContent = 'Capture';
        div.appendChild(capture);

        const video = document.createElement('video');
        video.style.display = 'block';
        const stream = await navigator.mediaDevices.getUserMedia({video: true});

        document.body.appendChild(div);
        div.appendChild(video);
        video.srcObject = stream;
        await video.play();

        // Wait for capture
        await new Promise((resolve) => capture.onclick = resolve);

        const canvas = document.createElement('canvas');
        canvas.width = video.videoWidth;
        canvas.height = video.videoHeight;
        canvas.getContext('2d').drawImage(video, 0, 0);
        stream.getVideoTracks()[0].stop();
        div.remove();

        const data = canvas.toDataURL('image/jpeg', quality);
        return data;
      }
    ''')
    display(js)
    data = eval_js('takePhoto({})'.format(quality))
    binary = b64decode(data.split(',')[1])
    with open(filename, 'wb') as f:
        f.write(binary)
    return filename


In [None]:
from IPython.display import display, Javascript, Image as ColabImage
from PIL import Image as PILImage

In [None]:
try:
    filename = take_photo()  # capture image
    print("Photo captured successfully!")

    img = PILImage.open(filename).resize((224, 224))  # use PILImage now
    img = np.array(img) / 255.0
    img = np.expand_dims(img, axis=0)

    pred = model.predict(img)[0][0]
    label = class_names[int(pred > 0.5)]
    confidence = pred if pred > 0.5 else 1 - pred

    print(f"Prediction: {label} ({confidence:.2f} confidence)")

    # Display image in Colab
    display(ColabImage(filename=filename))

except Exception as e:
    print("Camera capture failed:", e)
