In [None]:
import os
import math
import torch
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from torch import nn
from PIL import Image
from tqdm import tqdm
from diffusers import AutoencoderKL
from torch.nn import functional as F
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPTokenizer, CLIPTextModel
import torch.optim.lr_scheduler as lr_scheduler
from torch.amp import GradScaler, autocast

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    os.environ["PYTHONHASHSEED"] = str(seed)
    print(f"Seed set to {seed}")


set_seed()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Xây dựng các class Attention

In [None]:
class SelfAttention(nn.Module):
    def __init__(self, num_attn_heads, hidden_dim, in_proj_bias=True, out_proj_bias=True):
        super().__init__()
        # Số lượng head trong multi-head attention
        self.num_heads = num_attn_heads
        # Kích thước của mỗi head (giả sử hidden_dim chia đều cho số lượng head)
        self.head_size = hidden_dim // num_attn_heads

        # Khởi tạo một layer tuyến tính để chuyển đầu vào thành 3 vector: Q (Query), K (Key) và V (Value)
        # Output có kích thước 3 * hidden_dim để bao gồm 3 vector này liên tiếp
        self.qkv_proj = nn.Linear(
            hidden_dim, 3 * hidden_dim, bias=in_proj_bias)

        # Layer tuyến tính để chuyển đổi đầu ra của attention về lại kích thước hidden_dim ban đầu
        self.output_proj = nn.Linear(
            hidden_dim, hidden_dim, bias=out_proj_bias)

    def forward(self, features, use_causal_mask=False):
        # Lấy kích thước đầu vào: b=batch size, s=sequence length, d=feature dimension
        b, s, d = features.shape

        # Áp dụng layer qkv_proj để biến đổi đầu vào thành kết hợp của Q, K, V
        qkv_combined = self.qkv_proj(features)
        # Tách tensor đã kết hợp thành 3 tensor riêng biệt: Q, K, V theo chiều cuối cùng
        q_mat, k_mat, v_mat = torch.chunk(qkv_combined, 3, dim=-1)

        # Chuyển đổi kích thước của mỗi tensor Q, K, V để phù hợp với multi-head attention
        # View lại kích thước: (batch size, sequence length, số head, kích thước head)
        # Sau đó hoán đổi các chiều để có thứ tự: (batch size, số head, sequence length, kích thước head)
        q_mat = q_mat.view(b, s, self.num_heads,
                           self.head_size).permute(0, 2, 1, 3)
        k_mat = k_mat.view(b, s, self.num_heads,
                           self.head_size).permute(0, 2, 1, 3)
        v_mat = v_mat.view(b, s, self.num_heads,
                           self.head_size).permute(0, 2, 1, 3)

        # Tính phép nhân ma trận giữa Q và K^T để nhận ma trận điểm (attention scores)
        qk = torch.matmul(q_mat, k_mat.transpose(-2, -1))
        # Chia ma trận điểm cho căn bậc hai của kích thước head để tránh giá trị quá lớn (scale factor)
        sqrt_qk = qk / math.sqrt(self.head_size)

        # Nếu bật tùy chọn sử dụng causal mask (giúp tránh rò rỉ thông tin từ tương lai),
        # tạo một ma trận mask dạng tam giác trên và áp dụng cho scores
        if use_causal_mask:
            causal_mask = torch.triu(torch.ones_like(
                sqrt_qk, dtype=torch.bool), diagonal=1)
            sqrt_qk = sqrt_qk.masked_fill(causal_mask, -torch.inf)

        # Áp dụng hàm softmax trên chiều cuối để chuẩn hóa các điểm thành xác suất
        attn_weights = torch.softmax(sqrt_qk, dim=-1)
        # Tính giá trị attention bằng cách nhân trọng số với V (giá trị value)
        attn_values = torch.matmul(attn_weights, v_mat)

        # Đảo lại thứ tự các chiều về như ban đầu sau khi kết hợp các head:
        # Đổi chiều từ (batch size, số head, sequence length, kích thước head)
        # về (batch size, sequence length, hidden_dim)
        attn_values = attn_values.permute(
            0, 2, 1, 3).contiguous().view(b, s, d)

        # Áp dụng layer output_proj để chuyển đổi kết quả attention về lại định dạng và kích thước ban đầu của model
        final_output = self.output_proj(attn_values)
        return final_output

In [None]:
class CrossAttention(nn.Module):
    def __init__(self, num_attn_heads, query_dim, context_dim, in_proj_bias=True, out_proj_bias=True):
        super().__init__()
        # Số lượng head trong cơ chế attention (multi-head attention)
        self.num_heads = num_attn_heads
        # Kích thước mỗi head, giả sử query_dim chia đều cho số lượng head
        self.head_size = query_dim // num_attn_heads

        # Ánh xạ tuyến tính để chuyển đổi đầu vào query về không gian cần thiết
        self.query_map = nn.Linear(query_dim, query_dim, bias=in_proj_bias)
        # Ánh xạ tuyến tính để chuyển đổi context input thành key, để phù hợp với query
        self.key_map = nn.Linear(context_dim, query_dim, bias=in_proj_bias)
        # Ánh xạ tuyến tính để chuyển đổi context input thành value, để phù hợp với query
        self.value_map = nn.Linear(context_dim, query_dim, bias=in_proj_bias)

        # Ánh xạ tuyến tính cuối cùng, đưa kết quả attention trở lại không gian của query ban đầu
        self.output_map = nn.Linear(query_dim, query_dim, bias=out_proj_bias)

    def forward(self, query_input, context_input):
        # Lấy kích thước của input query: b_q=batch size, s_q=sequence length của query, d_q=dimension của query
        b_q, s_q, d_q = query_input.shape
        # Lấy sequence length của context input (s_kv) - các tensor key và value có cùng kích thước
        _, s_kv, _ = context_input.shape

        # Áp dụng các ánh xạ tuyến tính riêng cho query, key và value
        q_mat = self.query_map(query_input)
        k_mat = self.key_map(context_input)
        v_mat = self.value_map(context_input)

        # Định hình lại tensor và chuyển đổi chiều để phù hợp với cấu trúc multi-head attention
        # Chuyển q_mat từ (batch, s_q, query_dim) về (batch, s_q, num_heads, head_size) rồi hoán đổi thành (batch, num_heads, s_q, head_size)
        q_mat = q_mat.view(b_q, s_q, self.num_heads,
                           self.head_size).permute(0, 2, 1, 3)
        # Đối với key và value, sử dụng kích thước sequence từ context input (s_kv)
        k_mat = k_mat.view(b_q, s_kv, self.num_heads,
                           self.head_size).permute(0, 2, 1, 3)
        v_mat = v_mat.view(b_q, s_kv, self.num_heads,
                           self.head_size).permute(0, 2, 1, 3)

        # Tính toán điểm attention: nhân Q với transpose của K
        qk = torch.matmul(q_mat, k_mat.transpose(-2, -1))
        # Áp dụng scale factor bằng cách chia cho căn bậc hai của head_size để ổn định giá trị
        sqrt_qk = qk / math.sqrt(self.head_size)
        # Áp dụng softmax trên chiều cuối của tensor để chuyển điểm thành xác suất
        attn_weights = torch.softmax(sqrt_qk, dim=-1)

        # Tính các giá trị attention bằng cách nhân trọng số (attn_weights) với tensor value (v_mat)
        attn_values = torch.matmul(attn_weights, v_mat)
        # Kết hợp lại các head: đổi chiều từ (batch, num_heads, s_q, head_size) về (batch, s_q, num_heads, head_size)
        # rồi reshape về (batch, s_q, query_dim)
        attn_values = attn_values.permute(
            0, 2, 1, 3).contiguous().view(b_q, s_q, d_q)

        # Áp dụng ánh xạ tuyến tính cuối cùng để đưa đầu ra của cross attention về không gian của query ban đầu
        final_output = self.output_map(attn_values)
        return final_output

