In [1]:
import os
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from transformers import BertModel, AdamW, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

In [2]:
from Loader import shuffled_contracts, shuffled_labels, shuffled_type_labels
from GadgetEmbedder import CodeEmbedder
from GadgetExtract import CodeExtractor

In [3]:
print(shuffled_labels[0], shuffled_type_labels[0], shuffled_contracts[0])

0 0 pragma solidity 0.5.4;




interface IERC165 {
    
    function supportsInterface(bytes4 interfaceId) external view returns (bool);
}




contract IERC721 is IERC165 {
    event Transfer(address indexed from, address indexed to, uint256 indexed tokenId);
    event Approval(address indexed owner, address indexed approved, uint256 indexed tokenId);
    event ApprovalForAll(address indexed owner, address indexed operator, bool approved);

    function balanceOf(address owner) public view returns (uint256 balance);
    function ownerOf(uint256 tokenId) public view returns (address owner);

    function approve(address to, uint256 tokenId) public;
    function getApproved(uint256 tokenId) public view returns (address operator);

    function setApprovalForAll(address operator, bool _approved) public;
    function isApprovedForAll(address owner, address operator) public view returns (bool);

    function transferFrom(address from, address to, uint256 tokenId) pu

In [4]:
len(shuffled_type_labels)

960

In [5]:
class ContractClassificationDataset(Dataset):
    def __init__(self, contracts, labels, type_labels, embedder, extractor, max_length):
        self.contracts = contracts
        self.type_labels = type_labels
        self.labels = labels
        self.embedder = embedder
        self.extractor = extractor
        self.max_length = max_length
    def __getgadgets__(self, contract):
        code_gadgets = extractor.run(contract)
        return code_gadgets
    def __len__(self):
        return len(self.contracts)
    def __getitem__(self, idx):
        contract = self.contracts[idx]
        label = self.labels[idx]
        input_ids, attention_masks = embedder.generate_input_ids_and_attention_mask(
            self.__getgadgets__(contract), self.max_length)
        return {'input_ids': input_ids, 'attention_mask': attention_masks, 'label': torch.tensor(label)}

In [6]:
embedder = CodeEmbedder()
extractor = CodeExtractor()

In [11]:
train_dataset = ContractClassificationDataset(shuffled_contracts, shuffled_labels, shuffled_type_labels, embedder, extractor, 512)

In [16]:
filtered_contracts = []
filtered_labels = []

for idx in range(train_dataset.__len__()):
    data = train_dataset.__getitem__(idx)
    
    # Check if the contract is not a tensor of 0 (tensor[0])
    if not torch.equal(data['input_ids'], torch.zeros_like(data['input_ids'])):
        filtered_contracts.append(train_dataset.contracts[idx])
        filtered_labels.append(train_dataset.labels[idx])

# Create a new dataset with the filtered contracts and labels
filtered_train_dataset = ContractClassificationDataset(filtered_contracts, filtered_labels, shuffled_type_labels, embedder, extractor, 512)

In [None]:
class BERTClassifier(nn.Module):
    def __init__(self, bert_model_name, num_classes):
        super(BERTClassifier, self).__init__()
        self.bert = BertModel.from_pretrained(bert_model_name)
        self.dropout = nn.Dropout(0.1)
        self.fc = nn.Linear(self.bert.config.hidden_size, num_classes)

    def forward(self, input_ids, attention_mask):
        with torch.no_grad():
            outputs = self.bert(input_ids, attention_mask=attention_masks)
        last_hidden_state = outputs.last_hidden_state
        cls_embedding = last_hidden_state[:, 0, :]
        mean_pooled = torch.mean(cls_embedding, dim=0)
        x = self.dropout(mean_pooled)
        logits = self.fc(x)
        return logits

In [None]:
def train(model, data_loader, optimizer, scheduler, device):
    model.train()
    for batch in data_loader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['label'].to(device)
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state
        cls_embedding = last_hidden_state[:, 0, :]
        mean_pooled = torch.mean(cls_embedding, dim=0)
        loss = nn.CrossEntropyLoss()(mean_pooled, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

In [None]:
def predict_sentiment(text, model, tokenizer, device, max_length=128):
    model.eval()
    encoding = tokenizer(text, return_tensors='pt', max_length=max_length, padding='max_length', truncation=True)
    input_ids = encoding['input_ids'].to(device)
    attention_mask = encoding['attention_mask'].to(device)

    with torch.no_grad():
            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            last_hidden_state = outputs.last_hidden_state
            cls_embedding = last_hidden_state[:, 0, :]
            mean_pooled = torch.mean(cls_embedding, dim=0)
            _, preds = torch.max(mean_pooled, dim=1)
    return "vulnerable" if preds.item() == 0 else "clean"

In [None]:
# Set up parameters
bert_model_name = 'bert-base-uncased'
num_classes = 2
max_length = 512
batch_size = 16
num_epochs = 4
learning_rate = 2e-5

In [None]:
train_texts, val_texts, train_labels, val_labels = train_test_split(shuffled_contracts, shuffled_labels, test_size=0.2, random_state=42)

In [None]:
train_dataset = ContractClassificationDataset(train_texts, train_labels, shuffled_type_labels, embedder, extractor, max_length)
val_dataset = ContractClassificationDataset(val_texts, val_labels, shuffled_type_labels, embedder, extractor, max_length)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BERTClassifier(bert_model_name, num_classes).to(device)

In [None]:
optimizer = AdamW(model.parameters(), lr=learning_rate)
total_steps = len(train_dataloader) * num_epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

In [None]:
for epoch in range(num_epochs):
  print(f"Epoch {epoch + 1}/{num_epochs}")
  train(model, train_dataloader, optimizer, scheduler, device)
  accuracy, report = evaluate(model, val_dataloader, device)
  print(f"Validation Accuracy: {accuracy:.4f}")
  print(report)