In [None]:
import json
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from sentencepiece import SentencePieceProcessor
import os
import torch.nn as nn
from dataclasses import dataclass
from typing import Optional
import torch.nn.functional as F
import math
torch.set_printoptions(sci_mode=False)

In [None]:

# Function to pad a list of tensors to the same length
def pad_tensors(list_of_tensors):
    tensor_count = len(list_of_tensors) if not torch.is_tensor(list_of_tensors) else list_of_tensors.shape[0]
    max_dim = max(t.shape[0] for t in list_of_tensors)  # Find the maximum length
    res = []
    for t in list_of_tensors:
        # Create a zero tensor of the desired shape
        res_t = torch.zeros(max_dim, *t.shape[1:]).type(t.dtype).to(t.device)
        res_t[:t.shape[0]] = t  # Copy the original tensor into the padded tensor
        res.append(res_t)

    # Concatenate tensors along a new dimension
    res = torch.cat(res)
    firstDim = len(list_of_tensors)
    secondDim = max_dim

    # Reshape the result to have the new dimension first
    return res.reshape(firstDim, secondDim, *res.shape[1:])

class MyDataset(Dataset):
    def __init__(self):
        super().__init__()
        training_data = {
        "how are you": "i am fine <end>",
        "who is john": "a nice person <end>",
        "who is nice": "john <end>",
        "where is john": "at home <end>",
        "how is john": "i dont know <end>",
        "who are you": "mini gpt model <end>",
        "what is your name": "i am a GPT model <end>",
        "what is GPT": "Generative Pre-trained Transformer <end>",
        "how old are you": "I was created recently <end>",
        "what can you do": "I can generate text <end>",
        "are you intelligent": "I try to be <end>",
        "what is AI": "Artificial Intelligence <end>",
        "who created you": "programmers <end>",
        "where do you live": "in the cloud <end>",
        "do you like humans": "I like helping them <end>",
        "what is your purpose": "to assist and provide information <end>",
        "can you learn": "I can be trained on data <end>",
        "are you alive": "I am just a program <end>",
        "what is your favorite color": "I do not have a preference <end>",
        "do you have emotions": "I do not feel emotions <end>",
        "what is love": "a complex human emotion <end>",
        "can you think": "I process information <end>",
        "do you dream": "I do not dream <end>",
        "what is happiness": "a state of well-being <end>",
        "can you make decisions": "I can make choices based on data <end>",
        "are you human": "I am an AI model <end>"
        }

        # Extract input and target phrases
        data_words = [k for k, _ in training_data.items()]
        target_words = [v for _, v in training_data.items()]

        # Build vocabulary from training data
        self.vocabulary_words = list(set([element.lower() for nestedlist in [x.split(" ") for x in data_words] for element in nestedlist] + [element.lower() for nestedlist in [x.split(" ") for x in target_words] for element in nestedlist]))

        # Ensure <end> token is at the end of vocabulary list, and there's a blank at the beginning
        self.vocabulary_words.remove("<end>")
        self.vocabulary_words.append("<end>")
        self.vocabulary_words.insert(0, "")

        # Create mappings from word to index and index to word
        self.word_to_ix = {self.vocabulary_words[k].lower(): k for k in range(len(self.vocabulary_words))}
        self.ix_to_word = {v: k for k, v in self.word_to_ix.items()}

        training_data2 = {
        "how are you": "i am fine <end>",
        "who is john": "a nice person <end>",
        "who is nice": "john <end>",
        "where is john": "at home <end>",
        "how is john": "i dont know <end>",
        "who are you": "mini gpt model <end>",
        "what is your name": "i am a GPT model <end>",
        "what is GPT": "Generative Pre-trained Transformer <end>",
        "how old are you": "I was created recently <end>",
        "what can you do": "I can generate text <end>",
        "are you intelligent": "I try to be <end>",
        "what is AI": "Artificial Intelligence <end>",
        "who created you": "programmers <end>",
        "where do you live": "in the cloud <end>",
        "do you like humans": "I like helping them <end>",
        "what is your purpose": "to assist and provide information <end>",
        "can you learn": "I can be trained on data <end>",
        "are you alive": "I am just a program <end>",
        "what is your favorite color": "I do not have a preference <end>",
        "do you have emotions": "I do not feel emotions <end>",
        "what is love": "a complex human emotion <end>",
        "can you think": "I process information <end>",
        "do you dream": "I do not dream <end>",
        }

        # Extract input and target phrases
        data_words = [k for k, _ in training_data2.items()]
        target_words = [v for _, v in training_data2.items()]
        data_words_tensor = self.Encode(data_words)
        target_words_tensor = self.Encode(target_words)



        self.input_ids = torch.cat((data_words_tensor, target_words_tensor),1)





    def Decode(self, x : torch.Tensor):
        index_batch = torch.tensor(x).cpu().numpy().tolist()
        # print("index_batch",index_batch)
        res = []
        for indices in index_batch:
            words = []
            for ix in indices:
                words.append(self.ix_to_word[ix].lower())  # Convert index to word
                if ix == self.word_to_ix["<end>"]:
                    break  # Stop when <end> token is encountered
            res.append(" ".join(words))
        return res

    def Encode(self, seq_batch, device = None):
        index_batch = []

        # Loop over sequences in the batch
        for seq in seq_batch:
            word_list = seq.lower().split(" ")
            indices = [self.word_to_ix[word] for word in word_list if word in self.word_to_ix]
            t = torch.tensor(indices)
            if device is not None:
                t = t.to(device)  # Transfer tensor to the specified device
            index_batch.append(t)

        # Pad tensors to have the same length
        return pad_tensors(index_batch)
    def vocab_size(self):
        return len(self.vocabulary_words)

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx]






