# GRAPHCODEBERT

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install transformers torch pandas tqdm

In [2]:
import pandas as pd
import numpy as np
import os
import random
from itertools import combinations, product
import re
import sklearn
from sklearn.model_selection import train_test_split
import torch
torch.set_float32_matmul_precision('high')
from torch.utils.data import Dataset, DataLoader
import transformers
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AdamW
from tqdm import tqdm

import warnings
warnings.filterwarnings('ignore')
from google.colab import files

In [3]:
# Seed 고정 함수
def seed_everything(seed: int = 42, contain_cuda: bool = False):
  os.environ['PYTHONHASHSEED'] = str(seed)
  random.seed(seed)
  np.random.seed(seed)

  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

  torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  print(f"Seed set as {seed}")

seed = 42
seed_everything(seed)

Seed set as 42


In [None]:
# CUDA 사용 가능 여부 확인 및 GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


# 데이터 생성

In [None]:
code_folder = "/content/drive/MyDrive/코드 유사성 판단/train_code"
problem_folders = os.listdir(code_folder)

In [None]:
scripts_contents = []
problem_nums = []

# 각 문제 폴더와 코드 파일을 순회
for problem_folder in tqdm(problem_folders):
    scripts = os.listdir(os.path.join(code_folder, problem_folder))
    for script in scripts:
        if script == ".ipynb_checkpoints":
            continue  # .ipynb_checkpoints 폴더는 무시
        script_file_path = os.path.join(code_folder, problem_folder, script)
        if os.path.isfile(script_file_path):  # 파일이 실제 존재하는지 확인
            with open(script_file_path, 'r', encoding='utf-8') as file:
                content = file.read()
            scripts_contents.append(content)
            problem_nums.append(problem_folder)



100%|██████████| 500/500 [1:48:19<00:00, 13.00s/it]


In [None]:
df = pd.DataFrame(data= {'code':scripts_contents, 'problem_num':problem_nums})

In [None]:
# train과 validation data set 분리
train_df, valid_df, train_label, valid_label = train_test_split(
        df,
        df['problem_num'],
        random_state=42,
        test_size=0.1,
        stratify=df['problem_num']
    )

train_df = train_df.reset_index(drop=True) # Reindexing
valid_df = valid_df.reset_index(drop=True)

# train data - 200만쌍

In [None]:
codes = train_df['code'].to_list() # code 컬럼을 list로 변환 - codes는 code가 쭉 나열된 형태임
problems = train_df['problem_num'].unique().tolist() # 문제 번호를 중복을 제외하고 list로 변환
problems.sort()

In [None]:
train_positive_pairs = []
train_negative_pairs = []

In [None]:
for problem in tqdm(problems):
    # 각각의 문제에 대한 code를 골라 정답 코드로 저장, 아닌 문제는 other_codes로 저장
    # 이때 train_df에는 problem_num이 정렬된 상태가 아니기 때문에 index가 다를 수 있음
    solution_codes = train_df[train_df['problem_num'] == problem]['code'].to_list()
    other_codes = train_df[train_df['problem_num'] != problem]['code'].to_list()

    # positive_pairs 2000개 (총 500 * 2000 = 1000,000개) 추출
    # negative_pairs 2000개 (총 500 * 2000 = 1000,000개) 추출
    positive_pairs = list(combinations(solution_codes,2))
    random.shuffle(positive_pairs)
    positive_pairs = positive_pairs[:2000]
    random.shuffle(other_codes)
    other_codes = other_codes[:2000]

    negative_pairs = []
    for pos_codes, others in zip(positive_pairs, other_codes):
        negative_pairs.append((pos_codes[0], others))

    train_positive_pairs.extend(positive_pairs)
    train_negative_pairs.extend(negative_pairs)

100%|██████████| 500/500 [04:09<00:00,  2.00it/s]