## Denoising Diffusion Probabilistic Models

In [None]:
class DDPMScheduler:
    def __init__(self, random_generator, train_timesteps=1000, diffusion_beta_start=0.00085, diffusion_beta_end=0.012):
        """
        Khởi tạo scheduler cho quá trình diffusion với các thông số:
         - random_generator: bộ tạo số ngẫu nhiên (PRNG) dùng cho quá trình thêm nhiễu.
         - train_timesteps: tổng số bước huấn luyện.
         - diffusion_beta_start, diffusion_beta_end: giá trị beta khởi đầu và kết thúc cho quá trình diffusion.
         
        Các bước tính toán chính:
         1. Tạo vector betas dạng tuyến tính sau khi lấy căn bậc hai (để ổn định phân bố) rồi bình phương lại.
         2. Tính các giá trị alphas = 1 - beta.
         3. Tính tích lũy (cumulative product) của alphas, dùng để ước lượng mức độ tín hiệu qua các bước.
         4. Lưu lại các giá trị và thiết lập lịch trình cho các bước diffusion.
        """
        # Tạo vector betas qua linspace trên khoảng [sqrt(diffusion_beta_start), sqrt(diffusion_beta_end)]
        # sau đó bình phương lại để đảm bảo beta có phân bố mong muốn
        self.betas = torch.linspace(
            diffusion_beta_start ** 0.5, diffusion_beta_end ** 0.5, train_timesteps, dtype=torch.float32) ** 2

        # Tính alphas = 1 - betas
        self.alphas = 1.0 - self.betas
        # Tính tích lũy của alphas theo chiều 0 (mỗi bước nhân với bước trước đó)
        self.alphas_cumulative_product = torch.cumprod(self.alphas, dim=0)
        # Một tensor chứa giá trị 1 để dùng trong các trường hợp biên (ví dụ: bước đầu tiên)
        self.one_val = torch.tensor(1.0)
        # Lưu lại bộ sinh số ngẫu nhiên cho việc thêm nhiễu
        self.prng_generator = random_generator
        # Lưu tổng số bước huấn luyện
        self.total_train_timesteps = train_timesteps
        # Tạo lịch trình thời gian ban đầu (trả về tensor với các timestep giảm dần từ train_timesteps-1 đến 0)
        self.schedule_timesteps = torch.from_numpy(
            np.arange(0, train_timesteps)[::-1].copy())

    def set_steps(self, num_sampling_steps=50):
        """
        Điều chỉnh lịch trình các bước mẫu (sampling steps) dựa trên số bước mong muốn.
         - num_sampling_steps: số bước mẫu trong quá trình sinh mẫu.
         
        Quá trình:
         1. Tính hệ số chia bước (step_scaling_factor) dựa trên tổng số bước huấn luyện chia cho số bước mẫu.
         2. Tạo mảng các timestep cho quá trình sinh mẫu theo tỉ lệ, sau đó đảo ngược thứ tự (giảm dần).
        """
        self.num_sampling_steps = num_sampling_steps
        step_scaling_factor = self.total_train_timesteps // self.num_sampling_steps
        timesteps_for_sampling = (np.arange(
            0, num_sampling_steps) * step_scaling_factor).round()[::-1].copy().astype(np.int64)
        self.schedule_timesteps = torch.from_numpy(timesteps_for_sampling)

    def _get_prior_timestep(self, current_timestep):
        """
        Tính toán timestep trước đó trong quá trình sampling.
         - current_timestep: timestep hiện tại.
         
        Công thức:
         previous_t = current_timestep - (tổng số bước / số bước mẫu)
        """
        previous_t = current_timestep - self.total_train_timesteps // self.num_sampling_steps
        return previous_t

    def _calculate_variance(self, timestep):
        """
        Tính phương sai (variance) cho bước timestep hiện tại.
         - Sử dụng tích lũy của alphas tại bước hiện tại và bước trước đó để tính toán beta hiện tại.
         - Công thức:
              beta_t_current = 1 - (alpha_cumprod_t / alpha_cumprod_t_prev)
              variance = ((1 - alpha_cumprod_t_prev) / (1 - alpha_cumprod_t)) * beta_t_current
         - Dùng torch.clamp để đảm bảo giá trị không quá nhỏ.
        """
        prev_t = self._get_prior_timestep(timestep)
        alpha_cumprod_t = self.alphas_cumulative_product[timestep]
        # Nếu prev_t < 0 (bước đầu tiên) thì gán alpha_cumprod_t_prev bằng 1
        alpha_cumprod_t_prev = self.alphas_cumulative_product[prev_t] if prev_t >= 0 else self.one_val
        beta_t_current = 1 - alpha_cumprod_t / alpha_cumprod_t_prev
        variance_value = (1 - alpha_cumprod_t_prev) / \
            (1 - alpha_cumprod_t) * beta_t_current
        variance_value = torch.clamp(variance_value, min=1e-20)
        return variance_value

    def adjust_strength(self, strength_level=1):
        """
        Điều chỉnh "mức độ mạnh" (strength) của quá trình mẫu. Điều này giúp kiểm soát mức độ thêm nhiễu ban đầu.
         - strength_level: hệ số điều chỉnh (giá trị từ 0 đến 1), xác định bước bắt đầu của quá trình sampling.
         
        Quá trình:
         1. Tính chỉ số bước bắt đầu dựa trên strength_level.
         2. Cập nhật schedule_timesteps để chỉ lấy các bước từ chỉ số này trở đi.
        """
        initial_step_index = self.num_sampling_steps - \
            int(self.num_sampling_steps * strength_level)
        self.schedule_timesteps = self.schedule_timesteps[initial_step_index:]
        # Lưu lại bước bắt đầu của quá trình sampling
        self.start_sampling_step = initial_step_index

    def step(self, current_t, current_latents, model_prediction):
        """
        Thực hiện một bước ngược (denoising step) trong quá trình diffusion, sử dụng dự đoán của mô hình.
         - current_t: timestep hiện tại.
         - current_latents: tensor hiện tại của latent, chứa thông tin nhiễu.
         - model_prediction: dự đoán của mô hình về nhiễu cần loại bỏ.
         
        Các bước chính:
         1. Tính các giá trị tích lũy alphas (cả ở thời điểm hiện tại và bước trước đó).
         2. Tính các hệ số alpha, beta hiện tại.
         3. Ước tính giá trị gốc (predicted original) từ tensor latent hiện tại và dự đoán mô hình.
         4. Tính trung bình dự đoán của bước trước đó (predicted prior mean) bằng cách kết hợp giá trị ước tính gốc và latent hiện tại với các hệ số đã tính.
         5. Nếu t > 0, thêm một thành phần nhiễu (variance term) dựa trên phương sai tính toán của bước đó.
         6. Trả về mẫu latent dự đoán cho bước trước đó.
        """
        t = current_t
        prev_t = self._get_prior_timestep(t)

        # Lấy giá trị cumulative product của alphas tại thời điểm hiện tại và trước đó
        alpha_cumprod_t = self.alphas_cumulative_product[t]
        alpha_cumprod_t_prev = self.alphas_cumulative_product[prev_t] if prev_t >= 0 else self.one_val

        # Tính beta cumulative cho thời điểm hiện tại và trước đó
        beta_cumprod_t = 1 - alpha_cumprod_t
        beta_cumprod_t_prev = 1 - alpha_cumprod_t_prev

        # Tính hệ số chuyển đổi giữa hai bước
        alpha_t_current = alpha_cumprod_t / alpha_cumprod_t_prev
        beta_t_current = 1 - alpha_t_current

        # Dự đoán mẫu gốc ban đầu từ latent hiện tại và dự đoán của mô hình
        predicted_original = (current_latents - beta_cumprod_t **
                              0.5 * model_prediction) / alpha_cumprod_t ** 0.5

        # Tính các hệ số trọng số để kết hợp dự đoán gốc và latent hiện tại
        original_coeff = (alpha_cumprod_t_prev ** 0.5 *
                          beta_t_current) / beta_cumprod_t
        current_coeff = alpha_t_current ** 0.5 * beta_cumprod_t_prev / beta_cumprod_t

        # Tính trung bình của bước trước đó (predicted prior mean)
        predicted_prior_mean = original_coeff * \
            predicted_original + current_coeff * current_latents

        # Khởi tạo thành phần nhiễu cho bước hiện tại
        variance_term = 0
        if t > 0:
            target_device = model_prediction.device
            # Sinh nhiễu chuẩn theo hình dạng của dự đoán của mô hình
            noise_component = torch.randn(
                model_prediction.shape, generator=self.prng_generator, device=target_device, dtype=model_prediction.dtype)
            variance_term = (self._calculate_variance(t)
                             ** 0.5) * noise_component

        # Mẫu dự đoán cho bước trước đó = trung bình dự đoán + nhiễu (nếu có)
        predicted_prior_sample = predicted_prior_mean + variance_term
        return predicted_prior_sample

    def add_noise(self, initial_samples, noise_timesteps):
        """
        Thêm nhiễu vào samples ban đầu dựa trên các noise timestep đã cho.
         - initial_samples: tensor mẫu ban đầu (latent representation).
         - noise_timesteps: các timestep mà tại đó sẽ thêm nhiễu.
         
        Quá trình:
         1. Chuyển alphas_cumulative_product về cùng thiết bị và kiểu dữ liệu với initial_samples.
         2. Tính căn bậc hai của alphas_cumprod và (1 - alphas_cumprod) cho các timestep được chọn.
         3. Sinh nhiễu chuẩn có cùng hình dạng với initial_samples.
         4. Tính mẫu nhiễu (noisy_result) dựa trên công thức kết hợp giữa initial_samples và nhiễu.
         5. Trả về cả noisy_result và random_noise (nhiễu đã thêm vào).
        """
        # Đảm bảo alphas_cumulative_product có cùng device và dtype với initial_samples
        alphas_cumprod = self.alphas_cumulative_product.to(
            device=initial_samples.device, dtype=initial_samples.dtype)
        noise_timesteps = noise_timesteps.to(initial_samples.device)

        # Tính căn bậc hai của alphas_cumprod tại các noise timestep
        sqrt_alpha_cumprod = alphas_cumprod[noise_timesteps] ** 0.5
        sqrt_alpha_cumprod = sqrt_alpha_cumprod.view(
            sqrt_alpha_cumprod.shape[0], *([1] * (initial_samples.ndim - 1)))

        # Tính căn bậc hai của (1 - alphas_cumprod) tại các noise timestep
        sqrt_one_minus_alpha_cumprod = (
            1 - alphas_cumprod[noise_timesteps]) ** 0.5
        sqrt_one_minus_alpha_cumprod = sqrt_one_minus_alpha_cumprod.view(
            sqrt_one_minus_alpha_cumprod.shape[0], *([1] * (initial_samples.ndim - 1)))

        # Sinh nhiễu chuẩn với hình dạng của initial_samples
        random_noise = torch.randn(initial_samples.shape, generator=self.prng_generator,
                                   device=initial_samples.device, dtype=initial_samples.dtype)
        # Tạo mẫu nhiễu bằng cách kết hợp initial_samples và nhiễu, sử dụng các hệ số tỷ lệ vừa tính
        noisy_result = sqrt_alpha_cumprod * initial_samples + \
            sqrt_one_minus_alpha_cumprod * random_noise
        return noisy_result, random_noise

