In [1]:
%cd drive/MyDrive/

/content/drive/MyDrive


In [2]:
!ls

'Colab Notebooks'   FuzzingTools	        reentracy_versions.csv	 TripletDetection
 contract_sources   GPTLens		        SAPdatasets		 unvul.pickle
 data-v13.json	    ISSRELLM4SCReplicate        SAP-FuzzingwithState	 VerifiedmSC.zip
 data-v4.json	    LLM-quickstart	        SCABI			 vul-llm-finetune
 data-v5.json	    new_contract-versions.csv   SCBytecode		 VulnerableSC.zip
 data-v6.json	    partTime		        SC-V7.zip
 data-v7.json	    Peculiar-main	        SC-V8.zip
 data-v8.json	    Peculiar-main.zip	        testingSC


In [3]:
import random
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AdamW
from sklearn.model_selection import train_test_split


In [8]:
#define the noise function
def add_function_noise(code, noise_level=0.1):
    """
    Add noise to a Solidity function at the function level, combining simple perturbations
    and more sophisticated adversarial patterns.

    Args:
        code (str): A clean Solidity function.
        noise_level (float): The proportion of the function to introduce noise.

    Returns:
        str: The noisy Solidity function.
    """
    lines = code.split("\n")
    num_lines = len(lines)
    num_noisy_lines = max(1, int(num_lines * noise_level))

    for _ in range(num_noisy_lines):
        idx = random.randint(0, num_lines - 1)
        noisy_line = lines[idx].strip()

        # Randomly select a noise type
        noise_type = random.choice([
            # Simple perturbations
            "variable_swap",
            "syntax_error",
            "comment_insert",
            "modifier_swap",
            "missing_semicolon",
            # Sophisticated patterns
            "uninitialized_variable",
            "incorrect_visibility",
            "reentrancy_simulation",
            "arithmetic_issue",
            "access_control_misconfig",
            "delegatecall_misuse",
            "dos_pattern"
        ])

        if noise_type == "variable_swap":
            # Swap variable types
            noisy_line = noisy_line.replace("uint256", "int256")
            noisy_line = noisy_line.replace("address", "string")
        elif noise_type == "syntax_error":
            # Introduce a syntax error
            if ";" in noisy_line:
                noisy_line = noisy_line.replace(";", "")  # Remove semicolon
            elif "{" in noisy_line:
                noisy_line = noisy_line.replace("{", "")  # Remove opening brace
            else:
                noisy_line += " if (x > 0)"  # Add incomplete statement
        elif noise_type == "comment_insert":
            noisy_line = f"// This is a noise comment\n{noisy_line}"
        elif noise_type == "modifier_swap":
            modifiers = ["public", "private", "internal", "external"]
            for mod in modifiers:
                if mod in noisy_line:
                    new_mod = random.choice([m for m in modifiers if m != mod])
                    noisy_line = noisy_line.replace(mod, new_mod)
                    break
        elif noise_type == "missing_semicolon":
            if ";" in noisy_line:
                noisy_line = noisy_line.replace(";", "")  # Remove semicolon
        elif noise_type == "uninitialized_variable":
            # Comment out initialization
            if "=" in noisy_line:
                noisy_line = f"// {noisy_line}"
        elif noise_type == "incorrect_visibility":
            # Change visibility specifiers
            visibility_specifiers = ["public", "private", "internal", "external"]
            for specifier in visibility_specifiers:
                if specifier in noisy_line:
                    new_specifier = random.choice([s for s in visibility_specifiers if s != specifier])
                    noisy_line = noisy_line.replace(specifier, new_specifier)
                    break
        elif noise_type == "reentrancy_simulation":
            # Insert a placeholder external call
            noisy_line = f"{noisy_line}\n    externalContract.call();"
        elif noise_type == "arithmetic_issue":
            # Modify arithmetic operations
            noisy_line = noisy_line.replace("+", "-")
            noisy_line = noisy_line.replace("*", "/")
        elif noise_type == "access_control_misconfig":
            # Remove access control modifiers
            noisy_line = noisy_line.replace("onlyOwner", "")
        elif noise_type == "delegatecall_misuse":
            # Insert a placeholder delegatecall usage
            noisy_line = f"{noisy_line}\n    address(this).delegatecall();"
        elif noise_type == "dos_pattern":
            # Add an infinite loop
            noisy_line = f"{noisy_line}\n    while(true) {{}}"

        # Replace the line in the code
        lines[idx] = noisy_line

    # Reconstruct the noisy code
    noisy_code = "\n".join(lines)
    return noisy_code

#define the Dataset class
class SolidityDataset(Dataset):
    def __init__(self, codes, noise_level=0.1):
        """
        Args:
            codes (list of str): List of clean Solidity functions.
            noise_level (float): Proportion of the function to modify for noise simulation.
        """
        self.codes = codes
        self.noise_level = noise_level

    def __len__(self):
        return len(self.codes)

    def __getitem__(self, idx):
        clean_code = self.codes[idx]
        noisy_code = add_function_noise(clean_code, self.noise_level)
        return clean_code, noisy_code


#Load the Dataset
def load_data_from_pickle(file_path):
    with open(file_path, 'rb') as f:
        codes = pickle.load(f)  # Assume it contains a list of strings (smart contract functions)
    return codes

# Load clean dataset
file_path = 'unvul.pickle'
clean_contracts = load_data_from_pickle(file_path)

# Verify the dataset
print(f"Loaded {len(clean_contracts)} clean smart contract functions.")
print("Example function:\n", clean_contracts[0])


Loaded 10000 clean smart contract functions.
Example function:
 function repayBorrowBehalf(address borrower) external payable { (uint err,) = repayBorrowBehalfInternal(borrower, msg.value); \n requireNoError(err, "repayBorrowBehalf failed"); \n } \n


In [9]:
#Prepare DataLoaders
# Split the dataset into training and validation sets
train_contracts, val_contracts = train_test_split(clean_contracts, test_size=0.1, random_state=42)

# Create Datasets and DataLoaders
train_dataset = SolidityDataset(train_contracts, noise_level=0.2)
val_dataset = SolidityDataset(val_contracts, noise_level=0.2)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

#initialize Model and Optimizer
# Initialize Model and Optimizer
tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-small")
model = AutoModelForSeq2SeqLM.from_pretrained("Salesforce/codet5-small")
optimizer = AdamW(model.parameters(), lr=5e-5)

# Define the device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

#define the custom Loss function

def custom_loss(outputs_clean, labels_clean, outputs_noisy, labels_noisy, margin=1.0):
    """
    Compute custom loss for clean and noisy inputs.

    Args:
        outputs_clean: Model outputs for clean inputs.
        labels_clean: Tokenized clean code inputs.
        outputs_noisy: Model outputs for noisy inputs.
        labels_noisy: Tokenized noisy code inputs.
        margin: Margin value for the noisy reconstruction loss.

    Returns:
        total_loss: Combined loss for optimization.
    """
    # Cross-Entropy Loss function
    loss_fct = torch.nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

    # Term 1: Reconstruction loss for clean code
    clean_loss = loss_fct(outputs_clean.logits.view(-1, outputs_clean.logits.size(-1)), labels_clean.input_ids.view(-1))

    # Term 2: Reconstruction loss for noisy code
    noisy_loss = loss_fct(outputs_noisy.logits.view(-1, outputs_noisy.logits.size(-1)), labels_noisy.input_ids.view(-1))

    # Margin-based loss: Encourage noisy_loss to be higher than the margin
    margin_loss = torch.clamp(margin - noisy_loss, min=0)

    # Total Loss
    total_loss = clean_loss + margin_loss
    return total_loss

