<a href="https://colab.research.google.com/github/ImadZerrout/Mini-Projects/blob/master/Packages_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import recall_score, precision_score
import ast  # Safely evaluates the string representation of lists

# --- CONFIGURATION ---
BATCH_SIZE = 32
LEARNING_RATE = 0.001
EPOCHS = 50
CSV_FILE = "pypi_dataset_features.csv"

import re # Make sure to add this import at the top of your file!

class PyPIDataset(Dataset):
    def __init__(self, dataframe):
        self.df = dataframe

        # --- THE FIX IS HERE ---
        # A helper function to extract all numbers (including negatives and scientific notation)
        # from the messy "np.float32(...)" string.
        def parse_embedding(emb_str):
            if not isinstance(emb_str, str):
                return [0.0] * 768
            # Regex to find all floating point numbers
            numbers = re.findall(r"[-+]?(?:\d*\.\d+|\d+)(?:[eE][-+]?\d+)?", emb_str)

            # Convert them to floats. If it fails, return a zero-vector
            try:
                # We expect 768 dimensions from CodeBERT
                parsed = [float(x) for x in numbers[-768:]]
                if len(parsed) != 768:
                    return [0.0] * 768
                return parsed
            except:
                return [0.0] * 768

        print("Parsing embeddings... (this might take a few seconds)")
        clean_embeddings = [parse_embedding(x) for x in dataframe['code_embedding']]
        self.embeddings = torch.tensor(clean_embeddings, dtype=torch.float32)

        # Branch B: The Tabular Features (Signals + Metadata)
        # We drop the non-numeric columns and the target
        feature_cols = [c for c in dataframe.columns if c not in ['filename', 'label', 'code_embedding']]
        self.tabular = torch.tensor(dataframe[feature_cols].values, dtype=torch.float32)

        # The Target Label (0 or 1)
        self.labels = torch.tensor(dataframe['label'].values, dtype=torch.float32).unsqueeze(1)

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        return self.embeddings[idx], self.tabular[idx], self.labels[idx]

# --- 2. THE HYBRID MODEL ---
class HybridMalwareDetector(nn.Module):
    def __init__(self, embed_dim=768, tab_dim=12):
        super(HybridMalwareDetector, self).__init__()

        # Branch A: Processing Code Embeddings
        self.branch_code = nn.Sequential(
            nn.Linear(embed_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3)
        )

        # Branch B: Processing Metadata/Signals
        self.branch_tab = nn.Sequential(
            nn.Linear(tab_dim, 32),
            nn.ReLU()
        )

        # Fusion Layer: Combine A and B
        # 128 (from code) + 32 (from tab) = 160 inputs
        self.classifier = nn.Sequential(
            nn.Linear(128 + 32, 64),
            nn.ReLU(),
            nn.Linear(64, 1),
            nn.Sigmoid()  # Forces output between 0 and 1
        )

    def forward(self, embedding, tabular):
        x1 = self.branch_code(embedding)
        x2 = self.branch_tab(tabular)

        # Concatenate the two branches
        combined = torch.cat((x1, x2), dim=1)

        return self.classifier(combined)

# --- 3. TRAINING LOOP ---
def train_model():
    print("Loading Data...")
    df = pd.read_csv(CSV_FILE)

    # Split into Train and Test sets
    train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

    train_dataset = PyPIDataset(train_df)
    test_dataset = PyPIDataset(test_df)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

    # Initialize Model
    # dynamically calculate tabular input size based on columns found
    num_tab_features = train_dataset.tabular.shape[1]
    model = HybridMalwareDetector(tab_dim=num_tab_features)

    criterion = nn.BCELoss() # Binary Cross Entropy Loss (Standard for 0/1 classification)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    print(f"Starting training on {len(train_df)} samples...")

    for epoch in range(EPOCHS):
        model.train()
        total_loss = 0

        for embed_batch, tab_batch, label_batch in train_loader:
            optimizer.zero_grad()

            # Forward Pass
            outputs = model(embed_batch, tab_batch)
            loss = criterion(outputs, label_batch)

            # Backward Pass
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{EPOCHS} | Loss: {total_loss/len(train_loader):.4f}")

    # --- 4. EVALUATION ---
    print("\n--- Final Evaluation ---")
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for embed_batch, tab_batch, label_batch in test_loader:
            outputs = model(embed_batch, tab_batch)
            predicted = (outputs > 0.5).float() # The Decision Threshold

            all_preds.extend(predicted.numpy())
            all_labels.extend(label_batch.numpy())

    recall = recall_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)

    print(f"Recall (Detection Rate): {recall:.2%}")
    print(f"Precision (False Alarm Control): {precision:.2%}")

    # Save the trained model
    torch.save(model.state_dict(), "malware_detector_model.pth")
    print("Model saved to malware_detector_model.pth")

