## 1. 모델 아키텍쳐 설정

In [1]:
import torch
import torch.nn as nn
from functools import partial

class CrossAttention(nn.Module):
    def __init__(self, model_dims: int, num_heads: int):
        super().__init__()
        self.attention = nn.MultiheadAttention(
            embed_dim=model_dims, 
            num_heads=num_heads, 
            batch_first=True,
            dtype=torch.bfloat16 # float16
        )
        self.layer_norm = nn.LayerNorm(model_dims)

    def forward(self, text_features, image_features):
        attn_output, _ = self.attention(text_features, image_features, image_features)
        output = self.layer_norm(text_features + attn_output)
        return output

class MultimodalPhi2(nn.Module):
    def __init__(self, peft_llm, vision_encoder):
        super().__init__()
        
        self.vision_encoder = vision_encoder
        self.llm = peft_llm

        target_dtype = self.llm.dtype
        model_dims = self.llm.config.hidden_size
        vit_dims = self.vision_encoder.config.hidden_size
        num_heads = self.llm.config.num_attention_heads
        num_llm_layers = self.llm.config.num_hidden_layers

        self.target_layers = range(num_llm_layers - 4, num_llm_layers)

        self.vision_projection = nn.Linear(vit_dims, model_dims)
        self.vision_projection.to(device=self.llm.device, dtype=target_dtype) 
        
        self.cross_attentions = nn.ModuleDict({
        str(i): CrossAttention(model_dims, num_heads) for i in self.target_layers
        }) 

        self.image_features_cache = None # 이미지 특징을 임시 저장할 공간

        for layer_idx in self.target_layers:
            layer = self.llm.model.model.layers[layer_idx] # peft_llm의 경우, model을 한번 더 거쳐야 함
            layer.self_attn.register_forward_hook(
                partial(self.cross_attention_hook, layer_idx=layer_idx)
            )
        
        self.cross_attentions.to(device=self.llm.device, dtype=target_dtype)

    def cross_attention_hook(self, module, input, output, layer_idx):
        hidden_states = output[0]
        
        # ModuleDict의 키는 문자열이므로, 인덱싱할 때 str(layer_idx)를 사용
        cross_attn_output = self.cross_attentions[str(layer_idx)](
            hidden_states, self.image_features_cache
        )

        return (cross_attn_output,) + output[1:]

    def forward(self, input_ids: torch.Tensor, pixel_values: torch.Tensor, attention_mask: torch.Tensor, labels: torch.Tensor = None):
        # 이미지 특징을 계산하고, 훅 함수가 사용할 수 있도록 캐시에 저장합니다.
        image_outputs = self.vision_encoder(pixel_values)
        image_patch_features = image_outputs.last_hidden_state
        self.image_features_cache = self.vision_projection(
            image_patch_features.to(self.llm.dtype)
        )

        outputs = self.llm(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels 
        )
        
        self.image_features_cache = None
        
        return outputs

  cpu = _conversion_method_template(device=torch.device("cpu"))


## 2. 모델 불러오기 
    4번 학습한 3epoch폴더가 가장 성능 좋음

In [None]:
import torch
import os
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, CLIPVisionModel, CLIPImageProcessor
def load_my_trained_model(save_directory):

    print(f"Loading our trained model from {save_directory}...")
    device = "cuda" if torch.cuda.is_available() else "cpu"

    llm = AutoModelForCausalLM.from_pretrained("microsoft/phi-2", load_in_4bit=True, torch_dtype=torch.bfloat16).to(device)
    tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-2", trust_remote_code=True)
    vision_encoder = CLIPVisionModel.from_pretrained("openai/clip-vit-base-patch32", torch_dtype=torch.bfloat16).to(device)
    image_processor = CLIPImageProcessor.from_pretrained(
        "openai/clip-vit-base-patch32"
    )

    
    lora_save_path = os.path.join(save_directory, "llm_adapters3") # 저장 시 파일명과 일치
    peft_llm = PeftModel.from_pretrained(llm, lora_save_path)

    model = MultimodalPhi2(peft_llm, vision_encoder)


    vision_projection_path = os.path.join(save_directory, "vision_projection3.pt")
    model.vision_projection.load_state_dict(torch.load(vision_projection_path, map_location='cpu'))

    cross_attentions_path = os.path.join(save_directory, "cross_attentions3.pt")
    model.cross_attentions.load_state_dict(torch.load(cross_attentions_path, map_location='cpu'))
    
    model.to(device)

    model.eval()

    print("\nOur final trained model has been successfully loaded and is ready for inference!")
    return model, tokenizer, image_processor