#training the Model fo smart contract functions
def train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device='cpu'):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for clean_code_batch, noisy_code_batch in train_loader:
            optimizer.zero_grad()

            # Tokenize clean inputs and labels
            inputs_clean = tokenizer(clean_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
            labels_clean = inputs_clean  # For autoencoder-like reconstruction

            # Tokenize noisy inputs and labels
            inputs_noisy = tokenizer(noisy_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
            labels_noisy = inputs_noisy  # We want to penalize the model if it reconstructs noisy code well

            # Forward pass for clean inputs
            outputs_clean = model(input_ids=inputs_clean.input_ids, attention_mask=inputs_clean.attention_mask, labels=labels_clean.input_ids)

            # Forward pass for noisy inputs
            outputs_noisy = model(input_ids=inputs_noisy.input_ids, attention_mask=inputs_noisy.attention_mask, labels=labels_noisy.input_ids)

            # Compute custom loss
            loss = custom_loss(outputs_clean, labels_clean, outputs_noisy, labels_noisy, margin=1.0)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for clean_code_batch, noisy_code_batch in val_loader:
                # Tokenize clean inputs and labels
                inputs_clean = tokenizer(clean_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
                labels_clean = inputs_clean

                # Tokenize noisy inputs and labels
                inputs_noisy = tokenizer(noisy_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
                labels_noisy = inputs_noisy

                # Forward pass for clean inputs
                outputs_clean = model(input_ids=inputs_clean.input_ids, attention_mask=inputs_clean.attention_mask, labels=labels_clean.input_ids)

                # Forward pass for noisy inputs
                outputs_noisy = model(input_ids=inputs_noisy.input_ids, attention_mask=inputs_noisy.attention_mask, labels=labels_noisy.input_ids)

                # Compute custom loss
                loss = custom_loss(outputs_clean, labels_clean, outputs_noisy, labels_noisy, margin=1.0)
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)

        print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

#train the model
train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device=device)

#evlaute the model
# Evaluation function
def evaluate(model, tokenizer, code_list, device='cpu'):
    model.eval()
    inputs = tokenizer(code_list, return_tensors="pt", truncation=True, padding=True).to(device)
    with torch.no_grad():
        outputs = model.generate(inputs['input_ids'], max_length=512)
    reconstructed_codes = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
    return reconstructed_codes

# Evaluate a batch of clean and noisy samples
clean_samples = val_contracts[:5]
noisy_samples = [add_function_noise(code, noise_level=0.2) for code in clean_samples]
reconstructed_clean = evaluate(model, tokenizer, clean_samples, device=device)
reconstructed_noisy = evaluate(model, tokenizer, noisy_samples, device=device)

for idx in range(5):
    print(f"Clean Input {idx+1}:\n{clean_samples[idx]}\n")
    print(f"Reconstructed Clean Output {idx+1}:\n{reconstructed_clean[idx]}\n")
    print(f"Noisy Input {idx+1}:\n{noisy_samples[idx]}\n")
    print(f"Reconstructed Noisy Output {idx+1}:\n{reconstructed_noisy[idx]}\n")
    print("="*50)

#save the trained model
# Save the trained model
output_dir = "./function_reconstruction_model"
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"Model saved to {output_dir}")



Epoch 1/3, Training Loss: 0.1358, Validation Loss: 0.0702
Epoch 2/3, Training Loss: 0.0329, Validation Loss: 0.0168
Epoch 3/3, Training Loss: 0.0278, Validation Loss: 0.0090
Clean Input 1:
function _reduceReservesFresh(uint reduceAmount) internal returns (uint) { uint totalReservesNew; \n if (msg.sender != admin) { \n return fail(Error.UNAUTHORIZED, FailureInfo.REDUCE_RESERVES_ADMIN_CHECK); \n } \n

Reconstructed Clean Output 1:
function _reduceReservesFresh(uint reduceAmount) internal returns (uint) { uint totalReservesNew; \n if (msg.sender != admin) { \n return fail(Error.UNAUTHORIZED, FailureInfo.REDUCE_RESERVES_ADMIN_CHECK); \n } \n

Noisy Input 1:
function _reduceReservesFresh(uint reduceAmount) internal returns (uint) { uint totalReservesNew \n if (msg.sender != admin) { \n return fail(Error.UNAUTHORIZED, FailureInfo.REDUCE_RESERVES_ADMIN_CHECK) \n } \n

Reconstructed Noisy Output 1:
function _reduceReservesFresh(uint reduceAmount) internal returns (uint) { uint totalReservesNew

In [10]:
import pickle
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

def compute_reconstruction_loss(model, tokenizer, function_code, device='cpu'):
    model.eval()
    with torch.no_grad():
        # Tokenize input
        inputs = tokenizer(function_code, return_tensors="pt", padding=True, truncation=True).to(device)
        labels = inputs.input_ids.clone()
        labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

        # Forward pass
        outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)
        loss = outputs.loss.item()
    return loss

def identify_vulnerable_functions(functions, model, tokenizer, loss_threshold, device='cpu'):
    vulnerable_functions = []
    for idx, func_code in enumerate(functions):
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        if loss > loss_threshold:
            vulnerable_functions.append((idx + 1, func_code, loss))
    return vulnerable_functions

def print_vulnerable_functions(vulnerable_functions):
    for func_num, func_code, loss in vulnerable_functions:
        print(f"Function Number: {func_num}")
        print(f"Reconstruction Loss: {loss:.4f}")
        print("Function Code:")
        print(func_code)
        print("Vulnerability Type: Unknown vulnerability type")
        print("=" * 50)

def main():
    # Load the trained model and tokenizer
    output_dir = "./function_reconstruction_model"
    tokenizer = AutoTokenizer.from_pretrained(output_dir)
    model = AutoModelForSeq2SeqLM.from_pretrained(output_dir)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    # Load the vulnerable functions from 'vul.pickle'
    vul_file_path = 'vul.pickle'  # Ensure this is the correct path to your file
    with open(vul_file_path, 'rb') as f:
        vul_functions = pickle.load(f)

    print(f"Total vulnerable functions loaded: {len(vul_functions)}")

    # Select the top 100 functions
    top_100_functions = vul_functions[:100]
    print(f"Testing on the top {len(top_100_functions)} functions.")

    # Set the reconstruction loss threshold
    loss_threshold = 0.5  # Adjust based on your model's performance

    # Identify functions with high reconstruction loss
    vulnerable_functions = identify_vulnerable_functions(top_100_functions, model, tokenizer, loss_threshold, device)

    # Print the functions with large reconstruction loss
    print_vulnerable_functions(vulnerable_functions)

if __name__ == "__main__":
    main()


Total vulnerable functions loaded: 5000
Testing on the top 100 functions.


ValueError: text input must be of type `str` (single example), `List[str]` (batch or single pretokenized example) or `List[List[str]]` (batch of pretokenized examples).

In [17]:
!pip install solidity-parser




In [33]:
import pickle
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

def compute_reconstruction_loss_batch(model, tokenizer, function_codes, device='cpu'):
    model.eval()
    with torch.no_grad():
        # Tokenize inputs
        inputs = tokenizer(function_codes, return_tensors="pt", padding=True, truncation=True).to(device)
        labels = inputs.input_ids.clone()
        labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

        # Forward pass
        outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)
        # Compute individual losses
        loss_fct = torch.nn.CrossEntropyLoss(ignore_index=-100, reduction='none')
        logits = outputs.logits.view(-1, outputs.logits.size(-1))
        labels_flat = labels.view(-1)
        losses = loss_fct(logits, labels_flat)
        # Reshape to batch size
        losses = losses.view(labels.size(0), -1).mean(dim=1)
        return losses.cpu().numpy()

def identify_vulnerable_functions(functions, model, tokenizer, loss_threshold, device='cpu', batch_size=16):
    vulnerable_functions = []
    for i in range(0, len(functions), batch_size):
        batch_funcs = functions[i:i+batch_size]
        func_codes = []
        for func in batch_funcs:
            if isinstance(func, dict):
                func_code = func.get('function_code', None)
                if func_code is None:
                    continue  # Skip if function code is missing
            elif isinstance(func, tuple):
                func_code = func[0]
            elif isinstance(func, str):
                func_code = func
            else:
                continue  # Skip unsupported data types
            func_codes.append(func_code)
        if not func_codes:
            continue
        losses = compute_reconstruction_loss_batch(model, tokenizer, func_codes, device)
        for idx, loss in enumerate(losses):
            func_num = i + idx + 1
            func_code = func_codes[idx]
            if loss > loss_threshold:
                vulnerable_functions.append((func_num, func_code, loss))
    return vulnerable_functions

def print_vulnerable_functions(vulnerable_functions):
    for func_num, func_code, loss in vulnerable_functions:
        print(f"Function Number: {func_num}")
        print(f"Reconstruction Loss: {loss:.4f}")
        print("Function Code:")
        print(func_code)
        print("Vulnerability Type: Unknown vulnerability type")
        print("=" * 50)

def main():
    # Load the trained model and tokenizer
    output_dir = "./function_reconstruction_model"
    tokenizer = AutoTokenizer.from_pretrained(output_dir)
    model = AutoModelForSeq2SeqLM.from_pretrained(output_dir)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    # Load the vulnerable functions from 'vul.pickle'
    vul_file_path = 'vul.pickle'  # Ensure this is the correct path to your file
    with open(vul_file_path, 'rb') as f:
        vul_functions = pickle.load(f)

    print(f"Total vulnerable functions loaded: {len(vul_functions)}")

    # Select the top 100 functions
    top_100_functions = vul_functions[:100]
    print(f"Testing on the top {len(top_100_functions)} functions.")
    print(f"Type of vul_functions: {type(vul_functions)}")
    print(f"Type of vul_functions[0]: {type(vul_functions[0])}")
    print(f"Content of vul_functions[0]: {vul_functions[0]}")


    # Set the reconstruction loss threshold
    loss_threshold = 0.1  # Adjust based on your model's performance

    # Identify functions with high reconstruction loss
    vulnerable_functions = identify_vulnerable_functions(top_100_functions, model, tokenizer, loss_threshold, device)

    # Print the functions with large reconstruction loss
    print_vulnerable_functions(vulnerable_functions)

if __name__ == "__main__":
    main()


Total vulnerable functions loaded: 5000
Testing on the top 100 functions.
Type of vul_functions: <class 'list'>
Type of vul_functions[0]: <class 'dict'>
Content of vul_functions[0]: {'data_key': 'function isContract(\\n address _addr\\n )\\n internal\\n view\\n returns (bool)\\n {\\n uint256 size;\\n assembly { size := extcodesize(_addr) }\\n return size > 0;\\n }\\n', 'data_query': 'function mul(uint256 a, uint256 b) internal pure returns (uint256) {\\n if (a == 0) {\\n return 0;\\n }\\n uint256 c = a * b;\\n require(c / a == b);\\n return c;\\n }\\n'}


In [32]:
import random
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AdamW
from sklearn.model_selection import train_test_split

#define the noise function
def add_function_noise(code, noise_level=0.1):
    """
    Add noise to a Solidity function at the function level, combining simple perturbations
    and more sophisticated adversarial patterns.

    Args:
        code (str): A clean Solidity function.
        noise_level (float): The proportion of the function to introduce noise.

    Returns:
        str: The noisy Solidity function.
    """
    lines = code.split("\n")
    num_lines = len(lines)
    num_noisy_lines = max(1, int(num_lines * noise_level))

    for _ in range(num_noisy_lines):
        idx = random.randint(0, num_lines - 1)
        noisy_line = lines[idx].strip()

        # Randomly select a noise type
        noise_type = random.choice([
            # Simple perturbations
            "variable_swap",
            "syntax_error",
            "comment_insert",
            "modifier_swap",
            "missing_semicolon",
            # Sophisticated patterns
            "uninitialized_variable",
            "incorrect_visibility",
            "reentrancy_simulation",
            "arithmetic_issue",
            "access_control_misconfig",
            "delegatecall_misuse",
            "dos_pattern"
        ])

        if noise_type == "variable_swap":
            # Swap variable types
            noisy_line = noisy_line.replace("uint256", "int256")
            noisy_line = noisy_line.replace("address", "string")
        elif noise_type == "syntax_error":
            # Introduce a syntax error
            if ";" in noisy_line:
                noisy_line = noisy_line.replace(";", "")  # Remove semicolon
            elif "{" in noisy_line:
                noisy_line = noisy_line.replace("{", "")  # Remove opening brace
            else:
                noisy_line += " if (x > 0)"  # Add incomplete statement
        elif noise_type == "comment_insert":
            noisy_line = f"// This is a noise comment\n{noisy_line}"
        elif noise_type == "modifier_swap":
            modifiers = ["public", "private", "internal", "external"]
            for mod in modifiers:
                if mod in noisy_line:
                    new_mod = random.choice([m for m in modifiers if m != mod])
                    noisy_line = noisy_line.replace(mod, new_mod)
                    break
        elif noise_type == "missing_semicolon":
            if ";" in noisy_line:
                noisy_line = noisy_line.replace(";", "")  # Remove semicolon
        elif noise_type == "uninitialized_variable":
            # Comment out initialization
            if "=" in noisy_line:
                noisy_line = f"// {noisy_line}"
        elif noise_type == "incorrect_visibility":
            # Change visibility specifiers
            visibility_specifiers = ["public", "private", "internal", "external"]
            for specifier in visibility_specifiers:
                if specifier in noisy_line:
                    new_specifier = random.choice([s for s in visibility_specifiers if s != specifier])
                    noisy_line = noisy_line.replace(specifier, new_specifier)
                    break
        elif noise_type == "reentrancy_simulation":
            # Insert a placeholder external call
            noisy_line = f"{noisy_line}\n    externalContract.call();"
        elif noise_type == "arithmetic_issue":
            # Modify arithmetic operations
            noisy_line = noisy_line.replace("+", "-")
            noisy_line = noisy_line.replace("*", "/")
        elif noise_type == "access_control_misconfig":
            # Remove access control modifiers
            noisy_line = noisy_line.replace("onlyOwner", "")
        elif noise_type == "delegatecall_misuse":
            # Insert a placeholder delegatecall usage
            noisy_line = f"{noisy_line}\n    address(this).delegatecall();"
        elif noise_type == "dos_pattern":
            # Add an infinite loop
            noisy_line = f"{noisy_line}\n    while(true) {{}}"

        # Replace the line in the code
        lines[idx] = noisy_line

    # Reconstruct the noisy code
    noisy_code = "\n".join(lines)
    return noisy_code

#define the Dataset class
class SolidityDataset(Dataset):
    def __init__(self, codes, noise_level=0.1):
        """
        Args:
            codes (list of str): List of clean Solidity functions.
            noise_level (float): Proportion of the function to modify for noise simulation.
        """
        self.codes = codes
        self.noise_level = noise_level

    def __len__(self):
        return len(self.codes)

    def __getitem__(self, idx):
        clean_code = self.codes[idx]
        noisy_code = add_function_noise(clean_code, self.noise_level)
        return noisy_code, clean_code  # Return (input, target)

#Load the Dataset
def load_data_from_pickle(file_path, data_keys=['data_key', 'data_query']):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
        # Check if the data is a list of dicts
        if isinstance(data, list) and isinstance(data[0], dict):
            codes = []
            for item in data:
                for key in data_keys:
                    if key in item:
                        codes.append(item[key])
        else:
            codes = data  # Assume it's a list of strings
    return codes

# Load clean dataset
file_path = 'unvul.pickle'
clean_contracts = load_data_from_pickle(file_path)

# Verify the dataset
print(f"Loaded {len(clean_contracts)} clean smart contract functions.")
print("Example function:\n", clean_contracts[0])

#Prepare DataLoaders
# Split the dataset into training and validation sets
train_contracts, val_contracts = train_test_split(clean_contracts, test_size=0.1, random_state=42)

# Create Datasets and DataLoaders
train_dataset = SolidityDataset(train_contracts, noise_level=0.2)
val_dataset = SolidityDataset(val_contracts, noise_level=0.2)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

#initialize Model and Optimizer
# Initialize Model and Optimizer
tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-small")
model = AutoModelForSeq2SeqLM.from_pretrained("Salesforce/codet5-small")
optimizer = AdamW(model.parameters(), lr=5e-5)

# Define the device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

#training the Model fo smart contract functions
def train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device='cpu'):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for noisy_code_batch, clean_code_batch in train_loader:
            optimizer.zero_grad()

            # Tokenize inputs (noisy code) and labels (clean code)
            inputs = tokenizer(noisy_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
            labels = tokenizer(clean_code_batch, return_tensors="pt", padding=True, truncation=True).to(device).input_ids
            labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

            # Forward pass
            outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)

            # Compute loss
            loss = outputs.loss

            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for noisy_code_batch, clean_code_batch in val_loader:
                # Tokenize inputs (noisy code) and labels (clean code)
                inputs = tokenizer(noisy_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
                labels = tokenizer(clean_code_batch, return_tensors="pt", padding=True, truncation=True).to(device).input_ids
                labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

                # Forward pass
                outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)

                # Compute loss
                loss = outputs.loss
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)

        print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

#train the model
train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device=device)

