In [18]:
import cv2
import numpy as np
import os
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Lambda, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
import torch
from tensorflow.keras.applications import MobileNetV2

import os
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split
from tqdm import tqdm




# One Shot Learning (Siamese Network)

## Augmentasi Data

In [None]:
# Path ke video dan gambar target
# video_path = os.path.join('repository_lab_cv', 'proyek_kelompok', 'assets', 'test_video', 'OTV3.mp4')
template_path = os.path.join('assets', 'dataset', 'Mario-Target.png')
template_image = cv2.imread(template_path)
template_image = cv2.resize(template_image, (64, 64))  # Resize ke ukuran tetap

output_path = 'mario_detection_orb.mp4'  # Path untuk menyimpan video hasil

# Augmentasi data
datagen = ImageDataGenerator(
    rotation_range=45,
    width_shift_range=0.2,
    height_shift_range=0.2,
    shear_range=0.2,
    zoom_range=0.2,
    horizontal_flip=True,
    vertical_flip=False,
    fill_mode='nearest'
)

# Generate augmented images
template_image = np.expand_dims(template_image, axis=0)
augmented_images = datagen.flow(template_image, batch_size=1)

for i in range(10):  # Simpan 10 augmented images
    aug_image = next(augmented_images)[0].astype('uint8')
    cv2.imwrite(f'augmented_data/augmented_mario_{i}.png', aug_image)

## Define Model

In [None]:
def build_siamese_network(input_shape):
    # Load pretrained MobileNetV2 model
    base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=input_shape)
    
    # Add global average pooling and dense layer for feature extraction
    x = base_model.output
    x = GlobalAveragePooling2D()(x)
    x = Dense(256, activation='sigmoid')(x)  # Feature embedding layer

    model = Model(base_model.input, x, name='MobileNetV2_Siamese')
    return model

# Input size
input_shape = (64, 64, 3)  # Ensure your dataset matches this size

# Create Siamese branches
siamese_base = build_siamese_network(input_shape)

# Define inputs
input_a = Input(shape=input_shape)
input_b = Input(shape=input_shape)

# Extract features using MobileNetV2
feature_a = siamese_base(input_a)
feature_b = siamese_base(input_b)

# Compute L1 distance between feature vectors
l1_distance = Lambda(lambda tensors: K.abs(tensors[0] - tensors[1]))([feature_a, feature_b])

# Output layer (similarity score)
output = Dense(1, activation='sigmoid')(l1_distance)

# Build and compile Siamese model
siamese_model = Model([input_a, input_b], output)
siamese_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Print the model summary
siamese_model.summary()

## Prepare Dataset

In [None]:
import os
import cv2
import numpy as np
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

def load_images_from_directory(directory, label, target_size=(64, 64)):
    """Load images from a given directory and assign a label."""
    images = []
    labels = []
    for file in os.listdir(directory):
        if file.endswith(".png"):
            img_path = os.path.join(directory, file)
            img = cv2.imread(img_path)
            if img is not None:
                img = cv2.resize(img, target_size)  # Resize to target size
                images.append(img)
                labels.append(label)
    return np.array(images), np.array(labels)

# Load Mario images
mario_directory = "./augmented_data"
mario_images, mario_labels = load_images_from_directory(mario_directory, label=1)  # Label 1 for Mario

# Load CIFAR-10 data using DataLoader
transform = transforms.Compose([
    transforms.Resize((64, 64)),  # Resize CIFAR images to match Mario image size
    transforms.ToTensor(),       # Convert to Tensor
])

cifar10_data = datasets.CIFAR10(root='./cifar', train=True, transform=transform, download=True)
dataloader = DataLoader(cifar10_data, batch_size=1, shuffle=True)

# Limit CIFAR images to match the number of Mario images
cifar_images = []
cifar_labels = []
num_mario_images = len(mario_images)  # Limit CIFAR images to this number