tokenizer = MyDataset()
train_dataloader = DataLoader(tokenizer, batch_size=4, shuffle=True)
tokenizer.vocab_size()
# train_dataloader = DataLoader(train_dataset, batch_size=3, collate_fn=collate_fn, shuffle=True, pin_memory=True)

87

In [None]:

@dataclass
class ModelArgs:
    dim: int = 512
    n_layers: int = 4
    n_heads: int = 4
    vocab_size: int = -1
    multiple_of: int = 256
    norm_eps: float = 1e-5

def look_ahead_mask( seq_len):
    mask = torch.full([seq_len, seq_len], float("-inf"))
    mask = torch.triu(mask, diagonal=1)

    return mask

class RMSNorm(nn.Module):
    def __init__(self, dim: int, eps:float = 1e-6):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(dim))

    def _norm(self, x: torch.Tensor):
        #(B,seq_len, dim) * (B,seq_len,1) = (B,seq_len,dim)
        #rsqrt: 1/sqrt(x)
        return x * torch.rsqrt(x.pow(2).mean(-1,keepdim=True)+self.eps)

    def forward(self, x: torch.Tensor):
        return self.weight * self._norm(x.float()).type_as(x)

class SelfAttention(nn.Module):

    def __init__(self, args: ModelArgs):
        super().__init__()


        self.n_heads = args.n_heads
        self.head_dim = args.dim // args.n_heads

        self.wq = nn.Linear(args.dim, args.n_heads * self.head_dim, bias=False)
        self.wk = nn.Linear(args.dim, self.n_heads * self.head_dim, bias=False)
        self.wv = nn.Linear(args.dim, self.n_heads * self.head_dim, bias=False)
        self.wo = nn.Linear(args.n_heads * self.head_dim, args.dim, bias=False)




    def forward(self, x: torch.Tensor , LH_mask = False, padding_mask = False):
        batch_size, seq_len, _ = x.shape # (B, 1, dim)

        #(B,1,dim) -->(B,1,head_q * Head_dim)
        xq = self.wq(x)
        #(B,1,dim) -->(B,1,head_kv * Head_dim)
        xk = self.wk(x)
        xv = self.wv(x)

        #(B,1,head_q * Head_dim) --> (B,1,Head_q, head_dim)
        xq = xq.view(batch_size, seq_len, self.n_heads, self.head_dim)
        #(B,1,head_kv * Head_dim) --> (B,1,head_kv, head_dim)
        xk = xk.view(batch_size, seq_len, self.n_heads, self.head_dim)
        xv = xv.view(batch_size, seq_len, self.n_heads, self.head_dim)




        #(B, 1, Head_q, Head_dim) -> (B, head_q, 1, Head_dim)
        xq = xq.transpose(1,2)

        #(B, seq_len_kv, Head_q, head_dim) -> (B, Head_q, seq_len_kv, head_dim)
        xk = xk.transpose(1,2)
        xv = xv.transpose(1,2)


        # (B, Head_q, 1, Head_dim) @ (B, Head_q, Head_dim,seq_len_kv) -> (B, Head_q, 1 ,seq_len_kv)
        scores = torch.matmul(xq, xv.transpose(2,3))/math.sqrt(self.head_dim)
        if LH_mask ==True:
            mask = torch.full([seq_len, seq_len], float("-inf"), device=x.device)
            mask = torch.triu(mask, diagonal=1)
            scores = scores + mask

        if padding_mask != False:
            scores = scores + padding_mask
        # (B, Head_q, 1, Head_dim) @ (B, Head_q, Head_dim,seq_len_kv) -> (B, Head_q, 1 ,seq_len_kv)
        scores = F.softmax(scores.float(), dim=-1).type_as(xq)

        #(B, Head_q, 1 ,seq_len_kv) @ (B, Head_q, seq_len_kv, head_dim) -> (B, Head_q, 1, head_dim)
        output = torch.matmul(scores, xv)
        #(B, Head_q, 1, head_dim) -> (B, 1, Head_q, head_dim) -> (B,1,dim)
        output = output.transpose(1,2).contiguous().view(batch_size,seq_len,-1)

        #(dim.dim) @ (B,1,dim)
        return self.wo(output)