## kiến trúc U-Net

In [None]:
class UNET_ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, time_dim=1280):
        """
        Khởi tạo residual block cho UNet, tích hợp thông tin từ các đặc trưng ảnh và embedding của thời gian.
        
        Arguments:
         - in_channels: số kênh đầu vào.
         - out_channels: số kênh đầu ra.
         - time_dim: kích thước của vector embedding thời gian (mặc định là 1280).
        """
        super().__init__()

        # Áp dụng Group Normalization lên đầu vào (chia nhỏ theo từng nhóm gồm 32 channel)
        self.gn_feature = nn.GroupNorm(32, in_channels)
        # Lớp tích chập với kernel 3x3 (padding=1 để giữ nguyên kích thước) xử lý đặc trưng ảnh
        self.conv_feature = nn.Conv2d(
            in_channels, out_channels, kernel_size=3, padding=1)

        # Lớp Linear chuyển đổi embedding thời gian về số chiều tương ứng với out_channels,
        # giúp tích hợp thông tin thời gian vào các đặc trưng ảnh
        self.time_embedding_proj = nn.Linear(time_dim, out_channels)

        # Sau khi tích hợp thông tin, dùng Group Normalization để ổn định giá trị kích hoạt
        self.gn_merged = nn.GroupNorm(32, out_channels)
        # Lớp tích chập thứ hai với kernel 3x3, giúp trộn lẫn đặc trưng ảnh đã kết hợp với embedding thời gian
        self.conv_merged = nn.Conv2d(
            out_channels, out_channels, kernel_size=3, padding=1)

        # Residual connection: nếu số kênh đầu vào bằng số kênh đầu ra thì dùng Identity,
        # ngược lại thì sử dụng một lớp conv 1x1 để chuyển đổi số kênh phù hợp với đầu ra
        if in_channels == out_channels:
            self.residual_connection = nn.Identity()
        else:
            self.residual_connection = nn.Conv2d(
                in_channels, out_channels, kernel_size=1, padding=0)

    def forward(self, input_feature, time_emb):
        """
        Quy trình xử lý một forward pass của block.
        
        Arguments:
         - input_feature: tensor đặc trưng đầu vào có shape (batch, in_channels, height, width)
         - time_emb: vector embedding thời gian, có shape (batch, time_dim)
         
        Quá trình:
         1. Áp dụng Group Norm và kích hoạt Silu cho input_feature.
         2. Áp dụng convolution đầu tiên để tạo ra tensor đặc trưng tạm thời.
         3. Xử lý time_emb qua kích hoạt Silu và lớp Linear để đưa về kích thước tương ứng với out_channels.
         4. Mở rộng chiều của time_emb để có thể cộng với tensor từ convolution.
         5. Cộng embedding thời gian vào đặc trưng tạm thời và tiếp tục xử lý qua Group Norm, kích hoạt Silu và convolution thứ hai.
         6. Cuối cùng, cộng kết quả với residual connection để cho kết quả cuối cùng.
        """
        # Lưu lại tensor gốc để thực hiện skip-connection
        residual = input_feature

        # Bước 1: Chuẩn hóa các đặc trưng đầu vào và áp dụng kích hoạt Silu
        h = self.gn_feature(input_feature)
        h = F.silu(h)
        # Bước 2: Áp dụng convolution đầu tiên để lấy các đặc trưng mới
        h = self.conv_feature(h)

        # Bước 3: Xử lý embedding thời gian, áp dụng kích hoạt Silu trước khi chuyển qua lớp Linear
        time_emb_processed = F.silu(time_emb)
        time_emb_projected = self.time_embedding_proj(time_emb_processed)
        # Bước 4: Mở rộng chiều của time_emb_projected để có thể cộng với tensor h (với shape tương ứng)
        time_emb_projected = time_emb_projected.unsqueeze(-1).unsqueeze(-1)

        # Bước 5: Cộng thông tin embedding thời gian vào tensor đặc trưng, rồi chuẩn hóa và xử lý qua convolution thứ hai
        merged_feature = h + time_emb_projected
        merged_feature = self.gn_merged(merged_feature)
        merged_feature = F.silu(merged_feature)
        merged_feature = self.conv_merged(merged_feature)

        # Bước 6: Thực hiện skip-connection: cộng kết quả sau xử lý với tensor ban đầu (hoặc được chuyển đổi qua conv 1x1 nếu cần)
        output = merged_feature + self.residual_connection(residual)
        return output

