# Dense LoRA

In [None]:
import math

class DenseLoraLayer(keras.layers.Layer):
    def __init__(
        self,
        original_layer,
        total_iteration = 1000 ,  # Total number of iterations for the decay
        start_percent=0.05,  # The percentage of total_iteration when decay starts
        end_percent=0.85,  # The percentage of total_iteration when decay ends
        min_decay_factor=0,  # The minimum value that decay factor can take
        rank=64,
        alpha=32,
        trainable=True,
        **kwargs,
    ):
        original_layer_config = original_layer.get_config()
        name = original_layer_config["name"]
        kwargs.pop("name", None)

        super().__init__(name=name, trainable=trainable, **kwargs)

        self.rank = rank
        self.alpha = alpha
        self._scale = alpha / rank

        self.original_layer = original_layer
        self.original_layer.trainable = False


        self.total_iteration = total_iteration
        self.start_step = int(total_iteration * start_percent)
        self.end_step = int(total_iteration * end_percent)
        self.min_decay_factor = min_decay_factor

        #trainable=False, 이 변수가 텐서플로우의 자동 미분 및 최적화 과정에 의해 업데이트되지 않는다는 뜻
        #수동으로 업데이트될 수 있습니다. 예를 들어, 반복문 안에서 이 변수의 값을 업데이트하는 로직을 작성할 수 있음!
        self.current_step = tf.Variable(0, dtype=tf.int32, trainable=False)
        self.decay_factor = tf.Variable(1.0, dtype=tf.float32, trainable=False)


    def build(self, input_shape):
        # LoRA weights.
        kernel_shape = self.original_layer.kernel.shape
        self.A_weight = self.add_weight(
            name="lora_A_weight",
            shape=(self.rank, kernel_shape[0]),
            initializer=keras.initializers.VarianceScaling(
                scale=math.sqrt(5), mode="fan_in", distribution="uniform"
            ),
            trainable=self.trainable,
        )

        self.B_weight = self.add_weight(
            name="lora_B_weight",
            shape=(self.original_layer.units, self.rank),
            initializer='zeros',
            trainable=self.trainable,
        )
        self.C_weight = self.add_weight(
            name="lora_C_weight",
            shape=(self.original_layer.units,),
            initializer='zeros',
            trainable=self.trainable,
        )
        super().build(input_shape)

    def call(self, inputs, training=None):
            if training is None:
                training = self.trainable

            # Calculate the linear decay factor
            if self.current_step < self.start_step:
                self.decay_factor.assign(1.0)  # Decay has not started yet
            elif self.current_step > self.end_step:
                self.decay_factor.assign(tf.cast(self.min_decay_factor, dtype=tf.float32))  # Ensure float32 type for consistency
            else:
                # Linear decay between start_step and end_step
                self.decay_factor.assign(1.0 - ((tf.cast(self.current_step, dtype=tf.float32) - self.start_step) /
                                        (self.end_step - self.start_step) *
                                        (1.0 - tf.cast(self.min_decay_factor, dtype=tf.float32))))

            # Matrix multiplication for A and B weights with inputs
            lora_A_output = tf.matmul(self.A_weight, tf.transpose(inputs))  # Ax
            lora_output = tf.transpose(tf.matmul(self.B_weight, lora_A_output) * self._scale)  # BAx Transpose back to [batch_size, original_layer.units]

            #lora_output *= (1 - self.decay_factor) # 멘토링 때 나온 의견

            if training:
                original_output = self.original_layer(inputs)
                # 평균과 표준편차 계산
                original_weight_matrix = self.original_layer.weights[0]
                original_mean = tf.reduce_mean(original_weight_matrix, axis=0)
                original_variance = tf.reduce_mean(tf.square(original_weight_matrix - original_mean), axis=0)
                original_stddev = tf.sqrt(original_variance)

                # decay_factor가 0.3보다 작으면 noise_mean과 noise_std를 0으로 설정
                noise_mean = tf.where(self.decay_factor < 0.3, 0.0, original_mean * (1 - self.decay_factor))
                noise_std = tf.where(self.decay_factor < 0.3, 0.0, original_stddev * tf.sqrt(1 - tf.square(self.decay_factor)))
                noise = tf.random.normal(tf.shape(original_weight_matrix), mean=noise_mean, stddev=noise_std)

                self.current_step.assign_add(1)

                return original_output * self.decay_factor + (inputs @ noise) + lora_output + self.C_weight

            else:
                # 추론 모드에서는 LoRA 출력만 반환
                return lora_output + self.C_weight


