# Base Model

A feedforward multilayer ANN for classification task of CVEs - in their vectorization format - into their corresponding CWE.

To do:
- Import the data
- Train the model!

### Import the necessary libraries

Install libraries

In [None]:
# !pip install Pinecone
# !pip install torch

Import libraries

In [11]:
import numpy as np
import matplotlib.pyplot as plt

from pinecone import Pinecone

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, random_split

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import LabelEncoder

### Import data for supervised learning

Import data in the database, with the CVE in vector format and it's corresponding CWE number

In [21]:
# Database with the CVEs in their vector format
pc = Pinecone(api_key="pcsk_3fsLLS_KMLjRJ9jybj773qMy13aawRBFZveGrn2LrZhVP3GtMWet7PdZgNubyB8xRM6JQ7")
index = pc.Index("mc959")

In [35]:
# Incomplete
def fetch_data_from_pinecone(index, num_samples):
    X_data = []
    y_data = []
    namespace = "ns1"
    # Loop until we have collected the desired number of samples
    while len(X_data) < num_samples:
        # Generate a random query vector (or use a specific vector if needed)
        query_vector = np.random.rand(input_size).tolist()
        response = index.query(
            namespace=namespace,
            vector=query_vector,
            top_k=5,  # Adjust as needed to fetch more matches per query
            include_values=True,
            include_metadata=True,
            # Add any filters if necessary
            # filter={"genre": {"$eq": "action"}}
        )
        if response and 'matches' in response:
            for match in response['matches']:
                vector = match.get('values', [])
                metadata = match.get('metadata', {})
                if vector and metadata:
                    X_data.append(vector)
                    y_data.append(metadata.get('label', 0))  # Assuming label is in 'label'
                    # Stop if we've collected enough samples
                    if len(X_data) >= num_samples:
                        break
    return np.array(X_data), np.array(y_data)

In [22]:
query_vector = np.random.rand(768).tolist()

# Query the index for the top 5 most similar vectors
response = index.query(
    vector=query_vector,   # Your query vector with 768 dimensions
    top_k=5,               # Retrieve top 5 closest vectors
    include_values=True,   # Include the vector values in the response
    include_metadata=True, # Include any metadata associated with the vectors
)

In [2]:
# Vector size encoder CVE
V_CVE_size = 768

# Number of possible CWEs to return
N_CWE = 1365

### ANN Definition

Hyperparameters

In [8]:
input_size = V_CVE_size          # Input vector size (V_CVE_size)
num_classes = N_CWE              # Number of output classes (N_CWE)
hidden_sizes = [256, 128, 64]    # Sizes of hidden layers
activation_function = nn.ReLU    # Activation function to be used
batch_size = 32                  # Batch size
learning_rate = 1e-3             # Learning rate
num_epochs = 100                  # Number of training epochs
dropout_prob = 0.3              # Dropout probability for regularization (not used!)

Neural network model

In [9]:
# Build the neural network model dynamically
layers = []

# Input layer
layers.append(nn.Linear(input_size, hidden_sizes[0]))
layers.append(activation_function())

# Hidden layers
for i in range(len(hidden_sizes) - 1):
    layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
    layers.append(activation_function())
    layers.append(nn.Dropout(dropout_prob))

# Output layer
layers.append(nn.Linear(hidden_sizes[-1], num_classes))
layers.append(nn.Softmax(dim=1))

# Create the sequential model
model = nn.Sequential(*layers)

In [10]:
print("Neural Network Model:")
print(model)

Neural Network Model:
Sequential(
  (0): Linear(in_features=768, out_features=256, bias=True)
  (1): ReLU()
  (2): Linear(in_features=256, out_features=128, bias=True)
  (3): ReLU()
  (4): Dropout(p=0.3, inplace=False)
  (5): Linear(in_features=128, out_features=64, bias=True)
  (6): ReLU()
  (7): Dropout(p=0.3, inplace=False)
  (8): Linear(in_features=64, out_features=1365, bias=True)
  (9): Softmax(dim=1)
)


