train v2

In [None]:
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.neural_network import MLPClassifier  # Import MLPClassifier
from sklearn.metrics import classification_report, accuracy_score
from imblearn.over_sampling import SMOTE
import joblib

# Function to load and label datasets
def load_data(normal_path, xss_path):
    with open(normal_path, 'r', encoding='utf-8', errors='ignore') as file:
        normal_urls = [line.strip() for line in file if line.strip()]

    with open(xss_path, 'r', encoding='utf-8', errors='ignore') as file:
        xss_urls = [line.strip() for line in file if line.strip()]

    normal_labels = [0] * len(normal_urls)  # Label 0 for normal
    xss_labels = [1] * len(xss_urls)        # Label 1 for XSS

    urls = normal_urls + xss_urls
    labels = normal_labels + xss_labels

    return pd.DataFrame({'url': urls, 'label': labels})

# Load the data
data = load_data(r'PATH/TO/Train_NonXSS.txt', r'PATH/TO/Train_XSS.txt')

# Sample a small portion of the data for quicker testing (optional)
# Uncomment the following line to sample 10% of the data
# data = data.sample(frac=0.1, random_state=42)

# Preprocess the data using TF-IDF
vectorizer = TfidfVectorizer(max_features=5000)
X_tfidf = vectorizer.fit_transform(data['url']).toarray()

# Remove custom URL-based features
X_custom = X_tfidf  # Only using TF-IDF features
y = data['label']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X_custom, y, test_size=0.3, random_state=42)

# Apply SMOTE to oversample the minority class (XSS)
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

# Define and tune the MLPClassifier using GridSearchCV with fewer folds and early stopping
param_grid = {
    'hidden_layer_sizes': [(100,), (128,)],  # Smaller layers for faster training
    'alpha': [0.001],  # Keep alpha fixed to reduce complexity
    'max_iter': [300]   # Maximum iterations
}

mlp_clf = MLPClassifier(random_state=42, early_stopping=True, verbose=True)  # Early stopping added

# Perform Grid Search to tune hyperparameters with fewer folds
grid_search = GridSearchCV(mlp_clf, param_grid, cv=2, scoring='accuracy', verbose=10)
grid_search.fit(X_train_resampled, y_train_resampled)

# Best parameters found from the grid search
best_params = grid_search.best_params_
print("Best parameters:", best_params)

# Train the best model on resampled data
best_mlp_clf = grid_search.best_estimator_
best_mlp_clf.fit(X_train_resampled, y_train_resampled)

# Make predictions on the test data with a custom decision threshold
y_pred_proba = best_mlp_clf.predict_proba(X_test)[:, 1]
threshold = 0.95  # Adjust threshold
y_pred = (y_pred_proba >= threshold).astype(int)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy * 100:.2f}%")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Save the model and vectorizer for later use
joblib.dump(best_mlp_clf, 'mlpc_xss_model_without_custom_features.pkl')
joblib.dump(vectorizer, 'tfidf_vectorizer_without_custom_features.pkl')

Fitting 3 folds for each of 24 candidates, totalling 72 fits


Train V2 with K-Fold

In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report, accuracy_score
from imblearn.over_sampling import SMOTE
import joblib

# Function to load and label datasets
def load_data(normal_path, xss_path):
    """Loads URLs from text files and assigns labels (0 for normal, 1 for XSS)."""
    
    # Check if files exist
    if not os.path.exists(normal_path) or not os.path.exists(xss_path):
        raise FileNotFoundError("One or both dataset files not found. Please check the file paths.")

    # Read normal URLs
    with open(normal_path, 'r', encoding='utf-8', errors='ignore') as file:
        normal_urls = [line.strip() for line in file if line.strip()]
    
    # Read XSS URLs
    with open(xss_path, 'r', encoding='utf-8', errors='ignore') as file:
        xss_urls = [line.strip() for line in file if line.strip()]

    # Labels: 0 = normal, 1 = XSS
    normal_labels = [0] * len(normal_urls)
    xss_labels = [1] * len(xss_urls)

    urls = normal_urls + xss_urls
    labels = normal_labels + xss_labels

    return pd.DataFrame({'url': urls, 'label': labels})


# Load dataset
normal_file_path = r'PATH/TO/Train_NonXSS.txt'
xss_file_path = r'PATH/TO/Train_XSS.txt'

try:
    data = load_data(normal_file_path, xss_file_path)
    print(f"Dataset loaded successfully. Total samples: {len(data)}")
except FileNotFoundError as e:
    print(e)
    exit()

# Uncomment the following line to sample 10% of the data for faster testing
# data = data.sample(frac=0.1, random_state=42)

# Convert labels to integer type
data['label'] = data['label'].astype(int)

