In [None]:
import os
import cv2
import numpy as np
import torch
from segment_anything import sam_model_registry, SamPredictor
from tqdm import tqdm
import gc
from PIL import Image
from torchvision import models, transforms

In [None]:
sam_checkpoint = "/kaggle/input/segment-anything/pytorch/vit-b/1/model.pth"
model_type = "vit_b"
device = "cuda" if torch.cuda.is_available() else "cpu"

# Initialize SAM
print("Loading models...")
sam = sam_model_registry[model_type](checkpoint=sam_checkpoint)
sam.to(device=device)
predictor = SamPredictor(sam)

In [None]:
# --------- RESNET SETUP ----------
print("Loading ResNet model...")
resnet_model = models.resnet50(pretrained=True)
resnet_model.fc = torch.nn.Identity()  # Remove classification layer
resnet_model.eval()
resnet_model.to(device)

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

In [None]:
TOP_K_SEGMENTS = 5
BATCH_SIZE = 4 
TARGET_SIZE = 1024

In [None]:
def preprocess_image(image_path: str) -> np.ndarray:
    """Load and preprocess image to target size"""
    image = cv2.imread(image_path)
    if image is None:
        return None
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    h, w = image.shape[:2]
    
    # Resize to square while maintaining aspect ratio
    scale = TARGET_SIZE / max(h, w)
    new_h, new_w = int(h * scale), int(w * scale)
    image = cv2.resize(image, (new_w, new_h))
    
    # Pad to make square
    top = (TARGET_SIZE - new_h) // 2
    bottom = TARGET_SIZE - new_h - top
    left = (TARGET_SIZE - new_w) // 2
    right = TARGET_SIZE - new_w - left
    return cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=0)

In [None]:
def process_segments(image, masks):
    """
    Extracts visual features from top 10 SAM segments based on predicted IoU.
    Returns a (10, 2048) NumPy array.
    """
    # Sort by predicted IoU (descending)
    masks = sorted(masks, key=lambda x: x['predicted_iou'], reverse=True)[:TOP_K_SEGMENTS]
    
    feature_list = []
    
    for mask in masks:
        binary_mask = mask['segmentation'].astype(np.uint8)
        masked_img = cv2.bitwise_and(image, image, mask=binary_mask)
        
        if np.count_nonzero(binary_mask) < 50:
            continue
            
        try:
            input_tensor = transform(masked_img).unsqueeze(0).to(device)
            with torch.no_grad():
                features = resnet_model(input_tensor)
                feature_list.append(features.squeeze().cpu().numpy())
        except Exception as e:
            print(f"Skipping segment due to error: {e}")
            continue
    
    # Ensure exactly 10 features; pad with zeros if needed
    while len(feature_list) < TOP_K_SEGMENTS:
        feature_list.append(np.zeros(2048))
    
    return np.array(feature_list)  # shape: (10, 2048)

In [None]:
def batch_process_images(image_paths, save_dir):
    """Process images in batches using SAM and save features to individual .npy files"""
    os.makedirs(save_dir, exist_ok=True)
    
    for i in tqdm(range(0, len(image_paths), BATCH_SIZE), desc="Batch Processing"):
        batch_paths = image_paths[i:i+BATCH_SIZE]
        batch_images = []
        valid_paths = []
        
        # Load and preprocess batch
        for path in batch_paths:
            img = preprocess_image(path)
            if img is not None:
                if i % 500 == 0:
                    print("image being loaded")
                batch_images.append(img)
                valid_paths.append(path)
        
        if not batch_images:
            continue
            
        # Process each image individually
        for j, path in enumerate(valid_paths):
            try:
                predictor.set_image(batch_images[j])
                masks, scores, _ = predictor.predict()
                
                # Convert to mask dictionary format
                mask_dicts = [{
                    'segmentation': masks[k],
                    'predicted_iou': scores[k]
                } for k in range(len(masks))]
                
                features = process_segments(batch_images[j], mask_dicts)
                
                # Save features to .npy file
                img_file = os.path.basename(path)
                save_path = os.path.join(save_dir, img_file.rsplit('.', 1)[0] + '.npy')
                np.save(save_path, features)
                
            except Exception as e:
                print(f"Failed on {path}: {str(e)}")
                continue
            
        # Memory management
        if i % 20 == 0:
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

In [None]:
def process_dataset(image_folder: str):
    """Main processing pipeline"""
    image_paths = [
        os.path.join(image_folder, f) 
        for f in os.listdir(image_folder) 
        if f.lower().endswith(('.png', '.jpg', '.jpeg'))
    ]
    
    save_dir = '/kaggle/working/sam_features_f'  # Output directory
    batch_process_images(image_paths, save_dir)
    
    print(f"Saved features to {save_dir}")
    return save_dir

In [None]:
flickr8k_image_folder = "/kaggle/input/flickr8k/Images" 
features_dir = process_dataset(flickr8k_image_folder)

In [None]:
import os
import zipfile
from tqdm import tqdm

def zip_features_directory(features_dir='/kaggle/working/sam_features_f', output_zip='/kaggle/working/sam_features_f.zip'):
    """
    Compress the features directory into a zip file.
    """
    # Ensure the directory exists
    if not os.path.exists(features_dir):
        raise FileNotFoundError(f"Features directory not found: {features_dir}")
    
    print(f"Zipping contents of {features_dir} to {output_zip}...")
    
    # Create a zip file and add all feature files
    with zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Get list of all .npy files
        feature_files = [f for f in os.listdir(features_dir) if f.endswith('.npy')]
        
        # Add each file to the zip with progress bar
        for file in tqdm(feature_files, desc="Compressing files"):
            file_path = os.path.join(features_dir, file)
            zipf.write(file_path, arcname=file)
    
    print(f"Successfully created zip archive at {output_zip}")
    print(f"Total files compressed: {len(feature_files)}")
    print(f"Zip file size: {os.path.getsize(output_zip)/1024/1024:.2f} MB")
    
    return output_zip

# Example usage:
# After running process_dataset()
zip_path = zip_features_directory()