In [None]:
class UNET_AttentionBlock(nn.Module):
    def __init__(self, num_heads, head_dim, context_dim=512):
        """
        Khởi tạo Attention Block trong UNet, tích hợp cả Self-Attention và Cross-Attention,
        và một Feed-Forward Network (FFN) sử dụng cơ chế GEGLU.

        Parameters:
         - num_heads: số lượng head trong attention.
         - head_dim: kích thước của mỗi head.
         - context_dim: chiều của context (dữ liệu bối cảnh) dùng cho Cross-Attention (mặc định 512).
        """
        super().__init__()
        # Tính tổng chiều embedding: số head nhân với kích thước mỗi head.
        embed_dim = num_heads * head_dim

        # Chuẩn hóa đầu vào với GroupNorm giúp ổn định giá trị kích hoạt.
        self.gn_in = nn.GroupNorm(32, embed_dim, eps=1e-6)
        # Dự án đầu vào qua Conv2d với kernel 1x1, giữ nguyên kích thước spatial.
        self.proj_in = nn.Conv2d(embed_dim, embed_dim,
                                 kernel_size=1, padding=0)

        # -------- Self-Attention -----------
        # LayerNorm trước attention giúp chuẩn hóa tensor theo chiều cuối.
        self.ln_1 = nn.LayerNorm(embed_dim)
        # Self-Attention: cho phép mỗi vị trí trong chuỗi "chú ý" tới các vị trí khác cùng chuỗi.
        self.attn_1 = SelfAttention(num_heads, embed_dim, in_proj_bias=False)

        # -------- Cross-Attention -----------
        # LayerNorm cho phần cross-attention.
        self.ln_2 = nn.LayerNorm(embed_dim)
        # Cross-Attention: cho phép kết hợp thông tin từ input chính và context bên ngoài (ví dụ như embedding hướng dẫn).
        self.attn_2 = CrossAttention(
            num_heads, embed_dim, context_dim, in_proj_bias=False)

        # LayerNorm trước phần FFN.
        self.ln_3 = nn.LayerNorm(embed_dim)

        # -------- Feed-Forward Network (FFN) với GEGLU -----------
        # FFN sử dụng cơ chế GEGLU: đầu ra của layer Linear được chia làm 2 phần (intermediate và gate)
        # Kích thước output của lớp này là 4 * embed_dim * 2 vì sẽ chia làm 2 phần sau.
        self.ffn_geglu = nn.Linear(embed_dim, 4 * embed_dim * 2)
        # Lớp Linear sau GEGLU giúp giảm chiều về embed_dim.
        self.ffn_out = nn.Linear(4 * embed_dim, embed_dim)
        # Dự án đầu ra qua Conv2d với kernel 1x1 để chuyển đổi không gian đặc trưng về dạng ban đầu.
        self.proj_out = nn.Conv2d(
            embed_dim, embed_dim, kernel_size=1, padding=0)

    def forward(self, input_tensor, context_tensor):
        """
        Forward pass của Attention Block.

        Parameters:
         - input_tensor: tensor đầu vào với shape (B, C, H, W) (B=batch, C=channels, H/W=chiều cao/rộng).
         - context_tensor: tensor bối cảnh dùng cho Cross-Attention.
        
        Quy trình xử lý:
         1. Dự án và reshape input để chuyển về dạng chuỗi (flatten spatial dimensions).
         2. Áp dụng Self-Attention kèm skip connection.
         3. Áp dụng Cross-Attention với thông tin context, kèm skip connection.
         4. Áp dụng FFN với GEGLU và kết nối skip.
         5. Chuyển tensor về lại shape ban đầu và dự án đầu ra.
        """
        # Lưu lại input gốc để dùng cho skip connection cuối cùng.
        skip_connection = input_tensor

        # Lấy kích thước của input: B=batch, C=channels, H=height, W=width.
        B, C, H, W = input_tensor.shape
        HW = H * W  # Tính tổng số điểm ảnh (flatten không gian)

        # --- Pre-processing ---
        # Áp dụng GroupNorm và dự án đầu vào (Conv2d) để chuẩn bị dữ liệu.
        h = self.gn_in(input_tensor)
        h = self.proj_in(h)
        # Reshape từ (B, C, H, W) về (B, C, HW) và transpose để có dạng (B, HW, C),
        # thuận tiện cho việc áp dụng LayerNorm và attention.
        h = h.view(B, C, HW).transpose(-1, -2)

        # --- Self-Attention ---
        # Lưu kết quả trước khi attention để dùng cho skip connection.
        attn1_skip = h.clone()
        h = self.ln_1(h)
        h = self.attn_1(h)
        # Cộng skip connection sau Self-Attention.
        h = h + attn1_skip

        # --- Cross-Attention ---
        attn2_skip = h.clone()
        h = self.ln_2(h)
        h = self.attn_2(h, context_tensor)
        h = h + attn2_skip

        # --- Feed-Forward Network (FFN) với GEGLU ---
        ffn_skip = h.clone()
        h = self.ln_3(h)
        # Áp dụng lớp Linear và chia thành 2 phần: intermediate và gate.
        intermediate, gate = self.ffn_geglu(h).chunk(2, dim=-1)
        # Sử dụng GEGLU: nhân phần intermediate với gelu của gate.
        h = intermediate * F.gelu(gate)
        h = self.ffn_out(h)
        h = h + ffn_skip  # Skip connection cho FFN

        # --- Post-processing ---
        # Chuyển tensor về lại shape ban đầu: (B, HW, C) -> (B, C, H, W)
        h = h.transpose(-1, -2).view(B, C, H, W)
        # Dự án đầu ra qua Conv2d và cộng với skip connection ban đầu.
        output = self.proj_out(h) + skip_connection
        return output

