In [None]:
import os
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from datasets import load_dataset
import evaluate
import EIDA
import matplotlib.pyplot as plt


# 허깅페이스 패키지로 RoBERTa-base 모델, 토크나이저 로드
model_name = "roberta-base"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2).to(device) # layer 12개, d_model 768차원
# GLUE의 SST-2는 영화 리뷰에서 가져온 문장으로 감정 분석을 학습하는 데이터셋. label의 1은 긍정, 0은 부정
# 예시: {'sentence': "hide new secretions from the parental units", 'label': 0}
# label의 종류에 맞춰 num_labels=2 옵션으로 RoBERTa 모델을 생성. RoBERTa의 masked language modeling 사전학습 과정에서 쓰이던 모델 마지막의 pooler가 없어지고, 랜덤으로 초기화된 파라미터인 classifier가 생김. classifier는 두 계층(768x768, 768x2)으로 구성되어 있음.


# 허깅페이스 패키지로로 GLUE 벤치마크의 SST-2 감정분석 데이터셋 로드
max_length = 72 # 모든 데이터를 잘라내지 않고 다룰 수 있는 충분한 길이
batch_size = 16
dataset = load_dataset("glue", "sst2")
def preprocess_function(examples):
    return tokenizer(
        examples["sentence"],
        truncation=True,
        padding="max_length",
        max_length=max_length
    )
tokenized_dataset = dataset.map(preprocess_function, batched=True)

train_dataset = tokenized_dataset['train']
dev_dataset = tokenized_dataset['validation']


metric = evaluate.load("glue", "sst2") # GLUE 벤치마크에서 SST-2 셋에 대한 점수는 accuracy.
def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = predictions.argmax(axis=1)
    return metric.compute(predictions=predictions, references=labels)

In [2]:
list_plane_dim = [4, 8, 16, 32, 64] # token representation들로 추정할 부분공간의 차원
dict_color = {4: 'violet', 8: 'blue', 16: 'green', 32: 'orange', 64: 'red'}

list_sample_size = [128, 256, 512] # 평면을 추정하는 데에 이용할 training set 표본 데이터 크기를 몇으로 할 건가

num_iter = 5 # 부분공간을 생성할 횟수
for t in range(num_iter):
    train_dataset = train_dataset.shuffle() # train set 섞기
    input_ids = torch.tensor(train_dataset['input_ids'])
    attention_mask = torch.tensor(train_dataset['attention_mask'])
    label = torch.tensor(train_dataset['label'])

    EIDA.forward_roberta_with_save(model, input_ids, attention_mask, label, begin=0, end=list_sample_size[-1], batch_size=batch_size, max_length=max_length, N=2, dir=os.path.join("roberta", "sample"))
    # 부분공간 추정을 위한 token representation 표본 추출을 512개의 데이터에서 수행하고 "sample"라는 폴더에 저장해두기
    # 데이터(=문장) 각각에 대해, 표본추출은 모델을 통과하는 과정에서 파라미터의 input일 때의 위치 49곳, 파라미터의 output일 때의 위치 73곳에서 수행함. 한 시퀀스에서 N=2개의 token representation을 랜덤으로 추출함.

    for i in range(4*12+1):
        # 파라미터들의 input이 존재하는 latent space 49곳 중 한 곳(i번째)의 표본들을 sample_inputs 배열에 로드
        sample_inputs = []
        dir1 = os.path.join("roberta", "sample", "inputs", f"{i}")
        for f in os.listdir(dir1):
            if f.endswith('.pt'):
                sample_inputs.append(torch.load(os.path.join(dir1, f), weights_only=True))

        # 로드한 벡터들 중 첫 (sample_size * N)개를 이용해서 PCA를 수행하여 평면을 추정하고, 결과는 [64, 768(W_fc1에서는 3072)] 크기의 텐서인 plane_inputs로 반환. plane_inputs[0:k, :]가 k차원 부분공간 추정의 결과임.
        for sample_size in list_sample_size:
            plane_inputs = EIDA.PCA(sample_inputs, begin=0, end=sample_size*2, plane_dim=list_plane_dim[-1], device=device)
            os.makedirs(os.path.join("roberta", f"plane_{sample_size}_{t}", "inputs"), exist_ok=True)
            torch.save(plane_inputs, os.path.join("roberta", f"plane_{sample_size}_{t}", "inputs", f"{i}.pt"))
        del sample_inputs

    for i in range(6*12+1):
        # 파라미터들의 output이 존재하는 latent space 73곳 중 한 곳의 표본들을 sample_delta_outputs 배열에 로드
        sample_delta_outputs = []
        dir1 = os.path.join("roberta", "sample", "delta_outputs", f"{i}")
        for f in os.listdir(dir1):
            if f.endswith('.pt'):
                sample_delta_outputs.append(torch.load(os.path.join(dir1, f), weights_only=True))

        # 로드한 벡터들 중 첫 size개를 이용해서 PCA를 수행하여 평면을 추정하고, 결과는 [64, 768(W_fc2에서는 3072)] 크기의 텐서인 plane_delta_outputs로 반환. plane_delta_outputs[0:k, :]가 k차원 부분공간 추정의 결과임.
        for sample_size in list_sample_size:
            plane_delta_outputs = EIDA.PCA(sample_delta_outputs, begin=0, end=sample_size*2, plane_dim=list_plane_dim[-1], device=device)
            os.makedirs(os.path.join("roberta", f"plane_{sample_size}_{t}", "delta_outputs"), exist_ok=True)
            torch.save(plane_delta_outputs, os.path.join("roberta", f"plane_{sample_size}_{t}", "delta_outputs", f"{i}.pt"))
        del sample_delta_outputs

