In [1]:
import torch 
import torch.nn
import torch.nn.functional as F 

## Tensor Manipulaation

### Top-k and indices from batch of tensors

* input: `batch_size, num_elements`
* output: largest k along the last dimension + indices 
* Use for **retrieve mission**

Follow up 

* any dimension 
* the smallest k  
* do not use topk 

Function: `topk` 
* largest_k_value, indices = torch.topk(tensor, k, dim, largest=True)

Function: `sort`
* sorted_values, sorted_index = torch.sort(tensor, dim, descending=True)

In [None]:
def top_k(tensor, k, largest=True):
    k_value, k_index = torch.topk(tensor, k, largest=largest, dim=-1)
    return k_value, k_index 

def top_k_slice(tensor, k, largest=True):
    sorted_tensor, sorted_index = torch.sort(tensor, dim=-1, descending=largest)
    return sorted_tensor[..., :k], sorted_index[..., :k]
    

### Replace all negetive in a tensor with mean of positive 

* input: `batch_size, num_elements`
* output: `batch_size, num_elements` 
* Use for **handling special values / denoising data**

Function: `where` 
* tensor = torch.where(condition, value-if-true, value-if-false)
* An **element wise** replacement of value with provided value 
* Use to replace the value based on **condition**
  * Replace invalid value 
  * Select values 

Function `masked_fill` 

* tensor = tensor.masked_fill(mask, val)
* Do a **element wise** replacement where the mask **is True**

Tensor: mask 

* Element wise comparison 
* To generate a mask of the same size: tensor > val 
* A boolean mask => Can used for other computation 
  * extract all `mask==1` element: `tensor[mask]`
    * The output is of `1D` All dimension lost !! 
  * Applying the mask `tensor * mask` => the dimension is not changed 

Function computation, like `sum` 

* tensor = tensor.sum(dim, keepdim)
  * Boolean value can also be applied
* Do the computation along the given dim
* if `keepdim` is True `B, L, E` => `B,L,1` else `B,L` 
  * The computed dim is simply one umber 

Function: clamp

* tensor = tensor.clamp(min=val1, max=val2)

In [None]:
def replace_neg_with_mean(tensor):
    # get negetive mask
    negetive_mask = tensor < 0 
    # only where > 0 will used for compute averate 
    sum_positive_tensor = tensor.masked_fill(negetive_mask, 0).sum(dim=-1, keepdim=True)
    avg_positive_num = (~negetive_mask).sum(dim=-1, keepdim=True).clamp(min=1)
    replace_tensor = sum_positive_tensor / avg_positive_num 
    replaced_tensor = torch.where(negetive, replace_tensor, tensor)
    return replaced_tensor

### Rearrange, Reshape and Flatten a tensor 

* Input: a tensor `Batch, C, H, W` 
* Output: a tensor that is reshaped / rearranged / flatten
* Use for: many computation needed 

Difference between `permute`, `view`, `transpose`

* permute: 
  * **reordering of any dimensions**
  * `tensor = tensor.permute(0,4,3,2,1)` => B,L,C,H,W => B,W,H,C,L 
  * Do not copy data, doesn't need the data to be contigeous 
* transpose:
  * swap **only two dimensions**
  * `tensor = tensor.transpose(-2, -1)` => B,L,H,W => B,L,W,H
* view(The only shared memory operation):
  * Change the **shape** without changing order in memory => Memory mush be **contigeous** + new shape must match the input shape 
  * `tensor = tensor.view(L1, L2, L3)`
  * Faster than permute
  * Directly change the ***memory***
  * The created tensor **share same memory** with original => change one will influence another 
* reshape
  * Change the **shape** of tensor 
  * `tensor = tensor.reshape(l1, l2...)`
* flatten 
  * convert the multiple dim vector from start dim=> **1d** tensor 
  * `tensor = tensor.flatten(start_dim)`
    * if `start_dim` is not defined: B,H,W => B * H * W 
    * if `start_dim=1`, B,H,W => B, H * W
* squeeze
  * **remove all dim == 1**, not only a specific dim 
  * `tensor = tensor.squeeze()` 
* unsqueeze 
  * add a **new dimension** at new given dim / for **broadcasting**
  * `tensor = tensor.unsqueeze(0)` H,W => 1,H,W 

You are given a batch of grayscale images stored in a 4D tensor of shape `(batch_size, 1, height, width)`.
Each image has `one channel (1)`, but we want to process them as flattened 2D matrices (removing the channel dimension).
Your task is to **reshape** the images into a batch of 2D tensors of shape (batch_size, height, width) and then:
- Flatten each image into a vector of shape (batch_size, height × width)
- Convert back into the original shape using a reshaping function.
- Return both the flattened images and the reconstructed images.

In [None]:
def reshape_process_img(tensor):
    squeeze_tensor = tensor.squeeze() # B, H, W
    B, H, W = squeeze_tensor.shape()
    flatten_tensor = tensor.view(B, H * W).clone()
    # or 
    flatten_tensor = tensor.flatten(start_dim=1)
    reconstructed_tensor = flatten_tensor.view(B, 1, H, W)
    return flatten_tensor, reconstructed_tensor

### One-hot encode 

* input: `B` (if a one-dimension result) / `B * H * W` a image 
* output: `B * class` / `B * class * H * W` 
* used for: cross entropy, used as `y^`

