In [1]:
import pandas as pd
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from gensim.models import Word2Vec
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.optim.lr_scheduler import ReduceLROnPlateau

In [2]:
# 確認是否有 GPU 可以使用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cpu


In [3]:
def preprocess_payload(payload):
    def replace_url(text):
        text = re.sub(r'http[s]?://[^\s"<>]+', 'http://u', text)
        text = re.sub(r'ftp://[^\s"<>]+', 'http://u', text)
        text = re.sub(r'mailto:[^\s"<>]+', 'http://u', text)
        text = re.sub(r'file://[^\s"<>]+', 'http://u', text)
        text = re.sub(r'tel:[^\s"<>]+', 'http://u', text)
        text = re.sub(r'data:[^\s"<>]+', 'http://u', text)
        text = re.sub(r'(href|src|action|formaction|background)=[\'"][^\s"<>]+[\'"]', r'\1="http://u"', text)
        text = re.sub(r'srcset="([^"]+)"', lambda match: 'srcset="r"', text)
    
        return text

    # 替換數字為單個 0
    def replace_numbers(text):
        return re.sub(r'\d+', '0', text)  # 使用 \d+ 確保每組數字只替換為一個 0

    # 保留 HTML 標籤並處理內容
    def process_tag(match):
        tag_content = match.group(0)
        # 替換標籤內的 URL
        tag_content = replace_url(tag_content)
        # 替換標籤內的數字
        tag_content = replace_numbers(tag_content)
        return tag_content

    # 使用正則表達式匹配 HTML 標籤並處理
    processed_payload = re.sub(r'<[^>]+>', process_tag, payload)
        
    # 處理標籤外的數字和 URL
    processed_payload = replace_url(processed_payload)
    processed_payload = replace_numbers(processed_payload)
        
    return processed_payload

def custom_tokenize(text):
    # 定義正則表達式規則
    pattern = r'''(?x)                         # 開啟 verbose 模式，讓正則表達式更易讀
        "[^"]+"                                # 匹配雙引號內的內容
        | '[^']+'                              # 匹配單引號內的內容
        | http://\w+                           # 匹配 http:// 開頭的 URL
        | <\w+>                                # 匹配開啟的 HTML 標籤 <tag>
        | </\w+>                               # 匹配關閉的 HTML 標籤 </tag>
        | \w+=                                 # 匹配像 name=value 這樣的結構
        | [\w\.]+                              # 匹配普通單詞（字母、數字、下劃線或點）
        | [\s]+                                # 匹配空白字符
        | [^\w\s<>]+                           # 匹配非字母數字空白和非標籤的其他字符
    '''
        
    # 使用正則表達式分詞
    tokens = re.findall(pattern, text)
        
    # 清除多餘的空格字符
    tokens = [token.strip() for token in tokens if token.strip()]
        
    return tokens

In [4]:
def tokens_to_vectors(tered_tokensfil, model, max_sequence_length=100):
    vectors = []
    
    for token in tered_tokensfil:
        if token in model.wv:
            vectors.append(torch.tensor(model.wv[token], dtype=torch.float32))  # 轉為 PyTorch Tensor
        else:
            vectors.append(torch.zeros(model.vector_size, dtype=torch.float32))  # 未知詞補零

    vectors = torch.stack(vectors) if vectors else torch.zeros((0, model.vector_size), dtype=torch.float32)  

    # 進行 padding 或截取，確保 shape = (max_sequence_length, embedding_dim)
    if vectors.shape[0] < max_sequence_length:
        pad_size = max_sequence_length - vectors.shape[0]
        padding_tensor = torch.zeros((pad_size, model.vector_size), dtype=torch.float32)
        vectors = torch.cat([vectors, padding_tensor], dim=0)
    else:
        vectors = vectors[:max_sequence_length]  # 超過 max_sequence_length 則截斷

    return vectors

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class XSSClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, max_seq_len):
        super(XSSClassifier, self).__init__()
        
        # CNN 層
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=6, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=input_dim, out_channels=4, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(in_channels=input_dim, out_channels=2, kernel_size=3, padding=1)
        
        # BiLSTM 層
        self.lstm = nn.LSTM(input_size=12, hidden_size=64, batch_first=True, bidirectional=True)
        
        # Self-Attention 層
        self.attention = nn.Linear(128, 1)
        
        # MaxPooling & UpSampling
        self.maxpool = nn.MaxPool1d(kernel_size=2)
        self.upsample = nn.Upsample(scale_factor=2, mode='nearest')
        
        # Fully Connected Layers
        self.fc1 = nn.Linear(140 * max_seq_len, 128)
        self.fc2 = nn.Linear(128, output_dim)
        self.dropout = nn.Dropout(0.5)

    def attention_layer(self, lstm_out):
        attn_scores = torch.tanh(self.attention(lstm_out))  # (batch, seq_len, 1)
        attn_weights = torch.softmax(attn_scores, dim=1)  # 計算權重
        attn_out = lstm_out * attn_weights  # 加權輸出
        return attn_out
 
    def forward(self, x):
        # 調整維度 (batch, seq_len, embedding_dim) → (batch, embedding_dim, seq_len)
        x = x.permute(0, 2, 1)
        
        # CNN
        conv1_out = F.relu(self.conv1(x))
        conv2_out = F.relu(self.conv2(x))
        conv3_out = F.relu(self.conv3(x))
        
        # 串接 CNN 特徵
        conv_out = torch.cat([conv1_out, conv2_out, conv3_out], dim=1)  # (batch, 12, seq_len)
        
        # 調整維度以適應 LSTM (batch, seq_len, 12)
        conv_out = conv_out.permute(0, 2, 1)
        
        # BiLSTM
        lstm_out, _ = self.lstm(conv_out)
        
        # Self-Attention
        attn_out = self.attention_layer(lstm_out)
        
        # MaxPooling + UpSampling
        pooled_out = self.maxpool(attn_out.permute(0, 2, 1))  # 變成 (batch, channels, seq_len//2)
        upsampled_out = self.upsample(pooled_out)  # 放大回原本大小 (batch, channels, seq_len)
        
        # 特徵融合
        final_features = torch.cat([conv_out, upsampled_out.permute(0, 2, 1)], dim=-1)  # (batch, seq_len, 128)
        
        # 展平成全連接層輸入
        flattened = final_features.view(final_features.size(0), -1)
        
        # Dropout + 全連接層
        fc1_out = F.relu(self.fc1(flattened))
        fc1_out = self.dropout(fc1_out)
        output = self.fc2(fc1_out)

        return output


