In [1]:
import os
import json
import esprima
from tqdm import tqdm

In [2]:
def extract_code_gadgets(code):
    gadgets = []
    
    try:
        ast = esprima.parseScript(code, {
            'range': True,
            'tolerant': True,
            'jsx': False
        })
    except Exception as e:
        print(f"🔥 Parsing failed: {str(e)}")
        return gadgets

    sensitive_apis = [
        "fs.readFile", "fs.readFileSync", "fs.createReadStream",
        "fs.writeFile", "fs.writeFileSync", "fs.appendFile", "fs.appendFileSync",
        "fs.unlink", "fs.unlinkSync", "fs.rm", "fs.rmSync",
        "fs.readdir", "fs.readdirSync",
        "fs.existsSync", "fs.access", "fs.accessSync",
        "path.join", "path.resolve", "path.normalize",
        "child_process.exec", "child_process.execSync",
        "child_process.spawn", "child_process.spawnSync",
        "child_process.fork",
        "eval", "Function", "require", "import",
        "moment.locale"
    ]


    class GadgetExtractor(esprima.NodeVisitor):
        def __init__(self):
            self.sensitive_operations = []
            self.user_inputs = set()
            self.debug_log = []

        def get_source_code(self, node):
            """Safely extract source code with range validation"""
            if not hasattr(node, 'range') or node.range is None:
                return None
            start, end = node.range
            if start is None or end is None:
                return None
            return code[start:end]

        def visit_VariableDeclarator(self, node):
            try:
                if node.init:
                    source = self.get_source_code(node.init)
                    if source and any(s in source for s in ['req.query', 'req.params', 'req.body']):
                        var_name = node.id.name
                        self.user_inputs.add(var_name)
                        self.sensitive_operations.append({
                            'node': node,
                            'type': 'user_input',
                            'source': self.get_source_code(node)
                        })
            except Exception as e:
                print(f"VariableDeclarator error: {str(e)}")
            self.generic_visit(node)

        def visit_CallExpression(self, node):
            try:
                # Detect require() with user input
                if hasattr(node.callee, 'name') and node.callee.name == 'require':
                    arg_source = self.get_source_code(node.arguments[0])
                    if arg_source and any(var in arg_source for var in self.user_inputs):
                        self.sensitive_operations.append({
                            'node': node,
                            'type': 'dynamic_require',
                            'source': self.get_source_code(node)
                        })

                # Detect sensitive API calls
                if node.callee.type == 'MemberExpression':
                    obj = self.unwind_member_expression(node.callee)
                    method_call = f"{obj}.{node.callee.property.name}"
                    
                    if method_call in sensitive_apis:
                        self.sensitive_operations.append({
                            'node': node,
                            'type': method_call,
                            'source': self.get_source_code(node)
                        })
            except Exception as e:
                print(f"CallExpression error: {str(e)}")
            self.generic_visit(node)

        def unwind_member_expression(self, node):
            """Robust member expression unwinding"""
            parts = []
            try:
                while node.type == 'MemberExpression':
                    parts.append(node.property.name)
                    node = node.object
                base = node.name if hasattr(node, 'name') else self.get_source_code(node)
                return f"{base}.{'.'.join(reversed(parts))}" if parts else base
            except Exception:
                return "UnknownExpression"

    extractor = GadgetExtractor()
    extractor.visit(ast)

    # Safely create gadgets
    for op in extractor.sensitive_operations:
        # Validate node and range
        if not op['source']:
            continue
            
        node = op['node']
        if not hasattr(node, 'range') or node.range is None:
            continue
            
        start, end = node.range
        if start is None or end is None:
            continue

        # Calculate line numbers safely
        try:
            start_line = code[:start].count('\n') + 1
            end_line = code[:end].count('\n') + 1
            context_start = max(0, start_line - 3)  # 0-based index
            context_end = end_line + 1  # +2 lines after
            lines = code.split('\n')
            gadget = {
                'type': op['type'],
                'source': op['source'],
                'context': lines[context_start:context_end],
                'line_numbers': (start_line, end_line)
            }
            gadgets.append(gadget)
        except Exception as e:
            print(f"Skipping gadget due to error: {str(e)}")

    return gadgets

In [3]:
# Configuration
INPUT_DIR = "../SourceCode/Test"  # Folder containing your JS files
OUTPUT_FILE = "processed_gadgets.json"  # Output file for results


def process_dataset():
    js_files = [os.path.join(INPUT_DIR, f) for f in os.listdir(INPUT_DIR) if f.endswith(".js")]
    
    processed_data = []
    for js_file in tqdm(js_files):
        try:
            with open(js_file, "r", encoding="utf-8") as f:
                code = f.read()
            gadgets = extract_code_gadgets(code)
            processed_data.append({
                "file": js_file,
                "gadgets": gadgets
            })
        except Exception as e:
            print(f"Error processing {js_file}: {str(e)}")
    
    with open(OUTPUT_FILE, "w") as f:
        json.dump(processed_data, f, indent=2)

