# Diffusion Model Learning Notebook
Overview:
- Basic Diffusion Model
- Train Basic Difusion Model
- Improved Diffusion Model
- Train Improved Diffusion Model

## Basic Diffusion Model

In [None]:
# basic of Diffusion Model is consisted of two process
#        forward process (applying noise)
#    and reverse process (remove noise)
!pip install utils
!pip install modules
!pip install tqdm
!pip install torch
!pip install torchvision
!pip install requests



In [1]:
import os
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from tqdm import tqdm
from torch import optim
from utils import *
#from modules import UNet
import logging
from torch.utils.tensorboard import SummaryWriter

logging.basicConfig(format="%(asctime)s - %(levelname)s: %(message)s", level=logging.INFO, datefmt="%I:%M:%S")

if torch.cuda.is_available():
    print("GPU is available")
else:
    print("GPU is not available")

ModuleNotFoundError: No module named 'utils'

### definitions for neural networks inside our models

In [None]:
#some nn definition before our deffusion model
class EMA:
    def __init__(self, beta):
        super().__init__()
        self.beta = beta
        self.step = 0

    def update_model_average(self, ma_model, current_model):
        for current_params, ma_params in zip(current_model.parameters(), ma_model.parameters()):
            old_weight, up_weight = ma_params.data, current_params.data
            ma_params.data = self.update_average(old_weight, up_weight)

    def update_average(self, old, new):
        if old is None:
            return new
        return old * self.beta + (1 - self.beta) * new

    def step_ema(self, ema_model, model, step_start_ema=2000):
        if self.step < step_start_ema:
            self.reset_parameters(ema_model, model)
            self.step += 1
            return
        self.update_model_average(ema_model, model)
        self.step += 1

    def reset_parameters(self, ema_model, model):
        ema_model.load_state_dict(model.state_dict())


class SelfAttention(nn.Module):
    def __init__(self, channels, size):
        super(SelfAttention, self).__init__()
        self.channels = channels
        self.size = size
        self.mha = nn.MultiheadAttention(channels, 4, batch_first=True)
        self.ln = nn.LayerNorm([channels])
        self.ff_self = nn.Sequential(
            nn.LayerNorm([channels]),
            nn.Linear(channels, channels),
            nn.GELU(),
            nn.Linear(channels, channels),
        )

    def forward(self, x):
        x = x.view(-1, self.channels, self.size * self.size).swapaxes(1, 2)
        x_ln = self.ln(x)
        attention_value, _ = self.mha(x_ln, x_ln, x_ln)
        attention_value = attention_value + x
        attention_value = self.ff_self(attention_value) + attention_value
        return attention_value.swapaxes(2, 1).view(-1, self.channels, self.size, self.size)


class DoubleConv(nn.Module):
    def __init__(self, in_channels, out_channels, mid_channels=None, residual=False):
        super().__init__()
        self.residual = residual
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
            nn.GroupNorm(1, mid_channels),
            nn.GELU(),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.GroupNorm(1, out_channels),
        )

    def forward(self, x):
        if self.residual:
            return F.gelu(x + self.double_conv(x))
        else:
            return self.double_conv(x)


class Down(nn.Module):
    def __init__(self, in_channels, out_channels, emb_dim=256):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, in_channels, residual=True),
            DoubleConv(in_channels, out_channels),
        )

        self.emb_layer = nn.Sequential(
            nn.SiLU(),
            nn.Linear(
                emb_dim,
                out_channels
            ),
        )

    def forward(self, x, t):
        x = self.maxpool_conv(x)
        emb = self.emb_layer(t)[:, :, None, None].repeat(1, 1, x.shape[-2], x.shape[-1])
        return x + emb