In [None]:
# 추정된 4, 8, 16, 32, 64차원의 평면이 token representation의 분포를 얼마나 잘 설명하는지 측정하기 위해, 크기 1024의 표본을 다시 추출하여 이걸 평면에 정사영시켜보고 정사영과 원본벡터의 cosine similarity를 측정하는 과정.
train_dataset = train_dataset.shuffle() # train set 섞기
input_ids = torch.tensor(train_dataset['input_ids'])
attention_mask = torch.tensor(train_dataset['attention_mask'])
label = torch.tensor(train_dataset['label'])

EIDA.forward_roberta_with_save(model, input_ids, attention_mask, label, begin=0, end=1024, batch_size=batch_size, max_length=max_length, N=2, dir=os.path.join("roberta", "sample"))


avg_input_proj_length = {size: {k: [0.0 for i in range(4*12+1)] for k in list_plane_dim} for size in list_sample_size} # avg_input_proj_length[size][k][i]는 크기 size인 input token representation 표본 추출을 수행한 i번째 공간에서 추정된 k차원 평면에 다른 표본 토큰들을 정사영해보고 얻어진 원래 토큰과 정사영된 토큰 간의 cosine similarity 평균값
avg_output_proj_length = {size: {k: [0.0 for i in range(6*12+1)] for k in list_plane_dim} for size in list_sample_size} # avg_output_proj_length[size][k][i]는 크기 size인 Δ(output) 표본 추출을 수행한 i번째 공간에서 추정된 k차원 평면에 다른 표본 토큰들을 정사영해보고 얻어진 원래 토큰과 정사영된 토큰 간의 cosine similarity 평균값