# TF-IDF Vectorization (converts URLs into numerical features)
vectorizer = TfidfVectorizer(max_features=5000)
X_tfidf = vectorizer.fit_transform(data['url'])

# Define features and target variable
X = X_tfidf
y = data['label']

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# Apply SMOTE to balance dataset (handles class imbalance by oversampling minority class)
smote = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)

# Define hyperparameter grid for tuning
param_grid = {
    'hidden_layer_sizes': [(100,), (128,)],  # Smaller layers for efficiency
    'alpha': [0.001],  # Regularization strength
    'max_iter': [300]  # Maximum number of iterations
}

# Initialize MLPClassifier with early stopping enabled
mlp_clf = MLPClassifier(random_state=42, early_stopping=True, verbose=True)

# Use Stratified K-Fold Cross-Validation (ensures balanced splits)
k_folds = 5
stratified_kfold = StratifiedKFold(n_splits=k_folds, shuffle=True, random_state=42)

# Perform Grid Search with K-Fold Cross-Validation
grid_search = GridSearchCV(mlp_clf, param_grid, cv=stratified_kfold, scoring='accuracy', verbose=10, n_jobs=-1)
grid_search.fit(X_train_resampled, y_train_resampled)

# Retrieve best model and hyperparameters
best_params = grid_search.best_params_
best_mlp_clf = grid_search.best_estimator_
print("\nBest parameters found:", best_params)

# Train the best model on resampled data
best_mlp_clf.fit(X_train_resampled, y_train_resampled)

# Make predictions on test data
y_pred_proba = best_mlp_clf.predict_proba(X_test)[:, 1]

# Define custom threshold for classification
threshold = 0.95
y_pred = (y_pred_proba >= threshold).astype(int)

# Evaluate model performance
accuracy = accuracy_score(y_test, y_pred)
print(f"\nModel Accuracy: {accuracy * 100:.2f}%")
print("\nClassification Report:")
print(classification_report(y_test, y_pred))

# Save the trained model and TF-IDF vectorizer for later use
joblib.dump(best_mlp_clf, 'mlpc_xss_model_KFold.pkl')
joblib.dump(vectorizer, 'tfidf_vectorizer_KFold.pkl')
print("\nModel and vectorizer saved successfully.")


AttributeError: module 'scipy.sparse' has no attribute 'linalg'

Test without featured

In [None]:
import numpy as np
import joblib
from sklearn.metrics import accuracy_score

# Function to process data using TF-IDF only (without additional features)
def process_data(urls, vectorizer):
    # Convert URLs to TF-IDF feature vectors
    X_test_tfidf = vectorizer.transform(urls).toarray()
    return X_test_tfidf

# Function to test the XSS detection model
def test_model(test_data_path, vectorizer_path='tfidf_vectorizer_without_custom_features.pkl', model_path='mlpc_xss_model_without_custom_features.pkl', threshold=0.4):
    print("\nLoading model and vectorizer...\n")
    
    try:
        # Load the pre-trained MLPClassifier model
        model = joblib.load(model_path)
        
        # Load the TF-IDF vectorizer
        vectorizer = joblib.load(vectorizer_path)

        # Read the test data (list of URLs)
        with open(test_data_path, 'r', encoding='utf-8', errors='ignore') as file:
            urls = [line.strip() for line in file if line.strip()]

        # Ensure there's data to process
        if not urls:
            print("Error: No URLs found in the test file!")
            return None

        # Process the data using only TF-IDF features
        X_test_tfidf = process_data(urls, vectorizer)
        
        # Predict the probabilities using the trained MLP model
        y_pred_proba = model.predict_proba(X_test_tfidf)[:, 1]

        # Apply threshold to classify XSS vs. normal
        y_pred = (y_pred_proba >= threshold).astype(int)

        # Compute statistics
        total_xss_detected = np.sum(y_pred)
        total_payloads = len(y_pred)
        percentage_detected = (total_xss_detected / total_payloads) * 100 if total_payloads > 0 else 0

        # Print results
        print(f"Total XSS payloads detected: {total_xss_detected} / {total_payloads} ({percentage_detected:.2f}%)\n")

        return y_pred

    except Exception as e:
        print(f"Error during execution: {e}")
        return None

# Main Execution
if __name__ == "__main__":
    # Path to the test file (all URLs in this file should be XSS attempts)
    test_data_path = r'PATH/TO/Test_Dataset\XSS_20000_Line.txt'
    
    # Run the test function and count detected XSS payloads
    y_pred = test_model(
        test_data_path=test_data_path,
        vectorizer_path='tfidf_vectorizer_without_custom_features.pkl',     
        model_path='mlpc_xss_model_without_custom_features.pkl', 
        threshold= 0.99  # Adjust based on model performance
    )