#Einsum LoRA

In [None]:
# EinsumLoraLayer_O
import math
from tensorflow import keras

class EinsumLoraLayer_O(keras.layers.Layer):
    def __init__(
        self,
        original_layer,
        total_iteration = 1000 ,  # Total number of iterations for the decay
        start_percent=0.05,  # The percentage of total_iteration when decay starts
        end_percent=0.85,  # The percentage of total_iteration when decay ends
        min_decay_factor=0,  # The minimum value that decay factor can take
        rank=64,
        alpha=32,
        trainable=True,
        **kwargs,
    ):
        original_layer_config = original_layer.get_config()
        name = original_layer_config["name"]
        kwargs.pop("name", None)

        super().__init__(name=name, trainable=trainable, **kwargs)

        self.rank = rank
        self.alpha = alpha
        self._scale = alpha / rank

        self.original_layer = original_layer
        self.original_layer.trainable = False


        self.total_iteration = total_iteration
        self.start_step = int(total_iteration * start_percent)
        self.end_step = int(total_iteration * end_percent)
        self.min_decay_factor = min_decay_factor

        #trainable=False, 이 변수가 텐서플로우의 자동 미분 및 최적화 과정에 의해 업데이트되지 않는다는 뜻
        #수동으로 업데이트될 수 있습니다. 예를 들어, 반복문 안에서 이 변수의 값을 업데이트하는 로직을 작성할 수 있음!
        self.current_step = tf.Variable(0, dtype=tf.int32, trainable=False)
        self.decay_factor = tf.Variable(1.0, dtype=tf.float32, trainable=False)


    def build(self, inputs_shape):

        kernel_shape = self.original_layer.kernel.shape
        bias_shape = self.original_layer.bias.shape
        self.A_weight = self.add_weight(
            name="lora_A_weight",
            shape= kernel_shape[:-1] + (self.rank,),
            initializer=keras.initializers.VarianceScaling(
                scale=math.sqrt(5), mode="fan_in", distribution="uniform"
            ),
            trainable=self.trainable,
        )

        self.B_weight = self.add_weight(
            name="lora_B_weight",
            shape=(self.rank, kernel_shape[-1]) ,
            initializer="zeros",
            trainable=self.trainable,
        )
        self.C_weight = self.add_weight(
            name="lora_C_weight",
            shape= bias_shape ,
            initializer='zeros',
            trainable=self.trainable,
        )
        super().build(inputs_shape)



    def call(self, inputs, training=None):
            if training is None:
                training = self.trainable

            lora_A_output = tf.einsum(self.original_layer.equation, inputs , self.A_weight)
            lora_output = tf.matmul(lora_A_output, self.B_weight) * self._scale

            if training:
                # Calculate the linear decay factor
                if self.current_step < self.start_step:
                    self.decay_factor.assign(1.0)  # Decay has not started yet
                elif self.current_step > self.end_step:
                    self.decay_factor.assign(tf.cast(self.min_decay_factor, dtype=tf.float32))  # Ensure float32 type for consistency
                else:
                # Linear decay between start_step and end_step
                    self.decay_factor.assign(1.0 - ((tf.cast(self.current_step, dtype=tf.float32) - self.start_step) /
                                        (self.end_step - self.start_step) *
                                        (1.0 - tf.cast(self.min_decay_factor, dtype=tf.float32))))


                # Matrix multiplication for A and B weights with inputs
                original_output = self.original_layer(inputs) * self.decay_factor
                # 평균과 표준편차 계산
                original_weight_matrix = self.original_layer.weights[0]
                original_mean = tf.reduce_mean(original_weight_matrix, axis=0)
                original_variance = tf.reduce_mean(tf.square(original_weight_matrix - original_mean), axis=0)
                original_stddev = tf.sqrt(original_variance)

                # decay_factor가 0.3보다 작으면 noise_mean과 noise_std를 0으로 설정
                noise_mean = tf.where(self.decay_factor < 0.3, 0.0, original_mean * (1 - self.decay_factor))
                noise_std = tf.where(self.decay_factor < 0.3, 0.0, original_stddev * tf.sqrt(1 - tf.square(self.decay_factor)))
                noise = tf.random.normal(tf.shape(original_weight_matrix), mean=noise_mean, stddev=noise_std)

                # Increment the step counter
                self.current_step.assign_add(1)

                return original_output * self.decay_factor + tf.einsum(self.original_layer.equation, inputs , noise ) + lora_output + self.C_weight

            else:
                # 추론 모드에서는 LoRA 출력만 반환
                return lora_output + self.C_weight