for t in range(num_iter):
    with torch.no_grad():
        for i in range(4*12+1):
            list_x = []
            dir2 = os.path.join("roberta", "sample", "inputs", f"{i}")
            for f in os.listdir(dir2):
                if f.endswith('.pt'):
                    list_x.append(torch.load(os.path.join(dir2, f), weights_only=True)) # 표본 한 개 로드

            for sample_size in list_sample_size:
                plane_inputs = torch.load(os.path.join("roberta", f"plane_{sample_size}_{t}", "inputs", f"{i}.pt"), weights_only=True).to(device)
                X = torch.stack(list_x).to(device) # GPU 병렬연산을 위해 한 텐서로 합치기. 로드한 i번째 벡터는 X의 i행으로 있음.
            
                X = X / torch.norm(X, dim=1, keepdim=True) # 각 벡터를 길이 1로 정규화
                X = plane_inputs @ X.T # proj[i, j]는 로드한 j번째 표본벡터가 평면의 i번째 기저벡터인 plane_inputs[i, :] 방향의 성분을 얼마나 지녔는지를 의미. 이 시점부터는 개별 벡터의 정보가 X의 각 열로 있게 됨.
            
                for k in list_plane_dim:
                    avg_input_proj_length[sample_size][k][i] += torch.norm(X[0:k, :], dim=0).sum() / len(list_x) # 각 벡터를 k차원 평면에 정사영했을 때의 길이의 평균
            del X
            del list_x

        for i in range(6*12+1):
            list_y = []
            dir2 = os.path.join("roberta", "sample", "delta_outputs", f"{i}")
            for f in os.listdir(dir2):
                if f.endswith('.pt'):
                    list_y.append(torch.load(os.path.join(dir2, f), weights_only=True)) # 표본 한 개 로드
            
            for sample_size in list_sample_size:
                plane_delta_outputs = torch.load(os.path.join("roberta", f"plane_{sample_size}_{t}", "delta_outputs", f"{i}.pt"), weights_only=True).to(device)
                Y = torch.stack(list_y).to(device) # GPU 병렬연산을 위해 한 텐서로 합치기. 로드한 i번째 벡터는 Y의 i행으로 있음.

                Y = Y / torch.norm(Y, dim=1, keepdim=True) # 각 벡터를 길이 1로 정규화
                Y = plane_delta_outputs @ Y.T # proj[i, j]는 로드한 j번째 표본벡터가 평면의 i번째 기저벡터인 plane_delta_outputs[i, :] 방향의 성분을 얼마나 지녔는지를 의미. 이 시점부터는 개별 벡터의 정보가 Y의 각 열로 있게 됨.

                for k in list_plane_dim:
                    avg_output_proj_length[sample_size][k][i] += torch.norm(Y[0:k, :], dim=0).sum() / len(list_y) # 각 벡터를 k차원 평면에 정사영했을 때의 길이의 평균
            del Y
            del list_y


for sample_size in list_sample_size:
    for k in list_plane_dim:
        for i in range(4*12+1):
            avg_input_proj_length[sample_size][k][i] /= num_iter
        for i in range(6*12+1):
            avg_output_proj_length[sample_size][k][i] /= num_iter

In [None]:
# 그래프 그려서 저장하기
os.makedirs(os.path.join("roberta", "graph"), exist_ok=True)

index_and_title = [(0, "Inputs of W_Q, W_K, W_V  (768 dimensions)"), (1, "Inputs of W_O  (768 dimensions)"), (2, "Inputs of W_fc1  (768 dimensions)"), (3, "Inputs of W_fc2  (3072 dimensions)")]
for sample_size in list_sample_size:
    for index, title in index_and_title:
        plt.figure(figsize=(10, 6))
        for k in list_plane_dim:
            plt.plot(list(range(12)), [avg_input_proj_length[sample_size][k][4*l+index].item() for l in range(12)], label=f"on {k}-dim", marker='o', linestyle='-', color=dict_color[k])
        plt.ylim(0.2, 1.0)
        plt.xticks(ticks=range(0, 12), labels=range(0, 12))
        plt.grid(color='gray', linestyle='--', linewidth=0.5, alpha=0.7)
        plt.xlabel("layers")
        plt.ylabel("average cosine similarity")
        plt.title(title)
        plt.legend()

        plt.savefig(os.path.join("roberta", "graph", f"{sample_size}_input_{index}.png"), dpi=300, bbox_inches='tight')
        plt.show()