In [None]:
class SwitchSequential(nn.Sequential):
    def forward(self, x, guidance_context, time_embedding):
        """
        Thực hiện forward pass qua các module nằm trong SwitchSequential.
        
        Quy trình xử lý:
         - Duyệt qua từng module con trong danh sách.
         - Nếu module là UNET_AttentionBlock, truyền vào 2 tham số: tensor input và guidance_context.
         - Nếu module là UNET_ResidualBlock, truyền vào 2 tham số: tensor input và time_embedding.
         - Với các module khác, chỉ truyền input tensor x.
         
        Tham số:
         - x: tensor đầu vào.
         - guidance_context: tensor chứa thông tin bối cảnh dùng cho cross attention.
         - time_embedding: vector embedding thời gian dùng cho residual block.
        """
        for module_instance in self:
            # Nếu module là UNET_AttentionBlock, thì gọi forward với guidance_context
            if isinstance(module_instance, UNET_AttentionBlock):
                x = module_instance(x, guidance_context)
            # Nếu module là UNET_ResidualBlock, thì gọi forward với time_embedding
            elif isinstance(module_instance, UNET_ResidualBlock):
                x = module_instance(x, time_embedding)
            # Với các module khác, chỉ truyền x
            else:
                x = module_instance(x)
        return x


class TimeEmbedding(nn.Module):
    def __init__(self, n_embd):
        """
        Khởi tạo lớp TimeEmbedding dùng để chuyển đổi embedding thời gian.
        
        Các tham số:
         - n_embd: số chiều của embedding đầu vào.
         
        Cấu trúc:
         - proj1: lớp Linear chuyển đổi từ n_embd về 4 * n_embd.
         - proj2: lớp Linear chuyển đổi từ 4 * n_embd về 4 * n_embd.
        """
        super().__init__()
        self.proj1 = nn.Linear(n_embd, 4 * n_embd)
        self.proj2 = nn.Linear(4 * n_embd, 4 * n_embd)

    def forward(self, x):
        """
        Forward pass của TimeEmbedding:
         1. Áp dụng lớp Linear đầu tiên (proj1).
         2. Áp dụng hàm kích hoạt Silu (Sigmoid Linear Unit) để tăng tính phi tuyến.
         3. Áp dụng lớp Linear thứ hai (proj2).
         4. Trả về embedding đã được xử lý.
         
        Tham số:
         - x: vector embedding thời gian đầu vào (shape: [batch_size, n_embd]).
        """
        x = self.proj1(x)
        x = F.silu(x)
        x = self.proj2(x)
        return x

In [None]:
# ====================================================
# DownBlock: Giảm kích thước không gian của tensor (encoder)
# ====================================================
class DownBlock(nn.Module):
    def __init__(self, in_channels, out_channels, n_head, time_dim=320, context_dim=512):
        """
        Khối downsample gồm một SwitchSequential block (gồm residual block & attention block)
        và một lớp downsampling dùng Conv2d với stride = 2.
        
        Tham số:
          - in_channels: số kênh đầu vào.
          - out_channels: số kênh đầu ra của block.
          - n_head: số head trong attention block.
          - time_dim: chiều của vector time embedding.
          - context_dim: chiều của guidance context.
        """
        super().__init__()
        # SwitchSequential gồm:
        #   + UNET_ResidualBlock: tích hợp time embedding
        #   + UNET_AttentionBlock: tích hợp guidance context
        self.block = SwitchSequential(
            UNET_ResidualBlock(in_channels, out_channels, time_dim=time_dim),
            UNET_AttentionBlock(n_head, head_dim=out_channels //
                                n_head, context_dim=context_dim)
        )
        # Lớp downsample giảm kích thước spatial (stride=2)
        self.downsample = nn.Conv2d(
            out_channels, out_channels, kernel_size=3, stride=2, padding=1)

    def forward(self, x, guidance_context, time_embedding):
        # Áp dụng block xử lý (residual + attention)
        x = self.block(x, guidance_context, time_embedding)
        # Lưu lại tensor trước khi downsample làm skip connection
        skip = x
        # Giảm kích thước không gian của đặc trưng
        x = self.downsample(x)
        return x, skip


