### import library

In [None]:
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
import random
import re
import os
import glob

from transformers import (AutoTokenizer,
                         AutoModelForSequenceClassification,
                         AutoConfig,
                         get_linear_schedule_with_warmup)
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torch


### make train dataset 

In [None]:
import os
import glob
import re

def read_cpp_codes_from_folders(root_dir):
    """
    지정된 루트 디렉토리 아래의 모든 폴더에서 .cpp 파일들을 읽어서
    {라벨: [파일1의 내용, 파일2의 내용, ...]} 형태의 딕셔너리로 반환합니다.
    폴더 이름에서 숫자 부분을 라벨로 사용합니다.
    """
    label_to_code = {}
    
    # 각 라벨(폴더)를 순회합니다.
    for folder in tqdm(os.listdir(root_dir)):
        folder_path = os.path.join(root_dir, folder)
        if os.path.isdir(folder_path):
            # 폴더 이름에서 숫자 부분(라벨)을 추출합니다.
            label_match = re.search(r'\d+', folder)
            if label_match:
                label = int(label_match.group(0))
                cpp_files = glob.glob(os.path.join(folder_path, '*.cpp'))
                code_list = []
                
                # 각 .cpp 파일을 열고 내용을 읽습니다.
                for cpp_file in cpp_files:
                    with open(cpp_file, 'r', encoding='utf-8') as file:
                        content = file.read()
                            
                        new_content = ""
                        i = 0
                        while i < len(content):
                            if content[i:i+2] == '/*':
                                i += 2
                                while i < len(content) and content[i:i+2] != '*/':
                                    i += 1
                                i += 2  # 블록 주석의 끝 부분 넘어가기
                                continue
                            if content[i:i+2] == '//':
                                i += 2
                                while i < len(content) and content[i] != '\n':
                                    i += 1
                                continue
                            new_content += content[i]
                            i += 1

                        # 한 줄 주석 제거 후 남은 라인들을 다시 조합
                        new_lines = [line for line in new_content.split('\n') if line.strip() != '']

                        code = ' '.join(new_lines)

                        code = re.sub('\n',' ',code)
                        code = re.sub('\t',' ',code)
                        code = re.sub('\s',' ',code)
                        code_list.append(code)
                
                # 추출한 라벨을 키로 하여 코드 리스트를 저장합니다.
                if label not in label_to_code:
                    label_to_code[label] = code_list
                else:
                    label_to_code[label].extend(code_list)
            
    return label_to_code

# 루트 디렉토리 경로를 설정하세요.
root_dir = './train_code'
cpp_codes = read_cpp_codes_from_folders(root_dir)

# 결과를 확인합니다.
for label, codes in cpp_codes.items():
    print(f"Label: {label}, Number of CPP files: {len(codes)}")

In [None]:
# 정수형으로 키 변환
cpp_codes_dict_int_keys = {int(key): value for key, value in cpp_codes.items()}

# DataFrame 생성을 위한 데이터 준비
data = []
for label, codes in cpp_codes_dict_int_keys.items():
    for code in codes:
        data.append({'label': label, 'code': code})

# DataFrame 생성
df = pd.DataFrame(data)
df = df.sort_values(by=['label'], axis=0)
df['label'] = df['label'].apply(lambda x: x -1)
df.head()  # DataFrame의 처음 몇 줄을 출력하여 확인

In [None]:
df.to_csv('train_data.csv',index=False)

### test dataset preprocessing

In [None]:
def del_comment(content):
    new_content = ""
    i = 0
    while i < len(content):
        if content[i:i+2] == '/*':
            i += 2
            while i < len(content) and content[i:i+2] != '*/':
                i += 1
            i += 2  # 블록 주석의 끝 부분 넘어가기
            continue
        if content[i:i+2] == '//':
            i += 2
            while i < len(content) and content[i] != '\n':
                i += 1
            continue
        new_content += content[i]
        i += 1

    # 한 줄 주석 제거 후 남은 라인들을 다시 조합
    new_lines = [line for line in new_content.split('\n') if line.strip() != '']

    code = ' '.join(new_lines)
    return code

def del_function(text):
    return re.sub('\s', ' ',re.sub('\t',' ',re.sub('\n',' ',text)))