for idx, (img, label) in enumerate(dataloader):
    if idx >= num_mario_images:
        break
    img_np = img.squeeze(0).permute(1, 2, 0).numpy()  # Convert Tensor to numpy array
    cifar_images.append((img_np * 255).astype(np.uint8))  # Convert to uint8 format
    cifar_labels.append(0)  # Assign label 0 for non-Mario

# Convert CIFAR images and labels to numpy arrays
cifar_images = np.array(cifar_images)
cifar_labels = np.array(cifar_labels)

# Combine Mario and CIFAR datasets
data = np.concatenate((mario_images, cifar_images), axis=0)
labels = np.concatenate((mario_labels, cifar_labels), axis=0)

# Shuffle the dataset
indices = np.arange(len(data))
np.random.shuffle(indices)
data = data[indices]
labels = labels[indices]

# Create pairs for Siamese training
def create_pairs(data, labels):
    """Create positive and negative pairs for training."""
    pairs = []
    pair_labels = []

    # Create positive pairs
    mario_indices = np.where(labels == 1)[0]
    for i in range(len(mario_indices)):
        for j in range(i + 1, len(mario_indices)):
            pairs.append([data[mario_indices[i]], data[mario_indices[j]]])
            pair_labels.append(1)

    # Create negative pairs
    non_mario_indices = np.where(labels == 0)[0]
    for i in mario_indices:
        for j in non_mario_indices:
            pairs.append([data[i], data[j]])
            pair_labels.append(0)

    return np.array(pairs), np.array(pair_labels)

# Generate pairs
pairs, pair_labels = create_pairs(data, labels)

# Normalize data
pairs = pairs / 255.0  # Normalize images to [0, 1] range

## Train Model

In [None]:
# Buat folder untuk menyimpan model jika belum ada
model_folder = "./model"
if not os.path.exists(model_folder):
    os.makedirs(model_folder)

# Split data menjadi training dan validation
pairs_train, pairs_val, labels_train, labels_val = train_test_split(
    pairs, pair_labels, test_size=0.2, random_state=42
)

# Extract inputs (pair components) from the training and validation pairs
train_input_a = pairs_train[:, 0]
train_input_b = pairs_train[:, 1]
val_input_a = pairs_val[:, 0]
val_input_b = pairs_val[:, 1]

# Simpan model terbaik berdasarkan validation accuracy
checkpoint_path = os.path.join(model_folder, "siamese_best_model.h5")
checkpoint = ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_accuracy',
    save_best_only=True,
    save_weights_only=False,
    verbose=1
)

# Train the model
history = siamese_model.fit(
    [train_input_a, train_input_b], labels_train,
    validation_data=([val_input_a, val_input_b], labels_val),
    batch_size=32,
    epochs=10,
    callbacks=[checkpoint]
)

# Save final model and weights separately
final_model_path = os.path.join(model_folder, "siamese_final_model.h5")
weights_path = os.path.join(model_folder, "siamese_final_weights.h5")

# Save the full model (architecture + weights)
siamese_model.save(final_model_path)

# Save only the weights
siamese_model.save_weights(weights_path)
print(f"Model and weights saved to {model_folder}")

## Load Model

In [16]:
# Load the best saved model
final_model = load_model("./model/siamese_final_model.h5")

# Load weights
final_model.load_weights("./model/siamese_final_weights.h5")

## Inference

In [27]:
from tqdm import tqdm
import cv2
import numpy as np
from tensorflow.keras.models import load_model

# Load the saved Siamese model
model_path = "./model/siamese_best_model.h5"
siamese_model = load_model(model_path)

# Preprocess image
def preprocess_image(image, target_size):
    """Resize and normalize image for inference."""
    img = cv2.resize(image, target_size)
    img = img / 255.0  # Normalize to [0, 1]
    return np.expand_dims(img, axis=0)  # Add batch dimension

