In [1]:
from peft import PeftModel
import torch
from transformers import BitsAndBytesConfig, AutoModelForCausalLM, AutoProcessor, AutoConfig
import warnings
import os
import json

setattr(torch.nn.Linear, "reset_parameters", lambda self: None)
setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None)

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
model_path = 'output/lora_vision_all_ft_only'
model_base = 'microsoft/Phi-3.5-vision-instruct'

model_paths = model_path.split("/")
model_name = model_paths[-1]

load_8bit=False
load_4bit=False
device_map="auto"
device="cuda"

In [3]:
from peft import PeftModel
import torch
from transformers import BitsAndBytesConfig, AutoModelForCausalLM, AutoProcessor, AutoConfig
import warnings
import os
import json

def disable_torch_init():
    """
    Disable the redundant torch default initialization to accelerate model creation.
    """
    setattr(torch.nn.Linear, "reset_parameters", lambda self: None)
    setattr(torch.nn.LayerNorm, "reset_parameters", lambda self: None)

# This code is borrowed from LLaVA
def load_pretrained_model(model_path, model_base, model_name, load_8bit=False, load_4bit=False, 
                          device_map="auto", device="cuda", use_flash_attn=False, **kwargs):
    kwargs = {"device_map": device_map}
    
    if device != "cuda":
        kwargs['device_map'] = {"":device}
    
    if load_8bit:
        kwargs['load_in_8bit'] = True
    elif load_4bit:
        kwargs['quantization_config'] = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type='nf4'
        )
    else:
        kwargs['torch_dtype'] = torch.float16

    #if use_flash_attn:
    #    kwargs['_attn_implementation'] = 'flash_attention_2'

    if 'lora' in model_name.lower() and model_base is None:
        warnings.warn('There is `lora` in model name but no `model_base` is provided. If you are loading a LoRA model, please provide the `model_base` argument.')
    if 'lora' in model_name.lower() and model_base is not None:
        lora_cfg_pretrained = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
        if hasattr(lora_cfg_pretrained, 'quantization_config'):
            del lora_cfg_pretrained.quantization_config
        processor = AutoProcessor.from_pretrained(model_base, trust_remote_code=True)
        print('Loading Phi3-Vision from base model...')
        model = AutoModelForCausalLM.from_pretrained(model_base, low_cpu_mem_usage=True, config=lora_cfg_pretrained, trust_remote_code=True, **kwargs)
        token_num, tokem_dim = model.lm_head.out_features, model.lm_head.in_features
        if model.lm_head.weight.shape[0] != token_num:
            model.lm_head.weight = torch.nn.Parameter(torch.empty(token_num, tokem_dim, device=model.device, dtype=model.dtype))
            model.model.embed_tokens.weight = torch.nn.Parameter(torch.empty(token_num, tokem_dim, device=model.device, dtype=model.dtype))

        print('Loading additional Phi3-Vision weights...')
        non_lora_trainables = torch.load(os.path.join(model_path, 'non_lora_state_dict.bin'), map_location='cpu')
        non_lora_trainables = {(k[11:] if k.startswith('base_model.') else k): v for k, v in non_lora_trainables.items()}
        if any(k.startswith('model.model.') for k in non_lora_trainables):
            non_lora_trainables = {(k[6:] if k.startswith('model.') else k): v for k, v in non_lora_trainables.items()}
        model.load_state_dict(non_lora_trainables, strict=False)
    
        print('Loading LoRA weights...')
        model = PeftModel.from_pretrained(model, model_path)

        print('Merging LoRA weights...')
        
        model = model.merge_and_unload()
        
        print('Model Loaded!!!')
    
    else:
        processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True)
        model = AutoModelForCausalLM.from_pretrained(model_path, low_cpu_mem_usage=True, trust_remote_code=True, **kwargs)

    return processor, model


In [4]:
def main(args):

    # Model
    disable_torch_init()