#model, tokenizer, image_processor = load_my_trained_model("./save_model/3epoch")

  from .autonotebook import tqdm as notebook_tqdm


## trainDataProcessing.py

In [None]:
import json
from transformers import CLIPImageProcessor, AutoTokenizer
from datasets import Dataset
from tqdm.auto import tqdm
from PIL import Image

with open("llava_instruct_150k.json", "r", encoding="utf-8") as f:
    instruct_data = json.load(f)

image_processor = CLIPImageProcessor.from_pretrained("openai/clip-vit-base-patch32")
tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-2", trust_remote_code=True)

def instruction_generator(dataset):
    for item in tqdm(dataset, desc="Generating instruction pairs"):
        conv = item['conversations']
        
        # 2개씩 짝지어 (human, gpt) 턴을 처리
        for i in range(0, len(conv), 2):
            human_turn = conv[i]
            gpt_turn = conv[i+1]
            
            # 질문과 답변 텍스트를 분리
            question = human_turn['value']
            answer = gpt_turn['value']
            
            # 나중에 처리하기 쉽도록 분리된 정보를 yield
            yield {
                'image_path': item['image'],
                'question': question,
                'answer': answer
            }

expanded_instruction_dataset = Dataset.from_generator(instruction_generator, gen_kwargs={"dataset": instruct_data})

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

def preprocess_function(examples):

    image_paths = examples['image_path'] 
    questions = examples['question']
    answers = examples['answer']

    # 이미지 처리 
    images = [Image.open(f"coco/train2017/{path}").convert("RGB") for path in image_paths]
    processed_images = image_processor(images, return_tensors="pt")

    # 텍스트 처리 및 라벨 마스킹 
    question_tokenized = tokenizer(questions, padding="max_length", truncation=True, max_length=256)
    question_lengths = [len([tok for tok in ids if tok != tokenizer.pad_token_id]) for ids in question_tokenized['input_ids']]
    
    full_texts = [q + a for q, a in zip(questions, answers)]
    model_inputs = tokenizer(full_texts, padding="max_length", truncation=True, max_length=512, return_tensors="pt")

    # 위에서 계산한 질문 길이만큼 labels의 앞부분을 -100으로 마스킹
    labels = model_inputs['input_ids'].clone()
    for i, question_len in enumerate(question_lengths):
        labels[i, :question_len] = -100

    # 최종 결과물 조합
    return {
        "pixel_values": processed_images.pixel_values,
        "input_ids": model_inputs.input_ids,
        "attention_mask": model_inputs.attention_mask,
        "labels": labels
    }

instruction_dataset = expanded_instruction_dataset.map(
     function=preprocess_function,
     batched=True,
     remove_columns=expanded_instruction_dataset.column_names
)

save_path_instructions = "./instruction_dataset"
print(f"Saving the instruction dataset to '{save_path_instructions}'...")

instruction_dataset.save_to_disk(save_path_instructions)

print("Instruction dataset saved successfully!")

## 3. 데이터셋 불러오기

In [None]:
from datasets import load_from_disk

load_path = "./instruction_dataset"
print(f"Loading instruction dataset from '{load_path}'...")

instruction_dataset = load_from_disk(load_path)

print("Instruction dataset reloaded successfully!")
print(instruction_dataset)