if __name__ == "__main__":
    train_model()

Loading Data...
Parsing embeddings... (this might take a few seconds)
Parsing embeddings... (this might take a few seconds)
Starting training on 7952 samples...
Epoch 1/50 | Loss: 0.4598
Epoch 2/50 | Loss: 0.3895
Epoch 3/50 | Loss: 0.3455
Epoch 4/50 | Loss: 0.2897
Epoch 5/50 | Loss: 0.2739
Epoch 6/50 | Loss: 0.2794
Epoch 7/50 | Loss: 0.2726
Epoch 8/50 | Loss: 0.3026
Epoch 9/50 | Loss: 0.2680
Epoch 10/50 | Loss: 0.2648
Epoch 11/50 | Loss: 0.2740
Epoch 12/50 | Loss: 0.2510
Epoch 13/50 | Loss: 0.2605
Epoch 14/50 | Loss: 0.4605
Epoch 15/50 | Loss: 0.2839
Epoch 16/50 | Loss: 0.2550
Epoch 17/50 | Loss: 0.2495
Epoch 18/50 | Loss: 0.2381
Epoch 19/50 | Loss: 0.2462
Epoch 20/50 | Loss: 0.2471
Epoch 21/50 | Loss: 0.2439
Epoch 22/50 | Loss: 0.2428
Epoch 23/50 | Loss: 0.2670
Epoch 24/50 | Loss: 0.4401
Epoch 25/50 | Loss: 0.2902
Epoch 26/50 | Loss: 0.2846
Epoch 27/50 | Loss: 0.2699
Epoch 28/50 | Loss: 0.2486
Epoch 29/50 | Loss: 0.2573
Epoch 30/50 | Loss: 0.2439
Epoch 31/50 | Loss: 0.2436
Epoch 32/50

In [None]:

import os
import tarfile
import zipfile
import pandas as pd
import re
import math
from tqdm import tqdm
from transformers import AutoTokenizer, AutoModel
import torch
import email
from email.policy import default
# --- 1. DEEP LEARNING MODEL (CodeBERT) ---
# We use a tiny version or base version to convert code to numbers
print("Loading CodeBERT model...")
tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
model = AutoModel.from_pretrained("microsoft/codebert-base")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)



def extract_metadata_features(content):
    """
    Parses a PKG-INFO or METADATA file content.
    Returns a dictionary of features.
    """
    # Use Python's email parser because PKG-INFO is formatted like an email header
    msg = email.message_from_string(content, policy=default)

    # 1. Extract Raw Fields
    author_email = msg.get("Author-Email", "") or msg.get("Maintainer-Email", "") or ""
    home_page = msg.get("Home-page", "") or msg.get("Project-URL", "") or ""
    description = msg.get("Description", "") or msg.get("Long-Description", "") or ""

    # 2. Define Heuristics (The actual features for the model)
    features = {
        # Suspicious: No homepage often means a throwaway account
        "has_homepage": 1 if home_page and len(home_page) > 5 else 0,

        # Suspicious: Malicious packages often omit description
        "has_description": 1 if len(description) > 50 else 0,

        # Suspicious: Free email providers are common in malware (but also in benign)
        # You can add more domains like 'protonmail', 'qq.com', etc.
        "uses_free_email": 1 if any(x in author_email.lower() for x in ["gmail.com", "yahoo.com", "hotmail.com"]) else 0,

        # Suspicious: No license declared
        "has_license": 1 if msg.get("License") else 0,
    }

    return features



def get_code_embedding(code_snippet):
    """
    Converts source code into a 768-dimensional vector using CodeBERT.
    We take the first 512 tokens of the setup.py or __init__.py.
    """
    try:
        inputs = tokenizer(code_snippet, return_tensors="pt", truncation=True, max_length=512)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = model(**inputs)
        # The 'pooler_output' is a summary vector of the whole code snippet
        return outputs.pooler_output.cpu().numpy()[0]
    except Exception:
        return [0] * 768  # Return zero vector on error

# --- 2. STATIC ANALYSIS HELPERS ---

def calculate_entropy(text):
    """Calculates Shannon entropy to detect obfuscated/encrypted strings."""
    if not text: return 0
    prob = [float(text.count(c)) / len(text) for c in dict.fromkeys(list(text))]
    entropy = - sum([p * math.log(p) / math.log(2.0) for p in prob])
    return entropy

def extract_signals(file_content):
    """Scans text for suspicious keywords/patterns."""
    signals = {
        "has_eval": 1 if "eval(" in file_content else 0,
        "has_exec": 1 if "exec(" in file_content else 0,
        "has_b64decode": 1 if "b64decode" in file_content else 0,
        "has_subprocess": 1 if "subprocess" in file_content else 0,
        "has_socket": 1 if "socket" in file_content else 0,
        "has_requests": 1 if "requests.get" in file_content or "urllib" in file_content else 0,
        "max_entropy": 0.0,
        "longest_line_len": 0
    }

    # Check for obfuscation (high entropy or super long lines)
    if file_content:
        signals["max_entropy"] = calculate_entropy(file_content)
        signals["longest_line_len"] = max([len(line) for line in file_content.split('\n')]) if file_content else 0

    return signals

