<a href="https://colab.research.google.com/github/KAVYANSHTYAGI/Ransomware-Analysis-using-Machine-Learning-and-Deep-Learning/blob/main/dynamic_json_embeddings_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Load datasets from pickle files
with open("/content/drive/MyDrive/ransomware_analysis_files/dynamic analysis/embeddings file/train_dataset.pkl", "rb") as f:
    train_dataset = pickle.load(f)
with open("/content/drive/MyDrive/ransomware_analysis_files/dynamic analysis/embeddings file/test_dataset.pkl", "rb") as f:
    test_dataset = pickle.load(f)

NameError: name 'pickle' is not defined

In [None]:
!pip uninstall transformers -y
!pip install --upgrade transformers


Found existing installation: transformers 4.48.3
Uninstalling transformers-4.48.3:
  Successfully uninstalled transformers-4.48.3
Collecting transformers
  Downloading transformers-4.49.0-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Downloading transformers-4.49.0-py3-none-any.whl (10.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.0/10.0 MB[0m [31m69.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: transformers
Successfully installed transformers-4.49.0


In [None]:
import pandas as pd
import json
import os
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, GPT2Model
import torch.optim as optim
import numpy as np
import joblib
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score
import pickle

# ✅ Define JSONDataset before unpickling
class JSONDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=1024):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "labels": torch.tensor(self.labels[idx], dtype=torch.long)
        }

# ✅ Load Pretrained GPT-2 Tokenizer
tokenizer = AutoTokenizer.from_pretrained("gpt2", use_fast=False)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids(tokenizer.pad_token)
tokenizer.padding_side = "right"

# ✅ Define GPT-2 Feature Extractor
class GPT2FeatureExtractor(torch.nn.Module):
    def __init__(self, model_name="gpt2"):
        super(GPT2FeatureExtractor, self).__init__()
        self.gpt2 = GPT2Model.from_pretrained(model_name)

    def forward(self, input_ids, attention_mask=None):
        outputs = self.gpt2(input_ids=input_ids, attention_mask=attention_mask)
        last_hidden_state = outputs.last_hidden_state[:, -1, :]
        return last_hidden_state

# ✅ Instantiate GPT-2 Feature Extractor
model = GPT2FeatureExtractor()
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

# ✅ Load datasets from pickle files
with open("/content/drive/MyDrive/ransomware_analysis_files/dynamic analysis/embeddings file/train_dataset.pkl", "rb") as f:
    train_dataset = pickle.load(f)
with open("/content/drive/MyDrive/ransomware_analysis_files/dynamic analysis/embeddings file/test_dataset.pkl", "rb") as f:
    test_dataset = pickle.load(f)

print("✅ Datasets loaded successfully. Extracting embeddings...")

# ✅ Function to Extract Embeddings
def extract_gpt2_embeddings(dataset, model, tokenizer):
    model.eval()
    embeddings, labels = [], []
    for text, label in zip(dataset.texts, dataset.labels):
        inputs = tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=1024,
            return_tensors="pt"
        ).to(device)
        with torch.no_grad():
            embedding = model(inputs["input_ids"], inputs["attention_mask"]).cpu().numpy()
        embeddings.append(embedding.flatten())
        labels.append(label)
    return np.array(embeddings), np.array(labels)

# ✅ Extract Features
train_embeddings, train_labels = extract_gpt2_embeddings(train_dataset, model, tokenizer)
test_embeddings, test_labels = extract_gpt2_embeddings(test_dataset, model, tokenizer)

# ✅ Save embeddings to CSV
np.savetxt("train_embeddings.csv", train_embeddings, delimiter=",")
np.savetxt("test_embeddings.csv", test_embeddings, delimiter=",")
np.savetxt("train_labels.csv", train_labels, delimiter=",")
np.savetxt("test_labels.csv", test_labels, delimiter=",")

# ✅ Train SVM Classifier
svm_clf = SVC(kernel='linear', probability=True)
svm_clf.fit(train_embeddings, train_labels)

# ✅ Predict and Evaluate
y_pred_svm = svm_clf.predict(test_embeddings)
accuracy = accuracy_score(test_labels, y_pred_svm)
print("SVM Accuracy on GPT-2 Embeddings:", accuracy)

# ✅ Save SVM Model
joblib.dump(svm_clf, "svm_ransomware_detector.pkl")

# ✅ Prediction Function
def predict_json_with_svm(json_text):
    model.eval()
    inputs = tokenizer(
        json_text,
        truncation=True,
        padding="max_length",
        max_length=1024,
        return_tensors="pt"
    ).to(device)
    with torch.no_grad():
        embedding = model(inputs["input_ids"], inputs["attention_mask"]).cpu().numpy().flatten()
    svm_clf = joblib.load("svm_ransomware_detector.pkl")
    prediction = svm_clf.predict([embedding])
    confidence = max(svm_clf.predict_proba([embedding])[0]) * 100
    label = "Ransomware" if prediction[0] == 1 else "Benign"
    return {"label": label, "confidence": confidence}