class FeedForward(nn.Module):
    def __init__(self, args: ModelArgs):
        super().__init__()
        # 4096 * 4 -> 16384
        hidden_dim = 4 * args.dim
        # (16384 * 2) / 3 -> 10922
        hidden_dim = int(2 * hidden_dim / 3)

        # 10922 -> 11008
        hidden_dim = args.multiple_of * ((hidden_dim + args.multiple_of -1) // args.multiple_of)

        self.w1 = nn.Linear(args.dim, hidden_dim, bias = False)
        self.w2 = nn.Linear(hidden_dim, args.dim, bias = False)
        self.w3 = nn.Linear(args.dim, hidden_dim, bias = False)

    def forward(self, x: torch.Tensor):
        swish = F.silu(self.w1(x))
        x_v = self.w3(x)
        x = swish * x_v
        x = self.w2(x)
        return x



class EncoderBlock(nn.Module):
    def __init__(self, args: ModelArgs):
        super().__init__()

        self.n_hjead = args.n_heads
        self.dim = args.dim
        self.head_dim = args.dim // args.n_heads

        self.attention = SelfAttention(args)
        self.feed_forward = FeedForward(args)

        #Normalization Before the self attention
        self.attention_norm = RMSNorm(args.dim, eps = args.norm_eps)
        # Normalization Before the feed forward block
        self.ffn_norm = RMSNorm(args.dim, eps = args.norm_eps)


    def forward(self, x:torch.Tensor,  LH_mask = False, Pad_mask = False):
        #(B, seq_len, dim) + (B, seq_len, dim) --> (B, seq_len, dim)
        h = x+ self.attention.forward(self.attention_norm(x),  LH_mask)
        out = h + self.feed_forward.forward(self.ffn_norm(h))
        return out

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_len):
        super().__init__()

        self.dropout = nn.Dropout(dropout_p)

        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, dim_model)
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) # 0, 1, 2, 3, 4, 5
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model) # 1000^(2i/dim_model)

        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)

        # Saving buffer (same as parameter without gradients needed)
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding", pos_encoding)

    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        # Residual connection + pos encoding
        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])

