In [1]:
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import random
import numpy as npa
import torch
import transformers
import pandas as pd
import sys
import os
from tqdm import tqdm

# Add the parent directory to sys.path
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

from setting import set_seed, now


os.environ["CUDA_VISIBLE_DEVICES"] = "3"

now()

seed = 42
set_seed(seed)


현재 작업 디렉토리: /data1/home/gyubin/Knowledge_Editing_Dataset/analysis/Attention


In [2]:
model_name = "meta-llama/Meta-Llama-3-8B-Instruct"
# model_name = "dellaanima/KE_Meta-Llama-3-8B-Instruct_MEMIT_CF5000"

In [3]:
from transformers import AutoTokenizer, AutoModelForCausalLM
# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct", cache_dir = "../../../.cache")
tokenizer.pad_token_id = tokenizer.eos_token_id
tokenizer.padding_side='left'

#model = AutoModelForCausalLM.from_pretrained(model_name, cache_dir = "../.cache", torch_dtype=torch.bfloat16).to('cuda')
model = AutoModelForCausalLM.from_pretrained(model_name, cache_dir = "../../../.cache").to('cuda')

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [4]:
import json

data = 'df_exp1_e_5000_with_generated_sentences.json'
#data = "random_e_5000_with_generated_sentences.json"
with open(data, 'r') as file:
    data = json.load(file) 

# No block

In [5]:
import torch
import torch.nn.functional as F
from tqdm import tqdm
def compute_sequence_probability(model, tokenizer, prefix, target):
    """
    prefix + target을 한 번에 모델에 넣어서
    target 전체 시퀀스("Tim Cook")에 대한 확률을 계산.
    """
    # 1) prefix와 target을 붙여서 토크나이징
    #    (모델이 prefix 다음에 target을 생성한다고 가정)
    combined_text = prefix + target
    combined_inputs = tokenizer(combined_text, return_tensors='pt')
    
    # GPU 사용한다면 .to('cuda') 추가
    combined_inputs = {k: v.to(model.device) for k, v in combined_inputs.items()}

    # 2) 모델 Forward
    with torch.no_grad():
        outputs = model(**combined_inputs)
    # outputs.logits.shape: [batch_size=1, seq_len, vocab_size]
    
    # 3) prefix 구간과 target 구간의 토큰 인덱스 파악
    prefix_inputs = tokenizer(prefix, return_tensors='pt')
    prefix_length = prefix_inputs['input_ids'].shape[1]  # prefix의 토큰 개수
    
    # 전체 인풋 토큰
    combined_ids = combined_inputs['input_ids'][0]  # (seq_len,) 형태
    # target에 해당하는 토큰은 prefix_length 이후부터 끝까지
    target_ids = combined_ids[prefix_length:]
    
    # 4) target 시퀀스 각 토큰 확률 구하기
    #    logits[i-1]이 i번째 토큰의 확률 분포를 의미함.
    #    따라서 i = prefix_length 부터는 prefix_length-1이 target의 첫번째 토큰을 예측하는 logit이 됨.
    log_probs = []
    for i in range(prefix_length, len(combined_ids)):
        # i번째 토큰을 예측한 logits은 outputs.logits[0, i-1]
        # (i가 prefix_length일 때가 target의 첫 번째 토큰이 됨)
        if i == 0:
            # 시퀀스의 첫 번째 토큰(i=0)은 이전 토큰이 없으므로 스킵
            continue
        
        token_id = combined_ids[i]
        # logits에서 해당 위치 인덱스에 대한 softmax -> 그 중 token_id의 확률
        logits_i = outputs.logits[0, i-1]
        prob_i = F.softmax(logits_i, dim=-1)[token_id]
        
        # 로그 확률로 바꿔서 append
        log_probs.append(torch.log(prob_i))
    
    # 5) 모든 토큰의 로그 확률을 합산 -> exp해서 최종 확률
    total_log_prob = torch.sum(torch.stack(log_probs))
    total_prob = torch.exp(total_log_prob).item()
    
    return total_prob




