In [3]:
import os
import angr
import torch
from sentence_transformers import SentenceTransformer
from torch_geometric.data import Data
from sklearn.model_selection import train_test_split  # Added import for data splitting

binary_dir = './binaries'

data_list = []

# Use a code-aware embedding model if available
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')  # Can replace with a code-specific model

for binary_file in os.listdir(binary_dir):
    if binary_file.endswith('.exe'):
        angr_project = angr.Project(os.path.join(binary_dir, binary_file), auto_load_libs=False)
        cfg = angr_project.analyses.CFGFast()  # Changed to CFGFast for faster analysis and to use FCG

        functions = list(angr_project.kb.functions.values())
        function_addr_to_index = {function.addr: idx for idx, function in enumerate(functions)}  # Simplified mapping

        nodes = []
        for function in functions:
            # Extract function features
            # For example, get instruction mnemonics
            instructions = []
            for block in function.blocks:
                capstone_block = block.capstone
                for insn in capstone_block.insns:
                    instructions.append(insn.mnemonic)
            instruction_sequence = ' '.join(instructions)

            # Use instruction sequence or function name for embedding
            embedding = model.encode(instruction_sequence)
            nodes.append(embedding)

        # Build edges based on function calls using the call graph
        edge_index = []
        callgraph = angr_project.kb.callgraph  # Use the call graph from the knowledge base
        for src_addr, dst_addr in callgraph.edges():
            src_idx = function_addr_to_index.get(src_addr)
            dst_idx = function_addr_to_index.get(dst_addr)
            if src_idx is not None and dst_idx is not None:
                edge_index.append([src_idx, dst_idx])

        node_embeddings = torch.tensor(nodes, dtype=torch.float)
        if len(edge_index) == 0:
            edge_index = torch.empty((2, 0), dtype=torch.long)
        else:
            edge_index = torch.tensor(edge_index, dtype=torch.long).t().contiguous()

        label = 1 if "malware" in binary_file.lower() else 0  # Changed labels to 0 and 1
        data = Data(x=node_embeddings, edge_index=edge_index, y=torch.tensor([label], dtype=torch.long))
        data_list.append(data)

# Split data into train and test sets
train_data, test_data = train_test_split(
    data_list, test_size=0.2, random_state=42,
    stratify=[d.y.item() for d in data_list]
)  # Added data splitting

ERROR    | 2024-09-23 13:52:26,776 | angr.analyses.propagator.engine_vex.SimEnginePropagatorVEX | Unsupported statement type CAS.
ERROR    | 2024-09-23 13:52:32,472 | angr.analyses.propagator.engine_vex.SimEnginePropagatorVEX | Unsupported statement type CAS.


In [6]:
train_data

[Data(x=[3155, 384], edge_index=[2, 4358], y=[1])]

In [7]:
test_data

[Data(x=[455, 384], edge_index=[2, 800], y=[1])]

In [4]:
import torch
import torch.nn.functional as F
from torch_geometric.nn import GINConv, global_add_pool

class GIN(torch.nn.Module):
    def __init__(self):
        super(GIN, self).__init__()
        self.conv1 = GINConv(
            torch.nn.Sequential(
                torch.nn.Linear(384, 128),  # Changed input dimension to match embedding size and output to 128
                torch.nn.ReLU(),
                torch.nn.Linear(128, 128)
            )
        )
        self.conv2 = GINConv(
            torch.nn.Sequential(
                torch.nn.Linear(128, 128),
                torch.nn.ReLU(),
                torch.nn.Linear(128, 128)
            )
        )
        self.conv3 = GINConv(  # Added an additional convolutional layer
            torch.nn.Sequential(
                torch.nn.Linear(128, 128),
                torch.nn.ReLU(),
                torch.nn.Linear(128, 128)
            )
        )
        self.fc1 = torch.nn.Linear(128, 64)  # Adjusted dimensions
        self.fc2 = torch.nn.Linear(64, 2)  # Output remains 2 for binary classification

    def forward(self, data):
        x, edge_index, batch = data.x, data.edge_index, data.batch  # Added batch extraction
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = self.conv2(x, edge_index)
        x = F.relu(x)
        x = self.conv3(x, edge_index)  # Added an additional layer
        x = F.relu(x)

        x = global_add_pool(x, batch)
        
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.log_softmax(x, dim=1)

model = GIN()


In [5]:
from torch_geometric.loader import DataLoader
from sklearn.metrics import classification_report  # Added import for evaluation

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)  # Changed to use train_data
test_loader = DataLoader(test_data, batch_size=32, shuffle=False)  # Added test_loader

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
loss_fn = torch.nn.CrossEntropyLoss()

epochs = 10

for epoch in range(epochs):
    model.train()  # Set model to training mode
    for batch in train_loader:
        optimizer.zero_grad()
        output = model(batch)
        loss = loss_fn(output, batch.y)
        loss.backward()
        optimizer.step()
    # Evaluate on test set after each epoch
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in test_loader:
            output = model(batch)
            preds = output.argmax(dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch.y.cpu().numpy())
    print(f"Epoch {epoch+1}/{epochs}")
    print(classification_report(all_labels, all_preds))  # Added evaluation


Epoch 1/10
              precision    recall  f1-score   support

           1       1.00      1.00      1.00         1

    accuracy                           1.00         1
   macro avg       1.00      1.00      1.00         1
weighted avg       1.00      1.00      1.00         1

Epoch 2/10
              precision    recall  f1-score   support

           1       1.00      1.00      1.00         1

    accuracy                           1.00         1
   macro avg       1.00      1.00      1.00         1
weighted avg       1.00      1.00      1.00         1

Epoch 3/10
              precision    recall  f1-score   support

           1       1.00      1.00      1.00         1

    accuracy                           1.00         1
   macro avg       1.00      1.00      1.00         1
weighted avg       1.00      1.00      1.00         1

Epoch 4/10
              precision    recall  f1-score   support

           1       1.00      1.00      1.00         1

    accuracy               