In [1]:
!pip uninstall torch torchvision torchaudio fastai -y

Found existing installation: torch 2.5.1+cu121
Uninstalling torch-2.5.1+cu121:
  Successfully uninstalled torch-2.5.1+cu121
Found existing installation: torchvision 0.20.1+cu121
Uninstalling torchvision-0.20.1+cu121:
  Successfully uninstalled torchvision-0.20.1+cu121
Found existing installation: torchaudio 2.5.1+cu121
Uninstalling torchaudio-2.5.1+cu121:
  Successfully uninstalled torchaudio-2.5.1+cu121
Found existing installation: fastai 2.7.18
Uninstalling fastai-2.7.18:
  Successfully uninstalled fastai-2.7.18


In [2]:
!pip install diffusers==0.32.2 xformers torchvision torchaudio fastai --no-cache-dir

Collecting diffusers==0.32.2
  Downloading diffusers-0.32.2-py3-none-any.whl.metadata (18 kB)
Collecting xformers
  Downloading xformers-0.0.29.post3-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (1.0 kB)
Collecting torchvision
  Downloading torchvision-0.21.0-cp310-cp310-manylinux1_x86_64.whl.metadata (6.1 kB)
Collecting torchaudio
  Downloading torchaudio-2.6.0-cp310-cp310-manylinux1_x86_64.whl.metadata (6.6 kB)
Collecting fastai
  Downloading fastai-2.7.18-py3-none-any.whl.metadata (9.1 kB)