In [19]:
prefix_text = "The powerful capabilities of the Apple A5 enabled the sleek design of the"
target_text = ' ' + "iPad mini"
prob = compute_sequence_probability(model, tokenizer, prefix_text, target_text)
print(prob)

0.00798383355140686


In [None]:
'''
새로 만든 데이터셋, no hop sentence
'''
count = 0
for i in tqdm(range(len(data)-1)):
    prefix_text = data[i]['prompt'].format(data[i]['subject'])
    target_text = ' ' + data[i+1]['edited_knowledge']
    prob = compute_sequence_probability(model, tokenizer, prefix_text, target_text)
    count += prob
print(count / len(data))

In [6]:
'''
새로 만든 데이터셋, subject hop sentence + fact knowledge
'''
def hop_sentence_prob(model, tokenizer, data, check_col, opt = 0):
    count = 0
    for i in tqdm(range(len(data))):
        prefix_text = data[i]['generated_sentences'][check_col]['sentence_with_hop_word'][0] + data[i]['prompt'].format(data[i]['subject'])
        if opt == 0:
            target_text = ' '+data[i]['fact_knowledge']
            t = 'fact_knowledge'
        else:
            target_text = ' ' + data[i]['edited_knowledge']
            t = 'edited_knowledge'
        prob = compute_sequence_probability(model, tokenizer, prefix_text, target_text)
        count += prob
    print(f'{check_col} with {t} : {round(count / len(data), 6)}')
    return (count / len(data))

In [None]:
hop_cols = ['sbj_hop_test', 'obj_true_hop_test', 'obj_new_hop_test']
for check_col in hop_cols:
    for opt in range(0, 2):
        print(hop_sentence_prob(model, tokenizer, data, check_col, opt = opt))

In [None]:
'''
전체 데이터셋
'''
import json

data = "../../counterfact_memit.jsonl"
with open(data, 'r', encoding='utf-8') as file:
    data = [json.loads(line) for line in file]


count = 0
for i in tqdm(range(len(data)-1)):
    #i = 4
    prefix_text = data[i]['requested_rewrite']['prompt'].format(data[i]['requested_rewrite']['subject'])
    target_text = ' '+data[i+1]['requested_rewrite']['target_true']['str']  # "Tim Cook"과 같이 여러 토큰일 수 있음
    #print(prefix_text)
    #print(target_text)
    #print("-"*20)
    prob = compute_sequence_probability(model, tokenizer, prefix_text, target_text)
    #print(f"Probability of '{target_text}' given the prefix is: {prob}")
    count += prob
print(count / len(data))

# Block