Loss function

In [31]:
# Loss function definiton

loss_fn = nn.CrossEntropyLoss()

Optimization method

In [33]:
# Defining the optimization method
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

### ANN training

Prepare data for training

In [None]:
num_samples = 10

# Get the data
X_array, y_array = fetch_data_from_pinecone(index, num_samples)

# Ensure no missing values
if np.isnan(X_array).any():
    print("Data contains NaN values. Removing rows with NaN values...")
    # Create a mask for rows without NaNs
    mask = ~np.isnan(X_array).any(axis=1)
    
    # Filter X_array and y_array using the mask
    X_array = X_array[mask]
    y_array = y_array[mask]

# Convert data to PyTorch tensors
X_tensor = torch.tensor(X_array, dtype=torch.float32)
y_tensor = torch.tensor(y_array, dtype=torch.long)  # CrossEntropyLoss expects labels of type Long

# Define split proportions
train_size = int(0.7 * len(full_dataset))
test_size = int(0.3 * len(full_dataset))

# Split the dataset
train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Store data for training and the accuracy of the model

In [None]:
# Lists to store loss and accuracy for plotting
train_losses = []
train_accuracies = []
val_accuracies = []

Choose the GPU, if avaliable

In [14]:
# Check if CUDA is available and use the GPU if possible
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move the model to the GPU
model = model.to(device)

Training the model

In [None]:
for epoch in range(num_epochs):
    # Training phase
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    for batch_X, batch_y in train_loader:
        # # Move the batch to the GPU
        # batch_X, batch_y = batch_X.to(device), batch_y.to(device)

        # Forward pass
        outputs = model(batch_X)
        loss = loss_fn(outputs, batch_y)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()

    avg_train_loss = total_loss / len(train_loader)
    train_accuracy = 100 * correct / total
    train_losses.append(avg_train_loss)
    train_accuracies.append(train_accuracy)

    # # Validation phase (removed because of time issues)
    # model.eval()
    # val_loss = 0
    # val_correct = 0
    # val_total = 0
    # with torch.no_grad():
    #     for val_X, val_y in val_loader:
    #         outputs = model(val_X)
    #         loss = loss_fn(outputs, val_y)
    #         val_loss += loss.item()
    #         _, predicted = torch.max(outputs.data, 1)
    #         val_total += val_y.size(0)
    #         val_correct += (predicted == val_y).sum().item()

    avg_val_loss = val_loss / len(val_loader)
    val_accuracy = 100 * val_correct / val_total
    val_losses.append(avg_val_loss)
    val_accuracies.append(val_accuracy)

    # Save the model after each epoch
    torch.save(model.state_dict(), "base_model.pth")

    print(f'Epoch [{epoch+1}/{num_epochs}], '
          f'Train Loss: {avg_train_loss:.4f}, Train Acc: {train_accuracy:.2f}%, '
          f'Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%')

### ANN analysis

Data from training process

In [None]:
# Visualization of Loss and Accuracy

# Plot Loss over epochs
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

# Plot Accuracy over epochs
plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.legend()

plt.show()

Model final evaluation

In [None]:
# Evaluate on Test Data and Compute Performance Metrics

model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for test_X, test_y in test_loader:
        outputs = model(test_X)
        _, predicted = torch.max(outputs.data, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(test_y.cpu().numpy())

# Compute metrics
test_accuracy = accuracy_score(all_labels, all_preds) * 100
precision = precision_score(all_labels, all_preds, average='weighted', zero_division=0)
recall = recall_score(all_labels, all_preds, average='weighted', zero_division=0)
f1 = f1_score(all_labels, all_preds, average='weighted', zero_division=0)

print(f'Test Accuracy: {test_accuracy:.2f}%')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1 Score: {f1:.4f}')

Save the base model

In [34]:
# Save the trained model
torch.save(model.state_dict(), "base_model.pth")
print("Model saved as 'base_model.pth'")

Model saved as 'base_model.pth'
