In [3]:
import math
import torch
import torch.nn as nn

# LORA and QLORA

In [19]:

class LORALayer(nn.Module):
    def __init__(self, input_dim, output_dim, rank, alpha=1):
        super(LORALayer, self).__init__()
        self.rank = rank
        self.alpha = alpha

        # Original weight and bias of the linear layer
        self.weight = nn.Parameter(torch.Tensor(output_dim, input_dim))
        self.bias = nn.Parameter(torch.Tensor(output_dim))

        # LORA specific parameters
        self.A = nn.Parameter(torch.Tensor(input_dim, rank))
        self.B = nn.Parameter(torch.Tensor(rank, output_dim))

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        nn.init.zeros_(self.bias)
        nn.init.normal_(self.A, 0, 0.02)
        nn.init.normal_(self.B, 0, 0.02)

    def forward(self, x):
        original_size = x.size()
        x = x.reshape(-1, original_size[-1])  # Reshape to [batch_size * seq_length, d_model]

        lora_adjustment = self.alpha * (x @ self.A) @ self.B
        x = nn.functional.linear(x, self.weight + lora_adjustment, self.bias)

        return x.reshape(original_size)  # Reshape back to original size

# Example usage
input_dim = 512
output_dim = 512
rank = 16  # Rank for the low-rank matrices A and B
alpha = 2  # Scaling factor for LORA adjustment

lora_layer = LORALayer(input_dim, output_dim, rank, alpha)
lora_layer

LORALayer()

In [20]:
class QLORALayer(nn.Module):
    def __init__(self, input_dim, output_dim, rank, alpha=1, quantization_bits=8):
        super(QLORALayer, self).__init__()
        self.rank = rank
        self.alpha = alpha
        self.quantization_bits = quantization_bits

        # Original weight and bias
        self.weight = nn.Parameter(torch.Tensor(output_dim, input_dim))
        self.bias = nn.Parameter(torch.Tensor(output_dim))

        # QLORA specific parameters
        self.A = nn.Parameter(torch.Tensor(input_dim, rank))
        self.B = nn.Parameter(torch.Tensor(rank, output_dim))

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        nn.init.zeros_(self.bias)
        nn.init.normal_(self.A, 0, 0.02)
        nn.init.normal_(self.B, 0, 0.02)

    def quantize(self, x, num_bits):
        # Implement a simple quantization method
        scale = x.abs().max()
        x_quantized = torch.round(x / scale * (2**num_bits - 1))
        return x_quantized, scale

    def forward(self, x):
        original_size = x.size()
        x = x.reshape(-1, original_size[-1])  # Reshape to [batch_size * seq_length, d_model]

        A_quantized, scale_A = self.quantize(self.A, self.quantization_bits)
        B_quantized, scale_B = self.quantize(self.B, self.quantization_bits)

        lora_adjustment = self.alpha * (x @ (A_quantized / scale_A)) @ (B_quantized / scale_B)
        x = nn.functional.linear(x, self.weight + lora_adjustment, self.bias)

        return x.reshape(original_size)

# Example usage
input_dim = 512
output_dim = 512
rank = 16
alpha = 2
quantization_bits = 8

qlora_layer = QLORALayer(input_dim, output_dim, rank, alpha, quantization_bits)
qlora_layer

QLORALayer()

