In [None]:
from torch_helpers import *

In [None]:
CSV_PATH = '../data/annotations_ukraine.csv'
BEFORE_PATH = '../data/images_ukraine_extracted_before/'
AFTER_PATH = '../data/images_ukraine_extracted_after/'

In [None]:
def test_dataset():
    # Instantiate the dataset
    dataset = ChangeDetectionDataset(path=CSV_PATH, before_path=BEFORE_PATH, after_path=AFTER_PATH)

    # Check the length of the dataset
    print(f"Number of samples in the dataset: {len(dataset)}")

    # Iterate over a few samples
    for i in range(3):
        sample = dataset[i]
        I1, I2, label = sample['I1'], sample['I2'], sample['label']
        print(f"Sample {i}:")
        print(f"  I1 shape: {I1.shape}, I2 shape: {I2.shape}, Label: {label}")

# Run the test
test_dataset()


In [None]:
from torch.utils.data import DataLoader

def test_dataloader():
    # Instantiate the dataset
    dataset = ChangeDetectionDataset(path=CSV_PATH, before_path=BEFORE_PATH, after_path=AFTER_PATH)

    # Create a DataLoader
    dataloader = DataLoader(dataset, batch_size=2, shuffle=True)

    # Iterate over a few batches
    for i, batch in enumerate(dataloader):
        I1_batch, I2_batch, labels = batch['I1'], batch['I2'], batch['label']
        print(f"Batch {i}:")
        print(f"  I1 batch shape: {I1_batch.shape}, I2 batch shape: {I2_batch.shape}, Labels: {labels}")
        if i == 2:  # Limit to a few batches
            break

# Run the test
test_dataloader()

In [None]:
from torch_helpers import ChangeDetectionDataset
import numpy as np
CSV_PATH = '../data/annotations_ukraine.csv'
BEFORE_PATH = '../data/images_ukraine_extracted_before/'
AFTER_PATH = '../data/images_ukraine_extracted_after/'


def visualize_samples(dataset):
    import matplotlib.pyplot as plt
    import numpy as np    
    # Visualize a few samples
    # get annotations 
    df = dataset.df
    
    # set random seed
    np.random.seed(7)

    indices = np.random.choice(len(df), size=1, replace=False)
    for i in indices:
        sample = dataset[i]
        I1, I2, label = np.squeeze(sample['I1'].numpy()), np.squeeze(sample['I2'].numpy()), sample['label']

        # # Normalize the images to [0, 1] range
        # I1 = I1/10000
        # I2 = I2/10000

        # Extract metadata for the title
        location = df.iloc[i]['location']
        admin1 = df.iloc[i]['admin1']
        event_date = df.iloc[i]['event_date']
        attack = "Yes" if label == 1 else "No"
        timeline_id = df.iloc[i]['timeline_id']

        # Plot the images
        fig, axes = plt.subplots(1, 2, figsize=(20, 10))
        fig.suptitle(f"{location}, {admin1}, {event_date}, Attack: {attack}, id: {timeline_id}", fontsize=14, y=1.05)

        axes[0].imshow(np.transpose(I1, (1, 2, 0)))
        axes[0].set_title("Before", fontsize=12)
        axes[1].imshow(np.transpose(I2, (1, 2, 0)))
        axes[1].set_title("After", fontsize=12)

        plt.tight_layout()
        plt.show()

# Run the visualization
dataset = ChangeDetectionDataset(path=CSV_PATH, before_path=BEFORE_PATH,
                                     after_path=AFTER_PATH, normalise=False)

visualize_samples(dataset)

The following script extracts the dimensions of all images and writes them to a csv. That way I can identify problematic images and set the minimum dimensions for centre cropping. I then manually inspect the csvs, noted down the ids that belong to faulty images and deleted with programatically.

In [None]:
import os
import pandas as pd
import glob
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import tifffile
from tqdm import tqdm

# Base directory containing the image folders
base_dir = "../data/images_ukraine_extracted_after"

def get_image_info(tiff_path):
    """Extract image ID and dimensions using tifffile"""
    try:
        image_id = Path(tiff_path).parts[-3]
        
        # Read only metadata with tifffile
        with tifffile.TiffFile(tiff_path) as tif:
            width = tif.pages[0].imagewidth
            height = tif.pages[0].imagelength
        
        return {"id": image_id, "width": width, "height": height}
    except Exception as e:
        print(f"Error processing {tiff_path}: {e}")
        return None

def main():
    # Find all TIFF files matching the pattern
    print("Finding TIFF files...")
    tiff_files = glob.glob(os.path.join(base_dir, "**/files/composite.tif"), recursive=True)
    print(f"Found {len(tiff_files)} TIFF files")
    
    # Process files in parallel for speed
    results = []
    with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
        # Use tqdm for a progress bar
        results = list(filter(None, tqdm(
            executor.map(get_image_info, tiff_files),
            total=len(tiff_files),
            desc="Processing images"
        )))
    
    # Create a DataFrame from the results
    df = pd.DataFrame(results)
    
    # Print summary
    print(f"\nProcessed {len(df)} images successfully")
    print("\nSample data:")
    print(df.head())
    
    # Save to CSV
    output_path = os.path.join(base_dir, "after_image_dimensions.csv")
    df.to_csv(output_path, index=False)
    print(f"\nResults saved to {output_path}")

if __name__ == "__main__":
    main()

In [None]:
before_dimensions = pd.read_csv('../data/before_image_dimensions.csv')
after_dimensions = pd.read_csv('../data/after_image_dimensions.csv')

In [None]:
import shutil
import os

ids_to_exclude = [
    1130985, 688190, 1805361, 8460613, 2301444, 2095111, 
    1069195, 5010964, 634941, 3379814, 2244223, 5187329, 
    1072836, 5165655
]

# Iterate over the IDs to exclude
for id_to_exclude in ids_to_exclude:
    # Construct the paths for the before and after directories
    before_dir = os.path.join(BEFORE_PATH, str(id_to_exclude))
    after_dir = os.path.join(AFTER_PATH, str(id_to_exclude))
    
    # Remove the directories if they exist
    if os.path.exists(before_dir):
        shutil.rmtree(before_dir)
        print(f"Removed directory: {before_dir}")
    if os.path.exists(after_dir):
        shutil.rmtree(after_dir)
        print(f"Removed directory: {after_dir}")