#evlaute the model
# Evaluation function
def compute_reconstruction_loss(model, tokenizer, function_code, device='cpu'):
    model.eval()
    with torch.no_grad():
        # Tokenize input
        inputs = tokenizer(function_code, return_tensors="pt", padding=True, truncation=True).to(device)
        labels = inputs.input_ids.clone()
        labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

        # Forward pass
        outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)
        loss = outputs.loss.item()
    return loss

def main():
    # Load the trained model and tokenizer
    output_dir = "./function_reconstruction_model"
    tokenizer = AutoTokenizer.from_pretrained(output_dir)
    model = AutoModelForSeq2SeqLM.from_pretrained(output_dir)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    # Load the vulnerable functions from 'vul.pickle'
    vul_file_path = 'vul.pickle'  # Ensure this is the correct path to your file
    vul_functions = load_data_from_pickle(vul_file_path)

    # Load the clean functions from 'unvul.pickle'
    unvul_file_path = 'unvul.pickle'
    unvul_functions = load_data_from_pickle(unvul_file_path)

    print(f"Total vulnerable functions loaded: {len(vul_functions)}")
    print(f"Total clean functions loaded: {len(unvul_functions)}")
    print(f"Type of vul_functions[0]: {type(vul_functions[0])}")

    # Select a sample of functions
    num_samples = 100
    sample_vul_functions = vul_functions[:num_samples]
    sample_unvul_functions = unvul_functions[:num_samples]

    # Compute reconstruction loss for vulnerable functions
    vul_losses = []
    for func_code in sample_vul_functions:
        if not isinstance(func_code, str):
            print(f"Skipping non-string function code in vulnerable functions: {func_code}")
            continue
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        vul_losses.append(loss)

    # Compute reconstruction loss for clean functions
    unvul_losses = []
    for func_code in sample_unvul_functions:
        if not isinstance(func_code, str):
            print(f"Skipping non-string function code in clean functions: {func_code}")
            continue
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        unvul_losses.append(loss)

    # Print average losses
    avg_vul_loss = sum(vul_losses) / len(vul_losses) if vul_losses else 0
    avg_unvul_loss = sum(unvul_losses) / len(unvul_losses) if unvul_losses else 0

    print(f"Average reconstruction loss for vulnerable functions: {avg_vul_loss:.4f}")
    print(f"Average reconstruction loss for clean functions: {avg_unvul_loss:.4f}")

    # Determine loss threshold
    loss_threshold = (avg_vul_loss + avg_unvul_loss) / 2

    # Identify functions with high reconstruction loss
    identified_vul_functions = [(idx + 1, func_code, loss) for idx, (func_code, loss) in enumerate(zip(sample_vul_functions, vul_losses)) if loss > loss_threshold]
    identified_unvul_functions = [(idx + 1, func_code, loss) for idx, (func_code, loss) in enumerate(zip(sample_unvul_functions, unvul_losses)) if loss > loss_threshold]

    print(f"Identified {len(identified_vul_functions)} out of {len(sample_vul_functions)} vulnerable functions as vulnerable.")
    print(f"Misidentified {len(identified_unvul_functions)} out of {len(sample_unvul_functions)} clean functions as vulnerable.")

    # Optionally, print the identified vulnerable functions
    for func_num, func_code, loss in identified_vul_functions:
        print(f"Function Number: {func_num}")
        print(f"Reconstruction Loss: {loss:.4f}")
        print("Function Code:")
        print(func_code)
        print("Vulnerability Type: Unknown vulnerability type")
        print("=" * 50)

