In [8]:
import numpy as np
import lightgbm as lgb
import transformers
import re
import torch
import torch.nn as nn
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import safetensors
from safetensors import safe_open
import pandas as pd
from transformers import Trainer, TrainingArguments, DataCollatorWithPadding
device = 'cuda' if torch.cuda.is_available() else 'cpu'
from tqdm import tqdm


In [9]:
class Dataset(torch.utils.data.Dataset):
    def __init__(self, inputs, tokenizer, max_length, mode = 'train'):
        self.inputs = inputs
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.text_columns = ['sentence_1', 'sentence_2']
        self.mode = mode
    def cleaning_text(self, text):
        cleaned_text = re.sub(r'[ㅋ-ㅎ]+', '', text)
        cleaned_text = re.sub(r"[^가-힣a-zA-Z0-9\s]", "", cleaned_text)
        return cleaned_text

    def __len__(self):
        return len(self.inputs) 
    
    def __getitem__(self, idx):
        t = self.inputs.iloc[idx] 

        text = '[SEP]'.join([t[col] for col in self.text_columns])
        text = self.cleaning_text(text)
        output = self.tokenizer(text,
                                padding='max_length',
                                max_length=self.max_length,
                                truncation=True)

        datas = torch.tensor(output['input_ids'], dtype = torch.long)
        attn = torch.tensor(output['attention_mask'], dtype = torch.long)
        type_ids = torch.tensor(output['token_type_ids'], dtype = torch.long)
        if self.mode == 'train':
            labels = t['label']
            output = {'input_ids' : datas,
                      'attention_mask' : attn,
                      'token_type_ids' : type_ids,
                      'labels' : labels}
            return output
        else:
            output = {'input_ids' : datas,
                      'attention_mask' : attn,
                      'token_type_ids' : type_ids}
            return output
    

    


In [10]:

class MyModel(nn.Module):
    def __init__(self, model_name):
        super(MyModel, self).__init__()
        self.model = transformers.AutoModel.from_pretrained(
            model_name,
            trust_remote_code=True
        )
        
        # 첫 번째 Conv1D 레이어
        self.Conv1 = nn.Conv1d(
            in_channels=768,  # BERT의 출력 차원
            out_channels=256,
            kernel_size=3,
            padding=1
        )
        
        # 두 번째 Conv1D 레이어 (필요 시 추가)
        self.Conv2 = nn.Conv1d(
            in_channels=256,  # Conv1의 출력 차원
            out_channels=128,  # Conv2의 출력 차원
            kernel_size=3,
            padding=1
        )
        
        self.output_layer = nn.Linear(128, 1)  
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.3)
        self.avg_pool = nn.AdaptiveAvgPool1d(1)  
        self.batchnorm1 = nn.BatchNorm1d(256)  
        self.batchnorm2 = nn.BatchNorm1d(128)  
        self.maxpool = nn.MaxPool1d(kernel_size = 2)  
        # self.sigmoid = nn.Sigmoid()
        self.loss_fn = nn.MSELoss()

    def forward(self, input_ids, attention_mask, token_type_ids, labels = None):
        output = self.model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        
        output = output.last_hidden_state.permute(0, 2, 1)  

        cnn_output = self.Conv1(output)  # Shape: (B, 256, L)
        cnn_output = self.relu(cnn_output)
        cnn_output = self.batchnorm1(cnn_output)  
        cnn_output = self.maxpool(cnn_output) # (B, 128, L/2)

        cnn_output = self.Conv2(cnn_output)  # Shape: (B, 128, L/2)
        cnn_output = self.relu(cnn_output)
        cnn_output = self.batchnorm2(cnn_output) 
        cnn_output = self.avg_pool(cnn_output)  #(B, 128, 1)

        cnn_output = cnn_output.view(cnn_output.size(0), -1)  # Shape: (B, 128)
        output = self.output_layer(cnn_output).squeeze(-1)
        
        if labels is not None:
            loss = self.loss_fn(output, labels.float())
            return {'output' : output, 'loss' : loss}

        else:  
            return {'output' : output}