processor, model = load_pretrained_model(model_path = model_path, model_base=model_base, 
                                            model_name=model_name, device_map=device, 
                                            load_4bit=load_4bit, load_8bit=load_8bit,
                                            device=device, use_flash_attn=True
)

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


Loading Phi3-Vision from base model...


Loading checkpoint shards: 100%|██████████| 2/2 [00:03<00:00,  1.77s/it]


Loading additional Phi3-Vision weights...
Loading LoRA weights...
Merging LoRA weights...
Model Loaded!!!


In [5]:
import json

# 첫 번째 JSON 파일 읽기
with open('RAF-DB/all_ft_valid.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

basic_data =[]
cmpd_data = []

for d in data:
    
    if len(d['id']) == 4:
        basic_data.append(d)
    else:
        cmpd_data.append(d)

In [6]:
# BASIC INFERENCE

from transformers import TextStreamer
from PIL import Image
from tqdm import tqdm 

def do_basic(imagepath):
    
    image = Image.open(imagepath).convert("RGB")

    inp = f"<|image_1|>\nSelect an emotion label from the following options: 'Surprise', 'Fear', 'Disgust', 'Happiness', 'Sadness', 'Anger', or 'Neutral'."
    messages =[ {"role": "user", "content": inp} ]

    prompt = processor.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = processor(prompt, [image], return_tensors="pt").to(device)

    with torch.inference_mode():
        generate_ids = model.generate(
            **inputs, 
            max_new_tokens= 1000,
            temperature= 0,
            repetition_penalty= 1.0,
            use_cache=True,
            eos_token_id=processor.tokenizer.eos_token_id
        )
    generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:]
    outputs = processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
    
    return outputs
    
    
# epoch 5 : 65.4
import pandas as pd
from IPython.display import clear_output
cnt = 0
hit = 0
res_dic = {}
res_dic2 = {}

res_lb = []

for r in tqdm(basic_data):

    
    tr_lb = r['conversations'][1]['value']
    print('[', tr_lb, ']')

    lb = do_basic( 'RAF-DB/all/valid/'+ r['image']).strip()
    print(lb)
    res_lb.append(lb)
    
    clear_output(wait=True)
    if lb.lower() ==  tr_lb.lower():
        hit +=1 
        res_dic[tr_lb] = res_dic.get(tr_lb, 0) + 1
    
    cnt +=1
    res_dic2[tr_lb] = res_dic2.get(tr_lb, 0) + 1
    
        
    for k in res_dic:
        print(k)
        print(res_dic[k]/res_dic2[k])
    print(hit/cnt)
    print()
    

100%|██████████| 3068/3068 [17:49<00:00,  2.87it/s]

Sadness
0.8870292887029289
Surprise
0.8936170212765957
Happiness
0.9620253164556962
Disgust
0.76875
Anger
0.8148148148148148
Fear
0.7432432432432432
Neutral
0.8808823529411764
0.901890482398957






In [7]:
import pandas as pd
from tqdm import tqdm
from PIL import Image
df = pd.read_csv('RAF-DB/poster_cmpd_valid.csv')

edic_cmpd = {'Happily Surprised':['Happiness', 'Surprise'],
             'Happily Disgusted':['Happiness', 'Disgust'],
             'Sadly Fearful': ['Sadness', 'Fear'],
             'Sadly Angry': ['Sadness', 'Anger'],
             'Sadly Surprised': ['Sadness', 'Surprise'],
             'Sadly Disgusted': ['Sadness', 'Disgust'],
             'Fearfully Angry': ['Fear','Anger'],
             'Fearfully Surprised': ['Fear','Surprise'],
             'Angrily Surprised': ['Anger','Surprise'],
             'Angrily Disgusted': ['Anger','Disgust'],
             'Disgustedly Surprised': ['Disgust', 'Surprise']}


edic_basic = ['Surprise', 'Fear', 'Disgust', 'Happiness', 'Sadness', 'Anger', 'Neutral']

df