## 4. 최적의 파라미터 찾기

In [None]:
import wandb
wandb.login()

In [None]:
import torch
from torch.utils.data import DataLoader
from torch.optim import AdamW 
from transformers import get_linear_schedule_with_warmup
from transformers import default_data_collator
from tqdm.auto import tqdm
import wandb 
import gc


columns_to_tensorize = ['pixel_values', 'input_ids', 'attention_mask', 'labels']
instruction_dataset.set_format(type='torch', columns=columns_to_tensorize)

def model_train_eval(model, dataset, config, device):


    wandb.init(
        project="instruction-tuning-test",
        name=f"lr={config['learning_rate']}",
        config=config,
    )

    test_final_dataset = dataset.select(range(10000)) #훈련 상황에 따라 바꾸면 됨
    test_eval_final_dataset = dataset.select(range(10000, 11000)) # 테스트 데이터셋

    # collate_fn = defalut_data_collator를 함으로써 데이터의 형태로 올바르게 맞춰 줌
    train_dataloader = DataLoader(test_final_dataset, batch_size=config["batch_size"], collate_fn=default_data_collator, shuffle=True)
    eval_dataloader = DataLoader(test_eval_final_dataset, batch_size=config["batch_size"], collate_fn=default_data_collator, shuffle=True)

    trainable_params = [p for p in model.parameters() if p.requires_grad] # require_grad가 허용된(미분가능) 부분에만 optimizer적용
    optimizer = AdamW(trainable_params, lr=config["learning_rate"])
 
    num_training_steps = config["num_epochs"] * len(train_dataloader)
    num_warmup_steps = int(num_training_steps * 0.05)
    
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps= num_warmup_steps, # 몇 step동안 천천히 증가(웜업) 할지 설정
        num_training_steps=num_training_steps, # 몇 step에 걸쳐 천천히 감소할지 설정
    )
    print("Optimizer and Scheduler have been set up.")

    # 학습 시작
    print(f"\n--- Starting Training for {config['num_epochs']} epoch(s) ---")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # 훈련 루프
    for epoch in range(config["num_epochs"]):
        model.train()
        progress_bar = tqdm(train_dataloader, desc="Training")
        print(train_dataloader)
        for step, batch in enumerate(progress_bar):
            outputs = model(batch['input_ids'].to(device),
                                        batch['pixel_values'].to(device),
                                        batch['attention_mask'].to(device),
                                        batch['labels'].to(device))
            loss = outputs.loss
            
            loss.backward()
            
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

            if step % 10 == 0:
                    wandb.log({"train/loss": loss.item()})
            
            progress_bar.set_postfix({"loss": loss.item()})

        # 검증 루프 
        model.eval()
        eval_loss_total = 0
        print(f"\n--- Validating Epoch {epoch + 1} ---")
        
        with torch.no_grad():
            for batch in tqdm(eval_dataloader, desc="Validation"):
                batch = {k: v.to(device) for k, v in batch.items()}
                outputs = model(**batch)
                loss = outputs.loss
                eval_loss_total += loss.item()
        
        avg_eval_loss = eval_loss_total / len(eval_dataloader)
        
        wandb.log({
            "eval/loss": avg_eval_loss,
            "epoch": epoch + 1
        })
        

    print("Training complete!")
    wandb.finish()

if __name__ == "__main__":
    # 데이터 텐서로 바꿔주기
    columns_to_tensorize = ['pixel_values', 'input_ids', 'attention_mask', 'labels']
    instruction_dataset.set_format(type='torch', columns=columns_to_tensorize)

    parameters = {
        "learning_rate": [1e-4, 5e-5, 3e-5, 1e-5, 5e-6, 1e-6, 5e-7]
    }
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # 파라미터별 데이터 성능 테스트
    for i in range(1):
            # 모델 로드
        model, tokenizer, image_processor = load_my_trained_model("./save_model/3epoch")
        
        config = {"num_epochs": 1,
                        "batch_size": 7,
                        "learning_rate": parameters['learning_rate'][i]}
        
        model_train_eval(model, instruction_dataset, config, device)

        del model, image_processor, tokenizer
        gc.collect()
        torch.cuda.empty_cache()