In [11]:
def load_model(name):
    model_name = name.split('/')[-1]
    model = MyModel(name)
    tokenizer = AutoTokenizer.from_pretrained(name)
    tensors = {}
    model_path = f"./results/best_model_{model_name}"
    with safe_open(model_path + "/model.safetensors", framework="pt", device=0) as f:
        for k in f.keys():
            tensors[k] = f.get_tensor(k)
    model.load_state_dict(tensors)
    print(f'matched all parameters.[{model_name}]')
    return model, tokenizer


In [56]:
def get_predictions(model_list, mode = 'train'):
    if mode == 'train':
        data = pd.read_csv('/data/ephemeral/home/data/lgbdata10000.csv')
        data = data[['sentence_1', 'sentence_2', 'label']].dropna()
    else:
        data = pd.read_csv('/data/ephemeral/home/data/test.csv')
        id = data['id']
    preds = {}
    for name in model_list:
        model, tokenizer = load_model(name)
        if mode == 'train':
            dataset = Dataset(data, tokenizer, 160)
            data_collator = DataCollatorWithPadding(
                    tokenizer = tokenizer,
                    padding = True,
                    return_tensors = 'pt'
                )
            dataloader = torch.utils.data.DataLoader(dataset, collate_fn = data_collator, batch_size = 32)
        else:
            dataset = Dataset(data, tokenizer, 160, mode = 'test')
            dataloader = torch.utils.data.DataLoader(dataset, batch_size = 32)

        model = model.to(device)
        model.eval()
        all_outputs = np.array([])
        with torch.no_grad():
            for batch in tqdm(dataloader):
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                token_type_ids = batch['token_type_ids'].to(device) 

                output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)['output']
                all_outputs = np.hstack((all_outputs,output.cpu().numpy()))
            preds[name] = all_outputs
            
    return pd.DataFrame(preds)


In [75]:

def lgb_train(preds, label):
    lgb_dataset = lgb.Dataset(preds, label = label)
    lgb_params = {
         'objective': 'regression',
         'metric': 'mse',
         'boosting_type': 'gbdt',
         'learning_rate': 0.05,
         'num_leaves': 31,
         'max_depth': -1,
         'min_data_in_leaf': 20,
         'feature_fraction': 0.8,
         'bagging_fraction': 0.8,
         'bagging_freq': 1,
         'verbose': -1,
         'random_state': 42
         }
    lgb_model = lgb.train(lgb_params, lgb_dataset)
    return lgb_model



In [90]:
train = pd.read_csv('/data/ephemeral/home/moruka/lgbdata5000.csv')
train = train[['sentence_1', 'sentence_2', 'label']].dropna()
labels = train['label']
model_list = [#'klue/roberta-small',
            'klue/roberta-base',
            #'snunlp/KR-SBERT-Medium-extended-klueNLItriplet_PARpair_QApair-klueSTS',
            'snunlp/KR-SBERT-Medium-klueNLItriplet_PARpair-klueSTS',
            'klue/bert-base',
            #'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2',
            #  'Alibaba-NLP/gte-multilingual-base',
            ]

preds = get_predictions(model_list)
lgb_model = lgb_train(preds, labels)


Some weights of RobertaModel were not initialized from the model checkpoint at klue/roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


matched all parameters.[roberta-base]


  0%|          | 1/313 [00:00<02:11,  2.37it/s]


KeyboardInterrupt: 

In [80]:
test = pd.read_csv('/data/ephemeral/home/data/test.csv')
test_dataset = get_predictions(model_list, mode = 'test')
predictions = lgb_model.predict(test_dataset)
pd.DataFrame({'id' : test['id'], 'target' : predictions.round(2)}).to_csv('lgb_ensemble.csv')