In [None]:
class Transformer(nn.Module):

    def __init__(self, args: ModelArgs) -> None:
        super().__init__()

        assert args.vocab_size != -1, "Vocab sisze must be set"

        self.args = args
        self.vocab_size = args.vocab_size
        self.n_layers = args.n_layers
        self.tok_embeddings = nn.Embedding(self.vocab_size, args.dim)
        self.positional_encoder = PositionalEncoding(dim_model=args.dim, dropout_p=0, max_len=5000)

        self.layers = nn.ModuleList()
        for _ in range(args.n_layers):
            self.layers.append(EncoderBlock(args))

        self.norm = RMSNorm(args.dim, eps = args.norm_eps)
        self.output = nn.Linear(args.dim, self.vocab_size, bias=False)






    def forward(self, tokens: torch.Tensor, LH_mask = False, Pad_mask = False):
        # (B, Seq_len)
        batch_size, seq_len = tokens.shape
        # assert seq_len == 1 # "only one token at a time can be processed"

        if Pad_mask == True:
            padding_mask = (tokens == 0)  # 패딩 토큰은 0으로 되어 있다고 가정합니다.
            padding_mask = padding_mask.unsqueeze(1).repeat(1, seq_len, 1) | torch.transpose(padding_mask.unsqueeze(1).repeat(1, seq_len, 1), 2,1)
            padding_mask = padding_mask * float("-inf")

        # (B. Seq_len) -> (B, Seq_len, Dim)
        h = self.tok_embeddings(tokens)
        h = self.positional_encoder(h)

        #consecutively apply all the encoder layers
        for layer in self.layers:
            h = layer(h,  LH_mask, padding_mask)
        h = self.norm(h)
        output = self.output(h).float()
        return output

In [None]:
model_args: ModelArgs = ModelArgs()

In [None]:
device = "cpu"
# if device == "cuda":
    # torch.set_default_tensor_type(torch.cuda.HalfTensor)
model_args.vocab_size = tokenizer.vocab_size()

model = Transformer(model_args).to(device)
# model = nn.DataParallel(model, device_ids = [0,1])
opt = torch.optim.Adam(model.parameters(), lr=0.0001)
loss_fn = nn.CrossEntropyLoss()

In [None]:
def train_loop(model, opt, loss_fn, dataloader, device, print_random=False):
    model.train()
    total_loss = 0
    max_batches = 700
    iteration = 1
    # pbar = tqdm(dataloader, position=0, leave=True)
    for labels in dataloader:
        # X = inputs.to(device)
        # print(X.size())
        Y = labels.to(device)
        padding_mask = Y[:,1:] != 0
        # 이제 tgt를 1만큼 이동하여 <SOS>를 사용하여 pos 1에서 토큰을 예측
        y_input = Y[:,:-1]
        # y_expected = Y[:,1:]
        # LH_mask = look_ahead_mask(y_input.size(1)).to(torch.device(device))

        # X, y_input 및 tgt_mask를 전달하여 표준 training
        pred = model(y_input, LH_mask = True, Pad_mask = True)
        # print(str(X.size()) + " >> "+str(pred.size()))
        # print(y_expected.size())
        # Permute 를 수행하여 batch first
        if print_random == True:
            random_index = torch.randint(0, pred.size(0), (2,))
            Y_target = y_input[random_index].to("cpu")
            predicted_classes = torch.argmax(pred[random_index], dim=-1).to("cpu")
            # print(predicted_classes)
            print("pred_",[tokenizer.Decode(pred_.unsqueeze(1)) for pred_ in predicted_classes])
            print("Y_",[tokenizer.Decode(Y_.unsqueeze(1)) for Y_ in Y_target])



        #total loss cal
        # loss = loss_fn(pred, Y[:,1:])

        #pad loss cla


        loss = F.cross_entropy(pred.reshape(-1,pred.size(2)), Y[:,1:].reshape(-1), reduction='none')

        loss = loss.view(Y[:,1:].size())

        # print(loss[0])
        # 패딩이 아닌 위치의 손실만을 고려하여 평균 손실 계산
        loss = torch.sum(loss * padding_mask) / torch.sum(padding_mask)

        opt.zero_grad()
        loss.backward()
        opt.step()

        total_loss += loss.detach().item()
        iteration += 1
        # pbar.set_description("loss %s" % (total_loss/iteration))

    return total_loss / len(dataloader)
    # return total_loss / max_batches