if __name__ == "__main__":
    main()

#save the trained model
# Save the trained model
output_dir = "./function_reconstruction_model"
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"Model saved to {output_dir}")


Loaded 10000 clean smart contract functions.
Example function:
 function repayBorrowBehalf(address borrower) external payable { (uint err,) = repayBorrowBehalfInternal(borrower, msg.value); \n requireNoError(err, "repayBorrowBehalf failed"); \n } \n
Epoch 1/3, Training Loss: 0.0505, Validation Loss: 0.0112
Epoch 2/3, Training Loss: 0.0143, Validation Loss: 0.0065
Epoch 3/3, Training Loss: 0.0104, Validation Loss: 0.0069
Total vulnerable functions loaded: 10000
Total clean functions loaded: 10000
Type of vul_functions[0]: <class 'str'>
Average reconstruction loss for vulnerable functions: 0.0187
Average reconstruction loss for clean functions: 0.0008
Identified 41 out of 100 vulnerable functions as vulnerable.
Misidentified 1 out of 100 clean functions as vulnerable.
Function Number: 5
Reconstruction Loss: 0.0102
Function Code:
function randomItIs() internal returns (uint) {\n uint screen = uint(keccak256(abi.encodePacked(now, msg.sender, violet))) % 4;\n violet++;\n return screen;\n }\