# ====================================================
# UpBlock: Tăng kích thước không gian của tensor (decoder)
# ====================================================
class UpBlock(nn.Module):
    def __init__(self, in_channels, out_channels, n_head, time_dim=320, context_dim=512):
        """
        Khối upsample gồm một lớp upsampling dùng ConvTranspose2d
        và một SwitchSequential block xử lý sau khi concat skip connection.
        
        Tham số:
          - in_channels: số kênh đầu vào (từ bottleneck).
          - out_channels: số kênh đầu ra của block.
          - n_head: số head trong attention block.
          - time_dim: chiều của vector time embedding.
          - context_dim: chiều của guidance context.
        """
        super().__init__()
        # Lớp upsample tăng kích thước không gian (stride=2)
        self.upsample = nn.ConvTranspose2d(
            in_channels, out_channels, kernel_size=4, stride=2, padding=1)
        # Sau khi upsample, ta concat skip connection -> số kênh sẽ gấp đôi (out_channels * 2)
        # SwitchSequential block xử lý sự kết hợp đó (residual + attention)
        self.block = SwitchSequential(
            UNET_ResidualBlock(
                out_channels * 2, out_channels, time_dim=time_dim),
            UNET_AttentionBlock(n_head, head_dim=out_channels //
                                n_head, context_dim=context_dim)
        )

    def forward(self, x, skip, guidance_context, time_embedding):
        # Tăng kích thước không gian của đặc trưng
        x = self.upsample(x)
        # Nối (concatenate) skip connection từ encoder theo chiều kênh
        x = torch.cat([x, skip], dim=1)
        # Áp dụng block xử lý sau khi nối
        x = self.block(x, guidance_context, time_embedding)
        return x