In [None]:
# total_positive_pairs와 negative_pairs의 정답 코드를 묶어 code1로 지정
# total_positive_pairs와 negative_pairs의 비교 대상 코드를 묶어 code2로 지정
# 해당 코드에 맞는 label 설정
code1 = [code[0] for code in train_positive_pairs] + [code[0] for code in train_negative_pairs]
code2 = [code[1] for code in train_positive_pairs] + [code[1] for code in train_negative_pairs]
label = [1]*len(train_positive_pairs) + [0]*len(train_negative_pairs)

# DataFrame으로 선언
train_data = pd.DataFrame(data={'code1':code1, 'code2':code2, 'similar':label})
train_data = train_data.sample(frac=1).reset_index(drop=True) # frac: 추출할 표본 비율
train_data.to_csv('create2_train_data.csv',index=False)
files.download('create2_train_data.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# valid data

In [None]:
codes = valid_df['code'].to_list() # code 컬럼을 list로 변환 - codes는 code가 쭉 나열된 형태임
problems = valid_df['problem_num'].unique().tolist() # 문제 번호를 중복을 제외하고 list로 변환
problems.sort()

In [None]:
valid_positive_pairs = []
valid_negative_pairs = []

In [None]:
for problem in tqdm(problems):
    # 각각의 문제에 대한 code를 골라 정답 코드로 저장, 아닌 문제는 other_codes로 저장
    # 이때 train_df에는 problem_num이 정렬된 상태가 아니기 때문에 index가 다를 수 있음
    solution_codes = valid_df[valid_df['problem_num'] == problem]['code'].to_list()
    other_codes = valid_df[valid_df['problem_num'] != problem]['code'].to_list()

    # positive_pairs 200개 (총 500 * 200 = 100,000개) 추출
    # negative_pairs 200개 (총 500 * 200 = 100,000개) 추출
    positive_pairs = list(combinations(solution_codes,2))
    random.shuffle(positive_pairs)
    positive_pairs = positive_pairs[:200]
    random.shuffle(other_codes)
    other_codes = other_codes[:200]

    negative_pairs = []
    for pos_codes, others in zip(positive_pairs, other_codes):
        negative_pairs.append((pos_codes[0], others))

    valid_positive_pairs.extend(positive_pairs)
    valid_negative_pairs.extend(negative_pairs)

100%|██████████| 500/500 [00:25<00:00, 19.35it/s]


In [None]:
# total_positive_pairs와 negative_pairs의 정답 코드를 묶어 code1로 지정
# total_positive_pairs와 negative_pairs의 비교 대상 코드를 묶어 code2로 지정
# 해당 코드에 맞는 label 설정
code1 = [code[0] for code in valid_positive_pairs] + [code[0] for code in valid_negative_pairs]
code2 = [code[1] for code in valid_positive_pairs] + [code[1] for code in valid_negative_pairs]
label = [1]*len(valid_positive_pairs) + [0]*len(valid_negative_pairs)

# DataFrame으로 선언
valid_data = pd.DataFrame(data={'code1':code1, 'code2':code2, 'similar':label})
valid_data = valid_data.sample(frac=1).reset_index(drop=True) # frac: 추출할 표본 비율
valid_data.to_csv('valid_data.csv',index=False)
files.download('valid_data.csv')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# 모델링

In [4]:
create_train_data_path = '/content/drive/My Drive/코드 유사성 판단/create2_train_data.csv' # 첫 모델과 다르게, 200만쌍의 train 데이터
sample_data_path = '/content/drive/My Drive/코드 유사성 판단/sample_train.csv' # 여기서도 sample데이터 동일하게 사용
valid_data_path = '/content/drive/My Drive/코드 유사성 판단/valid_data.csv' # 동일하게 20만쌍 사용
test_data_path = '/content/drive/My Drive/코드 유사성 판단/test.csv'


In [5]:
create_train_df = pd.read_csv(create_train_data_path)
sample_df = pd.read_csv(sample_data_path)
train_df = pd.concat([create_train_df, sample_df], ignore_index=True) # 두 학습데이터를 결합
val_df = pd.read_csv(valid_data_path)


In [8]:
create_train_df.describe()

Unnamed: 0,similar
count,2000000.0
mean,0.5
std,0.5
min,0.0
25%,0.0
50%,0.5
75%,1.0
max,1.0


In [9]:
train_df.describe()

Unnamed: 0,similar
count,2020000.0
mean,0.5
std,0.5
min,0.0
25%,0.0
50%,0.5
75%,1.0
max,1.0


In [None]:
#  전처리 적용
def remove_extras(code):
    code = re.sub(re.compile("/\*.*?\*/", re.DOTALL), "", code) # 멀티 라인 주석 제거
    code = re.sub(re.compile("//.*?\n"), "", code) # 싱글 라인 주석 제거
    code = re.sub(re.compile("#include <.*?>\n"), "", code)  # angle brackets를 사용하는 include 제거
    code = re.sub(re.compile("#include \".*?\"\n"), "", code)  # double quotes를 사용하는 include 제거
    code = re.sub(re.compile("#define .*?\n"), "", code)  # 매크로 정의 제거
    code = re.sub(re.compile("[\t ]+"), " ", code)  # 탭과 여러 공백을 하나의 공백으로
    code = re.sub(re.compile("\n\s*\n"), "\n", code)  # 여러 줄바꿈을 하나로

    return code.strip()


In [None]:
# 데이터셋 클래스 정의
class CodePairDataset(Dataset):
    def __init__(self, tokenizer, data, max_length=512, include_labels=True):
        self.tokenizer = tokenizer
        self.data = data  # 이제 data는 DataFrame 객체입니다.
        self.max_length = max_length
        self.include_labels = include_labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        record = self.data.iloc[idx]
        code1 = remove_extras(record['code1'])
        code2 = remove_extras(record['code2'])

        inputs = self.tokenizer(
            code1, code2,
            padding='max_length', truncation=True, max_length=self.max_length, return_tensors="pt"
        )
        inputs = {key: val.squeeze() for key, val in inputs.items()}
        if self.include_labels:
            inputs['labels'] = torch.tensor(record['similar'], dtype=torch.long)
        return inputs


In [None]:
# GraphCodeBERT 모델 및 토크나이저 로드
model_name = "microsoft/graphcodebert-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.truncation_side = 'left'
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2).to(device)