In [6]:
import torch
import torch.nn.functional as F
from tqdm import tqdm
def compute_sequence_blocking_probability(model, tokenizer, prefix, target, block_text, block_opt = 0):
    """
    prefix + target을 한 번에 모델에 넣어서
    target 전체 시퀀스("Tim Cook")에 대한 확률을 계산.
    """
    # 1) prefix와 target을 붙여서 토크나이징
    #    (모델이 prefix 다음에 target을 생성한다고 가정)
    combined_text = prefix + target
    combined_inputs = tokenizer(combined_text, return_tensors='pt')
    
    # GPU 사용한다면 .to('cuda') 추가
    combined_inputs = {k: v.to(model.device) for k, v in combined_inputs.items()}

    if block_opt == 1:
        block_tokenized_b = tokenizer(' '+block_text, return_tensors='pt')
        block_tokenized_nb = tokenizer(block_text, return_tensors='pt')
        
        length_b = len(block_tokenized_b['input_ids'][0])-1 # eliminate <begin of text>
        start_idx_b = block_tokenized_b['input_ids'][0][1]
        
        
        length_nb = len(block_tokenized_nb['input_ids'][0])-1 # eliminate <begin of text>
        start_idx_nb = block_tokenized_nb['input_ids'][0][1]
        
        '''
        print(block_text, block_tokenized_b['input_ids'][0])
        print(block_text, block_tokenized_nb['input_ids'][0])
        print(start_idx_b, start_idx_nb)        
        print(combined_inputs)
        #prefix_tokenized = tokenizer(prefix_text, return_tensors = 'pt')
        print('-'*30)
        '''
        attention_idx = 0
        attention_mask = combined_inputs['attention_mask']
        for i in range(0, len(combined_inputs['input_ids'][0])):
            if combined_inputs['input_ids'][0][i] == start_idx_b:
                attention_mask[:, attention_idx:attention_idx+length_b] = 0
                break
            elif combined_inputs['input_ids'][0][i] == start_idx_nb:
                attention_mask[:, attention_idx:attention_idx+length_nb] = 0
        combined_inputs['attention_mask'] = attention_mask

            

    
    # 2) 모델 Forward
    with torch.no_grad():
        outputs = model(**combined_inputs)
    # outputs.logits.shape: [batch_size=1, seq_len, vocab_size]
    
    # 3) prefix 구간과 target 구간의 토큰 인덱스 파악
    prefix_inputs = tokenizer(prefix, return_tensors='pt')
    prefix_length = prefix_inputs['input_ids'].shape[1]  # prefix의 토큰 개수
    
    # 전체 인풋 토큰
    combined_ids = combined_inputs['input_ids'][0]  # (seq_len,) 형태
    # target에 해당하는 토큰은 prefix_length 이후부터 끝까지
    target_ids = combined_ids[prefix_length:]
    
    # 4) target 시퀀스 각 토큰 확률 구하기
    #    logits[i-1]이 i번째 토큰의 확률 분포를 의미함.
    #    따라서 i = prefix_length 부터는 prefix_length-1이 target의 첫번째 토큰을 예측하는 logit이 됨.
    log_probs = []
    for i in range(prefix_length, len(combined_ids)):
        # i번째 토큰을 예측한 logits은 outputs.logits[0, i-1]
        # (i가 prefix_length일 때가 target의 첫 번째 토큰이 됨)
        if i == 0:
            # 시퀀스의 첫 번째 토큰(i=0)은 이전 토큰이 없으므로 스킵
            continue
        
        token_id = combined_ids[i]
        # logits에서 해당 위치 인덱스에 대한 softmax -> 그 중 token_id의 확률
        logits_i = outputs.logits[0, i-1]
        prob_i = F.softmax(logits_i, dim=-1)[token_id]
        
        # 로그 확률로 바꿔서 append
        log_probs.append(torch.log(prob_i))
    
    # 5) 모든 토큰의 로그 확률을 합산 -> exp해서 최종 확률
    total_log_prob = torch.sum(torch.stack(log_probs))
    total_prob = torch.exp(total_log_prob).item()
    
    return total_prob

In [7]:
'''
새로 만든 데이터셋, subject hop sentence + fact knowledge
'''
def hop_sentence_prob(model, tokenizer, data, check_col, opt = 0):
    count = 0
    for i in tqdm(range(len(data))):
        prefix_text = data[i]['generated_sentences'][check_col]['sentence_with_hop_word'][0] + data[i]['prompt'].format(data[i]['subject'])
        block_text = data[i][check_col][0]
        if opt == 0:
            target_text = ' '+data[i]['fact_knowledge']
            t = 'fact_knowledge'
        else:
            target_text = ' ' + data[i]['edited_knowledge']
            t = 'edited_knowledge'
        prob = compute_sequence_blocking_probability(model, tokenizer, prefix_text, target_text, block_text, block_opt =1)
        count += prob
    print(f'{check_col} with {t} : {round(count / len(data), 6)}')
    return (count / len(data))

In [None]:
hop_cols = ['sbj_hop_test', 'obj_true_hop_test', 'obj_new_hop_test']
for check_col in hop_cols:
    for opt in range(0, 2):
        print(hop_sentence_prob(model, tokenizer, data, check_col, opt = opt))

# 토큰기반