Function: `scatter_` 
* fill the vector if the value with the indices
* tensor = tensor.scatter_(dim, reference, 1)
  * if the input is [1,2], the vector to be filled is [[0,0,0], [0,0,0]], result is [[0,1,0], [0,0,1]]

In [None]:
def one_hot_one_dim_encode(tensor, num_class):
    # input: B 
    # output: B, N
    B = len(tensor)
    expended_tensor = torch.zeros((B, num_class))
    expended_tensor.scatter_(1, tensor.unsqueeze(1), 1)
    return expended_tensor
def one_hot_multi_cls_encode(tensor, num_class):
    # input: B,n (n is the class label for each b)
    # output: B, N
    B, _ = tensor.shape()
    one_hot = torch.zeros((B, num_class))
    one_hot.scatter_(1, tensor, 1)
    return one_hot 
def one_hot_image_encode(tensor, num_class):
    # input: B, H, W 
    # output: B, N, H, W 
    B, H, W = tensor.shape()
    one_hot = torch.zeros((B, num_class, H, W))
    one_hot.scatter_(1, tensor.unsqueeze(1), 1)
def one_hot_multi_channel_encode(tensor, num_class):
    # input: B, C, H, W 
    # output: B, C*N, H, W 
    B, C, H, W = tensor.shape()
    one_hot = torch.zeros((B, C, num_class, H, W))
    one_hot.scatter_(2, tensor.unsqueeze(2), 1)
    one_hot = one_hot.view((B, num_class*C, H, W))
    return one_hot 

### Extract values from tensor based on index mapping

* input: a tensor + index mapping
* output: result from tensor and reconstruct following index 
* use for: nlp 

Function `gather`
* Extract value in certain dim with given index 
* `new_tensor = torch.gather(tensor, dim, index)`

Function `chunk` 
* chunk the tensor into several pieces along dimension 
* `new_tensor = torch.chunk(tensor, chunk_num, dim)`

In [None]:
def gather_value(tensor, dim, index):
    return torch.gather(tensor, dim, index)

## Matrix Manipulation

### batch matrix / matrix multiplication

* `torch.triu(input, diagonal)`
  * diagonal = 0: the upper triangle 
  * diagonal = 1: the upper triangle without diagonal
  * diagonal = -1: the upper triangle add one disgonal below
* `torch.diag(input, diagonal)`
  * diagonal definition is the same

implement a weighted sum of a tensor along an axis.

In [None]:
def weighted_sum(tensor, weight, dim):
    # if weight applied along dim1
    weighted_tensor = tensor * weight.unsqueeze(1)
    return weighted_tensor.sum(dim=dim, keepdim=True)

## Dataset Load and Pre-process 

* Initialize: a self-defined dataset supports
  * load a batch of image: need to load the correspoinding figures when called 
  * on-the-fly image processing  
  * A dataloader for **pre-processing**

`Dataset`
* An API wraps all operations when a batch of data is loaded 
* Need to initialize
  * How to load the figure 
  * After load the figure, the needed value for training and their formats
    * figure
    * label 
    * others if needed
  * pre-process, like change them to tensor + augmentation 
    * `tranforms`: pre-process defined officially `torchvision.transforms` 
      * `ToTensor`: must be used to transform to tensor
      * `Resize`, `RandomRotation`, `RandomHorizontalFlip` `ColorJitter` like this olny applies to `PIL`
        * `Resize`: Resize((shape))
        * `RandomRotation`: RandomRotation(degrees)
        * `RandomHorizontalClip`: RandomHorizontalClip(p)
        * `RandomCrop`: RandomCrop(size)
      * `Normalize`: only apply to `tensor` 
  * when loading
    * `__get_item__(self, idx)`: load each item, will aggeregate to a batch later => must be able to **concatenate**
      * Can use `PIL` to load image
      * Can convert to RGB
    * `__len__(self)`: how many figures
`DataLoader`
* Used to load **batch of data** from dataset, only need to pay attention to 
  * How to connect with the dataset
  * How to load: `for step, batch in enumerate(dataloader)`, batch is the loaded item
  * Number of batches: len(dataloader)

In [None]:
# import
# * import place is different, in utils.data
import torchvision.transforms and transforms 
from torch.utils.data import Dataset, DataLoader

In [None]:
import os 
from Image import PIL
class NewDataset(Dataset):
    def __init__(self, img_path, transform=None):
        self.img_path = img_path
        self.transform = transform 
    def __len__(self):
        return len(self.img_path) 
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_path, idx)
        img = PIL.open(img_path).convert('RGB')
        # * apply transform
        if self.transform is not None:
            img_processed = self.transform(img)
        return img_processed 

In [None]:
def get_dataloader(img_path, batch_size):
    transform = transforms.Compose([
        transforms.Resize((512, 512)),
        transforms.RandomHorizontalClip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(contrast=0.3),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5])
    ])
    dataset = NewDataset(img_path, transform=transform)
    
    
    dataloader = DataLoader(dataset, shuffle=True, batch=batch_size)

## ML and DL Concept

### Regression 

Show the flow of train a model

* define the model, optimizer 
* Manually
  * Compute loss => `loss.backward()` => in `no_grad`, update parameters, `p.grad.zero_()`