In [None]:
epoch = 100
for i in range(epoch):
    loss=train_loop(model, opt, loss_fn, train_dataloader, device)
    if (i+1) % 3 == 0:
        print(f"epoch {i+1} || " +str(loss))

    if (i+1) % 100 == 0:
        train_loop(model, opt, loss_fn, train_dataloader, device, print_random=True)

epoch 3 || 2.63858163356781
epoch 6 || 1.3259855310122173
epoch 9 || 0.7188119490941366
epoch 12 || 0.4775961736838023
epoch 15 || 0.32857658962408703
epoch 18 || 0.2835540895660718


KeyboardInterrupt: 

In [None]:
prompt_ = ["what is happiness", "can you make decisions","are you human" ,"how are you","who is john","who is nice"]
model.eval()
prompt_tok = tokenizer.Encode(prompt_)
end_tok = tokenizer.Encode(["<end>"])[-1,-1]
max_length = 10
total_output = prompt_tok.to(device)
# print(total_output)
for _ in range(max_length):
    out=model(total_output,LH_mask = True, Pad_mask = True)

    out=torch.argmax(out, dim=-1)
    # print(out[:,-1][0])
    # if out[:,-1][0] == end_tok:
    #     break

    # print( out[:,-1].unsqueeze(-1))
    total_output = torch.cat((total_output, out[:,-1].unsqueeze(-1)), 1)

tokenizer.Decode(total_output)

  index_batch = torch.tensor(x).cpu().numpy().tolist()


['what is happiness  a complex human emotion <end>',
 'can you make decisions learn can be trained on data <end>',
 'are you human  i am just a program <end>',
 'how are you  i am fine <end>',
 'who is john  a nice person <end>',
 'who is nice  john <end>']

# LoRA

In [None]:

class MyDataset(Dataset):
    def __init__(self):
        super().__init__()
        training_data = {
        "how are you": "i am fine <end>",
        "who is john": "a nice person <end>",
        "who is nice": "john <end>",
        "where is john": "at home <end>",
        "how is john": "i dont know <end>",
        "who are you": "mini gpt model <end>",
        "what is your name": "i am a GPT model <end>",
        "what is GPT": "Generative Pre-trained Transformer <end>",
        "how old are you": "I was created recently <end>",
        "what can you do": "I can generate text <end>",
        "are you intelligent": "I try to be <end>",
        "what is AI": "Artificial Intelligence <end>",
        "who created you": "programmers <end>",
        "where do you live": "in the cloud <end>",
        "do you like humans": "I like helping them <end>",
        "what is your purpose": "to assist and provide information <end>",
        "can you learn": "I can be trained on data <end>",
        "are you alive": "I am just a program <end>",
        "what is your favorite color": "I do not have a preference <end>",
        "do you have emotions": "I do not feel emotions <end>",
        "what is love": "a complex human emotion <end>",
        "can you think": "I process information <end>",
        "do you dream": "I do not dream <end>",
        "what is happiness": "a state of well-being <end>",
        "can you make decisions": "I can make choices based on data <end>",
        "are you human": "I am an AI model <end>"
        }

        # Extract input and target phrases
        data_words = [k for k, _ in training_data.items()]
        target_words = [v for _, v in training_data.items()]

        # Build vocabulary from training data
        self.vocabulary_words = list(set([element.lower() for nestedlist in [x.split(" ") for x in data_words] for element in nestedlist] + [element.lower() for nestedlist in [x.split(" ") for x in target_words] for element in nestedlist]))

        # Ensure <end> token is at the end of vocabulary list, and there's a blank at the beginning
        self.vocabulary_words.remove("<end>")
        self.vocabulary_words.append("<end>")
        self.vocabulary_words.insert(0, "")

        # Create mappings from word to index and index to word
        self.word_to_ix = {self.vocabulary_words[k].lower(): k for k in range(len(self.vocabulary_words))}
        self.ix_to_word = {v: k for k, v in self.word_to_ix.items()}

        training_data2 = {

        "what is happiness": "a state of well-being <end>",
        "can you make decisions": "I can make choices based on data <end>",
        "are you human": "I am an AI model <end>"
        }

        # Extract input and target phrases
        data_words = [k for k, _ in training_data2.items()]
        target_words = [v for _, v in training_data2.items()]


        data_words_tensor = self.Encode(data_words)
        target_words_tensor = self.Encode(target_words)



        self.input_ids = torch.cat((data_words_tensor, target_words_tensor),1)





    def Decode(self, x : torch.Tensor):
        index_batch = torch.tensor(x).cpu().numpy().tolist()
        # print("index_batch",index_batch)
        res = []
        for indices in index_batch:
            words = []
            for ix in indices:
                words.append(self.ix_to_word[ix].lower())  # Convert index to word
                if ix == self.word_to_ix["<end>"]:
                    break  # Stop when <end> token is encountered
            res.append(" ".join(words))
        return res

    def Encode(self, seq_batch, device = None):
        index_batch = []

        # Loop over sequences in the batch
        for seq in seq_batch:
            word_list = seq.lower().split(" ")
            indices = [self.word_to_ix[word] for word in word_list if word in self.word_to_ix]
            t = torch.tensor(indices)
            if device is not None:
                t = t.to(device)  # Transfer tensor to the specified device
            index_batch.append(t)

        # Pad tensors to have the same length
        return pad_tensors(index_batch)
    def vocab_size(self):
        return len(self.vocabulary_words)

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx]