print("Model test complete!")

# train.py

In [None]:
import torch
import torch.nn as nn
from functools import partial
import os
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, CLIPVisionModel, CLIPImageProcessor
from datasets import load_from_disk

class CrossAttention(nn.Module):
    def __init__(self, model_dims: int, num_heads: int):
        super().__init__()
        self.attention = nn.MultiheadAttention(
            embed_dim=model_dims, 
            num_heads=num_heads, 
            batch_first=True,
            dtype=torch.bfloat16 # float16
        )
        self.layer_norm = nn.LayerNorm(model_dims)

    def forward(self, text_features, image_features):
        attn_output, _ = self.attention(text_features, image_features, image_features)
        output = self.layer_norm(text_features + attn_output)
        return output

class MultimodalPhi2(nn.Module):
    def __init__(self, peft_llm, vision_encoder):
        super().__init__()
        
        self.vision_encoder = vision_encoder
        self.llm = peft_llm

        target_dtype = self.llm.dtype
        model_dims = self.llm.config.hidden_size
        vit_dims = self.vision_encoder.config.hidden_size
        num_heads = self.llm.config.num_attention_heads
        num_llm_layers = self.llm.config.num_hidden_layers

        self.target_layers = range(num_llm_layers - 4, num_llm_layers)

        self.vision_projection = nn.Linear(vit_dims, model_dims)
        self.vision_projection.to(device=self.llm.device, dtype=target_dtype) 
        
        self.cross_attentions = nn.ModuleDict({
        str(i): CrossAttention(model_dims, num_heads) for i in self.target_layers
        }) 

        self.image_features_cache = None # 이미지 특징을 임시 저장할 공간

        for layer_idx in self.target_layers:
        # ModuleDict의 키는 문자열이므로, 인덱싱할 때 str(layer_idx)를 사용
            layer = self.llm.model.model.layers[layer_idx] 
            layer.self_attn.register_forward_hook(
                partial(self.cross_attention_hook, layer_idx=layer_idx)
            )
        
        self.cross_attentions.to(device=self.llm.device, dtype=target_dtype)

    def cross_attention_hook(self, module, input, output, layer_idx):

        hidden_states = output[0]
        
        # ModuleDict의 키는 문자열이므로, 인덱싱할 때 str(layer_idx)를 사용
        cross_attn_output = self.cross_attentions[str(layer_idx)](
            hidden_states, self.image_features_cache
        )

        return (cross_attn_output,) + output[1:]

    def forward(self, input_ids: torch.Tensor, pixel_values: torch.Tensor, attention_mask: torch.Tensor, labels: torch.Tensor = None):
        image_outputs = self.vision_encoder(pixel_values)
        image_patch_features = image_outputs.last_hidden_state
        self.image_features_cache = self.vision_projection(
            image_patch_features.to(self.llm.dtype)
        )

        outputs = self.llm(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels 
        )

        self.image_features_cache = None
        
        return outputs


def load_my_trained_model(save_directory):
    print(f"Loading our trained model from {save_directory}...")
    device = "cuda" if torch.cuda.is_available() else "cpu"

    llm = AutoModelForCausalLM.from_pretrained("microsoft/phi-2", load_in_4bit=True, torch_dtype=torch.bfloat16).to(device)
    tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-2", trust_remote_code=True)
    vision_encoder = CLIPVisionModel.from_pretrained("openai/clip-vit-base-patch32", torch_dtype=torch.bfloat16).to(device)
    image_processor = CLIPImageProcessor.from_pretrained(
        "openai/clip-vit-base-patch32"
    )

    lora_save_path = os.path.join(save_directory, "llm_adapters3")
    peft_llm = PeftModel.from_pretrained(llm, lora_save_path)

    model = MultimodalPhi2(peft_llm, vision_encoder)

    vision_projection_path = os.path.join(save_directory, "vision_projection3.pt")
    model.vision_projection.load_state_dict(torch.load(vision_projection_path, map_location='cpu'))

    cross_attentions_path = os.path.join(save_directory, "cross_attentions3.pt")
    model.cross_attentions.load_state_dict(torch.load(cross_attentions_path, map_location='cpu'))
    

    model.to(device)
    model.eval()

    print("\nOur final trained model has been successfully loaded and is ready for inference!")
    return model, tokenizer, image_processor