# Sliding window function with scaling
def sliding_window_multi_scale(image, scales, stride):
    """Slide windows across the image at multiple scales."""
    windows = []
    coords = []
    for scale in scales:
        scaled_window_size = (int(64 * scale), int(64 * scale))
        for y in range(0, image.shape[0] - scaled_window_size[1] + 1, stride):
            for x in range(0, image.shape[1] - scaled_window_size[0] + 1, stride):
                window = image[y:y + scaled_window_size[1], x:x + scaled_window_size[0]]
                if window.shape[:2] == scaled_window_size:  # Ensure window size matches
                    windows.append((window, (x, y, scaled_window_size[0], scaled_window_size[1])))
    return windows

# Load the reference template (Mario template)
reference_template_path = "./augmented_data/augmented_mario_0.png"  # Example template
reference_image = cv2.imread(reference_template_path)

# Preprocess reference image
reference_image_preprocessed = preprocess_image(reference_image, (64, 64))

# Load target image (image to search Mario in)
target_image_path = "./assets/dataset/video.PNG"  # Replace with your test image
target_image = cv2.imread(target_image_path)

# Sliding window parameters
scales = [0.5, 1.0, 1.5]  # Multi-scale factors (smaller, original, larger)
stride = 32  # Step size for sliding window
threshold = 0.4  # Similarity threshold to classify as Mario

# Perform sliding window across multiple scales
windows = sliding_window_multi_scale(target_image, scales, stride)

# Initialize progress bar and detected boxes
detected_boxes = []
with tqdm(total=len(windows), desc="Processing Windows", unit="window") as pbar:
    for window, (x, y, w, h) in windows:
        # Preprocess window
        window_preprocessed = preprocess_image(window, (64, 64))
        
        # Predict similarity using Siamese Network
        similarity = siamese_model.predict([reference_image_preprocessed, window_preprocessed], verbose=0)[0][0]
        
        # If similarity exceeds threshold, save the bounding box
        if similarity > threshold:
            detected_boxes.append((x, y, x + w, y + h))
        
        # Update progress bar
        pbar.update(1)

# Apply Non-Maximum Suppression (NMS) to combine overlapping boxes
def non_maximum_suppression(boxes, overlap_thresh=0.5):
    """Apply NMS to reduce overlapping bounding boxes."""
    if len(boxes) == 0:
        return []

    # Convert boxes to array format
    boxes = np.array(boxes)
    x1, y1, x2, y2 = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3]

    # Compute area of each box
    area = (x2 - x1 + 1) * (y2 - y1 + 1)
    indices = np.argsort(y2)

    selected_boxes = []
    while len(indices) > 0:
        last = len(indices) - 1
        i = indices[last]
        selected_boxes.append(i)

        # Compute overlap
        xx1 = np.maximum(x1[i], x1[indices[:last]])
        yy1 = np.maximum(y1[i], y1[indices[:last]])
        xx2 = np.minimum(x2[i], x2[indices[:last]])
        yy2 = np.minimum(y2[i], y2[indices[:last]])

        w = np.maximum(0, xx2 - xx1 + 1)
        h = np.maximum(0, yy2 - yy1 + 1)
        overlap = (w * h) / area[indices[:last]]

        # Keep boxes with overlap less than the threshold
        indices = np.delete(indices, np.concatenate(([last], np.where(overlap > overlap_thresh)[0])))

    return boxes[selected_boxes].astype(int)

# Apply NMS
final_boxes = non_maximum_suppression(detected_boxes)

# Draw bounding boxes on the image
for (x1, y1, x2, y2) in final_boxes:
    cv2.rectangle(target_image, (x1, y1), (x2, y2), (0, 255, 0), 2)

# Show the result
cv2.imshow("Mario Detection", target_image)
cv2.waitKey(0)
cv2.destroyAllWindows()


Processing Windows: 100%|██████████| 2162/2162 [02:11<00:00, 16.45window/s]