index_and_title = [(0, "Δ(outputs) of W_Q  (768 dimensions)"), (1, "Δ(outputs) of W_K  (768 dimensions)"), (2, "Δ(outputs) of W_V  (768 dimensions)"), (3, "Δ(outputs) of W_O  (768 dimensions)"), (4, "Δ(outputs) of W_fc1  (3072 dimensions)"), (5, "Δ(outputs) of W_fc2  (768 dimensions)")]
for sample_size in list_sample_size:
    for index, title in index_and_title:
        plt.figure(figsize=(10, 6))
        for k in list_plane_dim:
            plt.plot(list(range(12)), [avg_output_proj_length[sample_size][k][6*l+index].item() for l in range(12)], label=f"on {k}-dim", marker='o', linestyle='-', color=dict_color[k])
        plt.ylim(0.2, 1.0)
        plt.xticks(ticks=range(0, 12), labels=range(0, 12))
        plt.grid(color='gray', linestyle='--', linewidth=0.5, alpha=0.7)
        plt.xlabel("layers")
        plt.ylabel("average cosine similarity")
        plt.title(title)
        plt.legend()

        plt.savefig(os.path.join("roberta", "graph", f"{sample_size}_delta_output_{index}.png"), dpi=300, bbox_inches='tight')
        plt.show()

In [None]:
avg_input_proj_length_all_param = {size: {k: 0.0 for k in list_plane_dim} for size in list_sample_size} 
avg_output_proj_length_all_param = {size: {k: 0.0 for k in list_plane_dim} for size in list_sample_size} 

for sample_size in list_sample_size:
    for k in list_plane_dim:
        for i in range(4*12+1):
            avg_input_proj_length_all_param[sample_size][k] += avg_input_proj_length[sample_size][k][i]
        for i in range(6*12+1):
            avg_output_proj_length_all_param[sample_size][k] += avg_output_proj_length[sample_size][k][i]
        avg_input_proj_length_all_param[sample_size][k] /= (4*12+1)
        avg_output_proj_length_all_param[sample_size][k] /= (6*12+1)


plt.figure(figsize=(10, 6))
for k in list_plane_dim:
    y_values = [avg_input_proj_length_all_param[size][k].item() for size in list_sample_size]
    plt.plot(list_sample_size, y_values, label=f"on {k}-dim", marker='o', linestyle='-', color=dict_color[k])
    for x, y in zip(list_sample_size, y_values):
        plt.text(x, y, f"{y:.3f}", fontsize=9, ha='center', va='bottom')

plt.ylim(0.5, 1.0)
plt.xticks(ticks=list_sample_size, labels=list_sample_size)
plt.grid(color='gray', linestyle='--', linewidth=0.5, alpha=0.7)
plt.xlabel("sample size")
plt.ylabel("average cosine similarity over all parameters")
plt.title("Cosine Similarity by Input Sample Size")
plt.legend()

plt.savefig(os.path.join("roberta", "graph", "all_inputs.png"), dpi=300, bbox_inches='tight')
plt.show()


plt.figure(figsize=(10, 6))
for k in list_plane_dim:
    y_values = [avg_output_proj_length_all_param[size][k].item() for size in list_sample_size]
    plt.plot(list_sample_size, [avg_output_proj_length_all_param[size][k].item() for size in list_sample_size], label=f"on {k}-dim", marker='o', linestyle='-', color=dict_color[k])
    for x, y in zip(list_sample_size, y_values):
        plt.text(x, y, f"{y:.3f}", fontsize=9, ha='center', va='bottom')

plt.ylim(0.5, 1.0)
plt.xticks(ticks=list_sample_size, labels=list_sample_size)
plt.grid(color='gray', linestyle='--', linewidth=0.5, alpha=0.7)
plt.xlabel("sample size")
plt.ylabel("average cosine similarity over all parameters")
plt.title("Cosine Similarity by Δ(Output) Sample Size")
plt.legend()

plt.savefig(os.path.join("roberta", "graph", "all_delta_outputs.png"), dpi=300, bbox_inches='tight')
plt.show()