Train

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from sklearn.utils.class_weight import compute_class_weight
from sklearn.feature_extraction.text import TfidfVectorizer
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, LeakyReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
import tensorflow as tf

# Set up TensorFlow to use a specific GPU (e.g., GPU 1)
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        # Restrict TensorFlow to only use GPU 1
        tf.config.set_visible_devices(gpus[1], 'GPU')  # Using GPU 1 (index starts from 0)
        tf.config.experimental.set_memory_growth(gpus[1], True)
        print(f"Using GPU: {gpus[1].name}")
    except RuntimeError as e:
        print(e)

# --- Train Model Function ---
def train_model(train_data_path, model_save_path='xss_detection_model.h5', vectorizer_save_path='tfidf_vectorizer.npy'):
    # Load the dataset
    df_new = pd.read_csv(train_data_path)

    # --- Feature Extraction using TF-IDF ---
    urls = df_new['url']
    tfidf_vectorizer = TfidfVectorizer(max_features=2000, ngram_range=(1, 4), analyzer='char')  # Expanded features
    X_new_tfidf = tfidf_vectorizer.fit_transform(urls).toarray()

    # --- Additional Features ---
    df_new['url_length'] = df_new['url'].apply(len)
    df_new['special_char_count'] = df_new['url'].apply(lambda x: sum(1 for char in x if char in ['<', '>', '"', '&']))
    df_new['keyword_presence'] = df_new['url'].apply(lambda x: 1 if any(kw in x.lower() for kw in ['script', 'alert', 'img', 'onerror']) else 0)
    df_new['encoded_chars'] = df_new['url'].apply(lambda x: sum(1 for char in x if '%' in x))
    X_new_features = np.hstack([X_new_tfidf, df_new[['url_length', 'special_char_count', 'keyword_presence', 'encoded_chars']].values])

    # Define labels
    y_new = df_new['label'].values

    # Handling class imbalance
    class_weights = compute_class_weight(class_weight='balanced', classes=np.array([0, 1]), y=y_new)
    class_weight_dict = {0: class_weights[0], 1: class_weights[1]}

    # Split the dataset into training and validation sets
    X_train_new, X_val_new, y_train_new, y_val_new = train_test_split(X_new_features, y_new, test_size=0.3, random_state=42)

    # Build the neural network model with Dropout layers and enhancements
    model = Sequential()
    model.add(Dense(1024, input_dim=X_train_new.shape[1]))
    model.add(LeakyReLU(alpha=0.1))
    model.add(BatchNormalization())
    model.add(Dropout(0.5))

    model.add(Dense(512, kernel_regularizer='l2'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(BatchNormalization())
    model.add(Dropout(0.5))

    model.add(Dense(256, kernel_regularizer='l2'))
    model.add(LeakyReLU(alpha=0.1))
    model.add(BatchNormalization())
    model.add(Dropout(0.5))

    model.add(Dense(1, activation='sigmoid'))

    # Compile the model with Adam optimizer and binary cross-entropy loss
    model.compile(optimizer=Adam(learning_rate=0.0001), loss='binary_crossentropy', metrics=['accuracy'])

    # Early stopping to prevent overfitting
    early_stopping = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    # Train the model
    history = model.fit(X_train_new, y_train_new, epochs=100, batch_size=4096, class_weight=class_weight_dict,
                        validation_data=(X_val_new, y_val_new), callbacks=[early_stopping])

    # Save the model and vectorizer to avoid retraining
    model.save(model_save_path)
    np.save(vectorizer_save_path, tfidf_vectorizer)
    print("Training complete. Model and vectorizer have been saved.")

    # Evaluate on the validation set
    y_pred_nn = (model.predict(X_val_new) > 0.5).astype(int)
    accuracy_val = accuracy_score(y_val_new, y_pred_nn)
    classification_rep_val = classification_report(y_val_new, y_pred_nn)

    print(f'Accuracy on validation set: {accuracy_val}')
    print('Classification Report on validation set:')
    print(classification_rep_val)

# Example usage:
train_model(r'C:\Users\Omen\Desktop\XSS\Cleaned_XSS_Dataset.csv')


Test without featured

In [3]:
import numpy as np
import tensorflow as tf
from sklearn.feature_extraction.text import TfidfVectorizer

def test_model(test_data_path, vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5'):
    # Load the saved model and vectorizer
    model = tf.keras.models.load_model(model_path)
    tfidf_vectorizer = np.load(vectorizer_path, allow_pickle=True).item()

    # Load and prepare the test dataset
    with open(test_data_path, 'r', encoding='utf-8') as file:
        xss_test_lines = file.readlines()

    # Apply cleaning
    xss_test_cleaned = [url.strip() for url in xss_test_lines]

    # Convert the test data to TF-IDF features and add the same new features
    X_test_real_tfidf = tfidf_vectorizer.transform(xss_test_cleaned).toarray()

    # Ensure the test dataset has the same additional features as the training dataset
    url_length = np.array([len(url) for url in xss_test_cleaned])
    special_char_count = np.array([sum(1 for char in url if char in ['<', '>', '"', '&']) for url in xss_test_cleaned])
    keyword_presence = np.array([1 if any(kw in url.lower() for kw in ['script', 'alert', 'img', 'onerror']) else 0 for url in xss_test_cleaned])
    encoded_chars = np.array([sum(1 for char in url if '%' in url) for url in xss_test_cleaned])

    # Stack the TF-IDF features with the additional features
    X_test_real_features = np.hstack([X_test_real_tfidf, url_length.reshape(-1, 1), special_char_count.reshape(-1, 1), keyword_presence.reshape(-1, 1), encoded_chars.reshape(-1, 1)])

    # Predict using the neural network
    y_pred_test_real = (model.predict(X_test_real_features) > 0.5).astype(int)

    # Count how many XSS payloads are detected
    xss_detected_count = np.sum(y_pred_test_real)
    xss_total_count = len(y_pred_test_real)
    xss_detected_percentage = (xss_detected_count / xss_total_count) * 100

    # Print the results
    print(f'XSS payloads detected: {xss_detected_count}/{xss_total_count} ({xss_detected_percentage:.2f}%)')

# Example usage:
#test_model(r'C:\Users\Omen\Desktop\XSS\all_merge_xss.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5')
#test_model(r'C:\Users\Omen\Desktop\XSS\xxs_payloads_01.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5')
#test_model(r'C:\Users\Omen\Desktop\XSS\xxs_payloads_02.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5')
#test_model(r'C:\Users\Omen\Desktop\XSS\xxs_payloads_03.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5')
#test_model(r'C:\Users\Omen\Desktop\XSS\xxs_payloads_04.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5')
test_model(r'C:\Users\Omen\Desktop\XSS\Payload_Big.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5')




[1m315/315[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step
XSS payloads detected: 8265/10080 (81.99%)


Test with Prefetch

In [6]:
import numpy as np
import tensorflow as tf
from sklearn.feature_extraction.text import TfidfVectorizer

# Function to calculate features for a batch of URLs
def calculate_features(urls):
    url_length = np.array([len(url) for url in urls])
    special_char_count = np.array([sum(1 for char in url if char in ['<', '>', '"', '&']) for url in urls])
    keyword_presence = np.array([1 if any(kw in url.lower() for kw in ['script', 'alert', 'img', 'onerror']) else 0 for url in urls])
    encoded_chars = np.array([url.count('%') for url in urls])
    
    return url_length, special_char_count, keyword_presence, encoded_chars

# Function to process data in batches using TensorFlow's tf.data API
def process_batch(urls, vectorizer):
    # Convert the test data to TF-IDF features
    X_test_real_tfidf = vectorizer.transform(urls).toarray()

    # Calculate additional features
    url_length, special_char_count, keyword_presence, encoded_chars = calculate_features(urls)

    # Stack the TF-IDF features with the additional features
    X_test_real_features = np.hstack([X_test_real_tfidf, url_length.reshape(-1, 1), special_char_count.reshape(-1, 1), keyword_presence.reshape(-1, 1), encoded_chars.reshape(-1, 1)])

    return X_test_real_features

# --- Optimized Test Model Function ---
def test_model(test_data_path, vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5', batch_size=4096, prefetch_buffer_size=2):
    # Load the saved model and vectorizer
    model = tf.keras.models.load_model(model_path)
    tfidf_vectorizer = np.load(vectorizer_path, allow_pickle=True).item()

    # Load and prepare the test dataset, handle encoding errors
    with open(test_data_path, 'r', encoding='utf-8', errors='ignore') as file:
        xss_test_lines = file.readlines()

    # Apply cleaning
    xss_test_cleaned = [url.strip() for url in xss_test_lines]

    # Create a TensorFlow dataset for batching and prefetching
    dataset = tf.data.Dataset.from_tensor_slices(xss_test_cleaned)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(prefetch_buffer_size)

    y_pred_test_real = []

    # Process the data and make predictions
    for batch_urls in dataset:
        # Decode batch URLs properly and process the batch
        batch_urls = [url.decode('utf-8') if isinstance(url, bytes) else url for url in batch_urls.numpy()]

        # Process the batch features
        X_test_real_features = process_batch(batch_urls, tfidf_vectorizer)

        # Predict for the current batch
        batch_preds = (model.predict(X_test_real_features, batch_size=batch_size) > 0.5).astype(int)
        y_pred_test_real.extend(batch_preds)

    y_pred_test_real = np.array(y_pred_test_real)

    # Count how many XSS payloads are detected
    xss_detected_count = np.sum(y_pred_test_real)
    xss_total_count = len(y_pred_test_real)
    xss_detected_percentage = (xss_detected_count / xss_total_count) * 100

    # Print the results
    print(f'XSS payloads detected: {xss_detected_count}/{xss_total_count} ({xss_detected_percentage:.2f}%)')

# Example usage without profiling
test_model(r'C:\Users\Omen\Desktop\XSS\Payload_Big.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5')




[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 444ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 243ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 219ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 222ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 232ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 223ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 220ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 252ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 179ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 165ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 239ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 180ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 176ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m 

Test with prefetch with multiprocessing

In [4]:
import numpy as np
import tensorflow as tf
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

# Enable TensorFlow XLA for optimization
tf.config.optimizer.set_jit(True)

# Function to calculate features for a batch of URLs
def calculate_features(urls):
    url_length = np.array([len(url) for url in urls])
    special_char_count = np.array([sum(1 for char in url if char in ['<', '>', '"', '&']) for url in urls])
    keyword_presence = np.array([1 if any(kw in url.lower() for kw in ['script', 'alert', 'img', 'onerror']) else 0 for url in urls])
    encoded_chars = np.array([url.count('%') for url in urls])
    return url_length, special_char_count, keyword_presence, encoded_chars

# Function to process data in batches using TensorFlow's tf.data API
def process_batch(urls, vectorizer):
    X_test_real_tfidf = vectorizer.transform(urls).toarray()
    url_length, special_char_count, keyword_presence, encoded_chars = calculate_features(urls)
    X_test_real_features = np.hstack([X_test_real_tfidf, url_length.reshape(-1, 1), special_char_count.reshape(-1, 1), keyword_presence.reshape(-1, 1), encoded_chars.reshape(-1, 1)])
    return X_test_real_features

# Worker function to process a batch of URLs
def worker_process(batch_urls, vectorizer, model, batch_size):
    try:
        X_test_real_features = process_batch(batch_urls, vectorizer)
        batch_preds = (model.predict(X_test_real_features, batch_size=batch_size) > 0.5).astype(int)
        return batch_preds
    except Exception as e:
        print(f"Error in worker process: {e}")
        return None

# Optimized Test Model Function using ThreadPoolExecutor and TensorFlow Dataset
def test_model_with_threads(test_data_path, vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5', num_threads=16, batch_size=20480, chunk_size=10000):
    print("Loading model and vectorizer")
    model = tf.keras.models.load_model(model_path)
    vectorizer = np.load(vectorizer_path, allow_pickle=True).item()

    def process_file_in_chunks():
        with open(test_data_path, 'r', encoding='utf-8', errors='ignore') as file:
            chunk = []
            for line in file:
                chunk.append(line.strip())
                if len(chunk) == chunk_size:
                    yield chunk
                    chunk = []
            if chunk:
                yield chunk

    total_xss_detected = 0
    total_payloads = 0
    chunk_count = 0

    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        for batch_chunk in process_file_in_chunks():
            chunk_count += 1
            futures.append(executor.submit(worker_process, batch_chunk, vectorizer, model, batch_size))

        for future in as_completed(futures):
            result = future.result()
            if result is not None:
                xss_detected_count = np.sum(result)
                xss_total_count = len(result)
                total_xss_detected += xss_detected_count
                total_payloads += xss_total_count

    final_percentage = (total_xss_detected / total_payloads) * 100 if total_payloads > 0 else 0
    print(f"\nFinal results: Total XSS payloads detected: {total_xss_detected}/{total_payloads} ({final_percentage:.2f}%)")

# Example usage with threads and XLA optimization
test_model_with_threads(r'C:\Users\Omen\Desktop\XSS\Payload_Big.txt', vectorizer_path='tfidf_vectorizer.npy', model_path='xss_detection_model.h5', num_threads=16, batch_size=20480, chunk_size=10000)




Loading model and vectorizer
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step   
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 961ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step   
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 681ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 774ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step   
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 721ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 744ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 862ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 857ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 631ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 789ms/step
[1m1/1[0m [32m━━━━━━━━