class Up(nn.Module):
    def __init__(self, in_channels, out_channels, emb_dim=256):
        super().__init__()

        self.up = nn.Upsample(scale_factor=2, mode="bilinear", align_corners=True)
        self.conv = nn.Sequential(
            DoubleConv(in_channels, in_channels, residual=True),
            DoubleConv(in_channels, out_channels, in_channels // 2),
        )

        self.emb_layer = nn.Sequential(
            nn.SiLU(),
            nn.Linear(
                emb_dim,
                out_channels
            ),
        )

    def forward(self, x, skip_x, t):
        x = self.up(x)
        x = torch.cat([skip_x, x], dim=1)
        x = self.conv(x)
        emb = self.emb_layer(t)[:, :, None, None].repeat(1, 1, x.shape[-2], x.shape[-1])
        return x + emb


### Definition of Deffusion Model

In [None]:
# Diffusion Model definition
class Diffusion:
  def __init(self, noise_steps = 1000, beta_start = 1e-4, beta_end = 0.02, img_size = 256, device = "cuda"):
    self.noise_steps = noise_steps
    self.beta_start = beta_start
    self.beta_end = beta_end
    self.img_size = img_size
    self.device = device

    self.beta = self.prepare_noise_schedule().to(device)
    self.alpha = 1. - self.beta
    self.alpha_hat = torch.cumprod(self.alpha, dim = 0)

  def prepare_noise_schedule(self):
    return torch.linspace(self.beta_start, self.beta_end, self.noise_steps)

  def noise_images(self, x,t):
    sqrt_alpha_hat = torch.sqrt(self.alpha_hat[t])[:,None,None,None]
    sqrt_one_minus_alpha_hat = torch.sqrt(1. - self.alpha_hat[t])[:,None,None,None]
    noise = torch.randn_like(x)
    return sqrt_alpha_hat * x + sqrt_one_minus_alpha_hat * noise, noise

  def sample_timesteps(self, n):
    return torch.randint(low = 1, high = self.noise_steps, size = (n,))

  def sample(self, model, n):
    logging.info(f"Sampling {n} new images ...")
    model.eval()
    with torch.no_grad():
      x = torch.randn((n, 3, self.img_size, self.img_size)).to(self.device)
      for i in tqdm(reversed(range(1, self.noise_steps)), position = 0):
        t = torch.ones((n)* i).long().to(self.device)
        predicted_noise = model(x, t)
        alpha = self.alpha[t][:,None,None,None]
        alpha_hat = self.alpha_hat[t][:,None,None,None]
        beta = self.beta[t][:,None,None,None]
        if i> 1:
          noise = torch.randn_like(x)
        else:
          noise = torch.zeros_like(x)
        x = 1/torch.sqrt(alpha) * (x - ((1 - alpha)/(torch.sqrt(1 - alpha_hat))) * predicted_noise) + torch.sqrt(beta) * noise
    model.train()
    x = (x.clamp(-1,1) + 1)/2
    x = (x*255).type(torch.uint8)
    return x

### Definition of UNet

In [None]:
# UNet Definition
class UNet(nn.Module):
    def __init__(self, c_in=3, c_out=3, time_dim=256, device="cuda"):
        super().__init__()
        self.device = device
        self.time_dim = time_dim
        self.inc = DoubleConv(c_in, 64) #two ccovolutional nn
        self.down1 = Down(64, 128)  # reduce size by 2
        self.sa1 = SelfAttention(128, 32)
        self.down2 = Down(128, 256)
        self.sa2 = SelfAttention(256, 16)
        self.down3 = Down(256, 256)
        self.sa3 = SelfAttention(256, 8)

        self.bot1 = DoubleConv(256, 512)
        self.bot2 = DoubleConv(512, 512)
        self.bot3 = DoubleConv(512, 256)

        self.up1 = Up(512, 128)
        self.sa4 = SelfAttention(128, 16)
        self.up2 = Up(256, 64)
        self.sa5 = SelfAttention(64, 32)
        self.up3 = Up(128, 64)
        self.sa6 = SelfAttention(64, 64)
        self.outc = nn.Conv2d(64, c_out, kernel_size=1)

    def pos_encoding(self, t, channels):
        inv_freq = 1.0 / (
            10000
            ** (torch.arange(0, channels, 2, device=self.device).float() / channels)
        )
        pos_enc_a = torch.sin(t.repeat(1, channels // 2) * inv_freq)
        pos_enc_b = torch.cos(t.repeat(1, channels // 2) * inv_freq)
        pos_enc = torch.cat([pos_enc_a, pos_enc_b], dim=-1)
        return pos_enc

    def forward(self, x, t):
        t = t.unsqueeze(-1).type(torch.float)
        t = self.pos_encoding(t, self.time_dim)

        x1 = self.inc(x)
        x2 = self.down1(x1, t)
        x2 = self.sa1(x2)
        x3 = self.down2(x2, t)
        x3 = self.sa2(x3)
        x4 = self.down3(x3, t)
        x4 = self.sa3(x4)

        x4 = self.bot1(x4)
        x4 = self.bot2(x4)
        x4 = self.bot3(x4)

        x = self.up1(x4, x3, t)
        x = self.sa4(x)
        x = self.up2(x, x2, t)
        x = self.sa5(x)
        x = self.up3(x, x1, t)
        x = self.sa6(x)
        output = self.outc(x)
        return output


### Plotting function

In [None]:
import os
import torch
import torchvision
from PIL import Image
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader


def plot_images(images):
    plt.figure(figsize=(32, 32))
    plt.imshow(torch.cat([
        torch.cat([i for i in images.cpu()], dim=-1),
    ], dim=-2).permute(1, 2, 0).cpu())
    plt.show()


def save_images(images, path, **kwargs):
    grid = torchvision.utils.make_grid(images, **kwargs)
    ndarr = grid.permute(1, 2, 0).to('cpu').numpy()
    im = Image.fromarray(ndarr)
    im.save(path)


def get_data(args):
    transforms = torchvision.transforms.Compose([
        torchvision.transforms.Resize(80),  # args.image_size + 1/4 *args.image_size
        torchvision.transforms.RandomResizedCrop(args.image_size, scale=(0.8, 1.0)),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
    dataset = torchvision.datasets.ImageFolder(args.dataset_path, transform=transforms)
    dataloader = DataLoader(dataset, batch_size=args.batch_size, shuffle=True)
    return dataloader


def setup_logging(run_name):
    os.makedirs("models", exist_ok=True)
    os.makedirs("results", exist_ok=True)
    os.makedirs(os.path.join("models", run_name), exist_ok=True)
    os.makedirs(os.path.join("results", run_name), exist_ok=True)

### Training Loops

In [None]:
def train(args):
    setup_logging(args.run_name)
    device = args.device
    dataloader = get_data(args)
    model = UNet().to(device)
    optimizer = optim.AdamW(model.parameters(), lr=args.lr)
    mse = nn.MSELoss()
    diffusion = Diffusion(img_size=args.image_size, device=device)
    logger = SummaryWriter(os.path.join("runs", args.run_name))
    l = len(dataloader)

    for epoch in range(args.epochs):
        logging.info(f"Starting epoch {epoch}:")
        pbar = tqdm(dataloader)
        for i, (images, _) in enumerate(pbar):
            images = images.to(device)
            t = diffusion.sample_timesteps(images.shape[0]).to(device)
            x_t, noise = diffusion.noise_images(images, t)
            predicted_noise = model(x_t, t)
            loss = mse(noise, predicted_noise)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            pbar.set_postfix(MSE=loss.item())
            logger.add_scalar("MSE", loss.item(), global_step=epoch * l + i)

        sampled_images = diffusion.sample(model, n=images.shape[0])
        save_images(sampled_images, os.path.join("results", args.run_name, f"{epoch}.jpg"))
        torch.save(model.state_dict(), os.path.join("models", args.run_name, f"ckpt.pt"))


def launch():
    import argparse
    parser = argparse.ArgumentParser()
    args = parser.parse_args()
    args.run_name = "DDPM_Uncondtional"
    args.epochs = 500
    args.batch_size = 12
    args.image_size = 64
    args.dataset_path = r".\input"
    #args.dataset_path = r"C:\Users\dome\datasets\landscape_img_folder"
    #args.dataset_path = KaggleDataset('landscape-pictures-metadata.json')
    args.device = "cuda"
    args.lr = 3e-4
    train(args)

In [None]:
import requests
from PIL import Image
from io import BytesIO
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

# Define a custom dataset class
class KaggleDataset(Dataset):
    def __init__(self, json_file, transform=None):
        self.data = json.load(open(json_file))
        self.transform = transform

    def __len__(self):
        return len(self.data['distribution'])

    def __getitem__(self, idx):
        img_url = self.data['distribution'][idx]['contentUrl']
        response = requests.get(img_url)
        img = Image.open(BytesIO(response.content))

        if self.transform:
            img = self.transform(img)

        return img



### Let us rock and roll here!


In [None]:
launch()



usage: colab_kernel_launcher.py [-h]
colab_kernel_launcher.py: error: unrecognized arguments: -f /root/.local/share/jupyter/runtime/kernel-ffb89734-ee39-4412-8ce7-b56d90d4e527.json


SystemExit: 2

In [None]:
import random
from pathlib import Path
from PIL import Image

# Get a list of all image files in the folder
folder_path = Path("./Input")
image_files = [file for file in folder_path.glob("*.jpg") if file.is_file()]

# Select a random image file
random_image_file = random.choice(image_files)

# Open the image and display it
image = Image.open(random_image_file)
image.show()

# AI generated Code for Reference

In [None]:
import cv2
import numpy as np
from PIL import Image

# Load input image and material pattern
img = cv2.imread('indoor_iamge.jpg')
material_pattern = cv2.imread('texture_image.jpg')

# Segment regions of interest (countertop and kitchen wall)
# Use techniques like edge detection, thresholding, or semantic segmentation
# For simplicity, assume we have the segmented regions as masks
countertop_mask = ...
kitchen_wall_mask = ...

# Extract texture from material pattern
material_texture = ...

# Replace texture in segmented regions
img[countertop_mask] = material_texture
img[kitchen_wall_mask] = material_texture

# Generate output image
output_img = Image.fromarray(img)
output_img.save('output_image_llama.jpg')

TypeError: 'NoneType' object does not support item assignment

In [None]:
import cv2
import numpy as np

def apply_texture_to_countertop(input_image_path, texture_path, output_image_path):
    # Load the input image and the texture
    image = cv2.imread(input_image_path)
    texture = cv2.imread(texture_path)

    # Resize texture to potentially needed size as a simplistic approach
    texture = cv2.resize(texture, (image.shape[1], image.shape[0]), interpolation=cv2.INTER_AREA)

    # Convert image to grayscale and then to binary image using thresholding
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    _, threshold = cv2.threshold(gray, 120, 255, cv2.THRESH_BINARY_INV)

    # Detect contours which could help in identifying the countertop area
    contours, _ = cv2.findContours(threshold, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

    # Assume largest contour is the countertop or adjust this logic as needed
    largest_contour = max(contours, key=cv2.contourArea)
    mask = np.zeros_like(gray)
    cv2.drawContours(mask, [largest_contour], -1, (255), thickness=cv2.FILLED)

    # Create masked area on original image
    masked_image = cv2.bitwise_and(image, image, mask=mask)

    # Apply the texture only within the masked area
    texture_masked = cv2.bitwise_and(texture, texture, mask=mask)
    result = cv2.addWeighted(masked_image, 0.5, texture_masked, 0.5, 0)

    # Save the result
    cv2.imwrite(output_image_path, result)
    return output_image_path

# Usage
output_path = apply_texture_to_countertop('indoor_image.png', 'output_image.png', 'output_image.jpg')
print("Output saved to:", output_path)


Output saved to: output_image.jpg


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from PIL import Image

class DiffusionModel(nn.Module):
    def __init__(self, num_steps, num_inputs):
        super(DiffusionModel, self).__init__()
        self.num_steps = num_steps
        self.num_inputs = num_inputs
        self.diffusion_steps = nn.ModuleList([self._build_diffusion_step() for _ in range(num_steps)])

    def _build_diffusion_step(self):
        return nn.Sequential(
            nn.Linear(self.num_inputs, self.num_inputs),
            nn.ReLU(),
            nn.Linear(self.num_inputs, self.num_inputs)
        )

    def forward(self, x):
        for diffusion_step in self.diffusion_steps:
            x = diffusion_step(x)
        return x

# Initialize the model
model = DiffusionModel(num_steps=10, num_inputs=2)

# Initialize the input
x = torch.randn(1, 2)

# Run the diffusion model
output = model(x)

# Print the output
print(output)
output_image = Image.fromarray(output.detach().numpy())

# Display the image
output_image.show()

tensor([[-0.2033, -0.1927]], grad_fn=<AddmmBackward0>)