def process_archive(file_path, label):
    features = {
        "filename": os.path.basename(file_path),
        "label": label,
        "code_embedding": None
    }

    # Initialize metadata with defaults (0) in case we can't find the file
    features.update({
        "has_homepage": 0,
        "has_description": 0,
        "uses_free_email": 0,
        "has_license": 0
    })

    # Initialize signal features
    features.update(extract_signals(""))

    try:
        # --- LOGIC FOR TAR.GZ ---
        if file_path.endswith(".tar.gz"):
            with tarfile.open(file_path, "r:gz") as tar:
                for member in tar.getmembers():
                    # A. Look for Code (setup.py)
                    if member.name.endswith("setup.py") or member.name.endswith("__init__.py"):
                        f = tar.extractfile(member)
                        content = f.read().decode('utf-8', errors='ignore')
                        # ... run code extraction ...
                        features["code_embedding"] = str(list(get_code_embedding(content)))
                        features.update(extract_signals(content))

                    # B. Look for Metadata (PKG-INFO)
                    if member.name.endswith("PKG-INFO"):
                        f = tar.extractfile(member)
                        meta_content = f.read().decode('utf-8', errors='ignore')
                        # ... run metadata extraction ...
                        features.update(extract_metadata_features(meta_content))

        # --- LOGIC FOR WHEELS (.whl) ---
        elif file_path.endswith(".whl"):
            with zipfile.ZipFile(file_path, "r") as z:
                for name in z.namelist():
                    # A. Look for Code
                    if name.endswith("setup.py") or name.endswith("__init__.py"):
                        content = z.read(name).decode('utf-8', errors='ignore')
                        features["code_embedding"] = str(list(get_code_embedding(content)))
                        features.update(extract_signals(content))

                    # B. Look for Metadata (METADATA)
                    if "dist-info" in name and name.endswith("METADATA"):
                        meta_content = z.read(name).decode('utf-8', errors='ignore')
                        features.update(extract_metadata_features(meta_content))

    except Exception as e:
        pass # Handle errors silently or log them

    return features

Loading CodeBERT model...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/498 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/25.0 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]



special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

Loading weights:   0%|          | 0/199 [00:00<?, ?it/s]

model.safetensors:   0%|          | 0.00/499M [00:00<?, ?B/s]

In [None]:
import sys
import os
import re
import torch
import torch.nn as nn


# --- 1. THE MODEL ARCHITECTURE ---
class HybridMalwareDetector(nn.Module):
    def __init__(self, embed_dim=768, tab_dim=12):
        super(HybridMalwareDetector, self).__init__()
        self.branch_code = nn.Sequential(nn.Linear(embed_dim, 128), nn.ReLU(), nn.Dropout(0.3))
        self.branch_tab = nn.Sequential(nn.Linear(tab_dim, 32), nn.ReLU())
        self.classifier = nn.Sequential(nn.Linear(128 + 32, 64), nn.ReLU(), nn.Linear(64, 1), nn.Sigmoid())

    def forward(self, embedding, tabular):
        x1 = self.branch_code(embedding)
        x2 = self.branch_tab(tabular)
        return self.classifier(torch.cat((x1, x2), dim=1))

# --- 2. DATA CLEANING HELPER ---
def parse_embedding(emb_str):
    if not isinstance(emb_str, str): return [0.0] * 768
    numbers = re.findall(r"[-+]?(?:\d*\.\d+|\d+)(?:[eE][-+]?\d+)?", emb_str)
    try:
        parsed = [float(x) for x in numbers[-768:]]
        return parsed if len(parsed) == 768 else [0.0] * 768
    except:
        return [0.0] * 768

