In [4]:
# Test Predictions for "To bee or not to bee"
# Generate predictions for images 251-347

import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
import warnings
warnings.filterwarnings('ignore')

from PIL import Image
import cv2
from tqdm import tqdm
from skimage import measure, feature, color, morphology

from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import (
    classification_report, confusion_matrix, accuracy_score, silhouette_score
)
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE, Isomap
from sklearn.cluster import KMeans, DBSCAN
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

from scipy.optimize import minimize
from skimage.transform import rotate

# Configuration des chemins
DATA_DIR = "../../test" 
IMAGES_DIR = os.path.join(DATA_DIR, "images")
MASKS_DIR = os.path.join(DATA_DIR, "masks")

In [5]:
def rotate_image_filled(image, angle, center):
    """Just rotates the image in memory for symmetry calculation"""
    try:
        center_skimage = (center[1], center[0])
        rotated = rotate(image, angle, center=center_skimage, preserve_range=True, cval=0)
        return rotated.astype(image.dtype)
    except Exception as e:
        print(f"Rotation error: {e}")
        return image

def create_symmetric_image(image, center_x):
    """Just flips the image in memory for symmetry calculation"""
    symmetric_image = np.fliplr(image)
    return symmetric_image

def load_image_and_mask(image_id, visualize=False, output_dir="../cleaned_samples"):
    """
    I. Quality control: Load and clean an image and its mask according to instructions:
    - Loading the mask and associated image
    - Transforming the mask into a binary image (label 1 being the insect, label 0 the background)
    - Computing the connected components of the binary mask
    - Restricting the binary mask to its connected component of highest area
    - Restricting both mask and image to the bounding box of the cleaned binary mask
    """
    image_path = os.path.join(IMAGES_DIR, f"{image_id}.jpg")
    mask_path = os.path.join(MASKS_DIR, f"binary_{image_id}.tif")

    # Check file existence
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image not found: {image_path}")
    if not os.path.exists(mask_path):
        raise FileNotFoundError(f"Mask not found: {mask_path}")

    # Loading the mask and associated image
    image = np.array(Image.open(image_path).convert("RGB"))
    mask = np.array(Image.open(mask_path).convert("L"))
    
    # Transforming the mask into a binary image
    mask_binary = (mask > 0).astype(np.uint8)

    # Computing the connected components of the binary mask
    labeled_mask = measure.label(mask_binary)
    props = measure.regionprops(labeled_mask)

    if not props:
        raise ValueError(f"No connected components found in mask for image {image_id}")

    # Restricting to largest connected component
    largest_region = max(props, key=lambda x: x.area)
    cleaned_mask = (labeled_mask == largest_region.label).astype(np.uint8)

    # Cropping to bounding box
    minr, minc, maxr, maxc = largest_region.bbox
    cleaned_mask_cropped = cleaned_mask[minr:maxr, minc:maxc]
    cropped_image = image[minr:maxr, minc:maxc]

    # Visualization
    if visualize:
        os.makedirs(output_dir, exist_ok=True)
        
        fig, axes = plt.subplots(2, 2, figsize=(12, 10))
        
        # Original Image
        axes[0,0].imshow(image)
        axes[0,0].set_title(f"Original Image {image_id}")
        axes[0,0].axis('off')
        
        # Original Mask
        axes[0,1].imshow(mask, cmap='gray')
        axes[0,1].set_title("Original Mask")
        axes[0,1].axis('off')
        
        # Cleaned & Cropped Image
        axes[1,0].imshow(cropped_image)
        axes[1,0].set_title("Cleaned & Cropped Image")
        axes[1,0].axis('off')
        
        # Cleaned & Cropped Mask
        axes[1,1].imshow(cleaned_mask_cropped, cmap='gray')
        axes[1,1].set_title("Cleaned & Cropped Mask")
        axes[1,1].axis('off')
        
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, f"quality_control_{image_id}.png"), dpi=150, bbox_inches='tight')
        plt.close()

    return cropped_image, cleaned_mask_cropped