In [47]:
#这是普通的自回归损失来训练的
import random
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AdamW
from sklearn.model_selection import train_test_split

# Removed the add_function_noise function since we're not introducing noise.

# Updated SolidityDataset class
class SolidityDataset(Dataset):
    def __init__(self, codes):
        """
        Args:
            codes (list of str): List of Solidity functions.
        """
        self.codes = codes

    def __len__(self):
        return len(self.codes)

    def __getitem__(self, idx):
        code = self.codes[idx]
        return code  # Return the code itself (autoencoder setup)

# Load data from pickle files
def load_data_from_pickle(file_path, data_keys=['data_key', 'data_query']):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
        # Check if the data is a list of dicts
        if isinstance(data, list) and isinstance(data[0], dict):
            codes = []
            for item in data:
                for key in data_keys:
                    if key in item:
                        codes.append(item[key])
        else:
            codes = data  # Assume it's a list of strings
    return codes

# Load clean dataset
clean_file_path = 'unvul.pickle'
clean_contracts = load_data_from_pickle(clean_file_path)

# Verify the dataset
print(f"Loaded {len(clean_contracts)} clean smart contract functions.")
print("Example clean function:\n", clean_contracts[0])

# Prepare DataLoaders
# Split the dataset into training and validation sets
train_contracts, val_contracts = train_test_split(clean_contracts, test_size=0.1, random_state=42)

# Create Datasets and DataLoaders
train_dataset = SolidityDataset(train_contracts)
val_dataset = SolidityDataset(val_contracts)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

# Initialize Model and Optimizer
tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-small")
model = AutoModelForSeq2SeqLM.from_pretrained("Salesforce/codet5-small")
optimizer = AdamW(model.parameters(), lr=5e-5)

# Define the device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Training the Model for smart contract functions
def train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device='cpu'):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for code_batch in train_loader:
            optimizer.zero_grad()

            # Tokenize inputs and labels (autoencoder setup)
            inputs = tokenizer(code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
            labels = inputs.input_ids.clone()
            labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

            # Forward pass
            outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)

            # Compute loss
            loss = outputs.loss

            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for code_batch in val_loader:
                # Tokenize inputs and labels (autoencoder setup)
                inputs = tokenizer(code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
                labels = inputs.input_ids.clone()
                labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

                # Forward pass
                outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)

                # Compute loss
                loss = outputs.loss
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)

        print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

# Train the model
train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device=device)

# Evaluation function
def compute_reconstruction_loss(model, tokenizer, code, device='cpu'):
    model.eval()
    with torch.no_grad():
        # Tokenize input and labels (autoencoder setup)
        inputs = tokenizer([code], return_tensors="pt", padding=True, truncation=True).to(device)
        labels = inputs.input_ids.clone()
        labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

        # Forward pass
        outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)
        loss = outputs.loss.item()
    return loss

def main():
    # Load the trained model and tokenizer
    output_dir = "./function_reconstruction_model"
    # Uncomment these lines if you have saved the model previously
    # tokenizer = AutoTokenizer.from_pretrained(output_dir)
    # model = AutoModelForSeq2SeqLM.from_pretrained(output_dir)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    # Load the vulnerable functions from 'vul.pickle'
    vul_file_path = 'vul.pickle'  # Ensure this is the correct path to your file
    vul_functions = load_data_from_pickle(vul_file_path)

    # Load the clean functions from 'unvul.pickle'
    unvul_file_path = 'unvul.pickle'
    unvul_functions = load_data_from_pickle(unvul_file_path)

    print(f"Total vulnerable functions loaded: {len(vul_functions)}")
    print(f"Total clean functions loaded: {len(unvul_functions)}")

    # Compute reconstruction loss for vulnerable functions
    vul_losses = []
    for func_code in vul_functions:
        if not isinstance(func_code, str):
            continue
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        vul_losses.append(loss)

    # Compute reconstruction loss for clean functions
    unvul_losses = []
    for func_code in unvul_functions:
        if not isinstance(func_code, str):
            continue
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        unvul_losses.append(loss)

    # Print average losses
    avg_vul_loss = sum(vul_losses) / len(vul_losses) if vul_losses else 0
    avg_unvul_loss = sum(unvul_losses) / len(unvul_losses) if unvul_losses else 0

    print(f"Average reconstruction loss for vulnerable functions: {avg_vul_loss:.4f}")
    print(f"Average reconstruction loss for clean functions: {avg_unvul_loss:.4f}")

    # Determine loss threshold
    loss_threshold = (avg_vul_loss + avg_unvul_loss) / 2

    # Identify functions with high reconstruction loss
    identified_vul_functions = [
        (idx + 1, func_code, loss) for idx, (func_code, loss) in enumerate(zip(vul_functions, vul_losses)) if loss > loss_threshold
    ]
    identified_unvul_functions = [
        (idx + 1, func_code, loss) for idx, (func_code, loss) in enumerate(zip(unvul_functions, unvul_losses)) if loss > loss_threshold
    ]

    print(f"Identified {len(identified_vul_functions)} out of {len(vul_functions)} vulnerable functions as vulnerable.")
    print(f"Misidentified {len(identified_unvul_functions)} out of {len(unvul_functions)} clean functions as vulnerable.")

    # Optionally, print the identified vulnerable functions
    for func_num, func_code, loss in identified_vul_functions:
        print(f"Function Number: {func_num}")
        print(f"Reconstruction Loss: {loss:.4f}")
        print("Function Code:")
        print(func_code)
        print("Vulnerability Type: Unknown vulnerability type")
        print("=" * 50)