# --- 3. REPORT GENERATOR ---
def print_detailed_report(features):
    """Formats and prints the extracted features for human review."""

    def yn(val, bad_condition=1):
        """Helper to format Yes/No with warning emojis."""
        if val == bad_condition: return "Yes 🚨" if bad_condition == 1 else "No 🚨"
        return "Yes ✅" if val == 1 else "No ✅"

    print("\n" + "="*50)
    print(f"📦 PACKAGE ANALYSIS: {features.get('filename', 'Unknown')}")
    print("="*50)

    print("\n🔍 METADATA INSPECTION")
    print(f"  - Has Homepage:       {yn(features.get('has_homepage'), bad_condition=0)}")
    print(f"  - Has Description:    {yn(features.get('has_description'), bad_condition=0)}")
    print(f"  - Has License:        {yn(features.get('has_license'), bad_condition=0)}")
    print(f"  - Uses Free Email:    {yn(features.get('uses_free_email'), bad_condition=1)}")

    print("\n💻 CODE HEURISTICS (STATIC SIGNALS)")
    print(f"  - Uses eval():        {yn(features.get('has_eval'), bad_condition=1)}")
    print(f"  - Uses exec():        {yn(features.get('has_exec'), bad_condition=1)}")
    print(f"  - Uses b64decode():   {yn(features.get('has_b64decode'), bad_condition=1)}")
    print(f"  - Uses subprocess:    {yn(features.get('has_subprocess'), bad_condition=1)}")
    print(f"  - Uses socket:        {yn(features.get('has_socket'), bad_condition=1)}")
    print(f"  - Uses web requests:  {yn(features.get('has_requests'), bad_condition=1)}")

    entropy = features.get('max_entropy', 0)
    entropy_warn = "⚠️ (Likely Obfuscated)" if entropy > 5.5 else "✅ (Normal)"
    print(f"  - Max String Entropy: {entropy:.2f} {entropy_warn}")

    line_len = features.get('longest_line_len', 0)
    len_warn = "⚠️ (Suspiciously Long)" if line_len > 1000 else "✅ (Normal)"
    print(f"  - Longest Line:       {line_len} chars {len_warn}")

    print("\n🧠 DEEP LEARNING (CodeBERT)")
    embed = parse_embedding(features.get('code_embedding'))
    if any(v != 0.0 for v in embed):
        print("  - Semantic Vector:    Generated successfully [✓]")
    else:
        print("  - Semantic Vector:    Failed / No code found [❌]")
    print("-" * 50)

# --- 4. MAIN SCANNING ENGINE ---
def scan_package(file_path, model_path="malware_detector_model.pth"):
    if not os.path.exists(file_path):
        print(f"[-] Error: Could not find file at '{file_path}'")
        return

    print(f"[*] Extracting features from {os.path.basename(file_path)}...")
    raw_features = process_archive(file_path, label=0)

    # Print the new detailed report
    print_detailed_report(raw_features)

    tabular_feature_names = [
        "has_eval", "has_exec", "has_b64decode", "has_subprocess",
        "has_socket", "has_requests", "max_entropy", "longest_line_len",
        "has_homepage", "has_description", "uses_free_email", "has_license"
    ]

    try:
        tab_data = [float(raw_features[feat]) for feat in tabular_feature_names]
        tab_tensor = torch.tensor([tab_data], dtype=torch.float32)
    except KeyError as e:
        print(f"[-] Extraction Error: Missing expected feature {e}")
        return

    emb_list = parse_embedding(raw_features["code_embedding"])
    emb_tensor = torch.tensor([emb_list], dtype=torch.float32)

    model = HybridMalwareDetector(embed_dim=768, tab_dim=12)
    try:
        model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu'), weights_only=True))
        model.eval()
    except FileNotFoundError:
        print(f"[-] Error: Model weights not found at '{model_path}'.")
        return

    with torch.no_grad():
        score = model(emb_tensor, tab_tensor).item()

    print(f"📊 THREAT SCORE: {score:.4f} (Range: 0.0 -> 1.0)")

    if score > 0.5:
        print("🚨 VERDICT: DANGER (Malicious) 🚨")
        print("    Recommendation: DO NOT INSTALL. Review the red flags 🚨 above.")
    else:
        print("✅ VERDICT: SAFE (Benign) ✅")
        print("    Recommendation: Looks safe to proceed.")
    print("="*50 + "\n")

if __name__ == "__main__":

      scan_package("numpy-2.4.2.tar.gz")


[*] Extracting features from numpy-2.4.2.tar.gz...

📦 PACKAGE ANALYSIS: numpy-2.4.2.tar.gz

🔍 METADATA INSPECTION
  - Has Homepage:       Yes ✅
  - Has Description:    No 🚨
  - Has License:        No 🚨
  - Uses Free Email:    No ✅

💻 CODE HEURISTICS (STATIC SIGNALS)
  - Uses eval():        No ✅
  - Uses exec():        No ✅
  - Uses b64decode():   No ✅
  - Uses subprocess:    No ✅
  - Uses socket:        No ✅
  - Uses web requests:  No ✅
  - Max String Entropy: 4.86 ✅ (Normal)
  - Longest Line:       101 chars ✅ (Normal)

🧠 DEEP LEARNING (CodeBERT)
  - Semantic Vector:    Generated successfully [✓]
--------------------------------------------------
📊 THREAT SCORE: 1.0000 (Range: 0.0 -> 1.0)
🚨 VERDICT: DANGER (Malicious) 🚨
    Recommendation: DO NOT INSTALL. Review the red flags 🚨 above.