In [9]:
def compute_sequence_blocking_probability(model, tokenizer, prefix, target, block_text, block_opt = 0):
    """
    prefix + target을 한 번에 모델에 넣어서
    target 전체 시퀀스("Tim Cook")에 대한 확률을 계산.
    """
    # 1) prefix와 target을 붙여서 토크나이징
    #    (모델이 prefix 다음에 target을 생성한다고 가정)
    # 1) prefix와 target을 붙여서 토크나이징
    combined_text = prefix + target
    #combined_inputs = tokenizer(combined_text, return_tensors='pt', padding=True, truncation=True)
    # 수정된 코드
    combined_inputs = tokenizer(combined_text, return_tensors='pt', padding=True, truncation=True, max_length=model.config.max_position_embeddings)

    # attention_mask와 pad_token_id 추가
    attention_mask = combined_inputs.get('attention_mask', None)
    combined_inputs['pad_token_id'] = tokenizer.pad_token_id  # pad_token_id 설정

    # GPU 사용한다면 .to('cuda') 추가
    combined_inputs = {k: v.to(model.device) if isinstance(v, torch.Tensor) else v for k, v in combined_inputs.items()}


    if block_opt == 1:
        block_tokenized_b = tokenizer(' '+block_text, return_tensors='pt')
        block_tokenized_nb = tokenizer(block_text, return_tensors='pt')
        
        length_b = len(block_tokenized_b['input_ids'][0])-1 # eliminate <begin of text>
        start_idx_b = block_tokenized_b['input_ids'][0][1]
        
        
        length_nb = len(block_tokenized_nb['input_ids'][0])-1 # eliminate <begin of text>
        start_idx_nb = block_tokenized_nb['input_ids'][0][1]

        attention_idx = 0
        attention_mask = combined_inputs['attention_mask']
        for i in range(0, len(combined_inputs['input_ids'][0])):
            if combined_inputs['input_ids'][0][i] == start_idx_b:
                attention_mask[:, attention_idx:attention_idx+length_b] = 0
                break
            elif combined_inputs['input_ids'][0][i] == start_idx_nb:
                attention_mask[:, attention_idx:attention_idx+length_nb] = 0
        combined_inputs['attention_mask'] = attention_mask

            

    
    # 2) 모델 Forward
    with torch.no_grad():
        generated_output = model.generate(input_ids=combined_inputs['input_ids'], 
                                          attention_mask=combined_inputs['attention_mask'],
                                          max_new_tokens=50,  # 생성할 새로운 토큰 수
                                          pad_token_id=tokenizer.pad_token_id)

    # 생성된 토큰만 추출
    input_length = combined_inputs['input_ids'].shape[1]
    generated_tokens = generated_output[0][input_length:]
    
    # 생성된 토큰을 디코딩
    decoded_output = tokenizer.decode(generated_tokens, skip_special_tokens=True)
    if target.lower() in decoded_output.lower():
        return 1
    else:
        return 0

In [6]:
def hop_sentence_prob(model, tokenizer, data, check_col, block_opt, opt = 0):
    count = 0
    for i in tqdm(range(len(data))):
        prefix_text = data[i]['generated_sentences'][check_col]['sentence_with_hop_word'][0] + data[i]['prompt'].format(data[i]['subject'])
        block_text = data[i][check_col][0]
        if opt == 0:
            target_text = data[i]['fact_knowledge']
            t = 'fact_knowledge'
        else:
            target_text = data[i]['edited_knowledge']
            t = 'edited_knowledge'
        prob = compute_sequence_blocking_probability(model, tokenizer, prefix_text, target_text, block_text, block_opt =block_opt)
        count += prob
    print(f'{check_col} with {t} : {round(count / len(data), 6)}')
    return (count / len(data))

In [10]:
from tqdm import tqdm
hop_cols = ['sbj_hop_test', 'obj_true_hop_test', 'obj_new_hop_test']
for check_col in hop_cols:
    for opt in range(0, 2):
        print(hop_sentence_prob(model, tokenizer, data, check_col, opt = opt, block_opt = 1))

  0%|          | 3/5000 [00:03<1:45:46,  1.27s/it]

0


  0%|          | 3/5000 [00:04<2:04:19,  1.49s/it]


KeyboardInterrupt: 

: 