* by torch
  * compute loss => clear computed grad `zero_grad()` => `loss.backward` => `optimizer.step()`

#### Linear Regression 

* Input: input $x$, label $y$ parameters (with / without optimizer)
* Loss: MSE Loss 
* Output: progression of training: updated weight 
  * weight update 

#### Logistic Regression 
* Loss: BCE Loss / cross-entropy

In [None]:
def train_regression(x, y_true, lr, epochs):
    # STEP1: initialize W and b 
    W = torch.randn(x.shape[1:], requires_grad=True)
    b = torch.randn(y.shape[1:], requires_grad=True)
    
    # STEP2: train 
    for epoch in epochs:
        # STEP3: compute y 
        y = X @ W + b 
        
        # STEP4: compute loss 
        MSE_loss = ((y - y_true) ** 2).mean()
        
        # * if sigmoid regression 
        y = torch.sigmoid(y)
        BCE_loss = - (y_true * log(y) + (1-y_true) * log(1 - y)).mean()
        # STEP5: compute gradients 
        MSE_loss.backward()
        
        # STEP6: manually compute upgrade
        with torch.no_grad():
            W = W - lr * W.grad 
            b = b - lr * b.grad 
            # STEP7: zero out the gradient
            W.grad.zero_()
            b.grad.zero_()
    return W, b

In [None]:
import torch.optim as optim 

class LinearRegressionModel(nn.Module):
    def __init__(self, input_dim, class_num):
        super(LinearRegressionModel, self).__init__()
        self.linear = nn.Linear(input_dim, class_num)
    def forward(self, x):
        return self.linear(x)

