In [6]:
import os

os.chdir("/root/dev/playground/knowledges/peft")
os.getcwd()

'/root/dev/playground/knowledges/peft'

In [7]:
!gpustat

[1m[37maa4173e0a5f2              [m  Thu May 30 21:08:10 2024  [1m[30m535.129.03[m
[36m[0][m [34mNVIDIA GeForce RTX 4090[m |[31m 40°C[m, [1m[32m 55 %[m | [36m[1m[33m16701[m / [33m24564[m MB |
[36m[1][m [34mNVIDIA GeForce RTX 4090[m |[31m 48°C[m, [1m[32m 53 %[m | [36m[1m[33m17013[m / [33m24564[m MB |
[36m[2][m [34mNVIDIA GeForce RTX 4090[m |[31m 46°C[m, [1m[32m 91 %[m | [36m[1m[33m18281[m / [33m24564[m MB |
[36m[3][m [34mNVIDIA GeForce RTX 4090[m |[31m 49°C[m, [1m[32m 66 %[m | [36m[1m[33m18305[m / [33m24564[m MB |
[36m[4][m [34mNVIDIA GeForce RTX 4090[m |[1m[31m 50°C[m, [1m[32m 45 %[m | [36m[1m[33m17927[m / [33m24564[m MB |
[36m[5][m [34mNVIDIA GeForce RTX 4090[m |[1m[31m 51°C[m, [1m[32m 60 %[m | [36m[1m[33m18163[m / [33m24564[m MB |


In [39]:
import torch

torch.cuda.set_device(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Huggingface PEFT tutorial

In [4]:
import peft
import accelerate
import datasets
import transformers
import diffusers

print("peft version: ", peft.__version__)
print("accelerate version: ", accelerate.__version__)
print("datasets version: ", datasets.__version__)
print("transformers version: ", transformers.__version__)
print("diffusers version: ", diffusers.__version__)

peft version:  0.10.0
accelerate version:  0.29.1
datasets version:  2.18.0
transformers version:  4.39.3
diffusers version:  0.27.2


In [5]:
from diffusers import UNet2DConditionModel

# unet = UNet2DConditionModel.from_pretrained(
#     "stabilityai/stable-diffusion-xl-base-1.0", subfolder="unet"
# )
unet = UNet2DConditionModel.from_pretrained(
    "stabilityai/stable-diffusion-2-base", subfolder="unet"
)

In [6]:
# unet.train()

In [7]:
# unet.to(device)
# x = torch.rand((8, 4, 64, 64)).to(device)
# time_step = torch.randn((8,)).to(device)
# enc_h = torch.rand((8, 77, 1024)).to(device)
# output = unet(x, timestep=time_step, encoder_hidden_states=enc_h).sample

In [8]:
model_size = 0
for param in unet.parameters():
    model_size += param.data.nelement()
print("trainable params: ", model_size)

trainable params:  865910724


In [9]:
# find modules
def find_modules(module, module_types):
    matching_modules = []
    for name, mod in module.named_modules():
        if isinstance(mod, module_types):
            module_name = name.split(".")[-1]
            if len(module_name) == 1:
                # print(name)
                module_name = name
            matching_modules.append(module_name)
    return matching_modules

In [10]:
module_types = (torch.nn.Linear, torch.nn.Embedding, torch.nn.Conv2d)
target_modules = find_modules(unet, module_types)
target_modules = list(set(target_modules))
print(target_modules)

['up_blocks.2.attentions.2.transformer_blocks.0.attn1.to_out.0', 'conv_out', 'up_blocks.1.attentions.2.transformer_blocks.0.ff.net.2', 'down_blocks.0.attentions.1.transformer_blocks.0.attn2.to_out.0', 'up_blocks.2.attentions.0.transformer_blocks.0.attn1.to_out.0', 'up_blocks.2.attentions.1.transformer_blocks.0.attn2.to_out.0', 'conv_shortcut', 'proj_in', 'up_blocks.2.attentions.0.transformer_blocks.0.attn2.to_out.0', 'up_blocks.3.attentions.1.transformer_blocks.0.attn1.to_out.0', 'up_blocks.1.attentions.0.transformer_blocks.0.attn1.to_out.0', 'down_blocks.1.attentions.0.transformer_blocks.0.attn2.to_out.0', 'down_blocks.1.attentions.1.transformer_blocks.0.ff.net.2', 'to_v', 'down_blocks.1.attentions.1.transformer_blocks.0.attn1.to_out.0', 'down_blocks.2.attentions.1.transformer_blocks.0.ff.net.2', 'time_emb_proj', 'conv1', 'up_blocks.1.attentions.0.transformer_blocks.0.ff.net.2', 'linear_2', 'up_blocks.2.attentions.0.transformer_blocks.0.ff.net.2', 'mid_block.attentions.0.transformer_b

## LoRA

In [6]:
from peft import LoraConfig, get_peft_model

unet_lora = unet.to(device)

# freeze params of models to save more memory
unet_lora.requires_grad_(False)

config = LoraConfig(
    r=4,
    lora_alpha=4,
    lora_dropout=0.1,
    target_modules=["to_q", "to_k", "to_v", "to_out.0"],
    init_lora_weights= "gaussian",
    bias="none"
) # scale = alpha / r

unet_lora.add_adapter(config)
lora_layers = filter(lambda p: p.requires_grad, unet.parameters())

trainable_params = 0
all_params = 0
for _, param in unet.named_parameters():
    num_params = param.numel()
    all_params += num_params
    if param.requires_grad:
        trainable_params += num_params
print(f"trainable params: {trainable_params:,d} || all params: {all_params:,d} || trainable%: {100 * trainable_params / all_params}")
unet_lora.train();

trainable params: 829,952 || all params: 866,740,676 || trainable%: 0.09575551522864031


In [7]:
x = torch.rand((8, 4, 64, 64)).to(device)
time_step = torch.randn((8,)).to(device)
enc_h = torch.rand((8, 77, 1024)).to(device)
output = unet_lora(x, timestep=time_step, encoder_hidden_states=enc_h).sample

## LoKr (slightly different to KAdaptation)

In [6]:
from peft import LoKrConfig, get_peft_model

unet_lokr = unet.to(device)

# freeze params of models to save more memory
unet_lokr.requires_grad_(False)

config = LoKrConfig(
    r=4,
    alpha=4,
    # rank_dropout=0.1,
    module_dropout=0.1,
    use_effective_conv2d=True,
    target_modules=["to_q", "to_k", "to_v", "to_out.0"]
)

unet_lokr.add_adapter(config)
lokr_layers = filter(lambda p: p.requires_grad, unet.parameters())

trainable_params = 0
all_params = 0
for _, param in unet.named_parameters():
    num_params = param.numel()
    all_params += num_params
    if param.requires_grad:
        trainable_params += num_params
print(f"trainable params: {trainable_params:,d} || all params: {all_params:,d} || trainable%: {100 * trainable_params / all_params}")
unet_lokr.train();
# unet = get_peft_model(unet, config).to(device)
# unet.print_trainable_parameters()
# unet.train();

trainable params: 112,448 || all params: 866,023,172 || trainable%: 0.012984410075346113


In [7]:
x = torch.rand((8, 4, 64, 64)).to(device)
time_step = torch.randn((8,)).to(device)
enc_h = torch.rand((8, 77, 1024)).to(device)
output = unet(x, timestep=time_step, encoder_hidden_states=enc_h).sample

# Layer replication

In [14]:
torch.tensor([0,1,0,2,0,3,1,2,1,3,2,3], dtype=torch.long, device='cuda').shape

torch.Size([12])

# Adapt LoRA for Conv2D and Linear layers

In [22]:
from typing import Optional, Union, Tuple

import torch
from torch import nn

class LoRALinearLayer(nn.Module):
    r"""
    A linear layer that is used with LoRA.

    Parameters:
        in_features (`int`):
            Number of input features.
        out_features (`int`):
            Number of output features.
        rank (`int`, `optional`, defaults to 4):
            The rank of the LoRA layer.
        network_alpha (`float`, `optional`, defaults to `None`):
            The value of the network alpha used for stable learning and preventing underflow. This value has the same
            meaning as the `--network_alpha` option in the kohya-ss trainer script. See
            https://github.com/darkstorm2150/sd-scripts/blob/main/docs/train_network_README-en.md#execute-learning
        device (`torch.device`, `optional`, defaults to `None`):
            The device to use for the layer's weights.
        dtype (`torch.dtype`, `optional`, defaults to `None`):
            The dtype to use for the layer's weights.
    """

    def __init__(
        self,
        in_features: int,
        out_features: int,
        rank: int = 4,
        network_alpha: Optional[float] = None,
        device: Optional[Union[torch.device, str]] = None,
        dtype: Optional[torch.dtype] = None,
    ):
        super().__init__()

        self.down = nn.Linear(in_features, rank, bias=False, device=device, dtype=dtype)
        self.up = nn.Linear(rank, out_features, bias=False, device=device, dtype=dtype)
        # This value has the same meaning as the `--network_alpha` option in the kohya-ss trainer script.
        # See https://github.com/darkstorm2150/sd-scripts/blob/main/docs/train_network_README-en.md#execute-learning
        self.network_alpha = network_alpha
        self.rank = rank
        self.out_features = out_features
        self.in_features = in_features

        nn.init.normal_(self.down.weight, std=1 / rank)
        nn.init.zeros_(self.up.weight)

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        orig_dtype = hidden_states.dtype
        dtype = self.down.weight.dtype

        down_hidden_states = self.down(hidden_states.to(dtype))
        up_hidden_states = self.up(down_hidden_states)

        if self.network_alpha is not None:
            up_hidden_states *= self.network_alpha / self.rank

        return up_hidden_states.to(orig_dtype)


class LoRAConv2dLayer(nn.Module):
    r"""
    A convolutional layer that is used with LoRA.

    Parameters:
        in_features (`int`):
            Number of input features.
        out_features (`int`):
            Number of output features.
        rank (`int`, `optional`, defaults to 4):
            The rank of the LoRA layer.
        kernel_size (`int` or `tuple` of two `int`, `optional`, defaults to 1):
            The kernel size of the convolution.
        stride (`int` or `tuple` of two `int`, `optional`, defaults to 1):
            The stride of the convolution.
        padding (`int` or `tuple` of two `int` or `str`, `optional`, defaults to 0):
            The padding of the convolution.
        network_alpha (`float`, `optional`, defaults to `None`):
            The value of the network alpha used for stable learning and preventing underflow. This value has the same
            meaning as the `--network_alpha` option in the kohya-ss trainer script. See
            https://github.com/darkstorm2150/sd-scripts/blob/main/docs/train_network_README-en.md#execute-learning
    """

    def __init__(
        self,
        in_features: int,
        out_features: int,
        rank: int = 4,
        kernel_size: Union[int, Tuple[int, int]] = (1, 1),
        stride: Union[int, Tuple[int, int]] = (1, 1),
        padding: Union[int, Tuple[int, int], str] = 0,
        network_alpha: Optional[float] = None,
    ):
        super().__init__()

        self.down = nn.Conv2d(in_features, rank, kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
        # according to the official kohya_ss trainer kernel_size are always fixed for the up layer
        # # see: https://github.com/bmaltais/kohya_ss/blob/2accb1305979ba62f5077a23aabac23b4c37e935/networks/lora_diffusers.py#L129
        self.up = nn.Conv2d(rank, out_features, kernel_size=(1, 1), stride=(1, 1), bias=False)

        # This value has the same meaning as the `--network_alpha` option in the kohya-ss trainer script.
        # See https://github.com/darkstorm2150/sd-scripts/blob/main/docs/train_network_README-en.md#execute-learning
        self.network_alpha = network_alpha
        self.rank = rank

        nn.init.normal_(self.down.weight, std=1 / rank)
        nn.init.zeros_(self.up.weight)

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        orig_dtype = hidden_states.dtype
        dtype = self.down.weight.dtype

        down_hidden_states = self.down(hidden_states.to(dtype))
        up_hidden_states = self.up(down_hidden_states)

        if self.network_alpha is not None:
            up_hidden_states *= self.network_alpha / self.rank

        return up_hidden_states.to(orig_dtype)


class LoRACompatibleConv(nn.Conv2d):
    """
    A convolutional layer that can be used with LoRA.
    """

    def __init__(self, *args, lora_layer: Optional[LoRAConv2dLayer] = None, **kwargs):

        super().__init__(*args, **kwargs)
        self.lora_layer = lora_layer

    def set_lora_layer(self, lora_layer: Optional[LoRAConv2dLayer]):
        self.lora_layer = lora_layer

    def _fuse_lora(self, lora_scale: float = 1.0, safe_fusing: bool = False):
        if self.lora_layer is None:
            return

        dtype, device = self.weight.data.dtype, self.weight.data.device

        w_orig = self.weight.data.float()
        w_up = self.lora_layer.up.weight.data.float()
        w_down = self.lora_layer.down.weight.data.float()

        if self.lora_layer.network_alpha is not None:
            w_up = w_up * self.lora_layer.network_alpha / self.lora_layer.rank

        fusion = torch.mm(w_up.flatten(start_dim=1), w_down.flatten(start_dim=1))
        fusion = fusion.reshape((w_orig.shape))
        fused_weight = w_orig + (lora_scale * fusion)

        if safe_fusing and torch.isnan(fused_weight).any().item():
            raise ValueError(
                "This LoRA weight seems to be broken. "
                f"Encountered NaN values when trying to fuse LoRA weights for {self}."
                "LoRA weights will not be fused."
            )

        self.weight.data = fused_weight.to(device=device, dtype=dtype)

        # we can drop the lora layer now
        self.lora_layer = None

        # offload the up and down matrices to CPU to not blow the memory
        self.w_up = w_up.cpu()
        self.w_down = w_down.cpu()
        self._lora_scale = lora_scale

    def _unfuse_lora(self):
        if not (getattr(self, "w_up", None) is not None and getattr(self, "w_down", None) is not None):
            return

        fused_weight = self.weight.data
        dtype, device = fused_weight.data.dtype, fused_weight.data.device

        self.w_up = self.w_up.to(device=device).float()
        self.w_down = self.w_down.to(device).float()

        fusion = torch.mm(self.w_up.flatten(start_dim=1), self.w_down.flatten(start_dim=1))
        fusion = fusion.reshape((fused_weight.shape))
        unfused_weight = fused_weight.float() - (self._lora_scale * fusion)
        self.weight.data = unfused_weight.to(device=device, dtype=dtype)

        self.w_up = None
        self.w_down = None

    def forward(self, hidden_states: torch.Tensor, scale: float = 1.0) -> torch.Tensor:
        if self.padding_mode != "zeros":
            hidden_states = F.pad(hidden_states, self._reversed_padding_repeated_twice, mode=self.padding_mode)
            padding = (0, 0)
        else:
            padding = self.padding

        original_outputs = F.conv2d(
            hidden_states, self.weight, self.bias, self.stride, padding, self.dilation, self.groups
        )

        if self.lora_layer is None:
            return original_outputs
        else:
            return original_outputs + (scale * self.lora_layer(hidden_states))


class LoRACompatibleLinear(nn.Linear):
    """
    A Linear layer that can be used with LoRA.
    """

    def __init__(self, *args, lora_layer: Optional[LoRALinearLayer] = None, **kwargs):
        super().__init__(*args, **kwargs)
        self.lora_layer = lora_layer

    def set_lora_layer(self, lora_layer: Optional[LoRALinearLayer]):
        self.lora_layer = lora_layer

    def _fuse_lora(self, lora_scale: float = 1.0, safe_fusing: bool = False):
        if self.lora_layer is None:
            return

        dtype, device = self.weight.data.dtype, self.weight.data.device

        w_orig = self.weight.data.float()
        w_up = self.lora_layer.up.weight.data.float()
        w_down = self.lora_layer.down.weight.data.float()

        if self.lora_layer.network_alpha is not None:
            w_up = w_up * self.lora_layer.network_alpha / self.lora_layer.rank

        fused_weight = w_orig + (lora_scale * torch.bmm(w_up[None, :], w_down[None, :])[0])

        if safe_fusing and torch.isnan(fused_weight).any().item():
            raise ValueError(
                "This LoRA weight seems to be broken. "
                f"Encountered NaN values when trying to fuse LoRA weights for {self}."
                "LoRA weights will not be fused."
            )

        self.weight.data = fused_weight.to(device=device, dtype=dtype)

        # we can drop the lora layer now
        self.lora_layer = None

        # offload the up and down matrices to CPU to not blow the memory
        self.w_up = w_up.cpu()
        self.w_down = w_down.cpu()
        self._lora_scale = lora_scale

    def _unfuse_lora(self):
        if not (getattr(self, "w_up", None) is not None and getattr(self, "w_down", None) is not None):
            return

        fused_weight = self.weight.data
        dtype, device = fused_weight.dtype, fused_weight.device

        w_up = self.w_up.to(device=device).float()
        w_down = self.w_down.to(device).float()

        unfused_weight = fused_weight.float() - (self._lora_scale * torch.bmm(w_up[None, :], w_down[None, :])[0])
        self.weight.data = unfused_weight.to(device=device, dtype=dtype)

        self.w_up = None
        self.w_down = None

    def forward(self, hidden_states: torch.Tensor, scale: float = 1.0) -> torch.Tensor:
        if self.lora_layer is None:
            out = super().forward(hidden_states)
            return out
        else:
            out = super().forward(hidden_states) + (scale * self.lora_layer(hidden_states))
            return out

In [23]:
# these two functions are used in the forward of the model
# use case (UNet): https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/unets/unet_2d_condition.py
def scale_lora_layers(model, weight):
    """
    Adjust the weightage given to the LoRA layers of the model.

    Args:
        model (`torch.nn.Module`):
            The model to scale.
        weight (`float`):
            The weight to be given to the LoRA layers.
    """
    from peft.tuners.tuners_utils import BaseTunerLayer

    if weight == 1.0:
        return

    for module in model.modules():
        if isinstance(module, BaseTunerLayer):
            module.scale_layer(weight)


def unscale_lora_layers(model, weight: Optional[float] = None):
    """
    Removes the previously passed weight given to the LoRA layers of the model.

    Args:
        model (`torch.nn.Module`):
            The model to scale.
        weight (`float`, *optional*):
            The weight to be given to the LoRA layers. If no scale is passed the scale of the lora layer will be
            re-initialized to the correct value. If 0.0 is passed, we will re-initialize the scale with the correct
            value.
    """
    from peft.tuners.tuners_utils import BaseTunerLayer

    if weight is None or weight == 1.0:
        return

    for module in model.modules():
        if isinstance(module, BaseTunerLayer):
            if weight != 0:
                module.unscale_layer(weight)
            else:
                for adapter_name in module.active_adapters:
                    # if weight == 0 unscale should re-set the scale to the original value.
                    module.set_scale(adapter_name, 1.0)

In [29]:
# example: Convolution Network (1 block)
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, in_channels*2, kernel_size=3, stride=stride, padding=0, bias=False),
            nn.BatchNorm2d(in_channels*2),
            nn.ReLU(),
            nn.Conv2d(in_channels*2, out_channels, kernel_size=3, stride=1, padding=0, bias=False),
            nn.BatchNorm2d(out_channels)
        )
        
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x = self.residual_function(x)
        x = self.relu(x)
        return x


class LoRAConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        
        self.residual_function = nn.Sequential(
            LoRAConv2dLayer(in_channels, in_channels*2, kernel_size=3, stride=stride, padding=0),
            nn.BatchNorm2d(in_channels*2),
            nn.ReLU(),
            LoRAConv2dLayer(in_channels*2, out_channels, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(out_channels)
        )
        
        self.relu = nn.ReLU()
    
    def forward(self, x):
        scale_lora_layers(self, 1.)
        x = self.residual_function(x)
        x = self.relu(x)
        unscale_lora_layers(self, 1.)
        return x

In [37]:
def find_modules(module, module_types):
    matching_modules = []
    for name, mod in module.named_modules():
        if isinstance(mod, module_types):
            module_name = name.split(".")[-1]
            if len(module_name) == 1:
                # print(name)
                module_name = name
            matching_modules.append(module_name)
    return matching_modules

module_types = (torch.nn.Linear, torch.nn.Conv2d)
target_modules = find_modules(example_cnn_lora, module_types)
target_modules = list(set(target_modules))
print(target_modules)

['up', 'down']


In [42]:
example_cnn = ConvBlock(3, 8).to(device)
example_cnn_lora = LoRAConvBlock(3, 8).to(device)

trainable_params = 0
all_params = 0
for _, param in example_cnn.named_parameters():
    num_params = param.numel()
    all_params += num_params
    if param.requires_grad:
        trainable_params += num_params
print(f"trainable params: {trainable_params:,d} || all params: {all_params:,d} || trainable%: {100 * trainable_params / all_params}")

trainable params: 622 || all params: 622 || trainable%: 100.0


In [43]:
from peft import LoraConfig, get_peft_model

exmaple_cnn_lora = example_cnn_lora.to(device)

# freeze params of models to save more memory
example_cnn_lora.requires_grad_(False)

config = LoraConfig(
    r=4,
    lora_alpha=4,
    lora_dropout=0.1,
    target_modules=["up", "down"],
    init_lora_weights= "gaussian",
    bias="none"
) # scale = alpha / r

example_cnn_lora = get_peft_model(example_cnn_lora, config).to(device)
example_cnn_lora.print_trainable_parameters()

trainable params: 444 || all params: 852 || trainable%: 52.1127


In [46]:
img = torch.rand((8, 3, 224, 224)).to(device) # B, C, H, W

In [47]:
example_cnn(img).shape

torch.Size([8, 8, 220, 220])

In [48]:
example_cnn_lora(img).shape

torch.Size([8, 8, 220, 220])