if __name__ == "__main__":
    main()

# Save the trained model
output_dir = "./function_reconstruction_model"
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"Model saved to {output_dir}")


Loaded 10000 clean smart contract functions.
Example clean function:
 function repayBorrowBehalf(address borrower) external payable { (uint err,) = repayBorrowBehalfInternal(borrower, msg.value); \n requireNoError(err, "repayBorrowBehalf failed"); \n } \n




Epoch 1/3, Training Loss: 0.0282, Validation Loss: 0.0014
Epoch 2/3, Training Loss: 0.0124, Validation Loss: 0.0008
Epoch 3/3, Training Loss: 0.0047, Validation Loss: 0.0007
Total vulnerable functions loaded: 10000
Total clean functions loaded: 10000


KeyboardInterrupt: 

In [51]:
import random
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AdamW
from sklearn.model_selection import train_test_split

# Updated SolidityDataset class
class SolidityDataset(Dataset):
    def __init__(self, codes, labels):
        """
        Args:
            codes (list of str): List of Solidity functions (clean and vulnerable).
            labels (list of int): List of labels (1 for clean, -1 for vulnerable).
        """
        self.codes = codes
        self.labels = labels

    def __len__(self):
        return len(self.codes)

    def __getitem__(self, idx):
        code = self.codes[idx]
        label = self.labels[idx]
        return code, label  # Return code and its label

# Load data from pickle files
def load_data_from_pickle(file_path, data_keys=['data_key', 'data_query']):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
        # Check if the data is a list of dicts
        if isinstance(data, list) and isinstance(data[0], dict):
            codes = []
            for item in data:
                for key in data_keys:
                    if key in item:
                        codes.append(item[key])
        else:
            codes = data  # Assume it's a list of strings
    return codes

# Load clean and vulnerable datasets
clean_file_path = 'unvul.pickle'
vul_file_path = 'vul.pickle'

clean_contracts = load_data_from_pickle(clean_file_path)
vul_contracts = load_data_from_pickle(vul_file_path)

# Verify the datasets
print(f"Loaded {len(clean_contracts)} clean smart contract functions.")
print(f"Loaded {len(vul_contracts)} vulnerable smart contract functions.")
print("Example clean function:\n", clean_contracts[0])
print("Example vulnerable function:\n", vul_contracts[0])

# Combine datasets and create labels
codes = clean_contracts + vul_contracts
labels = [1] * len(clean_contracts) + [-1] * len(vul_contracts)

# Split the dataset into training and validation sets
train_codes, val_codes, train_labels, val_labels = train_test_split(
    codes, labels, test_size=0.1, random_state=42
)

# Create Datasets and DataLoaders
train_dataset = SolidityDataset(train_codes, train_labels)
val_dataset = SolidityDataset(val_codes, val_labels)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

# Initialize Model and Optimizer
tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-small")
model = AutoModelForSeq2SeqLM.from_pretrained("Salesforce/codet5-small")
optimizer = AdamW(model.parameters(), lr=5e-5)

# Define the device
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Training the Model with Destructive Training
def train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device='cpu'):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for code_batch, label_batch in train_loader:
            optimizer.zero_grad()

            # Tokenize inputs and labels
            inputs = tokenizer(code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
            outputs = model(**inputs, labels=inputs.input_ids)
            reconstruction_loss = outputs.loss

            # Convert labels to tensor
            labels_tensor = torch.tensor(label_batch, dtype=torch.float, device=device)

            # Compute the custom loss
            # For clean contracts (label=1), minimize the reconstruction loss
            # For vulnerable contracts (label=-1), maximize the reconstruction loss
            loss = torch.mean(labels_tensor * reconstruction_loss)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for code_batch, label_batch in val_loader:
                # Tokenize inputs and labels
                inputs = tokenizer(code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
                outputs = model(**inputs, labels=inputs.input_ids)
                reconstruction_loss = outputs.loss

                # Convert labels to tensor
                labels_tensor = torch.tensor(label_batch, dtype=torch.float, device=device)

                # Compute the custom loss
                loss = torch.mean(labels_tensor * reconstruction_loss)
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)

        print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

# Train the model
train_model(model, train_loader, val_loader, optimizer, tokenizer, num_epochs=3, device=device)

# Evaluation function
def compute_reconstruction_loss(model, tokenizer, code, device='cpu'):
    model.eval()
    with torch.no_grad():
        # Tokenize input and labels
        inputs = tokenizer([code], return_tensors="pt", padding=True, truncation=True).to(device)
        outputs = model(**inputs, labels=inputs.input_ids)
        loss = outputs.loss.item()
    return loss

def main():
    # Load the trained model and tokenizer
    output_dir = "./function_reconstruction_model"
    # Uncomment these lines if you have saved the model previously
    # tokenizer = AutoTokenizer.from_pretrained(output_dir)
    # model = AutoModelForSeq2SeqLM.from_pretrained(output_dir)
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    model.to(device)

    # Load the vulnerable and clean functions
    vul_functions = vul_contracts
    unvul_functions = clean_contracts

    # Compute reconstruction loss for vulnerable functions
    vul_losses = []
    for func_code in vul_functions:
        if not isinstance(func_code, str):
            continue
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        vul_losses.append(loss)

    # Compute reconstruction loss for clean functions
    unvul_losses = []
    for func_code in unvul_functions:
        if not isinstance(func_code, str):
            continue
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        unvul_losses.append(loss)

    # Print average losses
    avg_vul_loss = sum(vul_losses) / len(vul_losses) if vul_losses else 0
    avg_unvul_loss = sum(unvul_losses) / len(unvul_losses) if unvul_losses else 0

    print(f"Average reconstruction loss for vulnerable functions: {avg_vul_loss:.4f}")
    print(f"Average reconstruction loss for clean functions: {avg_unvul_loss:.4f}")

    # Determine loss threshold
    loss_threshold = (avg_vul_loss + avg_unvul_loss) / 2
    print(f"the loss threshold is:{loss_threshold:.4f}")

    # Identify functions with high reconstruction loss
    identified_vul_functions = [
        (idx + 1, func_code, loss) for idx, (func_code, loss) in enumerate(zip(vul_functions, vul_losses)) if loss > loss_threshold
    ]
    identified_unvul_functions = [
        (idx + 1, func_code, loss) for idx, (func_code, loss) in enumerate(zip(unvul_functions, unvul_losses)) if loss > loss_threshold
    ]

    print(f"Identified {len(identified_vul_functions)} out of {len(vul_functions)} vulnerable functions as vulnerable.")
    print(f"Misidentified {len(identified_unvul_functions)} out of {len(unvul_functions)} clean functions as vulnerable.")

    # # Optionally, print the identified vulnerable functions
    # for func_num, func_code, loss in identified_vul_functions:
    #     print(f"Function Number: {func_num}")
    #     print(f"Reconstruction Loss: {loss:.4f}")
    #     print("Function Code:")
    #     print(func_code)
    #     print("Vulnerability Type: Unknown vulnerability type")
    #     print("=" * 50)

if __name__ == "__main__":
    main()

# Save the trained model
output_dir = "./function_reconstruction_model"
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"Model saved to {output_dir}")