Unnamed: 0,fpath,cmpd_label,basic_pred_top1,basic_pred_top2,basic_pred_top3,basic_pred_top4
0,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Happily Surprised,Happiness,Surprise,Sadness,Fear
1,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Sadly Disgusted,Sadness,Disgust,Fear,Neutral
2,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Sadly Disgusted,Happiness,Disgust,Sadness,Anger
3,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Happily Disgusted,Happiness,Sadness,Fear,Disgust
4,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Sadly Disgusted,Neutral,Sadness,Disgust,Fear
...,...,...,...,...,...,...
787,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Sadly Fearful,Sadness,Neutral,Fear,Disgust
788,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Angrily Surprised,Anger,Disgust,Surprise,Fear
789,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Angrily Disgusted,Disgust,Anger,Sadness,Fear
790,/mnt/storage1/Research/vlm_research/Phi3-Visio...,Angrily Surprised,Anger,Fear,Sadness,Happiness


In [8]:
def do_cmpd(imagepath):
    
    image = Image.open(imagepath).convert("RGB")

    inp = f"<|image_1|>\nSelect an emotion label from the following options: 'Surprise', 'Fear', 'Disgust', 'Happiness', 'Sadness', 'Anger' or 'Neutral'."
    messages =[ {"role": "user", "content": inp} ]

    prompt = processor.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = processor(prompt, [image], return_tensors="pt").to(device)

    with torch.inference_mode():
        generate_ids = model.generate(
            **inputs, 
            max_new_tokens= 1000,
            temperature= 0,
            repetition_penalty= 1.0,
            use_cache=True,
            eos_token_id=processor.tokenizer.eos_token_id
        )
    generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:]
    lb1 = processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0].strip()
    

    lb_cand = [e for e in edic_basic if e != lb1]
    lb_cand = ', '.join( lb_cand[:-1] ) + ', or ' + lb_cand[-1]
    
    inp = f"<|image_1|>\nThe primary emotion conveyed by this image was {lb1}. Please select the next most strongly felt emotion from the following options: {lb_cand}."
    messages =[ {"role": "user", "content": inp} ]
    
    print(inp)

    prompt = processor.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = processor(prompt, [image], return_tensors="pt").to(device)

    with torch.inference_mode():
        generate_ids = model.generate(
            **inputs, 
            max_new_tokens= 1000,
            temperature= 0,
            repetition_penalty= 1.0,
            use_cache=True,
            eos_token_id=processor.tokenizer.eos_token_id
        )
    generate_ids = generate_ids[:, inputs['input_ids'].shape[1]:]
    lb2 = processor.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0].strip()
    
    return lb1, lb2


In [9]:
# COMPOUND INFERENCE
import pandas as pd
from IPython.display import clear_output
cnt = 0
hit = 0
res_dic = {}
res_dic2 = {}

for i, r in df.iterrows():

    tr_lb = r['cmpd_label']
    print('[', tr_lb, ']')

    lb1, lb2 = do_cmpd( r['fpath'] )
    
    
    clear_output(wait=True)
    
    if lb1 != lb2 and lb1 in  edic_cmpd[tr_lb] and lb2 in  edic_cmpd[tr_lb]:
        hit +=1 
        res_dic[tr_lb] = res_dic.get(tr_lb, 0) + 1
    
    cnt +=1
    res_dic2[tr_lb] = res_dic2.get(tr_lb, 0) + 1
    
        
    for k in res_dic:
        print(k)
        print(res_dic[k]/res_dic2[k])
    print(hit/cnt)
    print()
    
    

Happily Surprised
0.8740740740740741
Sadly Disgusted
0.7588652482269503
Happily Disgusted
0.6170212765957447
Angrily Disgusted
0.7413793103448276
Angrily Surprised
0.6578947368421053
Fearfully Surprised
0.7672413793103449
Disgustedly Surprised
0.4
Sadly Fearful
0.3181818181818182
Fearfully Angry
0.7272727272727273
Sadly Surprised
0.5
Sadly Angry
0.3333333333333333
0.7095959595959596