# 훈련 세트와 검증 세트에 대한 데이터셋 생성
train_dataset = CodePairDataset(tokenizer, train_df, max_length=512)
val_dataset = CodePairDataset(tokenizer, val_df, max_length=512, include_labels=True)

# 데이터 로더 준비
train_loader = DataLoader(train_dataset, batch_size=48, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=48, shuffle=False)

# 파인 튜닝을 위한 옵티마이저 설정
optimizer = AdamW(model.parameters(), lr=2e-5)

# 훈련 루프 수정
model.train()
for epoch in range(3):  # 에폭 수 필요에 따라 조정
    total_loss = 0
    model.train()
    for batch in tqdm(train_loader):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss
        total_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 에폭당 평균 훈련 손실 계산
    epoch_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1}, Loss: {epoch_loss}")

    # 검증 세트를 이용한 모델 평가
    model.eval()
    total_eval_accuracy = 0
    for batch in tqdm(val_loader):
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)

        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)
        labels = batch["labels"]

        # 정확도 계산
        accuracy = (predictions == labels).cpu().numpy().mean() * 100
        total_eval_accuracy += accuracy

    # 에폭당 평균 검증 정확도 계산
    avg_val_accuracy = total_eval_accuracy / len(val_loader)
    print(f"Validation Accuracy: {avg_val_accuracy:.2f}%")


In [None]:
# 두번째 모델 저장
torch.save(model.state_dict(), '/content/drive/My Drive/코드 유사성 판단/model_graphcodebert6.pth')