model, tokenizer, image_processor = load_my_trained_model("./save_model/3epoch")


load_path = "./instruction_dataset"
print(f"Loading instruction dataset from '{load_path}'...")

instruction_dataset = load_from_disk(load_path)

print("Instruction dataset reloaded successfully!")
print(instruction_dataset)

import torch
from torch.utils.data import DataLoader
from torch.optim import AdamW 
from transformers import get_linear_schedule_with_warmup
from transformers import default_data_collator
from tqdm.auto import tqdm
import wandb 
import os

def model_train_eval(model, dataset, config, device):

    save_directory = "./save_instruction_train"

    wandb.init(
        project="instruction-tuning",
        name=f"lr={config['learning_rate']} instruction-tuning",
        config=config,
    )

    split_dataset = dataset.train_test_split(test_size=10000, shuffle=True, seed=42)

    test_final_dataset = split_dataset['train']
    test_eval_final_dataset = split_dataset['test']

    train_dataloader = DataLoader(test_final_dataset, batch_size=config["batch_size"], collate_fn=default_data_collator, shuffle=True)
    eval_dataloader = DataLoader(test_eval_final_dataset, batch_size=config["batch_size"], collate_fn=default_data_collator, shuffle=True)

    trainable_params = [p for p in model.parameters() if p.requires_grad] # require_grad가 허용된(미분가능) 부분에만 optimizer적용
    optimizer = AdamW(trainable_params, lr=config["learning_rate"])
 
    num_training_steps = config["num_epochs"] * len(train_dataloader)
    num_warmup_steps = int(num_training_steps * 0.05)
    
    lr_scheduler = get_linear_schedule_with_warmup(
        optimizer=optimizer,
        num_warmup_steps= num_warmup_steps, # 몇 step동안 천천히 증가(웜업) 할지 설정
        num_training_steps=num_training_steps, # 몇 step에 걸쳐 천천히 감소할지 설정
    )
    print("Optimizer and Scheduler have been set up.")

    # 학습 시작
    print(f"\n--- Starting Training for {config['num_epochs']} epoch(s) ---")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # 훈련 루프
    for epoch in range(config["num_epochs"]):
        model.train()
        progress_bar = tqdm(train_dataloader, desc="Training")
        print(train_dataloader)
        for step, batch in enumerate(progress_bar):
            outputs = model(batch['input_ids'].to(device),
                                        batch['pixel_values'].to(device),
                                        batch['attention_mask'].to(device),
                                        batch['labels'].to(device))
            
            loss = outputs.loss
            
            loss.backward()
            
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

            if step % 10 == 0:
                    wandb.log({"train/loss": loss.item()})
            
            progress_bar.set_postfix({"loss": loss.item()})

        # 검증 루프 
        model.eval()
        eval_loss_total = 0
        print(f"\n--- Validating Epoch {epoch + 1} ---")
        
        with torch.no_grad():
            for batch in tqdm(eval_dataloader, desc="Validation"):
                batch = {k: v.to(device) for k, v in batch.items()}
                outputs = model(**batch)
                loss = outputs.loss
                eval_loss_total += loss.item()
        
        avg_eval_loss = eval_loss_total / len(eval_dataloader)
        
        wandb.log({
            "eval/loss": avg_eval_loss,
            "epoch": epoch + 1
        })

        os.makedirs(save_directory+f"/{epoch}epoch", exist_ok=True)
    
        lora_save_path = os.path.join(save_directory+f"/{epoch}epoch", f"llm_adapters{epoch}")
        model.llm.save_pretrained(lora_save_path)
        print(f"LoRA adapters saved to {lora_save_path}")

        vision_projection_path = os.path.join(save_directory+f"/{epoch}epoch", f"vision_projection{epoch}.pt")
        torch.save(model.vision_projection.state_dict(), vision_projection_path)
        print(f"Vision projection saved to {vision_projection_path}")

        cross_attentions_path = os.path.join(save_directory+f"/{epoch}epoch", f"cross_attentions{epoch}.pt")
        torch.save(model.cross_attentions.state_dict(), cross_attentions_path)
        print(f"Cross attentions saved to {cross_attentions_path}")
        

    print("Training complete!")
    wandb.finish()