def compute_best_inscribed_circle(mask):
    """
    II. Computation of the best inscribed circle:
    - Choosing a proper initialization relying on the centroid of the mask
    - Defining a loss function to minimize
    - Minimizing the defined loss using the minimize function of Scipy
    """
    from scipy.ndimage import distance_transform_edt
    
    mask_binary = (mask > 0).astype(int)
    if not np.any(mask_binary):
        return 0.0, (0.0, 0.0)
    
    # Distance transform to find distance to nearest edge
    distance_map = distance_transform_edt(mask_binary)
    
    # Initialize with centroid
    props = measure.regionprops(mask_binary)
    if not props:
        return 0.0, (0.0, 0.0)
    
    centroid = props[0].centroid  # (y, x)
    
    # Loss function: negative radius (to maximize radius)
    def loss_function(coords):
        y, x = coords
        if not (0 <= y < distance_map.shape[0] and 0 <= x < distance_map.shape[1]):
            return 1000  # Penalty for out of bounds
        
        radius = distance_map[int(y), int(x)]
        return -radius  # Negative because we want to maximize
    
    # Minimize using scipy
    result = minimize(
        loss_function,
        x0=[centroid[0], centroid[1]],
        bounds=[(0, mask_binary.shape[0]-1), (0, mask_binary.shape[1]-1)],
        method='L-BFGS-B'
    )
    
    y_opt, x_opt = result.x[0], result.x[1]
    max_radius = -result.fun
    best_center = (x_opt, y_opt)  # Return as (x, y)
    
    return float(max_radius), best_center

def compute_best_symmetry_plane(image, mask, circle_center, circle_radius):
    """
    III. Computation of the best symmetry plane:
    - Choosing a proper initialization among rotations
    - Using the filled and proper rotation function
    - Using function to create symmetric around vertical line
    - Defining a loss function to minimize
    - Minimizing using scipy.minimize
    """
    
    # Create circular mask for region of interest
    Y, X = np.ogrid[:mask.shape[0], :mask.shape[1]]
    cx, cy = circle_center
    circle_mask = (X - cx)**2 + (Y - cy)**2 <= circle_radius**2
    
    # Loss function based on difference between image and its symmetric
    def symmetry_loss_function(angle):
        try:
            # Rotate image
            rotated_image = rotate_image_filled(image, angle[0], circle_center)
            # Create symmetric image
            symmetric_image = create_symmetric_image(rotated_image, cx)
            
            # Calculate difference in circular region
            if np.any(circle_mask):
                diff = np.sum((rotated_image[circle_mask].astype(float) - 
                              symmetric_image[circle_mask].astype(float))**2)
                return diff / np.sum(circle_mask)
            else:
                return 1000
        except Exception as e:
            return 1000
    
    # Try multiple initial angles
    initial_angles = [-30, -15, 0, 15, 30]
    best_result = None
    best_loss = float('inf')
    
    for init_angle in initial_angles:
        try:
            result = minimize(
                symmetry_loss_function,
                x0=[init_angle],
                bounds=[(-45, 45)],
                method='L-BFGS-B'
            )
            
            if result.fun < best_loss:
                best_loss = result.fun
                best_result = result
        except:
            continue
    
    if best_result is not None:
        best_angle = best_result.x[0]
        min_loss = best_result.fun
    else:
        best_angle = 0.0
        min_loss = 1000.0
    
    return float(min_loss), float(best_angle)

def extract_shape_features(mask):
    """Extract shape features from mask"""
    mask_binary = (mask > 0).astype(int)
    
    props = measure.regionprops(mask_binary)
    if not props:
        return {
            'area': 0, 'perimeter': 0, 'eccentricity': 0, 'solidity': 0, 
            'extent': 0, 'aspect_ratio': 0, 'horizontal_symmetry': 0
        }
    
    prop = props[0]
    
    # Basic shape features
    area = prop.area
    perimeter = prop.perimeter
    eccentricity = prop.eccentricity
    solidity = prop.solidity
    extent = prop.extent
    
    # Aspect ratio
    major_axis_length = prop.major_axis_length
    minor_axis_length = prop.minor_axis_length
    aspect_ratio = major_axis_length / minor_axis_length if minor_axis_length > 0 else 0
    
    # Horizontal symmetry measure
    height, width = mask_binary.shape
    if width >= 2:
        mid = width // 2
        left_half = mask_binary[:, :mid]
        right_half = mask_binary[:, width-mid:]
        right_half_flipped = np.fliplr(right_half)
        
        min_width = min(left_half.shape[1], right_half_flipped.shape[1])
        if min_width > 0:
            intersection = np.sum(left_half[:, :min_width] & right_half_flipped[:, :min_width])
            union = np.sum(left_half[:, :min_width] | right_half_flipped[:, :min_width])
            horizontal_symmetry = intersection / union if union > 0 else 0
        else:
            horizontal_symmetry = 0
    else:
        horizontal_symmetry = 0
    
    return {
        'area': float(area),
        'perimeter': float(perimeter),
        'eccentricity': float(eccentricity),
        'solidity': float(solidity),
        'extent': float(extent),
        'aspect_ratio': float(aspect_ratio),
        'horizontal_symmetry': float(horizontal_symmetry)
    }