# Run the dataset processing
process_dataset()

with open(OUTPUT_FILE, "r") as f:
    results = json.load(f)

print(json.dumps(results, indent=2))

  0%|          | 0/3 [00:00<?, ?it/s]

100%|██████████| 3/3 [00:00<00:00, 83.05it/s]

[
  {
    "file": "../SourceCode/Test\\vuln.js",
    "gadgets": []
  },
  {
    "file": "../SourceCode/Test\\vuln2.js",
    "gadgets": [
      {
        "type": "user_input",
        "source": "fileName = req.query.file",
        "context": [
          "",
          "app.get('/file', (req, res) => {",
          "    const fileName = req.query.file;",
          "    const filePath = path.join(__dirname, fileName);"
        ],
        "line_numbers": [
          9,
          9
        ]
      }
    ]
  },
  {
    "file": "../SourceCode/Test\\vuln4.js",
    "gadgets": [
      {
        "type": "user_input",
        "source": "locale = req.query.locale || 'en'",
        "context": [
          "",
          "app.get('/time', (req, res) => {",
          "    const locale = req.query.locale || 'en'; ",
          ""
        ],
        "line_numbers": [
          6,
          6
        ]
      }
    ]
  }
]





In [8]:
import os
import json
import re
from tqdm import tqdm

# Configuration
INPUT_DIR = "../SourceCode/Test"  # Folder containing your JS files
OUTPUT_FILE = "processed_gadgets.json"  # Output file for results

def get_line_number(code, index):
    """Get the line number of a given index in a string."""
    return code.count("\n", 0, index) + 1

def extract_code_gadgets(code):
    """Extract security-relevant code gadgets from JavaScript code."""
    gadgets = []

    # === Path Traversal ===
    path_traversal_pattern = r"path\.(?:join|resolve)\s*\(([^)]*)\)"
    user_input_pattern = r"(req\.query\.\w+|req\.params\.\w+|req\.body\.\w+)"
    
    matches = re.finditer(path_traversal_pattern, code)
    for match in matches:
        line_number = get_line_number(code, match.start())
        source_code = match.group(1)

        if re.search(user_input_pattern, source_code):
            gadget_type = "path_traversal_with_user_input"
        else:
            gadget_type = "path_traversal"

        gadgets.append({
            "type": gadget_type,
            "source": match.group(),
            "line_number": line_number
        })

    # === CVE-2022-24785: moment().locale() ===
    # 1️⃣ Detect `locale = req.query.locale`
    locale_assignment_pattern = r"(\w+)\s*=\s*(req\.query\.\w+)"
    locale_vars = {}  # Stores local variable mapping

    for match in re.finditer(locale_assignment_pattern, code):
        var_name, user_input = match.groups()
        locale_vars[var_name] = user_input

    # 2️⃣ Detect `moment().locale(locale)`
    moment_pattern = r"moment\(\)\.locale\s*\(\s*([a-zA-Z0-9_]+)\s*\)"
    
    for match in re.finditer(moment_pattern, code):
        line_number = get_line_number(code, match.start())
        var_name = match.group(1)

        if var_name in locale_vars:
            gadgets.append({
                "type": "insecure_locale_loading (CVE-2022-24785)",
                "source": match.group(),
                "line_number": line_number
            })

    return gadgets

def process_dataset():
    """Process all JavaScript files and detect vulnerabilities."""
    js_files = [os.path.join(INPUT_DIR, f) for f in os.listdir(INPUT_DIR) if f.endswith(".js")]
    
    processed_data = []
    for js_file in tqdm(js_files):
        try:
            with open(js_file, "r", encoding="utf-8") as f:
                code = f.read()
            gadgets = extract_code_gadgets(code)
            processed_data.append({
                "file": js_file,
                "gadgets": gadgets
            })
        except Exception as e:
            print(f"Error processing {js_file}: {str(e)}")
    
    with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
        json.dump(processed_data, f, indent=2)

# Run the dataset processing
process_dataset()

# Load and display results
with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
    results = json.load(f)

print(json.dumps(results, indent=2))


100%|██████████| 3/3 [00:00<00:00, 3004.52it/s]