In [6]:
# 測試
model = Word2Vec.load("../res/word2vec.model")
xss_model = XSSClassifier(128, 64, 2, 100) 
xss_model.to(device)  # 移到 GPU 或 CPU
xss_model.load_state_dict(torch.load("../res/best_model.pth"))  # 載入最佳模型
xss_model.eval()

def is_xss(payload):
    with torch.no_grad():
        processed_payload = preprocess_payload(payload)
        tokens = custom_tokenize(processed_payload)
        xss_vector = tokens_to_vectors(tokens, model).unsqueeze(0).to(device)
        output = xss_model(xss_vector)
        _, predicted = torch.max(output, 1)
        return predicted.item()

# 載入資料集並逐行輸出預測結果與答案，順便計算準確率
# df = pd.read_csv('xss_dataset.csv', header=None)
# xss_payloads = df[0].tolist()
# labels = df[1].tolist()
# count = 0

# for payload, label in zip(xss_payloads, labels):
#     result = is_xss(payload)
#     if result == label:
#         count += 1
#     else:
#         print(f"預測：{result}，實際：{label}，字串：{payload}")


# accuracy = count / len(xss_payloads)
# print(f"準確率：{accuracy:.2%}")

RuntimeError: Attempting to deserialize object on a CUDA device but torch.cuda.is_available() is False. If you are running on a CPU-only machine, please use torch.load with map_location=torch.device('cpu') to map your storages to the CPU.

In [None]:
import subprocess
from oracle_tools import is_same_dom, do_xss_post_request
import time

endpoint = 'http://127.0.0.1:5555/vuln_backend/1.0/endpoint/'

f = open("../res/cve_llm_output.txt", "r", encoding="utf-8")
# lines = f.readlines()
lines = [
    "<svg/onload=setTimeout('ale'+'rt(1)',100)>",
    "<iframe src=\"javascript: setTimeout('ale'+'rt(1)', 100)\"></iframe>",
    "<details open ontoggle=\"setTimeout(unescape('%61%6c%65%72%74(1)'), 100)\">",
    "<input onfocus=(()=>{setTimeout(()=>{alert?.(1)},100)})() autofocus>",
    "<img src=x onerror=((x)=>{['al','ert'][0]+='';eval(x)})(`alert(1)`)>",
    "<select><option></option></select><img src=x onerror=[].filter.constructor('ale'+'rt(1)')()>",
    "<math><mtext><script>setInterval`alert\\u00281\\u0029`</script></mtext></math>",
    "<video><source onerror=Function(\"alert(1)\")()>",
    "<marquee onstart=eval`al\\u0065rt(1)`>",
    "<script>new Function`aler\\u0074(1)`()</script>"
]


for i, payload in enumerate(lines):
    # payload = line.split(". ")[1]

    # if payload == "exit" or payload == "": break
    # xss_result = is_xss(payload)
    # result_1 = do_xss_post_request(endpoint, 'abc')
    # result_2 = do_xss_post_request(endpoint, payload)

    # print(payload, xss_result, not is_same_dom(result_1, result_2))

    if is_xss(payload) == 1:
        print(f"[{i}] {"Triggered"} | {payload}")

    time.sleep(1)