tokenizer = MyDataset()
train_dataloader = DataLoader(tokenizer, batch_size=4, shuffle=True)

In [None]:
# 기존 모델에서 lora 연결 가능한 레이어 리턴
def lora_connect_able(model):
        # 모듈의 자식들을 순회합니다.
    linear_list = set()
    for name, child in model.named_children():
        # 자식이 Linear 레이어인 경우
        if isinstance(child, nn.Linear):
            print( "Found Linear layer:", child)
            print( "Found Linear layer name:", name)
            linear_list.add(name)

        # 자식이 컨테이너 또는 다른 레이어 타입인 경우, 재귀적으로 내부를 확인합니다.
        elif any(isinstance(child, cls) for cls in [nn.ModuleList, EncoderBlock, SelfAttention,FeedForward ]):
            print("Traversing inside container or custom layer")
            [linear_list.add(e) for e in lora_connect_able(child )]
        # 다른 레이어 유형은 무시
        else:
            continue
    return linear_list
print(lora_connect_able(model))

Traversing inside container or custom layer
Traversing inside container or custom layer
Traversing inside container or custom layer
Found Linear layer: Linear(in_features=512, out_features=512, bias=False)
Found Linear layer name: wq
Found Linear layer: Linear(in_features=512, out_features=512, bias=False)
Found Linear layer name: wk
Found Linear layer: Linear(in_features=512, out_features=512, bias=False)
Found Linear layer name: wv
Found Linear layer: Linear(in_features=512, out_features=512, bias=False)
Found Linear layer name: wo
Traversing inside container or custom layer
Found Linear layer: Linear(in_features=512, out_features=1536, bias=False)
Found Linear layer name: w1
Found Linear layer: Linear(in_features=1536, out_features=512, bias=False)
Found Linear layer name: w2
Found Linear layer: Linear(in_features=512, out_features=1536, bias=False)
Found Linear layer name: w3
Traversing inside container or custom layer
Traversing inside container or custom layer
Found Linear layer:

In [None]:
#LoRA 정의
# 기본 모델 정의
class layer_test(nn.Module):
    def __init__(self, input_dim,hidden_dim, output_dim):
        super(layer_test, self).__init__()

        self.layer1 = nn.Linear(input_dim,hidden_dim)
        self.layer2 = nn.Linear(hidden_dim,output_dim)
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        return x