In [None]:
#EinsumLoraLayer_QKV
import math
from tensorflow import keras

class EinsumLoraLayer_QKV(keras.layers.Layer):
    def __init__(
        self,
        original_layer,
        total_iteration = 1000 ,  # Total number of iterations for the decay
        start_percent=0.05,  # The percentage of total_iteration when decay starts
        end_percent=0.85,  # The percentage of total_iteration when decay ends
        min_decay_factor=0,  # The minimum value that decay factor can take
        rank=64,
        alpha=32,
        trainable=True,
        **kwargs,
    ):
        original_layer_config = original_layer.get_config()
        name = original_layer_config["name"]
        kwargs.pop("name", None)

        super().__init__(name=name, trainable=trainable, **kwargs)

        self.rank = rank
        self.alpha = alpha
        self._scale = alpha / rank

        self.original_layer = original_layer
        self.original_layer.trainable = False


        self.total_iteration = total_iteration
        self.start_step = int(total_iteration * start_percent)
        self.end_step = int(total_iteration * end_percent)
        self.min_decay_factor = min_decay_factor

        #trainable=False, 이 변수가 텐서플로우의 자동 미분 및 최적화 과정에 의해 업데이트되지 않는다는 뜻
        #수동으로 업데이트될 수 있습니다. 예를 들어, 반복문 안에서 이 변수의 값을 업데이트하는 로직을 작성할 수 있음!
        self.current_step = tf.Variable(0, dtype=tf.int32, trainable=False)
        self.decay_factor = tf.Variable(1.0, dtype=tf.float32, trainable=False)


    def build(self, inputs_shape):

        kernel_shape = self.original_layer.kernel.shape
        bias_shape = self.original_layer.bias.shape
        self.A_weight = self.add_weight(
            name="lora_A_weight",
            shape=(self.rank, kernel_shape[0]),
            initializer=keras.initializers.VarianceScaling(
                scale=math.sqrt(5), mode="fan_in", distribution="uniform"
            ),
            trainable=self.trainable,
        )

        self.B_weight = self.add_weight(
            name="lora_B_weight",
            shape=(self.rank,) + kernel_shape[1:],
            initializer="zeros",
            trainable=self.trainable,
        )
        self.C_weight = self.add_weight(
            name="lora_C_weight",
            shape= bias_shape ,
            initializer='zeros',
            trainable=self.trainable,
        )
        super().build(inputs_shape)



    def call(self, inputs, training=None):
            if training is None:
                training = self.trainable

            # Matrix multiplication for A and B weights with inputs
            lora_A_output = tf.matmul(inputs, tf.transpose(self.A_weight)) #xA
            lora_output = tf.einsum(self.original_layer.equation, lora_A_output , self.B_weight) * self._scale  # BAx Transpose back to [batch_size, original_layer.units]

            if training:
                # Calculate the linear decay factor
                if self.current_step < self.start_step:
                    self.decay_factor.assign(1.0)  # Decay has not started yet
                elif self.current_step > self.end_step:
                    self.decay_factor.assign(tf.cast(self.min_decay_factor, dtype=tf.float32))  # Ensure float32 type for consistency
                else:
                # Linear decay between start_step and end_step
                    self.decay_factor.assign(1.0 - ((tf.cast(self.current_step, dtype=tf.float32) - self.start_step) /
                                        (self.end_step - self.start_step) *
                                        (1.0 - tf.cast(self.min_decay_factor, dtype=tf.float32))))


                # Matrix multiplication for A and B weights with inputs
                original_output = self.original_layer(inputs) * self.decay_factor
                # 평균과 표준편차 계산
                original_weight_matrix = self.original_layer.weights[0]
                original_mean = tf.reduce_mean(original_weight_matrix, axis=0)
                original_variance = tf.reduce_mean(tf.square(original_weight_matrix - original_mean), axis=0)
                original_stddev = tf.sqrt(original_variance)

                # decay_factor가 0.3보다 작으면 noise_mean과 noise_std를 0으로 설정
                noise_mean = tf.where(self.decay_factor < 0.3, 0.0, original_mean * (1 - self.decay_factor))
                noise_std = tf.where(self.decay_factor < 0.3, 0.0, original_stddev * tf.sqrt(1 - tf.square(self.decay_factor)))
                noise = tf.random.normal(tf.shape(original_weight_matrix), mean=noise_mean, stddev=noise_std)

                # Increment the step counter
                self.current_step.assign_add(1)

                return original_output * self.decay_factor +  tf.einsum(self.original_layer.equation, inputs , noise ) + lora_output + self.C_weight

            else:
                # 추론 모드에서는 LoRA 출력만 반환
                return lora_output + self.C_weight

