# Setup

In [13]:
!pip install diffusers transformers accelerate

Collecting accelerate
  Downloading accelerate-0.30.1-py3-none-any.whl (302 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/302.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━[0m [32m204.8/302.6 kB[0m [31m6.0 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m302.6/302.6 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch>=1.10.0->accelerate)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch>=1.10.0->accelerate)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch>=1.10.0->accelerate)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from 

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [59]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from diffusers import StableDiffusionPipeline, UNet2DConditionModel, ControlNetModel, AutoencoderKL, PNDMScheduler
from PIL import Image
from transformers import CLIPTextModel, CLIPTokenizer
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
from torchvision import transforms
import os

# Dataset

In [93]:
class ImageShadowDepthLightingDataset(Dataset):
    def __init__(self, image_dir, shadow_dir, depth_dir, lighting_directions=None, transform=None):
        self.image_dir = image_dir
        self.shadow_dir = shadow_dir
        self.depth_dir = depth_dir
        self.transform = transform
        self.image_files = sorted(os.listdir(image_dir))
        self.shadow_files = sorted(os.listdir(shadow_dir))
        self.depth_files = sorted(os.listdir(depth_dir))
        if lighting_directions != None:
          self.lighting_coords = self.load_lighting_coords(lighting_directions)

    def load_lighting_coords(self, lighting_file):
        with open(lighting_file, 'r') as f:
            coords = [list(map(float, line.strip().split())) for line in f]
        return coords

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        image = Image.open(os.path.join(self.image_dir, self.image_files[idx])).convert('RGB')
        shadow = Image.open(os.path.join(self.shadow_dir, self.shadow_files[idx])).convert('RGB')
        depth = Image.open(os.path.join(self.depth_dir, self.depth_files[idx]))
        # lighting = torch.tensor(self.lighting_coords[idx])

        if self.transform:
            image = self.transform(image)
            shadow = self.transform(shadow)
            depth = self.transform(depth)

        return image, shadow, depth

In [94]:
# Update to break image into pieces of 256x256, instead of resizing for more data
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

train_dataset = ImageShadowDepthLightingDataset(
    image_dir="/content/drive/MyDrive/Capstone/SOBA-high-res/images/",
    shadow_dir="/content/drive/MyDrive/Capstone/SOBA-high-res/shadow_maps/",
    depth_dir="/content/drive/MyDrive/Capstone/SOBA-high-res/high_depth_maps/",
    transform=train_transform)

image, shadow, depth = train_dataset[-1]
print(image.shape)
print(shadow.shape)
print(depth.shape)

torch.Size([3, 256, 256])
torch.Size([3, 256, 256])
torch.Size([1, 256, 256])


In [87]:
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)

# Model Experiments

## Model 1 - Attempt to use Huggingface Pipeline

In [50]:
class CustomStableDiffusionPipeline(StableDiffusionPipeline):
    def __init__(self, unet, vae, text_encoder, tokenizer, scheduler, controlnet, safety_checker=None, feature_extractor=None):
        super().__init__(unet=unet, vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, scheduler=scheduler, safety_checker=safety_checker, feature_extractor=feature_extractor)
        self.controlnet = controlnet
        self.image_head = nn.Conv2d(unet.config.in_channels, 3, kernel_size=1)
        self.shadow_head = nn.Conv2d(unet.config.in_channels, 1, kernel_size=1)

    def __call__(self, input_image, depth_map, prompt, height=None, width=None, **kwargs):
        # Avoid error of height being ambiguous
        if height is None:
            height = self.unet.config.sample_size * self.vae_scale_factor
        if width is None:
            width = self.unet.config.sample_size * self.vae_scale_factor

        # Resize depth map to fit the requirements of the pipeline
        depth_map_resized = nn.functional.interpolate(depth_map, size=(input_image.shape[2], input_image.shape[3]), mode='bilinear', align_corners=False)

        text_inputs = self.tokenizer(prompt, return_tensors="pt").to(input_image.device)
        text_encoder_output = self.text_encoder(**text_inputs)
        encoder_hidden_states = text_encoder_output.last_hidden_state

        # Timestep for scheduler
        timestep = torch.tensor([1.0], dtype=torch.float32).to(input_image.device)

        # Get conditioning input using ControlNet
        controlnet_output = self.controlnet(
            sample=depth_map_resized,
            controlnet_cond=depth_map_resized,
            encoder_hidden_states=encoder_hidden_states,
            timestep=timestep
        )

        # Get diffusion model output with conditioning
        unet_output = self.unet(
            sample=input_image,
            timestep=timestep,
            encoder_hidden_states=encoder_hidden_states,
            conditioning=controlnet_output.sample
        ).sample

        image_output = self.image_head(unet_output)
        shadow_output = self.shadow_head(unet_output)

        return image_output, shadow_output

In [51]:
# Sample test of model
unet = UNet2DConditionModel.from_pretrained("runwayml/stable-diffusion-v1-5", subfolder="unet")
vae = AutoencoderKL.from_pretrained("runwayml/stable-diffusion-v1-5", subfolder="vae")
text_encoder = CLIPTextModel.from_pretrained("runwayml/stable-diffusion-v1-5", subfolder="text_encoder")
tokenizer = CLIPTokenizer.from_pretrained("runwayml/stable-diffusion-v1-5", subfolder="tokenizer")
controlnet = ControlNetModel.from_pretrained("lllyasviel/control_v11p_sd15_openpose")
scheduler = PNDMScheduler.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="scheduler")