class LoRALayer(torch.nn.Module):
    def __init__(self, in_dim, out_dim, rank, alpha, device):
        super().__init__()
        self.device = device
        std_dev = 1 / torch.sqrt(torch.tensor(rank).float())
        self.A = torch.nn.Parameter(torch.randn(in_dim, rank) * std_dev).to(self.device)

        self.B = torch.nn.Parameter(torch.zeros(rank, out_dim)).to(self.device)

        self.alpha = alpha


    def forward(self, x):
        x = self.alpha * (x @  self.A @ self.B)

        return x.to(self.device)


class LinearWithLoRA(torch.nn.Module):
    def __init__(self, linear, rank, alpha, model_device="cpu", lora_device="cpu"):
        super().__init__()
        self.linear = linear
        self.lora = LoRALayer(
            linear.in_features, linear.out_features, rank, alpha, device=lora_device
        )
        self.model_device = model_device
        self.lora_device = lora_device
        self.enabled = True


    def forward(self, x):
        if self.enabled == True:
            return self.linear(x) + (self.lora(x.to(self.lora_device))).to(self.model_device)
        else:
            return self.linear(x)

In [None]:
from functools import partial
import copy
model.to('cpu')
for param in model.parameters():
    param.requires_grad = False
model_lora=copy.deepcopy(model)

lora_params = []
assign_lora = partial(LinearWithLoRA, rank=2, alpha=1, model_device="cpu", lora_device="cpu")
lora_device = "cpu"
'wq', 'w1', 'wk', 'w2', 'wo', 'w3', 'output', 'wv'
wq = True
wk = True
wv = True
w1 = True
w2 = True
w3 = True
wo = True

out = True
for layer in model_lora.layers:
    print(layer)
    # if l1 == True:

    #     layer.layer1 = assign_lora(layer.layer1)
    #     # print(list(layer.layer1.lora.parameters()))
    #     layer.layer1.lora.to(lora_device)
    #     layer.layer1.lora_device = lora_device

    #     lora_params.extend(list(layer.layer1.lora.parameters()))

    if wq:
        layer.attention.wq = assign_lora(layer.attention.wq)
        layer.attention.wq.lora.to(lora_device)
        layer.attention.wq.lora_device = lora_device
        lora_params.extend(list(layer.attention.wq.lora.parameters()))
    if wk:
        layer.attention.wk = assign_lora(layer.attention.wk)
        layer.attention.wk.lora.to(lora_device)
        layer.attention.wk.lora_device = lora_device
        lora_params.extend(list(layer.attention.wk.lora.parameters()))

    if wv:
        layer.attention.wv = assign_lora(layer.attention.wv)
        layer.attention.wv.lora.to(lora_device)
        layer.attention.wv.lora_device = lora_device
        lora_params.extend(list(layer.attention.wv.lora.parameters()))
    if wo:
        layer.attention.wo = assign_lora(layer.attention.wo)
        layer.attention.wo.lora.to(lora_device)
        layer.attention.wo.lora_device = lora_device
        lora_params.extend(list(layer.attention.wo.lora.parameters()))

    if w1:
        layer.feed_forward.w1 = assign_lora(layer.feed_forward.w1)
        layer.feed_forward.w1.lora.to(lora_device)
        layer.feed_forward.w1.lora_device = lora_device
        lora_params.extend(list(layer.feed_forward.w1.lora.parameters()))

    if w2:
        layer.feed_forward.w2 = assign_lora(layer.feed_forward.w2)
        layer.feed_forward.w2.lora.to(lora_device)
        layer.feed_forward.w2.lora_device = lora_device
        lora_params.extend(list(layer.feed_forward.w2.lora.parameters()))

    if w3:
        layer.feed_forward.w3 = assign_lora(layer.feed_forward.w3)
        layer.feed_forward.w3.lora.to(lora_device)
        layer.feed_forward.w3.lora_device = lora_device
        lora_params.extend(list(layer.feed_forward.w3.lora.parameters()))




if out == True:
    print(model_lora.output)

    model_lora.output = assign_lora(model_lora.output)
    model_lora.output.lora.to(lora_device)
    model_lora.output.lora_device = lora_device
    lora_params.extend(list(model_lora.output.lora.parameters()))