[
  {
    "file": "../SourceCode/Test\\vuln.js",
    "gadgets": [
      {
        "type": "path_traversal",
        "source": "path.join(__dirname, '../..', 'config.txt')",
        "line_number": 3
      }
    ]
  },
  {
    "file": "../SourceCode/Test\\vuln2.js",
    "gadgets": [
      {
        "type": "path_traversal",
        "source": "path.join(__dirname, fileName)",
        "line_number": 10
      }
    ]
  },
  {
    "file": "../SourceCode/Test\\vuln4.js",
    "gadgets": [
      {
        "type": "insecure_locale_loading (CVE-2022-24785)",
        "source": "moment().locale(locale)",
        "line_number": 10
      }
    ]
  }
]





In [9]:
import csv
import json
import re

INPUT_CSV = "../Datasets/output_cleaned.csv"  # CSV file containing JavaScript code snippets
OUTPUT_JSON = "processed_gadgets_2.json"  # Output JSON file

def get_line_number(code, index):
    """Get the line number of a given index in a string."""
    return code.count("\n", 0, index) + 1

def extract_code_gadgets(code):
    """Extract security-relevant code gadgets from JavaScript code."""
    gadgets = []
    
    # Path Traversal Detection
    path_traversal_pattern = r"path\.(?:join|resolve)\s*\(([^)]*)\)"
    user_input_pattern = r"(req\.query\.\w+|req\.params\.\w+|req\.body\.\w+)"
    
    matches = re.finditer(path_traversal_pattern, code)
    for match in matches:
        line_number = get_line_number(code, match.start())
        source_code = match.group(1)

        if re.search(user_input_pattern, source_code):
            gadget_type = "path_traversal_with_user_input"
        else:
            gadget_type = "path_traversal"

        gadgets.append({
            "type": gadget_type,
            "source": match.group(),
            "line_number": line_number
        })
    
    return gadgets

def process_csv():
    """Process the CSV file and detect vulnerabilities in JavaScript code snippets."""
    processed_data = []
    
    with open(INPUT_CSV, "r", encoding="utf-8") as csvfile:
        reader = csv.reader(csvfile)
        next(reader)  # Skip header
        
        for row in reader:
            code, label = row[0], int(row[1])
            gadgets = extract_code_gadgets(code)
            processed_data.append({
                "code": code,
                "label": label,
                "gadgets": gadgets
            })
    
    with open(OUTPUT_JSON, "w", encoding="utf-8") as jsonfile:
        json.dump(processed_data, jsonfile, indent=2)
    
    print(json.dumps(processed_data, indent=2))

# Run the script
process_csv()


[
  {
    "code": "const fs = require('fs');\nconst path = require('path');\nconst file = fs.readFileSync(path.join(__dirname, 'file.txt'), 'utf8');\nconsole.log(file);",
    "label": 1,
    "gadgets": [
      {
        "type": "path_traversal",
        "source": "path.join(__dirname, 'file.txt')",
        "line_number": 3
      }
    ]
  },
  {
    "code": "const fs = require('fs'); const path = require('path'); fs.readFile(path.join(__dirname, '../..', 'config.txt'), (err, data) => { if (err) { console.error(err); } else { console.log(data.toString()); } });",
    "label": 0,
    "gadgets": [
      {
        "type": "path_traversal",
        "source": "path.join(__dirname, '../..', 'config.txt')",
        "line_number": 1
      }
    ]
  },
  {
    "code": "var fs = require('fs');\nvar filePath = fs.readFileSync('path/to/file.txt', 'utf8');\nvar newFilePath = filePath + '/etc/passwd';\nfs.writeFileSync(newFilePath, 'Traversed File Path');",
    "label": 1,
    "gadgets": []
  },
  {


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from transformers import RobertaTokenizer, RobertaModel

# Load dataset
import pandas as pd
df = pd.read_csv("processed_gadgets_2.json")

# Tokenize using CodeBERT
tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base")
tokens = [tokenizer.encode(code, padding="max_length", truncation=True, max_length=512) for code in df["code"]]

# Convert to tensor
X = torch.tensor(tokens)
y = torch.tensor(df["label"].values)

# Split dataset
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define BLSTM model
class VulnerabilityDetector(nn.Module):
    def __init__(self, embedding_dim=768, hidden_dim=256, output_dim=1):
        super(VulnerabilityDetector, self).__init__()
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        _, (hidden, _) = self.lstm(x)
        hidden = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
        return self.sigmoid(self.fc(hidden))

# Instantiate and train model
model = VulnerabilityDetector()
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    optimizer.zero_grad()
    outputs = model(X_train.float())
    loss = criterion(outputs.squeeze(), y_train.float())
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item()}")

# Evaluate on test set
with torch.no_grad():
    predictions = model(X_test.float()).squeeze().round()
    accuracy = (predictions == y_test.float()).float().mean()
    print(f"Test Accuracy: {accuracy.item()}")


  from .autonotebook import tqdm as notebook_tqdm
