In [2]:
import torch

print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU detected:", torch.cuda.get_device_name(0))
else:
    print("⚠️ No GPU detected.")


CUDA available: True
GPU detected: NVIDIA GeForce RTX 2080 Super with Max-Q Design


# Imports

In [19]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report
from torch.nn.functional import softmax


# Training

In [20]:
df = pd.read_csv("consolidated.csv", sep=";")

# check solidity exist, remove "nan" value
def has_source(fp_sol):
    sol_path = os.path.join(r"C:\Users\dafel\OneDrive - The Pennsylvania State University\Y4S2\DS 440\source", f"{fp_sol}.sol")
    return os.path.isfile(sol_path)

df = df[df["fp_sol"].apply(has_source)]

# transfer property_holds to label： "t" -> 1， "f" -> 0
df = df[df['property_holds'].isin(['t', 'f'])]
df['label'] = df['property_holds'].map({'t': 1, 'f': 0})

# read Solidity Source code, sol as txt
def read_source(fp_sol):
    sol_path = os.path.join(r"C:\Users\dafel\OneDrive - The Pennsylvania State University\Y4S2\DS 440\source", f"{fp_sol}.sol")
    try:
        with open(sol_path, "r", encoding="utf-8") as f:
            return f.read()
    except FileNotFoundError:
        print(f"file {sol_path} missing, continue")
        return ""

df['code'] = df['fp_sol'].apply(read_source)

# filter "nan" out，select code and label
data = df.dropna(subset=['code', 'label'])
data = data[data['code'] != ""]
solidity_data = []
for _, row in tqdm(data.iterrows(), total=len(data), desc="Loading Solidity files"):
    fp_sol = row["fp_sol"]
    sol_path = os.path.join("cgt-main", "source", f"{fp_sol}.sol")
    try:
        with open(sol_path, "r", encoding="utf-8") as f:
            code = f.read()
        solidity_data.append({
            "contract_name": row["contractname"],
            "code": code,
            "bug_type": row["property"],
            "swc_id": row["swc"],
            "dasp_id": row["dasp"]
        })
    except FileNotFoundError:
        pass
        
# JSON file saved
df_solidity = pd.DataFrame(solidity_data)
df_solidity.to_json("aaaaa.json", indent=4)

# train-validate separate
train_texts, val_texts, train_labels, val_labels = train_test_split(
    data['code'].tolist(), data['label'].tolist(), test_size=0.2, random_state=42
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = logits.argmax(axis=-1)
    return {
        "accuracy": accuracy_score(labels, predictions)
    }

# define PyTorch Dataset class
class SolidityDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=512):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        # auto cut code lenth
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors="pt"
        )
        item = {key: val.squeeze(0) for key, val in encoding.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

# Define weighted loss inside a custom Trainer
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")

        # Manually set class weights here
        weight = torch.tensor([1.0, 2.5]).to(logits.device)
        loss_fct = nn.CrossEntropyLoss(weight=weight)
        loss = loss_fct(logits, labels)

        return (loss, outputs) if return_outputs else loss


# load CodeBERT tokenizer and model form（microsoft/codebert-base）
model_name = "microsoft/codebert-base"
tokenizer = RobertaTokenizer.from_pretrained(model_name)
model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=2)

# !!!!!!!!!!!!!!!!!!!!!!! need further explore
train_dataset = SolidityDataset(train_texts, train_labels, tokenizer)
val_dataset = SolidityDataset(val_texts, val_labels, tokenizer)

# Trainer define and tune
training_args = TrainingArguments(
    output_dir="./codebert-finetuned",  
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    evaluation_strategy="steps",
    eval_steps=500,
    save_steps=500,
    warmup_steps=100,
    weight_decay=0.01,
    learning_rate=2e-5,
    logging_dir='./logs',
    logging_steps=100,
    save_total_limit=2,
    disable_tqdm=False,
    load_best_model_at_end=True,
    metric_for_best_model="eval_accuracy",
    greater_is_better=True
)

trainer = WeightedTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)]
)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# TRAINING
trainer.train()

predictions = trainer.predict(val_dataset)
logits = predictions.predictions
labels = predictions.label_ids

# Use custom threshold on softmax probs
probs = softmax(torch.tensor(logits), dim=1).numpy()
custom_preds = (probs[:, 1] > 0.4).astype(int)  # try 0.4 or lower

print(classification_report(labels, custom_preds, zero_division=0))

# Show result
eval_result = trainer.evaluate()
print("Evaluation results:", eval_result)

Loading Solidity files: 100%|██████████| 19456/19456 [00:01<00:00, 10014.71it/s]
Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at microsoft/codebert-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Step,Training Loss,Validation Loss,Accuracy
500,0.6983,0.684771,0.725848
1000,0.6698,0.651669,0.672148
1500,0.6621,0.647318,0.74409
2000,0.6322,0.64071,0.742549
2500,0.6609,0.639613,0.743063


              precision    recall  f1-score   support

           0       0.81      0.61      0.70      2825
           1       0.37      0.61      0.46      1067

    accuracy                           0.61      3892
   macro avg       0.59      0.61      0.58      3892
weighted avg       0.69      0.61      0.63      3892



Evaluation results: {'eval_loss': 0.6473181247711182, 'eval_accuracy': 0.7440904419321686, 'eval_runtime': 137.6695, 'eval_samples_per_second': 28.271, 'eval_steps_per_second': 3.537, 'epoch': 1.2846865364850977}


# Testing

In [21]:
predictions = trainer.predict(val_dataset)
preds = predictions.predictions.argmax(axis=-1)
true_labels = predictions.label_ids

# Generate classification report
report = classification_report(true_labels, preds)
print(report)

              precision    recall  f1-score   support

           0       0.74      0.98      0.85      2825
           1       0.73      0.11      0.19      1067

    accuracy                           0.74      3892
   macro avg       0.74      0.55      0.52      3892
weighted avg       0.74      0.74      0.67      3892