def calculate_bug_ratio(mask):
    """Calculate ratio of bug pixels to total pixels"""
    bug_pixels = np.sum(mask > 0)
    total_pixels = mask.size
    ratio = bug_pixels / total_pixels if total_pixels > 0 else 0
    return float(ratio)

def extract_color_features(image, mask):
    """Extract color features (RGB) from image within mask"""
    mask_bool = mask > 0
    
    if not np.any(mask_bool):
        return {f'{color}_{stat}': 0.0 for color in ['r', 'g', 'b'] 
                for stat in ['min', 'max', 'mean', 'median', 'std']}
    
    # Extract pixels within mask for each channel
    r_channel = image[:, :, 0][mask_bool]
    g_channel = image[:, :, 1][mask_bool]
    b_channel = image[:, :, 2][mask_bool]
    
    def get_channel_stats(channel):
        if len(channel) == 0:
            return {'min': 0, 'max': 0, 'mean': 0, 'median': 0, 'std': 0}
        return {
            'min': float(np.min(channel)),
            'max': float(np.max(channel)),
            'mean': float(np.mean(channel)),
            'median': float(np.median(channel)),
            'std': float(np.std(channel))
        }
    
    # Get stats for each channel
    r_stats = get_channel_stats(r_channel)
    g_stats = get_channel_stats(g_channel)
    b_stats = get_channel_stats(b_channel)
    
    color_features = {}
    for stat in ['min', 'max', 'mean', 'median', 'std']:
        color_features[f'r_{stat}'] = r_stats[stat]
        color_features[f'g_{stat}'] = g_stats[stat]
        color_features[f'b_{stat}'] = b_stats[stat]
    
    return color_features