Collecting torch==2.6.0 (from xformers)
  Downloading torch-2.6.0-cp310-cp310-manylinux1_x86_64.whl.metadata (28 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch==2.6.0->xformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch==2.6.0->xformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==

In [3]:
!pip install accelerate>=0.16.0 torchvision transformers>=4.25.1 ftfy tensorboard Jinja2 peft==0.7.0

In [4]:
# Image upscaling with true parallel multi-GPU support
# model: stabilityai/stable-diffusion-x4-upscaler

from datasets import load_dataset
from diffusers import StableDiffusionUpscalePipeline
import torch
import gc
from tqdm import tqdm
import concurrent.futures
import threading
import time
import os
from functools import partial

classes = {
    'broadleaved_indigenous_hardwood': 'BIH', 
    'deciduous_hardwood': 'DHW', 
    'grose_broom': 'GBM', 
    'harvested_forest': 'HFT', 
    'herbaceous_freshwater_vege': 'HFV', 
    'high_producing_grassland': 'HPG', 
    'indigenous_forest': 'IFT', 
    'lake_pond': 'LPD', 
    'low_producing_grassland': 'LPG', 
    'manuka_kanuka': 'MKA', 
    'shortrotation_cropland': 'SRC', 
    'urban_build_up': 'UBU', 
    'urban_parkland': 'UPL'
}

# Global variables for GPU management
gpu_locks = []
pipelines = []

def setup_multi_pipeline():
    """
    Set up multiple pipelines, one per GPU with proper locking mechanisms.
    """
    global gpu_locks, pipelines
    
    model_id = "stabilityai/stable-diffusion-x4-upscaler"
    num_gpus = torch.cuda.device_count()
    
    if num_gpus <= 0:
        raise RuntimeError("No CUDA devices available")
    
    print(f"Found {num_gpus} GPU(s)")
    
    # Create locks and pipelines for each GPU
    gpu_locks = [threading.Lock() for _ in range(num_gpus)]
    pipelines = []
    
    for i in range(num_gpus):
        print(f"Setting up pipeline on GPU {i}")
        pipe = StableDiffusionUpscalePipeline.from_pretrained(
            model_id,
            torch_dtype=torch.float16
        )
        pipe = pipe.to(f"cuda:{i}")
        pipelines.append(pipe)
    
    print(f"Created {len(pipelines)} pipelines across {num_gpus} GPUs")
    return num_gpus

def process_batch_on_gpu(batch_data, gpu_idx, int2str_fn):
    """Process a batch of examples on a specific GPU."""
    global gpu_locks, pipelines
    
    results = []
    batch_indices, batch_examples = batch_data
    
    # Acquire lock for this GPU
    with gpu_locks[gpu_idx]:
        pipeline = pipelines[gpu_idx]
        
        for idx, example in zip(batch_indices, batch_examples):
            image = example["image"]
            label = example["label"]
            
            # Get the string label
            label_str = int2str_fn(label)
            
            # Resize the image
            low_res_image = image.resize((128, 128))
            
            # Create prompt
            label_parts = label_str.split("_")
            prompt = f"A real aerial view of {' '.join(label_parts)} area in Waikato, New Zealand"
            
            # Upscale the image using the selected pipeline
            try:
                upscaled_image = pipeline(prompt=prompt, image=low_res_image).images[0]
                
                # Create result with upscaled image
                result = dict(example)
                result["image"] = upscaled_image
                results.append((idx, result))
            except Exception as e:
                print(f"Error processing image {idx} on GPU {gpu_idx}: {str(e)}")
                # Return original in case of error
                results.append((idx, example))
    
    return results

def upscale(dataset_name: str, version: str, huggingface_token: str) -> None:
    """Upscale the dataset using multiple GPUs in parallel."""
    full_dataset_name = f"{dataset_name}{version}"
    print(f"Loading dataset: {full_dataset_name}")
    
    synthetic_ds = load_dataset(full_dataset_name, token=huggingface_token)
    
    # Set up multiple pipelines
    num_gpus = setup_multi_pipeline()
    
    # Process each split
    for split in synthetic_ds:
        print(f"Processing {split} split with {len(synthetic_ds[split])} samples")
        
        # Get the int2str function for labels
        int2str_fn = synthetic_ds[split].features["label"].int2str
        
        # Define batch size and prepare batches
        # Smaller batches to avoid memory issues, but not too small to maintain parallelism
        batch_size = 4  # Can be adjusted based on your GPU memory
        total_samples = len(synthetic_ds[split])
        
        # Create batches
        batches = []
        for i in range(0, total_samples, batch_size):
            end_idx = min(i + batch_size, total_samples)
            batch_indices = list(range(i, end_idx))
            batch_examples = [synthetic_ds[split][j] for j in batch_indices]
            batches.append((batch_indices, batch_examples))
        
        print(f"Created {len(batches)} batches of size {batch_size}")
        
        # Process batches in parallel using ThreadPoolExecutor
        processed_results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_gpus) as executor:
            # Submit each batch to a GPU
            futures = []
            for i, batch in enumerate(batches):
                gpu_idx = i % num_gpus
                # Use partial to create a function with bound arguments
                process_fn = partial(process_batch_on_gpu, batch, gpu_idx, int2str_fn)
                futures.append(executor.submit(process_fn))
            
            # Process results as they complete
            for i, future in enumerate(tqdm(concurrent.futures.as_completed(futures), 
                                          total=len(futures), 
                                          desc="Processing batches")):
                try:
                    batch_results = future.result()
                    processed_results.extend(batch_results)
                    
                    # Periodically clear CUDA cache
                    if i % 10 == 0:
                        torch.cuda.empty_cache()
                except Exception as e:
                    print(f"Batch processing error: {str(e)}")
        
        # Sort results by index and extract only the examples
        processed_results.sort(key=lambda x: x[0])
        sorted_results = [result for _, result in processed_results]
        
        # Create a new dataset from the processed results
        upscaled_ds = synthetic_ds[split].from_list(sorted_results)
        synthetic_ds[split] = upscaled_ds
    
    # Clean up GPU memory
    global pipelines
    for pipeline in pipelines:
        del pipeline
    pipelines = []
    torch.cuda.empty_cache()
    gc.collect()
    
    # Push to hub
    output_name = f"{full_dataset_name}_upscaled"
    print(f"Pushing to hub: {output_name}")
    synthetic_ds.push_to_hub(output_name, token=huggingface_token)
    print("Processing complete!")


The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

In [5]:
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()

HFT = user_secrets.get_secret("HFT")

In [None]:
upscale(
    dataset_name="dushj98/waikato_aerial_2017_synthetic", 
    version="_v1", 
    huggingface_token=HFT,
)

___

# Using Accelerate and Xformers

In [10]:
from datasets import load_dataset
from diffusers import StableDiffusionUpscalePipeline
import torch
from torch.nn import DataParallel
from torchvision.transforms import ToTensor, ToPILImage

