In [1]:
from google.colab import files
uploaded = files.upload()

Saving validation.txt to validation.txt


In [None]:
!pip install -U pip setuptools wheel
!pip install -U rust
!pip install tokenizers

In [None]:
!git clone https://github.com/squaresLab/VarCLR.git
%cd VarCLR
!pip install -e .

In [None]:
!pip install pytorch_lightning

In [None]:
!pip install gdown sentencepiece black isort sacremoses --no-deps

In [None]:
!pip install transformers==4.38.2 tokenizers==0.14.1 --no-deps

In [8]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [21]:
from torch import nn
from transformers import AutoModel
from safetensors.torch import load_file
import os

class diffcse_varclr(nn.Module):
  def __init__(self, model_path):
    super().__init__()
    self.encoder = AutoModel.from_pretrained(model_path, local_files_only=True, use_safetensors=True)
    state_dict = load_file(os.path.join(model_path, "model.safetensors"))

    self.dense = nn.Linear(768, 768)
    self.dense.weight.data.copy_(state_dict["mlp.dense.weight"])
    self.dense.bias.data.copy_(state_dict["mlp.dense.bias"])

    self.activation = nn.Tanh()

  def forward(self, inputs):
    outputs = self.encoder(**inputs)
    cls = outputs.last_hidden_state[:, 0]
    embedding = self.dense(cls)
    embedding = self.activation(embedding)
    return embedding

In [27]:
import pandas as pd
import torch
import torch.nn.functional as F
from sklearn.metrics import roc_auc_score, accuracy_score, precision_recall_fscore_support
from transformers import RobertaTokenizer, RobertaModel, BertTokenizer, BertModel, BartTokenizer, BartModel
from transformers import AutoTokenizer, AutoModel
import logging
import os
import safetensors

from varclr.models.model import Encoder


# GPU 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"사용 장치: {device}")

# 데이터 로드 및 전처리
def load_and_prepare_data(file_path=None):

    if file_path is None:
        raise FileNotFoundError(f"{file_path} 파일을 찾을 수 없습니다.")

    print(f"파일 로드: {file_path}")

    # 데이터 로드
    separators = ['\t', ',', ' ', '|', ';']
    val_df = None

    for sep in separators:
        try:
            temp_df = pd.read_csv(file_path, sep=sep, nrows=5, encoding='utf-8')
            if len(temp_df.columns) > 1:
                val_df = pd.read_csv(file_path, sep=sep, encoding='utf-8')
                print(f"구분자 '{sep}'로 데이터 로드 성공")
                break
        except Exception as e:
            continue

    if val_df is None:
        raise ValueError("데이터를 로드할 수 없습니다.")

    # 컬럼명 매핑
    val_df_renamed = val_df.rename(columns={
        val_df.columns[0]: 'path1',
        val_df.columns[1]: 'path2',
        val_df.columns[2]: 'label'
    })

    print(f"데이터 형태: {val_df_renamed.shape}")
    print(f"컬럼명: {val_df_renamed.columns.tolist()}")
    print(f"라벨 분포: {val_df_renamed['label'].value_counts().to_dict()}")

    return val_df_renamed