pipeline = CustomStableDiffusionPipeline(
    unet=unet,
    vae=vae,
    text_encoder=text_encoder,
    tokenizer=tokenizer,
    scheduler=scheduler,
    controlnet=controlnet
)

dummy_image = torch.randn(1, 3, 256, 256)
dummy_depth = torch.randn(1, 1, 256, 256)
dummy_prompt = "a photo of a face"

image_output, shadow_output = pipeline(dummy_image, dummy_depth, dummy_prompt, height=256, width=256)
print(image_output.shape, shadow_output.shape)

Cannot initialize model with low cpu memory usage because `accelerate` was not found in the environment. Defaulting to `low_cpu_mem_usage=False`. It is strongly recommended to install `accelerate` for faster and less memory-intense model loading. You can do so with: 
```
pip install accelerate
```
.
Cannot initialize model with low cpu memory usage because `accelerate` was not found in the environment. Defaulting to `low_cpu_mem_usage=False`. It is strongly recommended to install `accelerate` for faster and less memory-intense model loading. You can do so with: 
```
pip install accelerate
```
.
Cannot initialize model with low cpu memory usage because `accelerate` was not found in the environment. Defaulting to `low_cpu_mem_usage=False`. It is strongly recommended to install `accelerate` for faster and less memory-intense model loading. You can do so with: 
```
pip install accelerate
```
.
You have disabled the safety checker for <class '__main__.CustomStableDiffusionPipeline'> by pass

RuntimeError: Given groups=1, weight of size [320, 4, 3, 3], expected input[1, 1, 256, 256] to have 4 channels, but got 1 channels instead

In [None]:
# Running into too many errors around input sizes - will continue to work on this using Huggingface documentation.

## Model 2 - Simple Diffusion Model from Scratch

In [89]:
# Custom UNet model
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(UNet, self).__init__()

        def down_block(in_ch, out_ch):
            return nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_ch),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_ch),
                nn.ReLU(inplace=True)
            )

        def up_block(in_ch, out_ch):
            return nn.Sequential(
                nn.ConvTranspose2d(in_ch, out_ch, kernel_size=2, stride=2),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_ch),
                nn.ReLU(inplace=True),
                nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1),
                nn.BatchNorm2d(out_ch),
                nn.ReLU(inplace=True)
            )

        self.down1 = down_block(in_channels, 64)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.down2 = down_block(64, 128)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.down3 = down_block(128, 256)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.down4 = down_block(256, 512)
        self.pool4 = nn.MaxPool2d(kernel_size=2, stride=2)

        self.middle = nn.Sequential(
            nn.Conv2d(512, 1024, kernel_size=3, padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(inplace=True),
            nn.Conv2d(1024, 1024, kernel_size=3, padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(inplace=True)
        )

        self.up4 = up_block(1024, 512)
        self.up3 = up_block(1024, 256)
        self.up2 = up_block(512, 128)
        self.up1 = up_block(256, 64)

        self.output_image = nn.Conv2d(128, out_channels, kernel_size=1)
        self.output_shadow = nn.Conv2d(128, out_channels, kernel_size=1)

    def forward(self, x, depth):
        # Concatenate depth map to input
        x = torch.cat([x, depth], dim=1)

        d1 = self.down1(x)
        p1 = self.pool1(d1)
        d2 = self.down2(p1)
        p2 = self.pool2(d2)
        d3 = self.down3(p2)
        p3 = self.pool3(d3)
        d4 = self.down4(p3)
        p4 = self.pool4(d4)

        middle = self.middle(p4)

        u4 = self.up4(middle)
        u4 = torch.cat([u4, d4], dim=1)
        u3 = self.up3(u4)
        u3 = torch.cat([u3, d3], dim=1)
        u2 = self.up2(u3)
        u2 = torch.cat([u2, d2], dim=1)
        u1 = self.up1(u2)
        u1 = torch.cat([u1, d1], dim=1)

        image_output = self.output_image(u1)
        shadow_output = self.output_shadow(u1)

        return image_output, shadow_output

In [90]:
class SimpleDiffusionModel(nn.Module):
    def __init__(self, unet):
        super(SimpleDiffusionModel, self).__init__()
        self.unet = unet

    def forward(self, x, depth, t):
        return self.unet(x, depth)

    def loss_fn(self, pred, target):
        image_pred, shadow_pred = pred
        image_target, shadow_target = target

        image_loss = F.mse_loss(image_pred, image_target)
        shadow_loss = F.mse_loss(shadow_pred, shadow_target)

        return image_loss + shadow_loss

In [91]:
# Testing the model architecture with dummy data
class DummyDataset(Dataset):
    def __init__(self, transform=None):
        self.transform = transform

    def __len__(self):
        return 100

    def __getitem__(self, idx):
        image = torch.randn(3, 256, 256)
        shadow = torch.randn(3, 256, 256)
        depth = torch.randn(1, 256, 256)

        if self.transform:
            image = self.transform(image)
            shadow = self.transform(shadow)
            depth = self.transform(depth)

        return image, shadow, depth

dummy_transform = transforms.Compose([
    transforms.Resize((256, 256)),
])

dummy_dataset = DummyDataset(transform=dummy_transform)
dataloader = DataLoader(dummy_dataset, batch_size=8, shuffle=True)

In [92]:
# Initialize model, loss function, and optimizer
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
unet = UNet(in_channels=4, out_channels=3).to(device)
model = SimpleDiffusionModel(unet=unet).to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-4)

# Training loop for testing
num_epochs = 2
for epoch in range(num_epochs):
    model.train()
    for images, shadows, depths in dataloader:
        images, shadows, depths = images.to(device), shadows.to(device), depths.to(device)

        optimizer.zero_grad()
        outputs = model(images, depths, t=0)
        loss = model.loss_fn(outputs, (images, shadows))

        loss.backward()
        optimizer.step()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

Epoch [1/2], Loss: 2.4004
Epoch [1/2], Loss: 2.3588
Epoch [1/2], Loss: 2.3197
Epoch [1/2], Loss: 2.2878
Epoch [1/2], Loss: 2.2486
Epoch [1/2], Loss: 2.2187
Epoch [1/2], Loss: 2.1813
Epoch [1/2], Loss: 2.1538
Epoch [1/2], Loss: 2.1225
Epoch [1/2], Loss: 2.0910
Epoch [1/2], Loss: 2.0599
Epoch [1/2], Loss: 2.0276
Epoch [1/2], Loss: 2.0021
Epoch [2/2], Loss: 1.9704
Epoch [2/2], Loss: 1.9466
Epoch [2/2], Loss: 1.9185
Epoch [2/2], Loss: 1.8931
Epoch [2/2], Loss: 1.8642
Epoch [2/2], Loss: 1.8460
Epoch [2/2], Loss: 1.8195
Epoch [2/2], Loss: 1.7974
Epoch [2/2], Loss: 1.7793
Epoch [2/2], Loss: 1.7546
Epoch [2/2], Loss: 1.7367
Epoch [2/2], Loss: 1.7176
Epoch [2/2], Loss: 1.6965


# Training with Real Data

In [96]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
unet = UNet(in_channels=4, out_channels=3).to(device)
model = SimpleDiffusionModel(unet=unet).to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-4)