def train_linear_torch(X, Y, lr, epochs):
    B, D = X.shape 
    B, N = Y.shape 
    linearLayer = LinearRegressionModel(D, N)
    criterion = nn.MSELoss()
    criterion = nn.BCELoss()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(linearLayer.parameters(), lr=lr)
    
    for epoch in range(epochs):
        y = linearLayer(x)

        loss = criterion(y, Y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.forward()
    return linearLayer

    # ! inference
    # ! binary class 
    y_pred = x @ W + b 
    logits_pred = x @ W + b 
    
    y_pred = torch.sigmoid(logits_pred) > 0.5 
    
    # ! multi class 
    y_pred = torch.argmax(y_pred, dim = 1)
    y_pred = torch.argmax(torch.softmax(logits_pred, dim = 1), dim = 1)
        

### MLP Block

* input: X, Y true, num_classes, lr, epochs
* output: trained model 
* Model
  * multi layer: different linear layer 
  * Activation
  * Other connection layer: dropout, Norm ...

In [None]:
class MLPBlock(nn.Module):
    def __init__(self, input_dim, hidden_sizes, num_class):
        # * if not defined layer by layer
        prev_dim = input_dim 
        self.layer = []
        for hidden_size in hidden_sizes:
            self.layer.append(nn.Linear(prev_dim, hidden_size)),
            self.layer.append(nn.BatchNorm(hidden_size)),
            self.layer.append(nn.ReLU()),
            self.layer.append(nn.Dropout(0.1))
            prev_dim = hidden_size
        self.layer.append(nn.Linear(prev_dim, num_class))
        self.model = nn.Sequential(*self.layer)
        
        # or 
        self.layer = nn.ModuleList()
        for hidden_size in hidden_sizes:
            self.layer.append(nn.Sequential([
                nn.Linear(prev_dim, hidden_size),
                
            ]))
    def forword(x):
        return self.model(x)
    
def train_mlp(self, X, y_pred, lr, epochs):
    B, D = X.shape()
    B, N = y_pred.shape()
    
    mlp = MLPBlock(D, [16, 128, 32], N)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(mlp.parameters(), lr=lr)
    
    for epoch in epochs:
        y = mlp(X)
        loss = criterion(y, y_pred)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    return mlp 
    
        

### BP without autograd(backward)

* Use chain rule to compute the weight manually 
* After define the loss, the grad of each parameter
* compute new W based on **gradient descending updates**

In [None]:

def train_single_layer(X, y_pred, epochs, lr):
    B, D = X.shape()
    B, N = y_pred.shape()
    
    W = torch.randn((D, N))
    b = torch.randn((1, N))
    for epoch in range(epochs):
        # B, N 
        y = X @ W + b 
        y_act = F.ReLU(y)
        
        # can remove 
        loss = ((y_act - y_pred) ** 2).mean()
        # B, N 
        dL_dy = 2 * (y_act - y_pred) / B 
        dya_dy = (y > 0)
        # B, D 
        dy_dw = X 
        # 
        dW = dy_dw.T @ (dL_dy * dya_dy).float() 
        db = (dL_dy * dya_dy).float().sum(dim=0, keepdim=True)
        
        W = W - lr * dW 
        b = b - lr * db 
    return W, b 
    

### Add self-defined (e.g:ReLU) in MLP 

#### GeLU

* Gaussian enhanced ReLU: solved the abrupt change of Relu from 0 -> infinite
* Always used in **transformer**
* Dis: computational expensive

* $ GeLU = x cdf(x) = 0.5(1 + tanh(\sqrt{\frac{2}{\pi}}) \times (x + 0.044715 x^3))$

#### SiLU 

* smoother activate
* smoother gradient flow 
* Used in **normalization layer** and **transformer / diffusions**
* $ SiLU = x \times \sigma(x) = x \frac{1}{1 + e^{-x}}$

In [None]:
class ReLU(nn.Module):
    def __init__(self):
        super(ReLU, self).__init__()
    def forward(x):
        return torch.where(x > 0, x, 0)

class LeakyReLU(nn.Module):
    def __init__(self, alpha):
        super(LeakyReLU, self).__init__()
        self.alpha = alpha 
    def forward(x):
        return torch.where(x > 0, x, x * self.alpha)

class GeLU(nn.Module):
    def __init__(self):
        super(GeLU, self).__init__()
    def cdf(self, x):
        return 0.5 * (1 + torch.tanh(torch.sqrt(2 / torch.pi) * (x + 0.044715 * x ^^ 3)))
    def forward(x):
        return x * self.cdf(x)

class SiLU(nn.Module):
    def __init__(self):
        super(SiLU, self).__init__()
    def forward(x):
        return x * F.sigmoid(x)
class MLP(nn.Module):
    def __init__(self, act_fn):
        self.layer = nn.Sequential([
            nn.Conv2d(input_c, output_c),
            act_fn(),
            nn.BatchNorm2d(output_c),
            nn.Dropout(0.5)
        ])

### Normalization (manually)

All Normalization follows 

* $ scale \frac{x - \mu}{\sigma + \epsilon} + shift $

#### Batch Norm

* `scale` and `shift` is learable per batch 
* compute ber mini-batch 
* Stablize training 

#### Layer Norm

* `scale` and `shift` is learable per layer 
* compute per layer 
* Always used in NLP 

#### Adaptive Layer Norm

* `scale` and `shift` are computed by control signal 
* How it is controled is determined by the specific AdaLN

In [None]:
class BatchNorm(nn.Module):
    def __init__(self, num_embeddings, epsilon=1e-5, momentum=0.9):
        super(BatchNorm, self).__init__()
        self.scale = nn.Parameter(torch.ones(num_embeddings))
        self.shift = nn.Parameter(torch.zeros(num_embeddings))
        self.epsilon = epsilon
        self.momentum = momentum
        
        self.register_buffer("running_mean", torch.zeros(num_embeddings))
        self.register_buffer("running_var", torch.ones(num_embeddings))
    def forward(self, x):
        # along the batch 
        if self.training:
            mean = x.mean(dim=(0,2,3), keepdim=True)
            var = x.var(dim=(0,2,3), unbiased=False)
            self.running_mean = self.running_mean * self.momentum + mean * (1 - self.momentum)
            self.running_var = self.running_var * self.momentum + var * (1 - self.momentum)
            return self.scale * (x - mean) / torch.sqrt(var + self.epsilon) + self.shift
        else:
            return self.scale * (x - self.running_mean) / torch.sqrt(self.running_var + self.epsilon) + self.shift
        
class LayerNorm(nn.Module):
    def __init__(self, num_embeddings, epsilon): 
        super(LayerNorm, self).__init__()
        self.scale = nn.Parameter(torch.ones(num_embeddings))
        self.shift = nn.Parameter(torch.zeros(num_embeddings))
        self.epsilon = epsilon
    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        var = x.var(dim=-1, keepdim=True, unbiased=False)
        
        return self.scale * (x - mean) / torch.sqrt(var + self.epsilon) + self.shift
    
class AdaLN(nn.Module):
    def __init__(self, , num_embeddings, output_dim=None, time_emb_num=None, epsilon=1e-5):
        # if input is not embedding
        if time_emb_num is not None:
            self.emb = nn.Embedding(time_emb_num)
        # * output space
        output_dim = output_dim if output_dim is not None else num_embeddings * 2
        # * Used for reflect input embeddings to another space
        self.linear = nn.Linear(num_embeddings, output_dim)
        # * Used for activation
        self.act = nn.SiLU()
        self.norm = nn.LayerNorm(output_dim // 2, epsilon)
    def forward(self, x, emb, temb=None):
        if self.emb is not None:
            temb = self.emb(emb)
        # * reflect to other space
        temb = self.linear(self.act(temb))
        # * slice the results to two vectors
        scale, shift = temb.chunk(2, dim = 0)
        norm = self.norm(x)
        return (1 + scale) * norm + shift 
        

## Loss and Score

### Ac, Recall, Prec, Confusion Matrix 

* Input: true label (B, N), pridicted label(B, N)
* Output: matrix (B * N * N), each ele is the number of pair (True, Pridict)
* Use for classification problem: TP, FP, FN 

In [None]:
def confusion_mat(num_classes, pridicted, true):
    N = len(pridicted)
    confuse_mat = torch.zeors((num_classes, num_classes), dtype=torch.int64)
    
    # row is true, col is pridict 
    # unique (true, pridicted) pair
    # (1,0,2) , (1,1,2) => (4, 1, 8)
    index = num_classes * true + pridicted
    # count from (0,0) to (N-1, N-1) => how many pairs
    cf_mat = torch.bincount(index, minlength=num_classes ** 2)
    confuse_mat = cf_mat.view(num_classes, num_classes)
    
    return confuse_mat

TP = cf_mat.diag()
FN = cf_mat.sum(dim=1) - TP
FP = cf_mat.sum(dim=0) - TP
    

### Classification Loss

#### MSE

* minimun square loss 
* $ \sum (Y - Y_t)^2 $

#### MAE 

* minimum absloute loss 
* $ \sum |(Y - Y_t)| $

#### BCE 

* Binary cross entropy 
* $ - y_t * log(y) - (1 - y_t) * log(1 - y) $

#### Multi-class: cross entropy 

### Cross Entropy

* Input: True Probability(B, N), Pridicted label (B)
* Output: loss 
* Loss = $\sum(y_{true} * log(y))$

In [None]:
def cross_entropy(pridicted, true):
    # softmax of pridicted (log)
    logits = F.softmax(pridicted, dim=-1)
    log_logits = torch.log(logits)
    
    one_hot = torch.zeros_like(pridicted)
    one_hot = torch.scatter_(1, true.unsqueeze(1), 1)
    
    return -sum(one_hot * log_logits, dim=1).mean()

### Contrastive Loss

* Use for clip or multi-modality model
* Always used for aligning different modality 
  * Pairs distance 
* Step
  * Normalize along batch (l2) `F.normalize(tensor, p=2, dim=-1)
  * similarity: $cosine sim = \frac{E_i E_t}{temperature}$ => Between each pair is achieved by `matmul`
  * Get the gt label pair
  * Compute **cross entropy**

In [None]:
def contrastive_loss(image_emb, text_emb, temperature=0.7):
    # Normalize 
    # image B, D text: B, D 
    image_emb_normalized = F.normalize(image_emb, p=2, dim=-1)
    text_emb_normalized = F.normalize(text_emb, p=2, dim=-1)
    
    logits = torch.matmul(image_emb_normalized, text_emb_normalized.T) / temperature 
    
    gt = torch.arange(image_emb.shape[0])
    
    img_txt = F.cross_entropy(logits, gt)
    text_img = F.cross_entropy(logits.T, gt)
    
    return img_txt, text_img
    

### IOU and NMS

In [None]:
def compute_iou(box1, box2):
    """
    box format: [x1, y1, x2, y2]
    """
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    # Compute intersection
    inter_width = max(0, x2 - x1)
    inter_height = max(0, y2 - y1)
    intersection = inter_width * inter_height

    # Compute union
    area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union = area1 + area2 - intersection

    if union == 0:
        return 0.0  # Avoid division by zero

    return intersection / union


In [None]:
def non_max_suppression(boxes, scores, iou_threshold=0.5):
    """
    boxes: list of [x1, y1, x2, y2]
    scores: list of confidence scores
    iou_threshold: IoU threshold for suppression
    """
    # Sort boxes by descending score
    indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)
    keep = []

    while indices:
        current = indices.pop(0)
        keep.append(current)

        new_indices = []
        for i in indices:
            iou = compute_iou(boxes[current], boxes[i])
            if iou < iou_threshold:
                new_indices.append(i)  # Keep boxes with low IoU

        indices = new_indices

    return keep  # indices of boxes to keep


## Block

### Convolution Operation 

#### Manually Operation 

#### Use unfold to perform 

* Use to reshape the input tensor to **slices**, which is the same as the **kernel size**
* It is more **GPU friendly**: after unfold **the input**, when it multiples with **kernel**, direct **multiplication** is enough, no need to move + weighted sum
* `tensor = F.unfold(tensor, kernel_size=()), stride, padding` (other conv parameters)
  * `N, C, H, W` * `KH, KW` => `N, C * KH * KW, out_H * out_W` 
  * `out_H` = output size = (2 * padding + H - kH + 1) // stride

#### Casual Conv 

Only the down triangle can be used. Other part can not be used => set the **filtered place as mask**

#### Use Conv to implement pooling operation

Average pooling is averaging in a square == **all 1 kernel** conv / kernel_square

In [None]:
def manual_conv(input_tensor, kernel, stride, padding):
    B, C, H, W = input_tensor.shape()
    C_out, _, kH, kW = kernel.shape()
    
    out_H = (H - kH + 1 + 2 * padding) // stride + 1
    out_W = (W - kW + 1 + 2 * padding) // stride + 1

    output_tensor = torch.zeros((B, C_out, out_H, out_W))
    padded_input = F.pad(input_tensor, (padding, padding, padding, padding))
    
    for i in range(out_H):
        for j in range(out_W):
            # * B, C_in, kH, kW
            window = padded_input[:, :, i * stride: i * stride + kH, j * stride: j * stride + kW] 
            # * B, 1, C_in, kH, kW
            window_expand = window.unsqueeze(1)
            res = window_expand * kernel.unsqueeze(0)
            # * B, C_out, 1, 1
            output_tensor[:,:,i,j] = (res).sum(dim=2, 3,4)
    return output_tensor
def unfold_conv(input_tensor, kernel, stride, padding):
    B, C, H, W = input_tensor.shape()
    out_channels, in_channels, kH, kW = kernel.shape()
    
    output_H = (H - kH + 1 + 2 * padding) // stride 
    output_W = (W - kW + 1 + 2 * padding) // stride 
    # * B, C * kH * kW, out_W * out_H 
    folded_input = F.unfold(input_tensor, kernel_size=(kH, kW), stride=stride, padding=padding)
    # * out_channels, C * kW * kH
    kernel_rearange = kernel.view(out_channels, -1)
    output = folded_input * kernel_rearange
    return output.view((N, out_channels, output_H, output_W))

class Causal_3d_Conv(nn.Module):
    def __init__(self, input_channel, output_channel, stride, padding, kernel):
        super(Causal_3d_Conv, self).__init__()
        self.conv_3d = nn.Conv3d(output_channel, input_channel, kernel, stride, padding)
        
    def forward(x):
        mask = x.tril(torch.ones_like(x))
        return mask * self.conv_3d(x)
    
class Conv_pooling(nn.Module):
    def __init__(self, kernel_size):
        super(Conv_pooling, self).__init__()
        self.conv_kernel = torch.ones((kernel_size, kernel_size)) / (kernel_size ** 2)
        self.stride = kernel_size 
    def forward(self, x):
        B, C, H, W = x.shape()
        return F.conv2d(x, self.conv_kernel, stride=self.stride, groups=C)
        

### Convolution Layer 

* conv 
* Activation
* BatchNorm
  * Stable training
  * Tensor normalization 
* Dropout
  * overfit 

In [None]:
class ConvLayer(nn.Module):
    def __init__(self, out_channels, in_channels, kernel_size, padding, stride, pool_size):
        self.layer = nn.Sequential([
            nn.Conv2d(out_channels=out_channels, in_channels=in_channels, kernel_size=kernel_size, padding = padding, stride= stride),
            nn.BatchNorm2d(out_channels), 
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=pool_size),
            nn.Dropout(0.5)
        ])
    def forward(self, x):
        return self.layer(x)

### VAE 

Encoder => Reparameterization => Decoder 

Reparameterization: needs to resample from current x distribution

In [None]:
class VAE(nn.Module):
    def __init__(self, input_channel=3, hidden_dim, latent_dim):
        self.encoder = nn.Sequential([
            nn.Conv2d(out_channels=hidden_dim, in_channels=input_channel, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(hidden_dim),
            nn.ReLU(),
            nn.Conv2d(out_channels=hidden_dim * 2, in_channels=input_channel, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm(hidden_dim * 2), 
            nn.ReLU()
        ])
        
        self.decoder = nn.Sequential([
            nn.ConvTranspose2d(output_channels=hidden_dim, in_channels=hidden_dim * 2, kernel_size = 4, stride = 2, padding=1),
            nn.BatchNorm2d(hidden_dim),
            nn.ReLU(),
            nn.ConvTranspose2d(output_channels=hidden, in_channels=input_channel, kernel_size = 4, stride = 2, padding=1),
            nn.BatchNorm2d(input_channel),
            nn.ReLU(),
        ])
        
        self.mu_linear = nn.Linear(hidden_dim * 2 * 8 * 8, latent_dim)
        self.mu_logvar = nn.Linear(hidden_dim * 2 * 8 * 8, latent_dim)
        self.decoder_fc = nn.Linear(latent_dim, hidden_dim * 2 * 8 * 8)
    def reparameterize(self, x, mu_x, logvar_x):
        gaussian_distribute_x = torch.randn_like(x)
        
        var = exp(logvar_x / 2)
        
        return (gaussian_distribute_x - mu_x) * var 
    def forward(self, x):
        encoded_x = self.encoder(x)
        # to latent space
        x = x.view(x.shape[0], -1)
        mu_x = self.mu_linear(x)
        logvar_x = self.mu_logvar(x)
        repara_z = self.reparameterize(x, mu_x, logvar_x)
        decode_x = self.decoder_fc(repara_z)
        decode_x = x.view(x.shape[0], hidden_dum * 2, 8, 8)
        return self.decoder(decode_x)
        

### Transformer

See another file about transformer, Multi-head_Attention and sin-cos-position encoding 

## Multi-Modality

## Diffusion

### Definition of LoRA Model and Application of Transformer 


In [None]:
class LoRALinearLayer(nn.Linear):
    def __init__(self, input_dim, output_dim, rank, alpha):
        super().init__()
        self.r = rank 
        self.alpha = alpha
        self.Lora_W_down = nn.Linear(input_dim, r)
        self.Lora_W_up = nn.Linear(r, output_dim)
        
        nn.init.normal_(self.Lora_W_down.weight, std = 1/rank)
        nn.init.normal_(self.Lora_W_up.weight, std = 1/rank)
    def forward(self, x):
        out = super().forward(x)
        return self.alpha * self.Lora_W_up(self.Lora_W_down(x)) + out 

for names, module in model.named_modules():
    if isinstance(module, nn.Linear):
        if "attentions.key" in names or ..:
            lora_layer = LoRALinearLayer(module.in_features, module.out_features, rank, alpha)
            setattr(name, module, lora_layer)
        

### The whole process of Diffusion model 

#### Each step 

(Used both in inference and training): The deducuction is in note

* **Note**: All $\alpha_t$ there is **cumulative product of noise scale of $1 - \beta_t$**

In both training and inference, one step will include remove noise to original figure 

* The output is the **pridicted noise**
* Update rules $x_{t - 1} = x_{t} - \alpha_t * model(x_t, t)$

Training will also **add noise**, which depends on the **noise scheduler**

#### The whole process

* Iteratively denoise
* Set default steps (until the input noise is removed)


#### Backward process 

* reversed backward: from `xt` to `x0` 
* Update rules $x_{t-1} = \frac{1}{\sqrt{\alpha_t}} (x_t - \sqrt{1-\alpha_t}\epsilon_{\theta})$

**Used funtions**

* `cumprod`: cumulative product of given variables (include t )

In [None]:
class NoiseScheduler:
    def __init__(self, num_steps, beta_start, beta_end, sample_steps):
        # * beta linear variance 
        self.num_steps = num_steps
        self.sample_steps = sample_steps
        self.betas = torch.linspace(beta_start, beta_end, num_steps)
        self.alphas = 1.0 - self.betas 
        self.alphas_cumprod = torch.cumprod(self.alphas, dim=0)
        
        self.timestemps = self.get_timesteps()
    def get_timesteps(self):
        pass 
        # * depend on ddim / ddpm
    def add_noise(self, x, noise, t):
        alpha_t = self.alphas_cumprod[t].view(-1, 1, 1, 1)
        return x * alpha_t.sqrt() + noise * (1 - alpha_t).sqrt()
    def remove_noise(self, x, pridicted_noise, t):
        alpha_t = self.alphas_cumprod[t].view(-1, 1, 1, 1)
        beta_t = self.betas[t].view(-1, 1, 1, 1)
        
        removed = (x - pridict_noise * (1 - alpha_t).sqrt() ) / alpha_t.sqrt()
        
        # * if DDPM: add random noise
        z = torch.randn_like(x)
        return removed + beta_t * z 

In [None]:
def inference_loop(model, noise_scheduler, steps, img_size):
    # STEP1: input noise 
    x = torch.randn((1, *img_size))
    
    # * Steps is a list containing the target denoised step
    for t in range(steps):
        pridict_noise = model(x, torch.tensor[t])
        # * Add a "minus noise" to the figure
        x = noise_scheduler.remove_noise(x, pridict_noise, t)
    return x 

In [None]:
def train_loop(model, noise_scheduler, steps, img, loss, optimizer):
    for t in range(steps):
        noise = torch.randn_like(img)
        noised_img = noise_scheduler.add_noise(img, noise, t)
        
        pridicted_noise = model(noised_img)
        optimizer.zero_grad()
        loss = loss(noised_img - pridicted_noise, img)
        loss.backward()
        optimizer.step()
    return model 

### Write a noise Scheduler in Diffusion Model

#### Function of Noise Scheduler 

* Time Step Generator: generate the steps where **noise are insterted**
* Noise Adder: Add noise in specific steps with specific $\alpha$
  * Theratically it is added step by step
  * But We can compute the **cumulative product**
* Noise Remover: remove input noise from noised figure with specific $\alpha_t$

#### Difference between DDIM and DDPM

* Added nosie: the same: Gaussian noise step by step
* The sequence of adding noise: dif 
  * DDPM: Each step + added from **no noise ($\alpha=0$** to **very noisy $\alpha = inf$**
  * DDIM: sample step + added from **Noisy** to **no noise**
* The inference: all from very **noisy** but
  * DDPM: from steps **large to 0**, which means is **reverse** to input + Will add **random noise each time of inference**
  * DDIM: **Deterministic**, also from steps **large to 0**, but sequence are the same with input 

In [None]:
class DDPMNoiseSchduler(NoiseScheduler):
    def __init__(self, beta_start, beta_end, timesteps):
        self.beta = torch.linspace(beta_start, beta_end, timesteps)
        self.alpha = 1 - self.beta
        self.alpha_cum = torch.cumprod(self.alpha, dim=0)
        self.timesteps = self.get_timesteps(timesteps)
    def get_timesteps(self, steps):
        return torch.arrange(steps)
    def add_noise(self, x, noise, t):
        pass 
        # same with template 
    def remove_noise(self, x, noise, t):
        alpha = self.alpha_cum[t].view(-1,1,1,1)
        beta = self.beta[t].view(-1,1,1,1)
        
        random_noise = torch.randn_like(x)
        
        return (x - torch.sqrt(1 - alpha) * noise) / torch.sqrt(alpha) + torch.sqrt(beta) * random_noise 

# * Training is the same
# * Inference 
# TEST for t in reversed(range(timestpes)):


In [None]:
class DDIMNoiseSchduler(NoiseScheduler):
        def __init__(self, beta_start, beta_end, timesteps, sample_steps):
        self.alpha = torch.linspace(beta_start, beta_end, timesteps)
        self.alpha_cum = torch.cumprod(self.alpha, dim=0)
        self.timesteps = self.get_timesteps(timesteps, sample_steps)
        self.sample_steps = sample_steps
    def get_timesteps(self, steps):
        return torch.linspace(steps, 0, sample_steps)
    def add_noise(self, x, noise, t):
        pass 
        # same with template
    def remove_noise(self, x, noise, t):
        alpha = self.alpha_cum[t]
        
        return (x - torch.sqrt(1 - alpha) * noise) / torch.sqrt(alpha) 
    
# * Training is the same
# * Inference 
# TEST for t in range(timestpes):
    

### Write forward for generating a figure (Simulate pipeline)

The pipeline will need

* input text prompt 
* noise scheduler  
* transformer
* vae
* image processor

as input, then infer for **steps** to generate the figure

The process is 

* get the inference steps based on scheduler and target steps
* Get input noise(latent) compute by rand * sigma of noise scheduler
* In timesteps loop
  * get noised input
  * pridict noise
  * denoise by transformer
* decode

In [None]:
logged_images = []
num_inference = 30
guidance_scale = 7.5
from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion import retrieve_timesteps
from diffusers.image_processor import VaeImageProcessor
from diffusers.utils.torch_utils import randn_tensor
timesteps, num_inference_steps = retrieve_timesteps(noise_scheduler, num_inference, accelerator.device)
num_warmup_steps = max(len(timesteps) - num_inference_steps * noise_scheduler.order, 0)
def prepare_latents(batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
    shape = (batch_size, num_channels_latents, int(height // vae.config.scaling_factor), int(width // vae.config.scaling_factor))
    if isinstance(generator, list) and len(generator) != batch_size:
        raise ValueError(
            f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
            f" size of {batch_size}. Make sure the batch size matches the length of the generators."
        )
    if latents is None:
        latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
    else:
        latents = latents.to(accelerator.device)

    # scale the initial noise by the standard deviation required by the scheduler
    latents = latents * noise_scheduler.init_noise_sigma
    return latents
                
                # latents = noisy_latents.to(torch.bfloat16)
latent_channels = transformer.config.in_channels
generator = torch.Generator(device=accelerator.device).manual_seed(args.seed) if args.seed is not None else None

latents_inference = prepare_latents(
    1,
    latent_channels,
    transformer.config.sample_size * vae.config.scaling_factor,
    transformer.config.sample_size * vae.config.scaling_factor,
    prompt_embeds.dtype,
    device,
    generator,
)
                    
for i, t in enumerate(timesteps):
    # latent_model_input = torch.cat([latents] * 2)
    latent_model_input = noise_scheduler.scale_model_input(latents_inference, t)
    current_timestep = t
    if not torch.is_tensor(current_timestep):
        is_mps = latent_model_input.device.type == "mps"
        if isinstance(current_timestep, float):
            dtype = torch.float32 if is_mps else torch.float64
        else:
            dtype = torch.int32 if is_mps else torch.int64
        current_timestep = torch.tensor([current_timestep], dtype=dtype, device=latent_model_input.device)
    elif len(current_timestep.shape) == 0:
        current_timestep = current_timestep[None].to(latent_model_input.device)
    current_timestep = current_timestep.expand(latent_model_input.shape[0])
    # print(i)
    # print(f"latent {latent_model_input.dtype}; embeds {prompt_embeds_content.dtype}, time {current_timestep.dtype}")
    noise_pred = transformer(
        latent_model_input,
        encoder_hidden_states=prompt_embeds
        encoder_attention_mask=prompt_attention_mask,
        timestep=current_timestep,
        added_cond_kwargs=added_cond_kwargs,
        return_dict=False,
    )[0] 
    # noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
    # noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
    if transformer.config.out_channels // 2 == latent_channels:
        noise_pred = noise_pred.chunk(2, dim=1)[0]
    else:
        noise_pred = noise_pred

    # compute previous image: x_t -> x_t-1
    latents_inference = noise_scheduler.step(noise_pred, t, latents_inference, return_dict=False)[0]
image = vae.decode(latents_inference.to(torch.bfloat16) / vae.config.scaling_factor, return_dict=False)[0]
image_processor = VaeImageProcessor(vae_scale_factor=vae.config.scaling_factor)
image = image_processor.postprocess(image, output_type="pil")

# Distributed Training

## Data Parallel 

List how to implement data parallel by PyTorch. There are several ways, which are different in 
* Whether cross-device 
* Whether 

### Distributed Data Parallel 

Distributed training, so need to use multiple **process** and use package related to **process communication**

* Set Master process: define the main process to log, save checkpoints ... in `main (__name__)` function 
* multi processing launching: send the `running(selected by parameters) function(rank, ...)` to every process by `mp.spawn`, each process is numbered 
  * all process will run concurrently 
* Set up **inter-process connection** in the **running function**: `dist.init_process_group("gloo"/"nccl", rank=rank, world_size=world_size)`
* Set up **DDP** model
  * the model structure
  * The rank number it assigned `.to(rank)`
* Move data to corresponding model `.to(rank)`


In [None]:
# Data Parallel 
# * DP
one_device_net = torch.nn.DataParallel(model, device_id=[0,1])
# Following is the same

# * DDP 
import dist 
import torch.multiprocessing as mp 
from torch.nn.parallel import  DistributedDataParallel as DDP 

def example(rank, world_size, model,loss, optimizer, input, label):
    dist.init_process_group("nccl", rank=rank, world_size = world_size)
    model = model
    ddp_model=DDP(model, device_id=[rank])
    optimizer.zero_grad()
    output = ddp_model(input.to(rank))
    l = loss(output, label.to(rank))
    loss.backward()
    optimizer.step()
    
def main():
    world_size = 4 
    mp.spawn(example,
             args=(world_size, args),
             nprocs=world_size,
             join=True)
    
if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "1234"
    main()


# Quantization

## Manually quantize tensors

In [None]:
def quant_dequant_symmetric(x):
    n_max = x.abs().max()
    S = n_max / 127 
    x_q = (x / scale).round().clamp(-127, 127)
    x_dq = x_q * S 
    return x_q, x_dq

## Quantize with PyTorch API

# ML Tradition