Loaded 10000 clean smart contract functions.
Loaded 10000 vulnerable smart contract functions.
Example clean function:
 function repayBorrowBehalf(address borrower) external payable { (uint err,) = repayBorrowBehalfInternal(borrower, msg.value); \n requireNoError(err, "repayBorrowBehalf failed"); \n } \n
Example vulnerable function:
 function isContract(\n address _addr\n )\n internal\n view\n returns (bool)\n {\n uint256 size;\n assembly { size := extcodesize(_addr) }\n return size > 0;\n }\n


  labels_tensor = torch.tensor(label_batch, dtype=torch.float, device=device)
  labels_tensor = torch.tensor(label_batch, dtype=torch.float, device=device)


Epoch 1/3, Training Loss: -3.0081, Validation Loss: -2.7253
Epoch 2/3, Training Loss: -3.9321, Validation Loss: -2.8482
Epoch 3/3, Training Loss: -3.4884, Validation Loss: -2.9665
Average reconstruction loss for vulnerable functions: 61.7260
Average reconstruction loss for clean functions: 2.4859
the loss threshold is:32.1060
Identified 9999 out of 10000 vulnerable functions as vulnerable.
Misidentified 5 out of 10000 clean functions as vulnerable.
Model saved to ./function_reconstruction_model


In [68]:
import os
import glob
import re
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

def split_contract_into_functions(contract_code):
    """
    Splits a Solidity smart contract into individual functions.

    Args:
        contract_code (str): The Solidity contract code as a string.

    Returns:
        List[Tuple[int, str]]: A list of tuples containing function numbers and their code.
    """
    functions = []
    current_function = ''
    open_braces_count = 0
    func_num = 1

    for line in contract_code.splitlines():
        line_stripped = line.strip()

        if line_stripped.startswith('function '):
            current_function = line
            open_braces_count = line.count('{') - line.count('}')
        elif current_function:
            current_function += '\n' + line
            open_braces_count += line.count('{') - line.count('}')

        if current_function and open_braces_count == 0:
            functions.append((func_num, current_function.strip()))
            current_function = ''
            func_num += 1

    return functions


def compute_reconstruction_loss(model, tokenizer, function_code, device='cpu'):
    model.eval()
    with torch.no_grad():
        # Tokenize input
        inputs = tokenizer(function_code, return_tensors="pt", padding=True, truncation=True).to(device)
        labels = inputs.input_ids.clone()
        labels[labels == tokenizer.pad_token_id] = -100  # Ignore padding tokens in loss

        # Forward pass
        outputs = model(input_ids=inputs.input_ids, attention_mask=inputs.attention_mask, labels=labels)
        loss = outputs.loss.item()
    return loss


def identify_top_vulnerable_function(functions, model, tokenizer, device='cpu'):
    top_vulnerable_function = None
    max_loss = -float('inf')

    for func_num, func_code in functions:
        loss = compute_reconstruction_loss(model, tokenizer, func_code, device)
        if loss > max_loss:
            max_loss = loss
            top_vulnerable_function = (func_num, func_code, loss)

    return top_vulnerable_function


def print_top_vulnerable_function(top_vulnerable_function):
    if top_vulnerable_function:
        func_num, func_code, loss = top_vulnerable_function
        print(f"Function Number: {func_num}")
        print(f"Reconstruction Loss: {loss:.4f}")
        print("Function Code:")
        print(func_code)
        print("Vulnerability Type: Unknown vulnerability type")
        print("=" * 50)
    else:
        print("No vulnerable functions detected.")


# Load the trained model and tokenizer
output_dir = "./function_reconstruction_model"
tokenizer = AutoTokenizer.from_pretrained(output_dir)
model = AutoModelForSeq2SeqLM.from_pretrained(output_dir)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)

# Path to the directory containing Solidity contracts
contracts_dir = 'function_reconstruction_model/reentrant_contracts/'

# Get a list of all Solidity files in the directory
contract_files = glob.glob(os.path.join(contracts_dir, '*.sol'))
for contract_file in contract_files:
    print(f"\nProcessing contract: {contract_file}")
    try:
        with open(contract_file, 'r', encoding='utf-8') as f:
            contract_code = f.read()
    except UnicodeDecodeError:
        # Retry with a different encoding if UTF-8 fails
        with open(contract_file, 'r', encoding='latin-1') as f:
            contract_code = f.read()
    except Exception as e:
        print(f"Error reading {contract_file}: {e}")
        continue  # Skip this file and proceed to the next one

    # Split the contract into functions
    functions = split_contract_into_functions(contract_code)
    print(f"Total functions found: {len(functions)}")

    # Identify the top-1 vulnerable function
    top_vulnerable_function = identify_top_vulnerable_function(functions, model, tokenizer, device)

    # Print the top-1 vulnerable function with the highest reconstruction loss
    print("Top-1 vulnerable function:")
    print_top_vulnerable_function(top_vulnerable_function)



Processing contract: function_reconstruction_model/reentrant_contracts/0x4c67b3db1d4474c0ebb2db8bec4e345526d9e2fd.sol
Total functions found: 1
Top-1 vulnerable function:
Function Number: 1
Reconstruction Loss: 3.6723
Function Code:
function sendCall(
Vulnerability Type: Unknown vulnerability type

Processing contract: function_reconstruction_model/reentrant_contracts/0x33813c2f2aab62ac88c234858a1f08448424828f.sol
Total functions found: 36
Top-1 vulnerable function:
Function Number: 31
Reconstruction Loss: 7.9236
Function Code:
function _open() internal {
        newRound();
    }
Vulnerability Type: Unknown vulnerability type

Processing contract: function_reconstruction_model/reentrant_contracts/0x0459ebad0ba09901fda1441ee72e6cb664257f61.sol
Total functions found: 4
Top-1 vulnerable function:
Function Number: 3
Reconstruction Loss: 7.0905
Function Code:
function ALFA_bank(address log) public{
        LogFile = Log(log);
    }
Vulnerability Type: Unknown vulnerability type

Processing

In [62]:
%cd ..

/content/drive/MyDrive


In [None]:
import random
import pickle
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AdamW
from sklearn.model_selection import train_test_split

class SolidityDataset(Dataset):
    def __init__(self, codes, noise_level=0.1):
        self.codes = codes
        self.noise_level = noise_level

    def __len__(self):
        return len(self.codes)

    def __getitem__(self, idx):
        clean_code = self.codes[idx]
        noisy_code = add_function_noise(clean_code, self.noise_level)
        return clean_code, noisy_code

def load_data_from_pickle(file_path):
    with open(file_path, 'rb') as f:
        codes = pickle.load(f)
    return codes

file_path = 'unvul.pickle'
clean_contracts = load_data_from_pickle(file_path)

print(f"Loaded {len(clean_contracts)} clean smart contract functions.")
print("Example function:\n", clean_contracts[0])
train_contracts, val_contracts = train_test_split(clean_contracts, test_size=0.1, random_state=42)

train_dataset = SolidityDataset(train_contracts, noise_level=0.2)
val_dataset = SolidityDataset(val_contracts, noise_level=0.2)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)