# ====================================================
# UNET: Kiến trúc UNET tích hợp các module trên
# ====================================================
class UNET(nn.Module):
    def __init__(self, h_dim=128, n_head=4, time_dim=320, context_dim=512):
        """
        Kiến trúc UNET tổng thể, bao gồm phần encoder (downsampling),
        bottleneck (mid block) và decoder (upsampling).
        
        Tham số:
          - h_dim: số kênh cơ sở ban đầu.
          - n_head: số head cho các attention blocks.
          - time_dim: chiều của time embedding (đầu ra từ TimeEmbedding).
          - context_dim: chiều của guidance context (đầu ra từ CLIPTextEncoder).
        """
        super().__init__()
        # Lớp chuyển đổi đầu vào: từ latent (thường có 4 kênh) về h_dim channels.
        self.in_conv = nn.Conv2d(4, h_dim, kernel_size=3, padding=1)

        # =============================
        # Encoder: Downsampling
        # =============================
        # Down block 1: chuyển từ h_dim -> h_dim * 2
        self.down_block1 = DownBlock(
            h_dim, h_dim * 2, n_head, time_dim=time_dim, context_dim=context_dim)
        # Down block 2: chuyển từ h_dim * 2 -> h_dim * 4
        self.down_block2 = DownBlock(
            h_dim * 2, h_dim * 4, n_head, time_dim=time_dim, context_dim=context_dim)

        # =============================
        # Bottleneck (Mid Block)
        # =============================
        # Sử dụng SwitchSequential để xâu chuỗi các khối:
        #   + UNET_ResidualBlock: tích hợp time embedding
        #   + UNET_AttentionBlock: tích hợp guidance context
        #   + UNET_ResidualBlock: tiếp tục xử lý
        self.mid_block = SwitchSequential(
            UNET_ResidualBlock(h_dim * 4, h_dim * 4, time_dim=time_dim),
            UNET_AttentionBlock(n_head, head_dim=(
                h_dim * 4) // n_head, context_dim=context_dim),
            UNET_ResidualBlock(h_dim * 4, h_dim * 4, time_dim=time_dim)
        )

        # =============================
        # Decoder: Upsampling
        # =============================
        # Up block 1: từ h_dim * 4 về h_dim * 2 (kết hợp skip connection từ down_block2)
        self.up_block1 = UpBlock(
            h_dim * 4, h_dim * 2, n_head, time_dim=time_dim, context_dim=context_dim)
        # Up block 2: từ h_dim * 2 về h_dim (kết hợp skip connection từ down_block1)
        self.up_block2 = UpBlock(
            h_dim * 2, h_dim, n_head, time_dim=time_dim, context_dim=context_dim)

        # Lớp cuối cùng: chuyển từ h_dim về số kênh cần thiết (ở diffusion model thường là 4)
        self.out_conv = nn.Conv2d(h_dim, 4, kernel_size=3, padding=1)

    def forward(self, x, guidance_context, time_embedding):
        """
        Forward pass của UNET.
        
        Tham số:
          - x: tensor không gian tiềm ẩn (latent image), thường có 4 kênh.
          - guidance_context: tensor chứa embedding của prompt (từ CLIPTextEncoder).
          - time_embedding: embedding thời gian (từ TimeEmbedding).
        """
        # 1. Chuyển đầu vào qua lớp conv ban đầu
        x = self.in_conv(x)

        # 2. Encoder: Downsample và lưu lại skip connection
        # sau block 1, skip1 có kích thước tương ứng với h_dim*2
        x, skip1 = self.down_block1(x, guidance_context, time_embedding)
        # sau block 2, skip2 có kích thước tương ứng với h_dim*4
        x, skip2 = self.down_block2(x, guidance_context, time_embedding)

        # 3. Bottleneck: xử lý ở mức thấp nhất của không gian đặc trưng
        x = self.mid_block(x, guidance_context, time_embedding)

        # 4. Decoder: Upsample và kết hợp skip connections
        x = self.up_block1(x, skip2, guidance_context, time_embedding)
        x = self.up_block2(x, skip1, guidance_context, time_embedding)

        # 5. Lớp chuyển đổi cuối cùng để tạo kết quả đầu ra
        x = self.out_conv(x)
        return x

## Xây dựng hàm mã hóa thông tin thời gian

In [None]:
def embed_a_timestep(timestep, embedding_dim=320):
    """
    Nhúng một giá trị timestep thành vector embedding sử dụng các hàm cosine và sine.
    
    Các bước thực hiện:
     1. Tính nửa số chiều embedding (half_dim) từ embedding_dim.
     2. Tạo vector các tần số (freqs) theo cấp số nhân giảm dần, sử dụng log của 10000 để điều chỉnh tỉ lệ.
     3. Nhân giá trị timestep với vector tần số để chuẩn bị đầu vào cho các hàm cosine và sine.
     4. Tính toán vector cosine và sine của kết quả, sau đó nối hai vector lại theo chiều cuối.
    
    Tham số:
     - timestep: giá trị timestep cần nhúng (dạng số hoặc tensor chứa một số).
     - embedding_dim: số chiều của vector embedding (mặc định 320).
     
    Trả về:
     - Một tensor chứa vector nhúng có kích thước embedding_dim.
    """
    # Tính số chiều mỗi nửa của vector embedding
    half_dim = embedding_dim // 2

    # Tạo vector tần số: sử dụng hàm exp để tạo các tần số giảm dần từ 0 đến half_dim,
    # với tỉ lệ giảm là -log(10000)/half_dim
    freqs = torch.exp(
        -math.log(10000) * torch.arange(start=0, end=half_dim,
                                        dtype=torch.float32) / half_dim
    )

    # Nhân giá trị timestep (đưa về tensor) với vector freqs
    x = torch.tensor([timestep], dtype=torch.float32)[:, None] * freqs[None]

    # Tính cosine và sine cho x, sau đó nối kết quả lại theo chiều cuối (embedding_dim)
    return torch.cat([torch.cos(x), torch.sin(x)], dim=-1)


def embed_timesteps(timesteps, embedding_dim=320):
    """
    Nhúng nhiều giá trị timesteps thành các vector embedding sử dụng hàm cosine và sine.
    
    Các bước thực hiện:
     1. Xác định half_dim từ embedding_dim.
     2. Tạo vector các tần số (freqs) giảm dần dựa trên log(10000).
     3. Nhân mỗi giá trị timestep với vector tần số để tạo tensor nhân đôi (args).
     4. Áp dụng hàm cosine và sine lên tensor args.
     5. Nối các tensor cosine và sine theo chiều cuối cùng để thu được vector embedding cuối cùng.
    
    Tham số:
     - timesteps: tensor chứa các giá trị timestep, có kích thước (batch_size, ) hoặc (batch_size, 1).
     - embedding_dim: số chiều của vector embedding (mặc định 320).
     
    Trả về:
     - Một tensor chứa các vector nhúng cho mỗi timestep với kích thước (batch_size, embedding_dim).
    """
    # Tính số chiều của một nửa vector embedding
    half_dim = embedding_dim // 2

    # Tạo vector tần số, đảm bảo nó có cùng device với tensor timesteps
    freqs = torch.exp(
        -math.log(10000) * torch.arange(half_dim,
                                        dtype=torch.float32) / half_dim
    ).to(timesteps.device)

    # Nhân mỗi giá trị timestep với vector tần số
    args = timesteps[:, None].float() * freqs[None, :]

    # Tính cosine và sine cho args, sau đó nối chúng lại theo chiều cuối
    return torch.cat([torch.cos(args), torch.sin(args)], dim=-1)

class UNETOutputLayer(nn.Module):
    def __init__(self, in_channels, out_channels):
        """
        Khởi tạo lớp UNETOutputLayer dùng để chuyển đổi các đặc trưng đầu ra từ UNET 
        về định dạng cuối cùng (số kênh cần thiết, thường là số kênh của latent từ VAE).
        
        Tham số:
         - in_channels: số kênh của tensor đầu vào (đầu ra của decoder UNET).
         - out_channels: số kênh của tensor đầu ra mong muốn.
        """
        super().__init__()
        # Sử dụng một lớp convolution 3x3 với padding=1 để giữ nguyên kích thước không gian,
        # giúp chuyển đổi số kênh từ in_channels sang out_channels.
        self.conv = nn.Conv2d(in_channels, out_channels,
                              kernel_size=3, padding=1)

    def forward(self, x):
        """
        Forward pass của UNETOutputLayer.
        
        Tham số:
         - x: tensor đầu vào từ UNET, có kích thước (B, in_channels, H, W).
         
        Trả về:
         - Tensor đầu ra với kích thước (B, out_channels, H, W).
        """
        return self.conv(x)

## Khai báo model

In [None]:
# ------------------------- Diffusion Model -------------------------
class Diffusion(nn.Module):
    def __init__(self, h_dim=128, n_head=4):
        """
        Khởi tạo mô hình Diffusion.
        
        Tham số:
         - h_dim: chiều ẩn của UNET.
         - n_head: số lượng head trong attention của UNET.
        """
        super().__init__()
        # Tạo embedding thời gian với kích thước 320
        self.time_embedding = TimeEmbedding(320)
        # UNET: mô hình chính xử lý không gian tiềm ẩn (latent), context và time embedding
        self.unet = UNET(h_dim, n_head)
        # Lớp chuyển đổi đầu ra của UNET về định dạng mong muốn
        self.unet_output = UNETOutputLayer(h_dim, 4)

    @torch.autocast(device_type='cuda', dtype=torch.float16, enabled=True, cache_enabled=True)
    def forward(self, latent, context, time):
        """
        Forward pass của mô hình Diffusion.
        
        Tham số:
         - latent: tensor không gian tiềm ẩn (latent representation) của ảnh.
         - context: thông tin context (ví dụ mô tả ảnh mã hóa từ CLIP).
         - time: timestep hoặc vector thời gian.
        """
        # Chuyển đổi time thông qua lớp time_embedding
        time = self.time_embedding(time)
        # Áp dụng UNET với latent, context và time embedding
        output = self.unet(latent, context, time)
        # Chuyển đổi đầu ra của UNET qua lớp unet_output để ra kết quả cuối cùng
        output = self.unet_output(output)
        return output

# ------------------------- CLIP Text Encoder -------------------------
class CLIPTextEncoder(nn.Module):
    def __init__(self):
        """
        Khởi tạo mô hình CLIPTextEncoder để mã hóa các mô tả (prompts) của ảnh thành vector embedding.
        """
        super().__init__()
        # Định danh của mô hình CLIP được sử dụng
        CLIP_id = "openai/clip-vit-base-patch32"
        # Khởi tạo tokenizer và text encoder từ mô hình đã định nghĩa
        self.tokenizer = CLIPTokenizer.from_pretrained(CLIP_id)
        self.text_encoder = CLIPTextModel.from_pretrained(CLIP_id)
        # Chọn thiết bị: sử dụng CUDA nếu có, ngược lại sử dụng CPU
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        # Đóng băng các tham số của text_encoder để không cập nhật trong quá trình huấn luyện
        for param in self.text_encoder.parameters():
            param.requires_grad = False
        # Đặt chế độ eval và chuyển text_encoder sang thiết bị đã chọn
        self.text_encoder.eval()
        self.text_encoder.to(self.device)

    def forward(self, prompts):
        """
        Forward pass của CLIPTextEncoder.
        
        Tham số:
         - prompts: danh sách các chuỗi mô tả ảnh.
         
        Trả về:
         - last_hidden_states: tensor các vector embedding của các prompt.
        """
        inputs = self.tokenizer(
            prompts,
            padding="max_length",
            truncation=True,
            max_length=self.text_encoder.config.max_position_embeddings,
            return_tensors="pt"
        )
        # Chuyển các tensor về cùng device với text_encoder
        input_ids = inputs.input_ids.to(self.device)
        attention_mask = inputs.attention_mask.to(self.device)
        # Tính toán embedding của văn bản mà không cập nhật gradient
        with torch.no_grad():
            text_encoder_output = self.text_encoder(
                input_ids=input_ids,
                attention_mask=attention_mask
            )
        # Lấy vector embedding cuối cùng từ text encoder
        last_hidden_states = text_encoder_output.last_hidden_state
        return last_hidden_states


# ------------------------- Pre-trained VAE -------------------------
# Sử dụng mô hình Variational Autoencoder (VAE) đã được huấn luyện sẵn từ HuggingFace
VAE_id = "stabilityai/sd-vae-ft-mse"
vae = AutoencoderKL.from_pretrained(VAE_id)
# Đóng băng các tham số của VAE để không cập nhật trong quá trình huấn luyện
vae.requires_grad_(False)
vae.eval()

## Khai báo hàm trực quan hóa ảnh và scale giá trị ảnh

In [None]:
def show_images(images, title="", titles=[]):
    """
    Hiển thị tập hợp các ảnh trong một lưới (grid) sử dụng matplotlib.
    
    Tham số:
      - images: danh sách các tensor ảnh với định dạng (C, H, W).
      - title: tiêu đề chung cho toàn bộ hình ảnh.
      - titles: danh sách tiêu đề cho từng ảnh (nếu có).
    """
    # Tạo figure với kích thước 8x8 inch
    plt.figure(figsize=(8, 8))

    # Lặp qua các ảnh, giới hạn hiển thị tối đa là 25 ảnh
    for i in range(min(25, len(images))):
        # Tạo subplot trong lưới 5x5, vị trí subplot là i+1 (do chỉ số bắt đầu từ 1)
        plt.subplot(5, 5, i + 1)
        # Đổi chiều của tensor từ (C, H, W) sang (H, W, C), chuyển về CPU và chuyển đổi sang numpy để hiển thị
        img = images[i].permute(1, 2, 0).cpu().numpy()
        # Hiển thị ảnh
        plt.imshow(img)
        # Nếu có tiêu đề riêng cho từng ảnh, hiển thị tiêu đề
        if titles:
            plt.title(titles[i])
        # Tắt hiển thị trục để hình ảnh trông gọn gàng hơn
        plt.axis("off")

    # Hiển thị tiêu đề chung cho toàn bộ figure
    plt.suptitle(title)
    # Điều chỉnh bố cục của các subplot sao cho không bị chồng lấn
    plt.tight_layout()
    # Hiển thị figure
    plt.show()


def rescale(value, in_range, out_range, clamp=False):
    """
    Chuyển đổi giá trị từ khoảng in_range sang khoảng out_range.
    
    Tham số:
      - value: giá trị (hoặc tensor) cần chuyển đổi.
      - in_range: tuple (in_min, in_max) xác định khoảng giá trị đầu vào.
      - out_range: tuple (out_min, out_max) xác định khoảng giá trị đầu ra.
      - clamp: nếu True, giới hạn giá trị kết quả trong khoảng out_range.
      
    Trả về:
      - rescaled_value: giá trị sau khi chuyển đổi về khoảng out_range.
    """
    # Lấy giá trị nhỏ nhất và lớn nhất trong khoảng đầu vào
    in_min, in_max = in_range
    # Lấy giá trị nhỏ nhất và lớn nhất trong khoảng đầu ra
    out_min, out_max = out_range

    # Tính khoảng chênh lệch (span) của in_range và out_range
    in_span = in_max - in_min
    out_span = out_max - out_min

    # Chuyển đổi value về tỷ lệ [0, 1]; thêm 1e-8 để tránh chia cho 0
    scaled_value = (value - in_min) / (in_span + 1e-8)
    # Chuyển đổi giá trị từ [0, 1] về khoảng out_range
    rescaled_value = out_min + (scaled_value * out_span)

    # Nếu yêu cầu, giới hạn giá trị kết quả trong khoảng out_range
    if clamp:
        rescaled_value = torch.clamp(rescaled_value, out_min, out_max)

    return rescaled_value

## Khai báo class Pytorch dataset

In [None]:
# Đặt kích thước ảnh mong muốn (chiều rộng và chiều cao)
WIDTH, HEIGHT = 32, 32

# Đặt batch size cho quá trình huấn luyện
batch_size = 32

class EmojiDataset(Dataset):
    def __init__(self, csv_files, image_folder, transform=None):
        """
        Khởi tạo dataset cho Emoji.
        
        Tham số:
         - csv_files: danh sách đường dẫn các file CSV chứa thông tin metadata.
         - image_folder: đường dẫn thư mục chứa ảnh.
         - transform: các biến đổi (augmentation) áp dụng lên ảnh nếu có.
        """
        # Đọc và nối tất cả các file CSV thành 1 DataFrame duy nhất
        self.dataframe = pd.concat([pd.read_csv(csv_file)
                                   for csv_file in csv_files])
        self.images_folder = image_folder

        # Tạo cột "image_path" bằng cách thay thế dấu "\" thành "/" trong cột "file_name"
        self.dataframe["image_path"] = self.dataframe["file_name"].str.replace(
            "\\", "/")

        # Lấy danh sách đường dẫn ảnh và danh sách tiêu đề (prompt) từ DataFrame
        self.image_paths = self.dataframe["image_path"].tolist()
        self.titles = self.dataframe["prompt"].tolist()
        self.transform = transform

    def __len__(self):
        # Trả về tổng số mẫu trong dataset
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Xây dựng đường dẫn đầy đủ đến ảnh bằng cách nối thư mục chứa ảnh với đường dẫn ảnh
        image_path = self.images_folder + "/" + self.image_paths[idx]
        # Lấy tiêu đề tương ứng với ảnh
        title = self.titles[idx]
        # Loại bỏ các ký tự không mong muốn (dấu ngoặc kép và dấu ngoặc đơn)
        title = title.replace('"', "").replace("'", "")
        # Mở ảnh với PIL và chuyển sang định dạng RGB
        image = Image.open(image_path).convert("RGB")

        # Nếu có áp dụng transform, thì biến đổi ảnh theo các bước đã định nghĩa
        if self.transform:
            image = self.transform(image)

        # Trả về ảnh đã xử lý và tiêu đề của ảnh đó
        return image, title


# ------------------ Khởi tạo DataLoader ------------------
# Định nghĩa các phép biến đổi (transforms) cho ảnh
transform = transforms.Compose([
    # Resize ảnh về kích thước (WIDTH, HEIGHT) sử dụng nội suy BICUBIC
    transforms.Resize(
        (WIDTH, HEIGHT), interpolation=transforms.InterpolationMode.BICUBIC),
    transforms.ToTensor(),  # Chuyển ảnh thành tensor
    # Chuẩn hóa tensor ảnh với mean và std để giá trị nằm trong khoảng [-1, 1]
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Đường dẫn các file CSV chứa metadata (cần thay đổi đường dẫn phù hợp với máy của bạn)
csv_files = ["/content/blobs_crawled_data/metadata.csv"]
# Đường dẫn tới thư mục chứa ảnh
image_folder = "/content/blobs_crawled_data/images"

# Khởi tạo dataset từ class EmojiDataset
train_dataset = EmojiDataset(
    csv_files=csv_files,
    image_folder=image_folder,
    transform=transform
)

# Tạo DataLoader cho quá trình huấn luyện
train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,           # Trộn dữ liệu mỗi epoch
    num_workers=2,          # Sử dụng 2 worker để tải dữ liệu song song
    pin_memory=True,        # Tăng tốc độ chuyển dữ liệu sang GPU nếu có
    persistent_workers=True  # Duy trì worker giữa các epoch để giảm thời gian khởi động lại
)