Loading model and vectorizer...

Total XSS payloads detected: 19830 / 20000 (99.15%)



Test with prefetch with multiprocessing

In [None]:
import numpy as np
import joblib
from sklearn.metrics import accuracy_score
from typing import List, Optional, Tuple
import numpy.typing as npt
from tqdm import tqdm

def prefetch_data(file_path: str, batch_size: int = 1000) -> List[str]:
    """
    Prefetch URLs from file into memory in batches.
    
    Args:
        file_path: Path to the file containing URLs
        batch_size: Size of each batch to read
        
    Returns:
        List of URLs
    """
    urls = []
    try:
        with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
            while True:
                batch = [next(file).strip() for _ in range(batch_size)]
                urls.extend([url for url in batch if url])
                if len(batch) < batch_size:
                    break
    except StopIteration:
        pass
    return urls

def process_batch(urls: List[str], vectorizer) -> npt.NDArray:
    """
    Process a batch of URLs using TF-IDF vectorization.
    
    Args:
        urls: List of URLs to process
        vectorizer: Trained TF-IDF vectorizer
        
    Returns:
        Processed feature vectors
    """
    return vectorizer.transform(urls).toarray()

def predict_batch(model, X_batch: npt.NDArray, threshold: float) -> Tuple[npt.NDArray, npt.NDArray]:
    """
    Make predictions on a batch of processed data.
    
    Args:
        model: Trained model
        X_batch: Processed feature vectors
        threshold: Classification threshold
        
    Returns:
        Tuple of (predictions, probabilities)
    """
    probabilities = model.predict_proba(X_batch)[:, 1]
    predictions = (probabilities >= threshold).astype(int)
    return predictions, probabilities

def test_model(
    test_data_path: str,
    vectorizer_path: str = 'tfidf_vectorizer_without_custom_features.pkl',
    model_path: str = 'mlpc_xss_model_without_custom_features.pkl',
    threshold: float = 0.4,
    batch_size: int = 1000
) -> Optional[Tuple[npt.NDArray, float]]:
    """
    Test the XSS detection model with batch processing.
    
    Args:
        test_data_path: Path to test data file
        vectorizer_path: Path to saved vectorizer
        model_path: Path to saved model
        threshold: Classification threshold
        batch_size: Size of batches for processing
        
    Returns:
        Tuple of (predictions, detection_percentage) or None if error
    """
    print("\nLoading model and vectorizer...\n")
    
    try:
        # Load model and vectorizer
        model = joblib.load(model_path)
        vectorizer = joblib.load(vectorizer_path)
        
        # Prefetch all URLs
        print("Prefetching URLs...")
        urls = prefetch_data(test_data_path, batch_size)
        
        if not urls:
            print("Error: No URLs found in the test file!")
            return None
        
        # Initialize arrays for storing results
        all_predictions = []
        all_probabilities = []
        
        # Process data in batches with progress bar
        total_batches = (len(urls) + batch_size - 1) // batch_size
        print("\nProcessing batches...")
        
        for i in tqdm(range(0, len(urls), batch_size)):
            # Get current batch
            batch_urls = urls[i:i + batch_size]
            
            # Process batch
            X_batch = process_batch(batch_urls, vectorizer)
            
            # Make predictions
            predictions, probabilities = predict_batch(model, X_batch, threshold)
            
            # Store results
            all_predictions.extend(predictions)
            all_probabilities.extend(probabilities)
        
        # Convert to numpy arrays
        all_predictions = np.array(all_predictions)
        all_probabilities = np.array(all_probabilities)
        
        # Compute statistics
        total_xss_detected = np.sum(all_predictions)
        total_payloads = len(all_predictions)
        percentage_detected = (total_xss_detected / total_payloads) * 100 if total_payloads > 0 else 0
        
        # Print results
        print(f"\nTotal XSS payloads detected: {total_xss_detected} / {total_payloads} ({percentage_detected:.2f}%)\n")
        
        return all_predictions, percentage_detected
    
    except Exception as e:
        print(f"Error during execution: {e}")
        return None

if __name__ == "__main__":
    # Path to the test file
    test_data_path = r'PATH/TO/XSS_Final\Train_XSS.txt'
    
    # Run the test function with batch processing
    results = test_model(
        test_data_path=test_data_path,
        vectorizer_path='tfidf_vectorizer_without_custom_features.pkl',
        model_path='mlpc_xss_model_without_custom_features.pkl',
        threshold=0.99,  # Adjust based on model performance
        batch_size=1000  # Adjust based on available memory
    )


Loading model and vectorizer...

Prefetching URLs...

Processing batches...


100%|██████████| 42/42 [00:02<00:00, 17.93it/s]


Total XSS payloads detected: 41658 / 42000 (99.19%)