tokenizer = AutoTokenizer.from_pretrained("Salesforce/codet5-small")
model = AutoModelForSeq2SeqLM.from_pretrained("Salesforce/codet5-small")
optimizer = AdamW(model.parameters(), lr=5e-5)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Define a set of noise codes
noise_codes = [
    "function placeholder() public {}",
    "function unknown() external view {}",
    "function dummy() internal {}",
    "function empty() private {}"
]

# Tokenize noise codes
noise_code_tokenized_list = [tokenizer(noise_code, return_tensors="pt", padding=True, truncation=True).to(device) for noise_code in noise_codes]

def custom_loss(outputs_clean, labels_clean, outputs_noisy, labels_noisy, lambda_coeff=1.0):
    """
    Compute custom loss for clean and noisy inputs.

    Args:
        outputs_clean: Model outputs for clean inputs.
        labels_clean: Tokenized clean code inputs.
        outputs_noisy: Model outputs for noisy inputs.
        labels_noisy: Tokenized noise code inputs.
        lambda_coeff: Coefficient to balance the loss terms.

    Returns:
        total_loss: Combined loss for optimization.
    """
    loss_fct = torch.nn.CrossEntropyLoss(ignore_index=-100)

    # Term 1: Reconstruction loss for clean code
    clean_loss = loss_fct(outputs_clean.logits.view(-1, outputs_clean.logits.size(-1)), labels_clean.view(-1))

    # Term 2: Loss between model's output for noisy input and the noise code
    noisy_loss = loss_fct(outputs_noisy.logits.view(-1, outputs_noisy.logits.size(-1)), labels_noisy.view(-1))

    # Total Loss
    total_loss = clean_loss + lambda_coeff * noisy_loss
    return total_loss

def train_model(model, train_loader, val_loader, optimizer, tokenizer, noise_code_tokenized_list, num_epochs=3, lambda_coeff=1.0, device='cpu'):
    model.to(device)
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0
        for clean_code_batch, noisy_code_batch in train_loader:
            optimizer.zero_grad()

            # Tokenize clean inputs and labels
            inputs_clean = tokenizer(clean_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
            labels_clean = inputs_clean.input_ids.clone()
            labels_clean[labels_clean == tokenizer.pad_token_id] = -100  # Ignore padding tokens

            # Tokenize noisy inputs
            inputs_noisy = tokenizer(noisy_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)

            # Create labels_noisy by randomly selecting a noise code for each sample
            batch_size = inputs_noisy.input_ids.size(0)
            noise_labels = []
            for _ in range(batch_size):
                noise_code_tokenized = random.choice(noise_code_tokenized_list)
                noise_labels.append(noise_code_tokenized.input_ids.squeeze(0))
            # Pad noise labels to the same length
            noise_labels_padded = torch.nn.utils.rnn.pad_sequence(noise_labels, batch_first=True, padding_value=tokenizer.pad_token_id).to(device)
            labels_noisy = noise_labels_padded.clone()
            labels_noisy[labels_noisy == tokenizer.pad_token_id] = -100  # Ignore padding tokens

            # Forward pass for clean inputs
            outputs_clean = model(input_ids=inputs_clean.input_ids, attention_mask=inputs_clean.attention_mask, labels=labels_clean)

            # Forward pass for noisy inputs
            outputs_noisy = model(input_ids=inputs_noisy.input_ids, attention_mask=inputs_noisy.attention_mask, labels=labels_noisy)

            # Compute custom loss
            loss = custom_loss(outputs_clean, labels_clean, outputs_noisy, labels_noisy, lambda_coeff)

            # Backward pass and optimization
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)

        # Validation phase
        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for clean_code_batch, noisy_code_batch in val_loader:
                # Tokenize clean inputs and labels
                inputs_clean = tokenizer(clean_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)
                labels_clean = inputs_clean.input_ids.clone()
                labels_clean[labels_clean == tokenizer.pad_token_id] = -100

                # Tokenize noisy inputs
                inputs_noisy = tokenizer(noisy_code_batch, return_tensors="pt", padding=True, truncation=True).to(device)

                # Create labels_noisy by randomly selecting a noise code for each sample
                batch_size = inputs_noisy.input_ids.size(0)
                noise_labels = []
                for _ in range(batch_size):
                    noise_code_tokenized = random.choice(noise_code_tokenized_list)
                    noise_labels.append(noise_code_tokenized.input_ids.squeeze(0))
                # Pad noise labels to the same length
                noise_labels_padded = torch.nn.utils.rnn.pad_sequence(noise_labels, batch_first=True, padding_value=tokenizer.pad_token_id).to(device)
                labels_noisy = noise_labels_padded.clone()
                labels_noisy[labels_noisy == tokenizer.pad_token_id] = -100  # Ignore padding tokens

                # Forward pass for clean inputs
                outputs_clean = model(input_ids=inputs_clean.input_ids, attention_mask=inputs_clean.attention_mask, labels=labels_clean)

                # Forward pass for noisy inputs
                outputs_noisy = model(input_ids=inputs_noisy.input_ids, attention_mask=inputs_noisy.attention_mask, labels=labels_noisy)

                # Compute custom loss
                loss = custom_loss(outputs_clean, labels_clean, outputs_noisy, labels_noisy, lambda_coeff)
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)

        print(f"Epoch {epoch + 1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Validation Loss: {avg_val_loss:.4f}")

train_model(model, train_loader, val_loader, optimizer, tokenizer, noise_code_tokenized_list, num_epochs=3, lambda_coeff=1.0, device=device)

def evaluate(model, tokenizer, code_list, device='cpu'):
    model.eval()
    inputs = tokenizer(code_list, return_tensors="pt", padding=True, truncation=True).to(device)
    with torch.no_grad():
        outputs = model.generate(inputs['input_ids'], max_length=512)
    reconstructed_codes = [tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
    return reconstructed_codes

# Evaluate a batch of clean and noisy samples
clean_samples = val_contracts[:5]
noisy_samples = [add_function_noise(code, noise_level=0.2) for code in clean_samples]
reconstructed_clean = evaluate(model, tokenizer, clean_samples, device=device)
reconstructed_noisy = evaluate(model, tokenizer, noisy_samples, device=device)

for idx in range(5):
    print(f"Clean Input {idx+1}:\n{clean_samples[idx]}\n")
    print(f"Reconstructed Clean Output {idx+1}:\n{reconstructed_clean[idx]}\n")
    print(f"Noisy Input {idx+1}:\n{noisy_samples[idx]}\n")
    print(f"Reconstructed Noisy Output {idx+1}:\n{reconstructed_noisy[idx]}\n")
    print("=" * 50)

output_dir = "./function_reconstruction_model"
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"Model saved to {output_dir}")