# ✅ Example Usage
sample_json_text = "CreateFile WriteFile CryptEncrypt DeleteVolumeShadowCopy ModifyRegistry ContactMaliciousDomain"
prediction = predict_json_with_svm(sample_json_text)
print("🔍 Prediction Result:", prediction)


✅ Datasets loaded successfully. Extracting embeddings...
SVM Accuracy on GPT-2 Embeddings: 0.75
🔍 Prediction Result: {'label': 'Benign', 'confidence': 57.20443397075422}


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import pickle
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# ✅ Load Pickled Embeddings (Numerical Data)
with open("/content/drive/MyDrive/ransomware_analysis_files/dynamic analysis/embeddings file/train_dataset.pkl", "rb") as f:
    train_embeddings, train_labels = pickle.load(f)

with open("/content/drive/MyDrive/ransomware_analysis_files/dynamic analysis/embeddings file/test_dataset.pkl", "rb") as f:
    test_embeddings, test_labels = pickle.load(f)

print(f"✅ Loaded embeddings: Train={train_embeddings.shape}, Test={test_embeddings.shape}")

# ✅ Convert embeddings into tensors
train_embeddings_tensor = torch.tensor(train_embeddings, dtype=torch.float32)
train_labels_tensor = torch.tensor(train_labels, dtype=torch.long)
test_embeddings_tensor = torch.tensor(test_embeddings, dtype=torch.float32)
test_labels_tensor = torch.tensor(test_labels, dtype=torch.long)

# ✅ Create DataLoader
batch_size = 16
train_loader = DataLoader(TensorDataset(train_embeddings_tensor, train_labels_tensor), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(TensorDataset(test_embeddings_tensor, test_labels_tensor), batch_size=batch_size, shuffle=False)

# ✅ Define Pretrained LSTM Model
class PretrainedLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes, pretrained_model=None):
        super(PretrainedLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

        # Load pretrained model if provided
        if pretrained_model:
            self.load_state_dict(torch.load(pretrained_model))

    def forward(self, x):
        _, (hidden, _) = self.lstm(x.unsqueeze(1))  # Add sequence dimension
        return self.fc(hidden[-1])

# ✅ Load Pretrained Model if Available
pretrained_path = "lstm_ransomware_detector.pth"
try:
    model = PretrainedLSTM(input_size=train_embeddings.shape[1], hidden_size=512, num_classes=2, pretrained_model=pretrained_path).cuda()
    print("✅ Loaded pretrained LSTM model!")
except:
    model = PretrainedLSTM(input_size=train_embeddings.shape[1], hidden_size=512, num_classes=2).cuda()
    print("🚀 Training new LSTM model...")

# ✅ Define Loss and Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# ✅ Train LSTM
num_epochs = 10
for epoch in range(num_epochs):
    total_loss = 0
    model.train()
    for inputs, labels in train_loader:
        inputs, labels = inputs.cuda(), labels.cuda()
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss:.4f}")

# ✅ Evaluate LSTM Model
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.cuda(), labels.cuda()
        outputs = model(inputs)
        predicted = torch.argmax(outputs, dim=1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# ✅ Compute Metrics
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)

# ✅ Print Metrics
print("\n📊 **Final Model Evaluation:**")
print(f"✅ Accuracy: {accuracy * 100:.2f}%")
print(f"✅ Precision: {precision:.4f}")
print(f"✅ Recall: {recall:.4f}")
print(f"✅ F1 Score: {f1:.4f}")

# ✅ Save Model
torch.save(model.state_dict(), "lstm_ransomware_detector.pth")
print("\n💾 Model saved successfully!")


AttributeError: LongformerTokenizerFast has no attribute pad_token

In [None]:

import torch
import pickle
from torch.utils.data import Dataset
from transformers import AutoTokenizer

# ✅ Define JSONDataset before unpickling
class JSONDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=4096):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        encoding = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        return {"input_ids": encoding["input_ids"].squeeze(0),
                "attention_mask": encoding["attention_mask"].squeeze(0),
                "labels": torch.tensor(self.labels[idx], dtype=torch.long)}

# ✅ Load Pretrained Tokenizer and Fix `pad_token`
tokenizer = AutoTokenizer.from_pretrained("allenai/longformer-base-4096")
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids(tokenizer.pad_token)

# ✅ Now, load the pickle file safely
with open("/content/drive/MyDrive/ransomware_analysis_files/dynamic analysis/embeddings file/test_dataset.pkl", "rb") as f:
    test_dataset = pickle.load(f)

# ✅ Print structure of loaded dataset
print("✅ Pickle file loaded successfully!")
print(f"Dataset Type: {type(test_dataset)}")
print(f"Dataset Length: {len(test_dataset)}")



config.json:   0%|          | 0.00/694 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

✅ Pickle file loaded successfully!
Dataset Type: <class '__main__.JSONDataset'>
Dataset Length: 20