In [12]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, lora_alpha=2, qlora_alpha=2, rank=16, quantization_bits=8):
        super(TransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        # Use LORALayer and QLORALayer in place of linear layers
        self.linear1 = LORALayer(d_model, dim_feedforward, rank, lora_alpha)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = QLORALayer(dim_feedforward, d_model, rank, qlora_alpha, quantization_bits)

        self.activation = nn.ReLU()

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, src, src_mask=None, src_key_padding_mask=None, is_causal=False):
        src2 = self.self_attn(src, src, src, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        src = self.norm1(src)

        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src

class TransformerModel(nn.Module):
    def __init__(self, ntoken, d_model, nhead, nlayers, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask, src_key_padding_mask)
        output = self.decoder(output)
        return output

# Example usage
ntoken = 1000  # size of vocabulary
d_model = 512  # embedding dimension
nhead = 8      # number of heads in nn.MultiheadAttention
nlayers = 2    # number of TransformerEncoderLayer
dropout = 0.2  # dropout probability

model = TransformerModel(ntoken, d_model, nhead, nlayers, dropout)


In [24]:
class LORALayer(nn.Module):
    def __init__(self, input_dim, output_dim, rank, alpha=1):
        super(LORALayer, self).__init__()
        self.rank = rank
        self.alpha = alpha

        # Original weight and bias of the linear layer
        self.weight = nn.Parameter(torch.Tensor(output_dim, input_dim))
        self.bias = nn.Parameter(torch.Tensor(output_dim))

        # LORA specific parameters
        self.A = nn.Parameter(torch.Tensor(input_dim, rank))
        self.B = nn.Parameter(torch.Tensor(rank, output_dim))

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        nn.init.zeros_(self.bias)
        nn.init.normal_(self.A, 0, 0.02)
        nn.init.normal_(self.B, 0, 0.02)

    def forward(self, x):
        print("LORALayer Input Shape:", x.shape)
        original_size = x.size()
        x = x.reshape(-1, original_size[-1])

        lora_adjustment = self.alpha * (x @ self.A) @ self.B
        adjusted_weight = self.weight + lora_adjustment
        print("Adjusted Weight Shape:", adjusted_weight.shape)
        x = nn.functional.linear(x, adjusted_weight, self.bias)
        print("LORALayer Output Shape:", x.shape)

        return x.reshape(original_size)

class QLORALayer(nn.Module):
    def __init__(self, input_dim, output_dim, rank, alpha=1, quantization_bits=8):
        super(QLORALayer, self).__init__()
        self.rank = rank
        self.alpha = alpha
        self.quantization_bits = quantization_bits

        # Original weight and bias
        self.weight = nn.Parameter(torch.Tensor(output_dim, input_dim))
        self.bias = nn.Parameter(torch.Tensor(output_dim))

        # QLORA specific parameters
        self.A = nn.Parameter(torch.Tensor(input_dim, rank))
        self.B = nn.Parameter(torch.Tensor(rank, output_dim))

        self.reset_parameters()

    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))
        nn.init.zeros_(self.bias)
        nn.init.normal_(self.A, 0, 0.02)
        nn.init.normal_(self.B, 0, 0.02)

    def quantize(self, x, num_bits):
        # Implement a simple quantization method
        scale = x.abs().max()
        x_quantized = torch.round(x / scale * (2**num_bits - 1))
        return x_quantized, scale

    def forward(self, x):
        print("QLORALayer Input Shape:", x.shape)
        original_size = x.size()
        x = x.reshape(-1, original_size[-1])

        A_quantized, scale_A = self.quantize(self.A, self.quantization_bits)
        B_quantized, scale_B = self.quantize(self.B, self.quantization_bits)

        lora_adjustment = self.alpha * (x @ (A_quantized / scale_A)) @ (B_quantized / scale_B)
        adjusted_weight = self.weight + lora_adjustment
        print("Adjusted Weight Shape:", adjusted_weight.shape)
        x = nn.functional.linear(x, adjusted_weight, self.bias)
        print("QLORALayer Output Shape:", x.shape)

        return x.reshape(original_size)

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, lora_alpha=2, qlora_alpha=2, rank=16, quantization_bits=8):
        super(TransformerEncoderLayer, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)

        # Use LORALayer and QLORALayer in place of linear layers
        self.linear1 = LORALayer(d_model, dim_feedforward, rank, lora_alpha)  # d_model -> dim_feedforward
        self.linear2 = QLORALayer(dim_feedforward, d_model, rank, qlora_alpha, quantization_bits)  # dim_feedforward -> d_model

        self.activation = nn.ReLU()
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, src, src_mask=None, src_key_padding_mask=None, is_causal=False):
        print("Encoder Layer Input Shape:", src.shape)
        src2 = self.self_attn(src, src, src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        src = self.norm1(src)

        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        print("Encoder Layer Output Shape:", src.shape)
        return src


class TransformerModel(nn.Module):
    def __init__(self, ntoken, d_model, nhead, nlayers, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        encoder_layers = TransformerEncoderLayer(d_model, nhead, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, nlayers)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, ntoken)

        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, src_mask=None, src_key_padding_mask=None):
        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_mask, src_key_padding_mask)
        output = self.decoder(output)
        return output

# Example usage
ntoken = 1000  # size of vocabulary
d_model = 2048  # embedding dimension
nhead = 8      # number of heads in nn.MultiheadAttention
nlayers = 2    # number of TransformerEncoderLayer
dropout = 0.2  # dropout probability

model = TransformerModel(ntoken, d_model, nhead, nlayers, dropout)




In [25]:
# Create dummy input data
seq_length = 10
dummy_input = torch.randint(ntoken, (seq_length, 1))  # Batch size of 1

# Forward pass
output = model(dummy_input)

# Check output shape
expected_output_shape = (seq_length, 1, ntoken)
assert output.shape == expected_output_shape, f"Output shape is incorrect: expected {expected_output_shape}, got {output.shape}"
print("Basic forward pass test passed.")


Encoder Layer Input Shape: torch.Size([10, 1, 2048])
LORALayer Input Shape: torch.Size([10, 1, 2048])


RuntimeError: The size of tensor a (2048) must match the size of tensor b (10) at non-singleton dimension 0

In [23]:
# 1. Forward Pass test

# Create dummy input data
vocab_size = 1000  # Adjust as per your model's vocabulary size
seq_length = 10    # Adjust sequence length as needed
dummy_input = torch.randint(vocab_size, (seq_length, 1))  # Batch size of 1

# Forward pass
output = model(dummy_input)

# Check output shape
expected_output_shape = (seq_length, 1, vocab_size)
assert output.shape == expected_output_shape, f"Output shape is incorrect: expected {expected_output_shape}, got {output.shape}"
print("Basic forward pass test passed.")

'''
# 2. Test Learning Capability

# Simple training loop
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

for epoch in range(1):  # Run for a few epochs
    optimizer.zero_grad()
    output = model(dummy_input)
    loss = criterion(output.view(-1, vocab_size), dummy_input.view(-1))
    loss.backward()
    optimizer.step()

    print(f"Epoch {epoch}, Loss: {loss.item()}")

# 3. Check Weights Update
    
# Save initial state of a specific weight for comparison
initial_weight = model.encoder.weight.data.clone()

# Perform a training step
optimizer.zero_grad()
output = model(dummy_input)
loss = criterion(output.view(-1, vocab_size), dummy_input.view(-1))
loss.backward()
optimizer.step()

# Check if the weights have been updated
assert not torch.equal(initial_weight, model.encoder.weight.data), "Weights did not update"
print("Weights update test passed.")
'''





RuntimeError: The size of tensor a (2048) must match the size of tensor b (10) at non-singleton dimension 0