def extract_additional_features(image, mask):
    """Extract additional features (texture and Hu moments)"""
    mask_bool = mask > 0
    
    if not np.any(mask_bool):
        return {
            'texture_entropy': 0.0, 'texture_energy': 0.0,
            'hu_moment1': 0.0, 'hu_moment2': 0.0, 'hu_moment3': 0.0, 'hu_moment4': 0.0
        }
    
    # Texture features using Local Binary Pattern
    try:
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        radius = min(3, min(image.shape[:2]) // 4)
        n_points = 8 * radius
        
        if radius > 0:
            lbp = feature.local_binary_pattern(gray, n_points, radius, method='uniform')
            lbp_bug = lbp[mask_bool]
            
            if len(lbp_bug) > 0:
                n_bins = int(n_points + 2)
                hist, _ = np.histogram(lbp_bug, bins=n_bins, range=(0, n_bins), density=True)
                hist = hist + 1e-10
                texture_entropy = -np.sum(hist * np.log2(hist))
                texture_energy = np.sum(hist ** 2)
            else:
                texture_entropy = 0.0
                texture_energy = 0.0
        else:
            texture_entropy = 0.0
            texture_energy = 0.0
    except:
        texture_entropy = 0.0
        texture_energy = 0.0
    
    # Hu moments
    try:
        moments = cv2.moments(mask.astype(np.uint8))
        hu_moments = cv2.HuMoments(moments).flatten()
        
        hu_moments_safe = []
        for moment in hu_moments[:4]:
            if abs(moment) < 1e-10:
                hu_moments_safe.append(0.0)
            else:
                hu_moments_safe.append(-np.sign(moment) * np.log10(abs(moment)))
        
        while len(hu_moments_safe) < 4:
            hu_moments_safe.append(0.0)
    except:
        hu_moments_safe = [0.0, 0.0, 0.0, 0.0]
    
    return {
        'texture_entropy': float(texture_entropy),
        'texture_energy': float(texture_energy),
        'hu_moment1': float(hu_moments_safe[0]),
        'hu_moment2': float(hu_moments_safe[1]),
        'hu_moment3': float(hu_moments_safe[2]),
        'hu_moment4': float(hu_moments_safe[3])
    }

def create_default_features(image_id):
    """Create default features when extraction fails"""
    return {
        'image_id': int(image_id),
        'bug_ratio': 0.0,
        'max_inscribed_radius': 0.0,
        'symmetry_score': 0.0,
        'symmetry_angle': 0.0,
        'area': 0.0, 'perimeter': 0.0, 'eccentricity': 0.0, 'solidity': 0.0,
        'extent': 0.0, 'aspect_ratio': 0.0, 'horizontal_symmetry': 0.0,
        **{f'{color}_{stat}': 0.0 for color in ['r', 'g', 'b'] 
           for stat in ['min', 'max', 'mean', 'median', 'std']},
        'texture_entropy': 0.0, 'texture_energy': 0.0,
        'hu_moment1': 0.0, 'hu_moment2': 0.0, 'hu_moment3': 0.0, 'hu_moment4': 0.0
    }

In [6]:
# For now, I'll provide a template function
def extract_all_features_test(image_id, test_dir="../../test"):
    """
    Extract all features for a test image
    This should be identical to your training feature extraction
    """
    try:
        # Load image and mask
        image_path = os.path.join(test_dir, "images", f"{image_id}.jpg")
        mask_path = os.path.join(test_dir, "masks", f"binary_{image_id}.tif")
        
        # Check if files exist
        if not os.path.exists(image_path) or not os.path.exists(mask_path):
            print(f"Missing files for image {image_id}")
            return create_default_features(image_id)
        
        # Load and process (you need to use your actual functions here)
         # Load directly for test data
        image = np.array(Image.open(image_path).convert("RGB"))
        mask = np.array(Image.open(mask_path).convert("L"))

        # Apply same preprocessing as training
        mask_binary = (mask > 0).astype(np.uint8)
        labeled_mask = measure.label(mask_binary)
        props = measure.regionprops(labeled_mask)

        if props:
            largest_region = max(props, key=lambda x: x.area)
            cleaned_mask = (labeled_mask == largest_region.label).astype(np.uint8)
            minr, minc, maxr, maxc = largest_region.bbox
            mask = cleaned_mask[minr:maxr, minc:maxc]
            image = image[minr:maxr, minc:maxc]
        else:
            return create_default_features(image_id)
        
        # Extract all features
        shape_features = extract_shape_features(mask)
        bug_ratio = calculate_bug_ratio(mask)
        color_features = extract_color_features(image, mask)
        additional_features = extract_additional_features(image, mask)
        
        # Compute inscribed circle and symmetry
        max_radius, circle_center = compute_best_inscribed_circle(mask)
        
        if max_radius > 5:
            symmetry_score, symmetry_angle = compute_best_symmetry_plane(
                image, mask, circle_center, max_radius
            )
        else:
            symmetry_score = 0.0
            symmetry_angle = 0.0
        
        # Combine all features
        features = {
            'image_id': int(image_id),
            'bug_ratio': bug_ratio,
            'max_inscribed_radius': max_radius,
            'symmetry_score': symmetry_score,
            'symmetry_angle': symmetry_angle,
            **shape_features,
            **color_features,
            **additional_features
        }
        
        return features
        
    except Exception as e:
        print(f"Error processing image {image_id}: {e}")
        return create_default_features(image_id)

# ==================== MAIN PREDICTION PIPELINE ====================

def main():
    print("=== TEST SET PREDICTIONS (Images 251-347) ===\n")
    
    # 1. Load saved models and preprocessors
    print("1. Loading models and preprocessors...")
    try:
        best_model = joblib.load('models/best_model.pkl')
        scaler = joblib.load('../scaler.pkl')  # From feature extraction
        label_encoder = joblib.load('models/label_encoder.pkl')
        print("✓ Models loaded successfully")
    except Exception as e:
        print(f"❌ Error loading models: {e}")
        print("Make sure you have run the ML notebook and feature extraction first!")
        return
    
    # 2. Get feature columns from training data
    train_features = pd.read_csv('../features_normalized.csv')
    feature_cols = [col for col in train_features.columns 
                   if col not in ['image_id', 'bug_type', 'species']]
    print(f"✓ Using {len(feature_cols)} features")
    
    # 3. Extract features for test images
    print("\n2. Extracting features for test images...")
    test_features = []
    failed_images = []
    
    for image_id in tqdm(range(251, 348), desc="Processing images"):
        features = extract_all_features_test(image_id)
        if features is not None:
            test_features.append(features)
        else:
            failed_images.append(image_id)
    
    print(f"✓ Successfully processed {len(test_features)} images")
    if failed_images:
        print(f"⚠️  Failed to process {len(failed_images)} images: {failed_images}")
    
    # 4. Create DataFrame and ensure column consistency
    test_df = pd.DataFrame(test_features)
    
    # Ensure all required columns are present
    missing_cols = set(feature_cols) - set(test_df.columns)
    if missing_cols:
        print(f"\n⚠️  Adding missing columns with default values: {missing_cols}")
        for col in missing_cols:
            test_df[col] = 0.0
    
    # Select features in the same order as training
    X_test = test_df[feature_cols].values
    
    # 5. Normalize features
    print("\n3. Normalizing features...")
    X_test_normalized = scaler.transform(X_test)
    
    # 6. Generate predictions
    print("\n4. Generating predictions...")
    predictions_encoded = best_model.predict(X_test_normalized)
    predictions = label_encoder.inverse_transform(predictions_encoded)
    
    # 7. Create submission DataFrame
    submission_df = pd.DataFrame({
        'ID': test_df['image_id'].astype(int),
        'bug type': predictions
    })
    
    # Sort by ID to ensure correct order
    submission_df = submission_df.sort_values('ID')
    
    # 8. Display prediction summary
    print("\n5. Prediction Summary:")
    print("-" * 40)
    print(submission_df['bug type'].value_counts())
    print("-" * 40)
    
    # Check for any suspicious predictions
    if 'Bee & Bumblebee' in submission_df['bug type'].values:
        count = (submission_df['bug type'] == 'Bee & Bumblebee').sum()
        print(f"\n⚠️  Warning: {count} images predicted as 'Bee & Bumblebee'")
        print("This rare class had only 1 training sample - predictions may be unreliable")
    
    # 9. Save submission file
    output_path = '../predictions_test.csv'
    submission_df.to_csv(output_path, index=False)
    print(f"\n✓ Predictions saved to: {output_path}")
    print(f"✓ File contains {len(submission_df)} predictions")
    
    # 10. Verify submission format
    print("\n6. Submission file verification:")
    print(f"  - Columns: {list(submission_df.columns)}")
    print(f"  - ID range: {submission_df['ID'].min()} to {submission_df['ID'].max()}")
    print(f"  - Unique bug types: {sorted(submission_df['bug type'].unique())}")
    print(f"  - Shape: {submission_df.shape}")
    
    # Display first few predictions
    print("\nFirst 10 predictions:")
    print(submission_df.head(10))
    
    print("\n✅ TEST PREDICTIONS COMPLETE!")
    print(f"📁 Submit the file: {output_path}")

# ==================== ALTERNATIVE: BATCH PREDICTION ====================

def predict_from_existing_features(test_features_path):
    """
    Alternative method if you've already extracted test features
    """
    print("=== PREDICTIONS FROM EXISTING FEATURES ===\n")
    
    # Load models
    best_model = joblib.load('models/best_model.pkl')
    scaler = joblib.load('../scaler.pkl')
    label_encoder = joblib.load('models/label_encoder.pkl')
    
    # Load test features
    test_df = pd.read_csv(test_features_path)
    
    # Get feature columns
    train_features = pd.read_csv('../features_normalized.csv')
    feature_cols = [col for col in train_features.columns 
                   if col not in ['image_id', 'bug_type', 'species']]
    
    # Prepare data
    X_test = test_df[feature_cols].values
    X_test_normalized = scaler.transform(X_test)
    
    # Predict
    predictions_encoded = best_model.predict(X_test_normalized)
    predictions = label_encoder.inverse_transform(predictions_encoded)
    
    # Create submission
    submission_df = pd.DataFrame({
        'ID': test_df['image_id'].astype(int),
        'bug type': predictions
    })
    
    submission_df.to_csv('../predictions_test.csv', index=False)
    print(f"✓ Predictions saved to: ../predictions_test.csv")
    
    return submission_df

# ==================== RUN THE PREDICTION ====================

if __name__ == "__main__":
    # Option 1: Extract features and predict
    main()
    
    # Option 2: If you have already extracted test features to a CSV file
    # test_features_path = '../test_features.csv'
    # if os.path.exists(test_features_path):
    #     predict_from_existing_features(test_features_path)

=== TEST SET PREDICTIONS (Images 251-347) ===

1. Loading models and preprocessors...
✓ Models loaded successfully
✓ Using 32 features

2. Extracting features for test images...


Processing images: 100%|██████████| 97/97 [21:30<00:00, 13.30s/it]


✓ Successfully processed 97 images

3. Normalizing features...

4. Generating predictions...

5. Prediction Summary:
----------------------------------------
bug type
Bee          49
Bumblebee    30
Butterfly     7
Wasp          6
Hover fly     5
Name: count, dtype: int64
----------------------------------------

✓ Predictions saved to: ../predictions_test.csv
✓ File contains 97 predictions

6. Submission file verification:
  - Columns: ['ID', 'bug type']
  - ID range: 251 to 347
  - Unique bug types: ['Bee', 'Bumblebee', 'Butterfly', 'Hover fly', 'Wasp']
  - Shape: (97, 2)

First 10 predictions:
    ID   bug type
0  251        Bee
1  252        Bee
2  253  Hover fly
3  254  Butterfly
4  255        Bee
5  256        Bee
6  257        Bee
7  258        Bee
8  259        Bee
9  260        Bee

✅ TEST PREDICTIONS COMPLETE!
📁 Submit the file: ../predictions_test.csv
