In [None]:
import os
import numpy as np
import cv2
from PIL import Image
from tqdm import tqdm
import shutil
import random
from pathlib import Path

In [None]:
random.seed(42)
np.random.seed(42)

In [None]:
DATA_ROOT = "path/to/FloodNet_Challenge_Track1"  # Update this path
OUTPUT_DIR = r"C:\project\floodnet-semisupervised-vqa\preprocessed_classification"

In [None]:
TRAIN_LABELED_FLOODED_PATH = os.path.join(DATA_ROOT, "Train/Labeled/Flooded/image")
TRAIN_LABELED_NONFLOODED_PATH = os.path.join(DATA_ROOT, "Train/Labeled/Non-Flooded/image")
TRAIN_UNLABELED_PATH = os.path.join(DATA_ROOT, "Train/Unlabeled/image")
VALIDATION_PATH = os.path.join(DATA_ROOT, "Validation/image")
TEST_PATH = os.path.join(DATA_ROOT, "Test/image")

In [None]:
# Create output directory structure
def create_output_structure():
    # Main output directory
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    # Create subdirectories
    os.makedirs(os.path.join(OUTPUT_DIR, "train", "Flooded"), exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_DIR, "train", "Non-Flooded"), exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_DIR, "train_unlabeled"), exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_DIR, "validation"), exist_ok=True)
    os.makedirs(os.path.join(OUTPUT_DIR, "test"), exist_ok=True)
    
    print(f"Created output directory structure at {OUTPUT_DIR}")

In [None]:
# Function to preprocess and save an image
def preprocess_and_save(src_path, dest_path, target_size=(224, 224)):
    """Preprocess image and save to destination path"""
    try:
        # Load image
        img = Image.open(src_path)
        
        # Resize to target size
        img = img.resize(target_size, Image.Resampling.LANCZOS)
        
        # Save preprocessed image
        img.save(dest_path, quality=95)
        return True
    except Exception as e:
        print(f"Error processing {src_path}: {e}")
        return False

In [None]:
# Function to get all image files from a directory
def get_image_files(directory):
    """Get all image files from a directory"""
    image_files = []
    for filename in os.listdir(directory):
        if filename.lower().endswith(('.jpg', '.jpeg', '.png')):
            image_files.append(os.path.join(directory, filename))
    return image_files

In [None]:
def process_dataset():
    # Create output structure
    create_output_structure()
    
    # Process labeled flooded images
    flooded_images = get_image_files(TRAIN_LABELED_FLOODED_PATH)
    print(f"Processing {len(flooded_images)} flooded images...")
    for src_path in tqdm(flooded_images):
        filename = os.path.basename(src_path)
        dest_path = os.path.join(OUTPUT_DIR, "train", "Flooded", filename)
        preprocess_and_save(src_path, dest_path)
    
    # Process labeled non-flooded images
    nonflooded_images = get_image_files(TRAIN_LABELED_NONFLOODED_PATH)
    print(f"Processing {len(nonflooded_images)} non-flooded images...")
    for src_path in tqdm(nonflooded_images):
        filename = os.path.basename(src_path)
        dest_path = os.path.join(OUTPUT_DIR, "train", "Non-Flooded", filename)
        preprocess_and_save(src_path, dest_path)
    
    # Process unlabeled images
    unlabeled_images = get_image_files(TRAIN_UNLABELED_PATH)
    print(f"Processing {len(unlabeled_images)} unlabeled images...")
    for src_path in tqdm(unlabeled_images):
        filename = os.path.basename(src_path)
        dest_path = os.path.join(OUTPUT_DIR, "train_unlabeled", filename)
        preprocess_and_save(src_path, dest_path)
    
    # Process validation images
    validation_images = get_image_files(VALIDATION_PATH)
    print(f"Processing {len(validation_images)} validation images...")
    for src_path in tqdm(validation_images):
        filename = os.path.basename(src_path)
        dest_path = os.path.join(OUTPUT_DIR, "validation", filename)
        preprocess_and_save(src_path, dest_path)
    
    # Process test images
    test_images = get_image_files(TEST_PATH)
    print(f"Processing {len(test_images)} test images...")
    for src_path in tqdm(test_images):
        filename = os.path.basename(src_path)
        dest_path = os.path.join(OUTPUT_DIR, "test", filename)
        preprocess_and_save(src_path, dest_path)
    
    print("Preprocessing complete!")


In [None]:
# Create metadata file with dataset statistics
def create_metadata():
    """Create a metadata file with dataset statistics"""
    metadata = {
        "flooded_count": len(os.listdir(os.path.join(OUTPUT_DIR, "train", "Flooded"))),
        "nonflooded_count": len(os.listdir(os.path.join(OUTPUT_DIR, "train", "Non-Flooded"))),
        "unlabeled_count": len(os.listdir(os.path.join(OUTPUT_DIR, "train_unlabeled"))),
        "validation_count": len(os.listdir(os.path.join(OUTPUT_DIR, "validation"))),
        "test_count": len(os.listdir(os.path.join(OUTPUT_DIR, "test"))),
        "image_size": (224, 224),
        "preprocessing": "Resized to 224x224 pixels with Lanczos resampling"
    }
    
    # Save metadata
    with open(os.path.join(OUTPUT_DIR, "metadata.txt"), "w") as f:
        for key, value in metadata.items():
            f.write(f"{key}: {value}\n")
    
    print(f"Metadata saved to {os.path.join(OUTPUT_DIR, 'metadata.txt')}")
    return metadata

In [None]:
# Create tfrecord files for faster loading (optional)
def create_sample_images():
    """Create a few sample images to verify preprocessing"""
    sample_dir = os.path.join(OUTPUT_DIR, "samples")
    os.makedirs(sample_dir, exist_ok=True)
    
    # Copy a few samples from each category
    categories = [
        ("Flooded", os.path.join(OUTPUT_DIR, "train", "Flooded")),
        ("Non-Flooded", os.path.join(OUTPUT_DIR, "train", "Non-Flooded")),
        ("Unlabeled", os.path.join(OUTPUT_DIR, "train_unlabeled"))
    ]
    
    for category_name, category_path in categories:
        files = os.listdir(category_path)
        if len(files) > 0:
            # Select 3 random files
            sample_files = random.sample(files, min(3, len(files)))
            
            # Copy the sample files
            for sample_file in sample_files:
                src = os.path.join(category_path, sample_file)
                dst = os.path.join(sample_dir, f"{category_name}_{sample_file}")
                shutil.copy(src, dst)
    
    print(f"Sample images copied to {sample_dir}")

if __name__ == "__main__":
    # Print startup message
    print("FloodNet Image Classification Preprocessing")
    print("===========================================")
    print(f"Source directory: {DATA_ROOT}")
    print(f"Destination directory: {OUTPUT_DIR}")
    
    # Check if directories exist
    if not os.path.exists(DATA_ROOT):
        print(f"Error: Source directory {DATA_ROOT} does not exist!")
        print("Please update the DATA_ROOT variable in the script.")
        exit(1)
    
    # Process the dataset
    process_dataset()
    
    # Create metadata
    metadata = create_metadata()
    
    # Create sample images
    create_sample_images()
    
    # Print summary
    print("\nPreprocessing Summary:")
    print("====================")
    for key, value in metadata.items():
        print(f"{key}: {value}")
    print("\nPreprocessed images are ready for training!")