# Conv LoRA

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, initializers
from tensorflow.keras.layers import Conv2D, Conv1D, Conv3D

class ConvLoRALayer(layers.Layer):
    def __init__(
        self,
        original_conv_layer,
        total_iteration = 1000 ,  # Total number of iterations for the decay
        start_percent=0.1,  # The percentage of total_iteration when decay starts
        end_percent=0.9,  # The percentage of total_iteration when decay ends
        min_decay_factor=0,  # The minimum value that decay factor can take
        rank=32,
        alpha=32,
        trainable=True,
        **kwargs
    ):
        # Capture the original layer's configuration.
        original_layer_config = original_conv_layer.get_config()
        name = original_layer_config["name"]
        kwargs.pop("name", None)

        super().__init__(name=name, trainable=trainable, **kwargs)

        self.rank = rank
        self.alpha = alpha
        self._scale = alpha / rank

        # The original convolutional layer is set to non-trainable to freeze its weights.
        self.original_conv_layer = original_conv_layer
        self.original_conv_layer.trainable = False

        self.kernel = None
        self.filters = original_conv_layer.filters #
        self.kernel_size = original_conv_layer.kernel_size[0] #
        self.in_channels = None

        self.total_iteration = total_iteration
        self.start_step = int(total_iteration * start_percent)
        self.end_step = int(total_iteration * end_percent)
        self.min_decay_factor = min_decay_factor

        #trainable=False, 이 변수가 텐서플로우의 자동 미분 및 최적화 과정에 의해 업데이트되지 않는다는 뜻
        #수동으로 업데이트될 수 있습니다. 예를 들어, 반복문 안에서 이 변수의 값을 업데이트하는 로직을 작성할 수 있음!
        self.current_step = tf.Variable(0, dtype=tf.int32, trainable=False)
        self.decay_factor = tf.Variable(1.0, dtype=tf.float32, trainable=False)

    def build(self, input_shape):
        # Ensure the original convolutional layer is built.
        #if not self.original_conv_layer.built:
        #    self.original_conv_layer.build(input_shape)

        # Calculate the shape for LoRA weights A and B.
        #self.kernel = self.original_conv_layer.kernel
        self.in_channels = input_shape[-1]

        in_channels = self.in_channels
        out_channels = self.filters
        kernel_size = self.original_conv_layer.kernel_size[0]

        # LoRA weights A and B.
        self.A_weight = self.add_weight(
            name="lora_A_weight",
            shape=(self.rank*kernel_size, in_channels*kernel_size),
            initializer=initializers.VarianceScaling(scale=1.0, mode='fan_in', distribution='uniform'),
            trainable=self.trainable
        )

        self.B_weight = self.add_weight(
            name="lora_B_weight",
            shape=(out_channels*kernel_size, self.rank*kernel_size),
            initializer="zeros",
            trainable=self.trainable
        )

        bias_shape = self.original_conv_layer.bias.shape
        self.C_weight = self.add_weight(
            name="lora_C_weight",
            shape=bias_shape,
            initializer="zeros",
            trainable=self.trainable
        )

        super().build(input_shape)

    def call(self, inputs, training=None):
        if training is None:
                training = self.trainable

        # Calculate the linear decay factor
        if self.current_step < self.start_step:
            self.decay_factor.assign(1.0)  # Decay has not started yet
        elif self.current_step > self.end_step:
            self.decay_factor.assign(tf.cast(self.min_decay_factor, dtype=tf.float32))  # Ensure float32 type for consistency
        else:
            # Linear decay between start_step and end_step
            self.decay_factor.assign(1.0 - ((tf.cast(self.current_step, dtype=tf.float32) - self.start_step) /
                                    (self.end_step - self.start_step) *
                                    (1.0 - tf.cast(self.min_decay_factor, dtype=tf.float32))))

        lora_BA = (self.B_weight@self.A_weight)

        kernel_size = self.original_conv_layer.kernel_size[0]
        in_channels = self.in_channels
        out_channels = self.filters

           # lora_BA의 형태 변환
           # lora_BA가 (out_channels*kernel_size*kernel_size, in_channels*kernel_size*kernel_size) 형태라고 가정
           # 이를 (kernel_size, kernel_size, in_channels, out_channels)로 변환
        lora_BA_reshaped = tf.reshape(lora_BA, (out_channels, kernel_size, kernel_size, in_channels))
        lora_BA_reshaped = tf.transpose(lora_BA_reshaped, [1, 2, 3, 0])
        lora_output = tf.nn.conv2d(inputs, lora_BA_reshaped, strides=[1, 1, 1, 1], padding='SAME') * self._scale

        # original_output = self.original_conv_layer(inputs) * self.decay_factor

        if training:
            original_output = self.original_conv_layer(inputs)
            # 평균과 표준편차 계산
            original_weight_matrix = self.original_conv_layer.weights[0]
            original_mean = tf.reduce_mean(original_weight_matrix)
            original_variance = tf.reduce_mean(tf.square(original_weight_matrix - original_mean))
            original_stddev = tf.sqrt(original_variance)

            # decay_factor가 0.3보다 작으면 noise_mean과 noise_std를 0으로 설정
            noise_mean = tf.where(self.decay_factor < 0.3, 0.0, original_mean * (1 - self.decay_factor))
            noise_std = tf.where(self.decay_factor < 0.3, 0.0, original_stddev * tf.sqrt(1 - tf.square(self.decay_factor)))
            noise = tf.random.normal(tf.shape(original_weight_matrix), mean=noise_mean, stddev=noise_std)
            noise_output = tf.nn.conv2d(inputs, noise, strides=[1, 1, 1, 1], padding='SAME')

            self.current_step.assign_add(1)

            return original_output * self.decay_factor + noise_output + lora_output + self.C_weight

        else:
            # 추론 모드에서는 LoRA 출력만 반환
            return lora_output + self.C_weight