num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    for images, shadows, depths in train_dataloader:
        images, shadows, depths = images.to(device), shadows.to(device), depths.to(device)

        optimizer.zero_grad()
        outputs = model(images, depths, t=0)  # TODO: Experiment with different timesteps
        loss = model.loss_fn(outputs, (images, shadows))

        loss.backward()
        optimizer.step()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

    torch.save(model.state_dict(), f"simple_diffusion_model_epoch_{epoch+1}.pth")

Epoch [1/100], Loss: 0.7807
Epoch [1/100], Loss: 0.5282
Epoch [1/100], Loss: 0.4803
Epoch [1/100], Loss: 0.4580
Epoch [1/100], Loss: 0.4239
Epoch [1/100], Loss: 0.3306


  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


Epoch [1/100], Loss: 0.2661
Epoch [2/100], Loss: 0.3086
Epoch [2/100], Loss: 0.3033
Epoch [2/100], Loss: 0.3150
Epoch [2/100], Loss: 0.2828
Epoch [2/100], Loss: 0.2498
Epoch [2/100], Loss: 0.2744
Epoch [2/100], Loss: 0.2797
Epoch [3/100], Loss: 0.2421
Epoch [3/100], Loss: 0.2116
Epoch [3/100], Loss: 0.1749
Epoch [3/100], Loss: 0.2343
Epoch [3/100], Loss: 0.1938
Epoch [3/100], Loss: 0.2099
Epoch [3/100], Loss: 0.1437
Epoch [4/100], Loss: 0.1695
Epoch [4/100], Loss: 0.1734
Epoch [4/100], Loss: 0.1798
Epoch [4/100], Loss: 0.2001
Epoch [4/100], Loss: 0.1449
Epoch [4/100], Loss: 0.1313
Epoch [4/100], Loss: 0.1687
Epoch [5/100], Loss: 0.1410
Epoch [5/100], Loss: 0.1633
Epoch [5/100], Loss: 0.1371
Epoch [5/100], Loss: 0.1075
Epoch [5/100], Loss: 0.1441
Epoch [5/100], Loss: 0.1609
Epoch [5/100], Loss: 0.1378
Epoch [6/100], Loss: 0.1182
Epoch [6/100], Loss: 0.1308
Epoch [6/100], Loss: 0.1281
Epoch [6/100], Loss: 0.1101
Epoch [6/100], Loss: 0.1327
Epoch [6/100], Loss: 0.1306
Epoch [6/100], Loss:

In [None]:
# TODO: Continue work with pre-trained models for better generation