df = pd.read_csv('./test.csv')

df['code1'] = df['code1'].apply(del_comment)
df['code2'] = df['code2'].apply(del_comment)
df = df.applymap(del_function)

df.to_csv('test_data.csv')

### seed

In [None]:
def seed_all(seed):
    random.seed(seed)
    os.environ['PYTHONDASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
seed_all(42)

### custom dataset

In [None]:
class NewsDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.code = data['code'].to_numpy()
        self.problem = data['label'].to_numpy()

    def __len__(self):
        return len(self.code)

    def __getitem__(self, idx):
        code1 = self.code[idx]
        standard = 500*self.problem[idx]
        r = np.random.random()
        #similar
        if r < 0.5:
            tmp = np.random.randint(standard, standard + 500)
            code2 = self.code[tmp]
            label = 1
        #not
        else:
            tmp = np.random.randint(standard + 500, len(self.code) + standard) % len(self.code)
            code2 = self.code[tmp]
            label = 0
        encoding = self.tokenizer(
            code1,
            code2,
            max_length=self.max_len,
            truncation=True,
            return_tensors="pt",
            padding='max_length'
        )

        return {'input_ids': encoding['input_ids'][0],
                'attention_mask': encoding['attention_mask'][0],
                'labels': torch.tensor(label, dtype=torch.long)}

    
class testDataset(Dataset):
    def __init__(self, data, tokenizer, max_len):
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.code1 = data['code1'].to_numpy()
        self.code2 = data['code2'].to_numpy()

    def __len__(self):
        return len(self.code1)

    def __getitem__(self, idx):
        code1 = self.code1[idx]
        code2 = self.code2[idx]
        
        encoding = self.tokenizer(
            code1,
            code2,
            max_length=self.max_len,
            truncation=True,
            return_tensors="pt",
            padding='max_length'
        )
        

        return {'input_ids': encoding['input_ids'][0],
                'attention_mask': encoding['attention_mask'][0]}

### prepare train

In [None]:
model_name = 'Lazyhope/unixcoder-clone-detection'
BATCH_SIZE = 3
EPOCH = 30

train_data = pd.read_csv('./train_data.csv')

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.truncation_side = 'left'

train_dataset = NewsDataset(train_data, tokenizer, max_len=1024)

train_loader = DataLoader(train_dataset,
                          batch_size=BATCH_SIZE,
                          shuffle=True,
                          pin_memory=True,
                          num_workers=4)


model = AutoModelForSequenceClassification.from_pretrained(model_name)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                            step_size=10,
                                            gamma=0.1,
                                            last_epoch=-1)

### train unixcoder

In [None]:
model.zero_grad()
model.train()
for epoch in tqdm(range(0,EPOCH,1)):
    model.train()
    train_loss = 0
    acc = 0
    cnt = 0
    print(f'\nepoch : {epoch+1}')
    for i, batch in tqdm(enumerate(train_loader),leave=False,total=len(train_loader)):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        logits = outputs.logits
        preds = torch.argmax(logits, axis=1)
        acc += torch.sum(labels==preds)
        cnt += torch.sum(preds)
        loss = outputs.loss
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        model.zero_grad()
    model.save_pretrained(f"unixcoder/{epoch}", from_pt=True)
    scheduler.step()
    print(f'train_loss : {train_loss}\nacc = {acc / len(train_data)}\ncount = {cnt}')

### prepare test

In [None]:
test_data = pd.read_csv('./test_data.csv')
model_name = 'Lazyhope/unixcoder-clone-detection'

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.truncation_side = 'left'

test_dataset = testDataset(test_data, tokenizer, max_len=1024)

test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### evaluate test data

In [None]:
model.to(device)
model.eval()
with torch.no_grad():
    test_preds = []
    for i, batch in tqdm(enumerate(test_loader),total=len(test_loader)):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        test_preds.extend(logits.cpu().numpy())

### threshold

In [None]:
test = F.softmax(torch.tensor(test_preds), dim=1)
test_preds = np.where(test>0.1,1,0)
submission = pd.DataFrame({'pair_id': test_data['pair_id'], 'similar': test_preds})
submission.to_csv('submission.csv',index=False)