# Define classes
classes = {
    'broadleaved_indigenous_hardwood': 'BIH', 
    'deciduous_hardwood': 'DHW', 
    'grose_broom': 'GBM', 
    'harvested_forest': 'HFT', 
    'herbaceous_freshwater_vege': 'HFV', 
    'high_producing_grassland': 'HPG', 
    'indigenous_forest': 'IFT', 
    'lake_pond': 'LPD', 
    'low_producing_grassland': 'LPG', 
    'manuka_kanuka': 'MKA', 
    'shortrotation_cropland': 'SRC', 
    'urban_build_up': 'UBU', 
    'urban_parkland': 'UPL'
}

# Load the model
model_id = "stabilityai/stable-diffusion-x4-upscaler"
pipeline = StableDiffusionUpscalePipeline.from_pretrained(
    model_id, 
    torch_dtype=torch.float16
)

# Enable xFormers for memory efficiency and speed (if installed)
try:
    pipeline.enable_xformers_memory_efficient_attention()
except ImportError:
    print("xFormers not installed. Skipping memory-efficient attention.")

# Move the pipeline to the GPU and wrap it with DataParallel
if torch.cuda.device_count() > 1:
    print(f"Using {torch.cuda.device_count()} GPUs!")
    pipeline = DataParallel(pipeline.to("cuda"))
else:
    pipeline = pipeline.to("cuda")

# Define transforms for converting PIL images to tensors and back
to_tensor = ToTensor()
to_pil = ToPILImage()

def _upscale_batch(batch, int2str):
    """
    Processes a batch of images and labels.
    """
    images = batch["image"]
    labels = batch["label"]
    label_strings = [int2str(label) for label in labels]

    # Resize images to low resolution and convert to tensors
    low_res_images = [to_tensor(image.resize((128, 128))) for image in images]

    # Generate prompts for the batch
    prompts = [
        f"A real aerial view of {' '.join(label_str.split('_'))} area in Waikato, New Zealand"
        for label_str in label_strings
    ]

    # Perform upscaling for the batch
    upscaled_images = []
    for prompt, low_res_image in zip(prompts, low_res_images):
        with torch.no_grad():
            # Convert tensor back to PIL for the pipeline
            low_res_pil = to_pil(low_res_image)
            # Move the PIL image to the correct device (handled by DataParallel)
            if torch.cuda.device_count() > 1:
                upscaled_image = pipeline.module(prompt=prompt, image=low_res_pil).images[0]
            else:
                upscaled_image = pipeline(prompt=prompt, image=low_res_pil).images[0]
            upscaled_images.append(upscaled_image)  # PIL image, no need to move to CPU

    # Update the batch with upscaled images
    batch["image"] = upscaled_images
    
    return batch


def _upscale(dataset_name: str, version: str, huggingface_token: str) -> None:
    dataset_name = f"{dataset_name}{version}"
    synthetic_ds = load_dataset(dataset_name, token=huggingface_token)

    # Use `map` to handle batched processing
    synthetic_ds = synthetic_ds.map(
        _upscale_batch, 
        fn_kwargs={"int2str": synthetic_ds["train"].features["label"].int2str},
        batched=True,  # Process images in batches
        batch_size=4   # Adjust batch size based on GPU memory
    )

    # Push the upscaled dataset to the Hub
    synthetic_ds.push_to_hub(f"{dataset_name}{version}_upscaled", token=huggingface_token)


Loading pipeline components...:   0%|          | 0/6 [00:00<?, ?it/s]

Using 2 GPUs!


In [12]:
try:
    _upscale(
        dataset_name="dushj98/waikato_aerial_2017_synthetic", 
        version="_v1", 
        huggingface_token=HFT
    )
except KeyboardInterrupt:
    print("Stopped via a Keyboard Interrupt")

Map:   0%|          | 0/13000 [00:00<?, ? examples/s]

  0%|          | 0/75 [00:00<?, ?it/s]

  0%|          | 0/75 [00:00<?, ?it/s]

  0%|          | 0/75 [00:00<?, ?it/s]

  0%|          | 0/75 [00:00<?, ?it/s]

  0%|          | 0/75 [00:00<?, ?it/s]

Stopped via a Keyboard Interrupt


___