class ModelEvaluator:
    def __init__(self, model_name, device='cuda'):
        self.model_name = model_name
        self.device = device
        self.model = None
        self.tokenizer = None
        self._load_model()

    def _load_model(self):
        """모델별로 적절한 토크나이저와 모델을 로드"""
        try:
            if self.model_name.lower() == 'codebert':
                self.tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')
                self.model = RobertaModel.from_pretrained('microsoft/codebert-base')
                self.model.to(self.device)
                self.model.eval()

            elif self.model_name.lower() == 'bert':
                self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
                self.model = BertModel.from_pretrained('bert-base-uncased')
                self.model.to(self.device)
                self.model.eval()

            elif self.model_name.lower() == 'bart':
                self.tokenizer = BartTokenizer.from_pretrained('facebook/bart-base')
                self.model = BartModel.from_pretrained('facebook/bart-base')
                self.model.to(self.device)
                self.model.eval()

            elif self.model_name.lower() == 'varclr':
                self.tokenizer = RobertaTokenizer.from_pretrained('microsoft/codebert-base')
                self.model = Encoder.from_pretrained("varclr-codebert")
                self.model.eval()

            # Drive 경로 맞게 수정할것!!
            elif self.model_name.lower() == 'diffcse_varclr':
                self.tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
                self.model = diffcse_varclr(model_path="/content/drive/MyDrive/models/diffcse_varclr")
                self.model.to(self.device)
                self.model.eval()

            else:
                raise ValueError(f"Unsupported model: {self.model_name}")

            # self.model.to(self.device)
            # self.model.eval()

            print(f"✅ {self.model_name} 모델 로드 완료 (GPU: {next(self.model.parameters()).is_cuda})")

        except Exception as e:
            print(f"❌ {self.model_name} 모델 로드 실패: {str(e)}")
            raise

    def get_embeddings(self, texts, max_length=512, batch_size=16):
        """배치 처리로 임베딩 생성 (GPU 최적화)"""
        embeddings = []

        with torch.no_grad():
            for i in range(0, len(texts), batch_size):
                batch_texts = texts[i:i+batch_size]
                batch_embeddings = []

                for text in batch_texts:
                    try:
                        inputs = self.tokenizer(
                            str(text),
                            return_tensors='pt',
                            max_length=max_length,
                            truncation=True,
                            padding=True
                        ).to(self.device)

                        if self.model_name.lower() == 'bart':
                            outputs = self.model.encoder(**inputs)
                            embedding = outputs.last_hidden_state[:, 0]
                        elif self.model_name.lower() == 'varclr':
                            embedding = self.model.encode(text)
                            embedding = torch.tensor(embedding, device=self.device)
                        elif self.model_name.lower() == 'diffcse_varclr':
                            embedding = self.model(inputs)
                        else:
                            outputs = self.model(**inputs)
                            embedding = outputs.last_hidden_state[:, 0]

                        batch_embeddings.append(embedding)

                    except Exception as e:
                        print(f"임베딩 생성 오류: {str(e)}")
                        embedding = torch.zeros(1, 768).to(self.device)
                        batch_embeddings.append(embedding)

                if batch_embeddings:
                    batch_tensor = torch.cat(batch_embeddings, dim=0)
                    embeddings.append(batch_tensor)

                if (i // batch_size + 1) % 5 == 0:
                    print(f"  진행률: {min(i+batch_size, len(texts))}/{len(texts)}")

        return torch.cat(embeddings, dim=0)

    def calculate_metrics(self, val_df, labels, threshold=0.5, metric_key_prefix="val"):
        """메트릭 계산 (GPU 최적화)"""
        print(f"\n{self.model_name.upper()} 평가 시작...")
        print(f"데이터 수: {len(val_df)}, 라벨 수: {len(labels)}")

        # 임베딩 생성
        print("Path1 임베딩 생성 중...")
        path1_embeddings = self.get_embeddings(val_df["path1"].tolist())

        print("Path2 임베딩 생성 중...")
        path2_embeddings = self.get_embeddings(val_df["path2"].tolist())

        # 코사인 유사도 계산
        similarities = F.cosine_similarity(path1_embeddings, path2_embeddings)

        # 라벨을 텐서로 변환
        labels_tensor = torch.tensor([float(l) for l in labels], device=self.device)

        predictions = (similarities > threshold).float()

        # 정확도 계산
        accuracy = (predictions == labels_tensor).float().mean()

        # Precision, Recall, F1 계산
        true_positives = ((predictions == 1) & (labels_tensor == 1)).sum()
        false_positives = ((predictions == 1) & (labels_tensor == 0)).sum()
        false_negatives = ((predictions == 0) & (labels_tensor == 1)).sum()

        precision = true_positives / (true_positives + false_positives + 1e-8)
        recall = true_positives / (true_positives + false_negatives + 1e-8)
        f1 = 2 * (precision * recall) / (precision + recall + 1e-8)

        # AUC 계산
        try:
            auc = roc_auc_score([float(l) for l in labels], similarities.cpu().numpy())
        except Exception as e:
            print(f"AUC 계산 실패: {str(e)}")
            auc = 0.0

        metrics = {
            f"{metric_key_prefix}_{self.model_name.lower()}_accuracy": accuracy.item(),
            f"{metric_key_prefix}_{self.model_name.lower()}_precision": precision.item(),
            f"{metric_key_prefix}_{self.model_name.lower()}_recall": recall.item(),
            f"{metric_key_prefix}_{self.model_name.lower()}_f1": f1.item(),
            f"{metric_key_prefix}_{self.model_name.lower()}_auc": auc,
        }

        return metrics

def evaluate_all_models(val_df, labels):
    """모든 모델에 대해 평가 수행"""
    models = ['diffcse_varclr', 'varclr', 'codebert', 'bert', 'bart']
    all_metrics = {}

    for model_name in models:
        print(f"\n{'='*60}")
        print(f"Evaluating {model_name.upper()}")
        print(f"{'='*60}")

        try:
            evaluator = ModelEvaluator(model_name, device=device)
            metrics = evaluator.calculate_metrics(val_df, labels, threshold=0.6)
            all_metrics.update(metrics)

            print(f"\n{model_name.upper()} Results:")
            for key, value in metrics.items():
                print(f"  {key}: {value:.4f}")

            # GPU 메모리 정리
            del evaluator
            torch.cuda.empty_cache()

        except Exception as e:
            print(f"❌ Error evaluating {model_name}: {str(e)}")
            continue

    return all_metrics

def compare_models(results):
    """모델 성능 비교"""
    models = ['diffcse_varclr', 'varclr', 'codebert', 'bert', 'bart']
    metrics = ['accuracy', 'precision', 'recall', 'f1', 'auc']

    print(f"\n{'='*80}")
    print("MODEL COMPARISON SUMMARY")
    print(f"{'='*80}")
    print(f"{'Model':<12} {'Accuracy':<10} {'Precision':<10} {'Recall':<10} {'F1':<10} {'AUC':<10}")
    print("-" * 80)

    for model in models:
        row = f"{model.upper():<12}"
        for metric in metrics:
            key = f"val_{model}_{metric}"
            if key in results:
                row += f"{results[key]:<10.4f}"
            else:
                row += f"{'N/A':<10}"
        print(row)


사용 장치: cuda


In [28]:
# 실행
if __name__ == "__main__":
    try:
        # 데이터 로드
        # 파일 경로 지정!!
        val_df_renamed = load_and_prepare_data(file_path='/content/test.txt')
        val_df_renamed = val_df_renamed.dropna(inplace=False, axis=0)
        labels = val_df_renamed['label'].tolist()

        print(f"\n총 데이터 수: {len(labels)}")
        print(f"GPU 사용 가능: {torch.cuda.is_available()}")

        # 모델 평가 실행
        results = evaluate_all_models(val_df_renamed, labels)

        # 결과 비교
        compare_models(results)

    except Exception as e:
        print(f"실행 오류: {str(e)}")
        print("데이터 파일 경로를 확인하세요.")

파일 로드: /content/validation.txt
구분자 '	'로 데이터 로드 성공
데이터 형태: (678, 3)
컬럼명: ['path1', 'path2', 'label']
라벨 분포: {0.0: 374, 1.0: 303}

총 데이터 수: 677
GPU 사용 가능: True

Evaluating DIFFCSE_VARCLR


Some weights of RobertaModel were not initialized from the model checkpoint at /content/drive/MyDrive/models/varclr and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


✅ diffcse_varclr 모델 로드 완료 (GPU: True)

DIFFCSE_VARCLR 평가 시작...
데이터 수: 677, 라벨 수: 677
Path1 임베딩 생성 중...
Path2 임베딩 생성 중...

DIFFCSE_VARCLR Results:
  val_diffcse_varclr_accuracy: 0.6411
  val_diffcse_varclr_precision: 1.0000
  val_diffcse_varclr_recall: 0.1980
  val_diffcse_varclr_f1: 0.3306
  val_diffcse_varclr_auc: 0.8917

Evaluating VARCLR




✅ varclr 모델 로드 완료 (GPU: False)

VARCLR 평가 시작...
데이터 수: 677, 라벨 수: 677
Path1 임베딩 생성 중...


  embedding = torch.tensor(embedding, device=self.device)


KeyboardInterrupt: 