# model_lora.output = assign_lora(model_lora.output)
# print(model_lora)
total_params = sum(p.numel() for p in model_lora.parameters())
total_params


EncoderBlock(
  (attention): SelfAttention(
    (wq): Linear(in_features=512, out_features=512, bias=False)
    (wk): Linear(in_features=512, out_features=512, bias=False)
    (wv): Linear(in_features=512, out_features=512, bias=False)
    (wo): Linear(in_features=512, out_features=512, bias=False)
  )
  (feed_forward): FeedForward(
    (w1): Linear(in_features=512, out_features=1536, bias=False)
    (w2): Linear(in_features=1536, out_features=512, bias=False)
    (w3): Linear(in_features=512, out_features=1536, bias=False)
  )
  (attention_norm): RMSNorm()
  (ffn_norm): RMSNorm()
)
EncoderBlock(
  (attention): SelfAttention(
    (wq): Linear(in_features=512, out_features=512, bias=False)
    (wk): Linear(in_features=512, out_features=512, bias=False)
    (wv): Linear(in_features=512, out_features=512, bias=False)
    (wo): Linear(in_features=512, out_features=512, bias=False)
  )
  (feed_forward): FeedForward(
    (w1): Linear(in_features=512, out_features=1536, bias=False)
    (w2): 

13808302

In [None]:
print(model.parameters())

total_params = sum(p.numel() for p in model.parameters())
total_params

<generator object Module.parameters at 0x7eb1c03c4740>


13725184

In [None]:
# 모델의 파라미터 중 GPU에 있는 파라미터만 선택하여 옵티마이저에 전달
lora_opt = torch.optim.Adam(lora_params, lr=0.001)

In [None]:
epoch = 100
device = "cpu"
for i in range(epoch):
    loss=train_loop(model_lora, lora_opt, loss_fn, train_dataloader, device)
    if (i+1) % 3 == 0:
        print(f"epoch {i+1} || " +str(loss))

    if (i+1) % 100 == 0:
        train_loop(model_lora, lora_opt, loss_fn, train_dataloader, device, print_random=True)

epoch 3 || 2.602139472961426
epoch 6 || 1.2802064418792725
epoch 9 || 0.46481025218963623
epoch 12 || 0.16624602675437927
epoch 15 || 0.10203269869089127
epoch 18 || 0.06499260663986206
epoch 21 || 0.03792702034115791
epoch 24 || 0.023019518703222275
epoch 27 || 0.01480775885283947
epoch 30 || 0.01159036997705698


KeyboardInterrupt: 

In [None]:
prompt_ = ["what is happiness", "can you make decisions","are you human" ,"how are you","who is john","who is nice"]
model_lora.eval()
prompt_tok = tokenizer.Encode(prompt_)
end_tok = tokenizer.Encode(["<end>"])[-1,-1]
max_length = 10
total_output = prompt_tok.to(device)
# print(total_output)
for _ in range(max_length):
    out=model_lora(total_output,LH_mask = True, Pad_mask = True)

    out=torch.argmax(out, dim=-1)
    # print(out[:,-1][0])
    # if out[:,-1][0] == end_tok:
    #     break

    # print( out[:,-1].unsqueeze(-1))
    total_output = torch.cat((total_output, out[:,-1].unsqueeze(-1)), 1)

tokenizer.Decode(total_output)


###
# "what is happiness": "a state of well-being <end>",
# "can you make decisions": "I can make choices based on data <end>",
# "are you human": "I am an AI model <end>"

# "how are you": "i am fine <end>",
# "who is john": "a nice person <end>",
# "who is nice": "john <end>",

  index_batch = torch.tensor(x).cpu().numpy().tolist()


['what is happiness  a state of well-being <end>',
 'can you make decisions i can make choices based on data <end>',
 'are you human  i am an ai model <end>',
 'how are you  i am an ai model <end>',
 'who is john  a state of well-being <end>',
 'who is nice  john <end>']