if __name__ == "__main__":
    # 데이터 텐서로 바꿔주기
    columns_to_tensorize = ['pixel_values', 'input_ids', 'attention_mask', 'labels']
    instruction_dataset.set_format(type='torch', columns=columns_to_tensorize)

    device = "cuda" if torch.cuda.is_available() else "cpu"

    config = {"num_epochs": 5,
                    "batch_size": 7,
                    "learning_rate":1e-4} 
    
    model_train_eval(model, instruction_dataset, config, device)

print("Model test complete!")

## 5. 추론

In [None]:
import pandas as pd

test_dataset = pd.read_csv("./open/test.csv")

In [None]:
## 추론
import matplotlib.pyplot as plt
from PIL import Image
from torch.functional import F

def inference(model, tokenizer, image_processor, prompt, image_path):
    model.eval()
    device = "cuda" if torch.cuda.is_available() else "cpu"

    if tokenizer.pad_token is None:
                tokenizer.pad_token = tokenizer.eos_token
                print("Tokenizer pad_token is set to eos_token.")

    prompt_tokens = tokenizer(prompt, return_tensors="pt")
    input_ids = prompt_tokens.input_ids.to(device)
    attention_mask = prompt_tokens.attention_mask.to(device)
    PIL_image = Image.open(image_path).convert("RGB")
    image = image_processor(images=PIL_image, return_tensors="pt").pixel_values.to(device) 

    with torch.no_grad():
        image_outputs = model.vision_encoder(image)
        image_patch_features = image_outputs.last_hidden_state
        model.image_features_cache = model.vision_projection(image_patch_features)
    
    generated_ids = model.llm.generate(
        input_ids=input_ids, 
        attention_mask=attention_mask, 
        max_new_tokens=128,  # 새로 생성할 최대 토큰 수
        do_sample=False,     # 샘플링을 활성화
        temperature=1,
        top_p=0.9,
        eos_token_id=tokenizer.eos_token_id,
        pad_token_id=tokenizer.pad_token_id
    )

    model.image_features_cache = None

    input_token_len = input_ids.shape[1]
    generated_text_ids = generated_ids[:, input_token_len:]
    
    generated_text = tokenizer.batch_decode(generated_text_ids, skip_special_tokens=True)[0]
    return generated_text.strip()

# 기존 질문 내용

index = 1

image_path = "./open/"+test_dataset['img_path'][index]

question = test_dataset['Question'][index]+" A."+ test_dataset['A'][index] + " B." + test_dataset['B'][index] + " C." + test_dataset['C'][index] + " D." + test_dataset['D'][index]
prompt = f"""ROLE: You're an ASSISTANT, I give you an image and a question, and you answer it for me.\n 
            RULE: Please select only one correct answer from A,B,C,D\n
            USER: {question}\n
            ASSISTANT:"""

result = inference(model, tokenizer, image_processor, prompt, image_path)

plt.imshow(plt.imread(image_path))
plt.axis('off')
plt.show()

print(f"Question: {question}")
print(f"Generated text: {result}")