<a href="https://colab.research.google.com/github/8sheeta8/Attack_Network_AI/blob/main/XSS(8sheeta8)/XSS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
# ✅ 개선 포인트 반영: MCTS-T UCB 수식 + AlexNet depth 확장

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
import os
from torchvision import transforms

# ✅ GPU/CPU 자동 선택
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("사용 장치:", device)

# ✅ 1. 업로드된 npy 파일을 불러오는 Dataset 클래스
class XSSDataset(Dataset):
    def __init__(self, image_path, label_path):
        self.images = np.load(image_path).astype(np.float32) / 255.0
        self.labels = np.load(label_path).astype(np.int64)

        if len(self.images.shape) == 3:
            self.images = self.images[:, np.newaxis, :, :]  # (N, 1, 16, 16)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return torch.tensor(self.images[idx]), torch.tensor(self.labels[idx])

# ✅ 2. DataLoader 생성 함수
def create_dataloader_from_files(img_path, label_path, batch_size=32, shuffle=False):
    dataset = XSSDataset(img_path, label_path)
    return DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)


# ✅ 1. 개선된 AlexNet 기반 Discriminator (구조 확장)
class DiscriminatorAlexNet(nn.Module):
    def __init__(self):
        super(DiscriminatorAlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * 4 * 4, 512),
            nn.ReLU(inplace=True),
            nn.Linear(512, 2)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x

# ✅ 4. 학습 함수
def train_discriminator(model, train_loader, val_loader, epochs=10, lr=1e-3):
    model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        correct = 0
        total = 0

        for x, y in train_loader:
            x, y = x.to(device), y.to(device)

            optimizer.zero_grad()
            out = model(x)
            loss = criterion(out, y)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            pred = out.argmax(dim=1)
            correct += (pred == y).sum().item()
            total += y.size(0)

        acc = correct / total
        print(f"Epoch {epoch+1}/{epochs} | Loss: {total_loss:.4f} | Train Acc: {acc:.4f}")

        # 검증 정확도 출력
        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for x, y in val_loader:
                x, y = x.to(device), y.to(device)
                out = model(x)
                pred = out.argmax(dim=1)
                correct += (pred == y).sum().item()
                total += y.size(0)
            val_acc = correct / total
        print(f"           >> Val Acc: {val_acc:.4f}")

# ✅ 5. 테스트 평가 함수
def evaluate_model(model, test_loader):
    model.eval()
    preds = []
    targets = []
    with torch.no_grad():
        for x, y in test_loader:
            x = x.to(device)
            out = model(x)
            pred = out.argmax(dim=1).cpu().numpy()
            preds.extend(pred)
            targets.extend(y.numpy())

    print("\n✅ 최종 테스트 결과:")
    print(classification_report(targets, preds, target_names=["Normal", "XSS"]))

# ✅ 6. 파일 경로 지정 후 실행
train_loader = create_dataloader_from_files("train_images.npy", "train_labels.npy", shuffle=True)
val_loader = create_dataloader_from_files("val_images.npy", "val_labels.npy")
test_loader = create_dataloader_from_files("test_images.npy", "test_labels.npy")

model = DiscriminatorAlexNet()
train_discriminator(model, train_loader, val_loader, epochs=15, lr=1e-3)
evaluate_model(model, test_loader)


사용 장치: cpu
Epoch 1/15 | Loss: 4.6255 | Train Acc: 0.9556
           >> Val Acc: 1.0000
Epoch 2/15 | Loss: 4.4067 | Train Acc: 0.9807
           >> Val Acc: 1.0000
Epoch 3/15 | Loss: 0.2487 | Train Acc: 0.9992
           >> Val Acc: 1.0000
Epoch 4/15 | Loss: 0.4239 | Train Acc: 0.9983
           >> Val Acc: 1.0000
Epoch 5/15 | Loss: 0.5897 | Train Acc: 0.9983
           >> Val Acc: 1.0000
Epoch 6/15 | Loss: 0.0050 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 7/15 | Loss: 0.0002 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 8/15 | Loss: 0.0000 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 9/15 | Loss: 0.0000 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 10/15 | Loss: 0.0000 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 11/15 | Loss: 0.0000 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 12/15 | Loss: 0.0000 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 13/15 | Loss: 0.0000 | Train Acc: 1.0000
           >> Val Acc: 1.00

 1단계: 우회 기법 (Action) 정의

In [37]:
import random
import urllib.parse

def insert_comment(tag):
    return tag.replace("<", "<!-- -->") if "<" in tag else tag

def url_encode(tag):
    return urllib.parse.quote(tag)

def split_tag(tag):
    if "<script>" in tag:
        return tag.replace("<script>", "<scr<script>ipt>")
    return tag

def case_shift(tag):
    return ''.join(c.upper() if random.random() < 0.5 else c.lower() for c in tag)

def space_injection(tag):
    return tag.replace("<", "< ").replace(">", " >")

# ✅ 3. 문자열에 위치+행동 적용
obfuscation_actions = [
    lambda s: s.replace('<', '<!-- -->') if '<' in s else s,
    lambda s: urllib.parse.quote(s),
    lambda s: s.replace("<script>", "<scr<script>ipt>") if "<script>" in s else s,
    lambda s: ''.join(c.upper() if random.random() < 0.5 else c.lower() for c in s),
    lambda s: s.replace('<', '< ').replace('>', ' >')
]

def apply_action(text, pos, action_fn):
    if pos >= len(text):
        return text
    part = text[pos:]
    transformed = action_fn(part)
    return text[:pos] + transformed

2단계: MCTS-T 노드 구조 정의

In [38]:
import numpy as np

# ✅ 2. 개선된 MCTS-T 구조 및 UCB-T 수식
class MCTSTNode:
    def __init__(self, xss_string, pos=None, act=None, parent=None):
        self.xss = xss_string
        self.pos = pos
        self.act = act
        self.parent = parent
        self.children = []
        self.visits = 0
        self.reward = 0.0

    def is_leaf(self):
        return len(self.children) == 0

    def expand(self, actions, max_pos):
        for pos in range(min(max_pos, len(self.xss))):
            for act in actions:
                new_str = apply_action(self.xss, pos, act)
                child = MCTSTNode(new_str, pos, act.__name__, parent=self)
                self.children.append(child)

    def ucb_score(self, C1=1.0, C2=1.0):
        if self.visits == 0:
            return float('inf')
        parent_visits = self.parent.visits if self.parent else 1
        return (self.reward / self.visits) + C1 * np.sqrt(np.log(parent_visits) / self.visits)


 3단계: MCTS 탐색 알고리즘 -

In [39]:
def select_promising_node(node):
    while not node.is_leaf():
        node = max(node.children, key=lambda n: n.ucb_score())
    return node

def backpropagate(node, reward):
    while node is not None:
        node.visits += 1
        node.reward += reward
        node = node.parent


4단계: 보상 평가 함수 (Discriminator 활용)

In [40]:
import torch

# ✅ 4. 문자열 → 이미지 변환

def string_to_image_matrix(text, max_len=256):
    arr = np.zeros((16, 16), dtype=np.uint8)
    for i, c in enumerate(text[:max_len]):
        arr[i // 16][i % 16] = ord(c) if ord(c) < 256 else 0
    return arr

# ✅ 5. 보상 평가 (Discriminator 속이기)

def calculate_reward(discriminator_model, xss_string, device):
    matrix = string_to_image_matrix(xss_string)
    tensor = torch.tensor(matrix, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device) / 255.0
    model = discriminator_model.eval().to(device)
    with torch.no_grad():
        output = model(tensor)
        prob = torch.softmax(output, dim=1)[0][0].item()
    return prob


5단계: 전체 Generator 실행

In [41]:
# ✅ 6. 개선된 Generator 실행

def generate_obfuscated_xss(xss, model, device, max_iter=20):
    root = MCTSTNode(xss)

    for _ in range(max_iter):
        node = root
        while not node.is_leaf():
            node = max(node.children, key=lambda n: n.ucb_score())

        if node.visits == 0:
            reward = calculate_reward(model, node.xss, device)
            backpropagate(node, reward)
        else:
            node.expand(obfuscation_actions, max_pos=5)
            if node.children:
                child = random.choice(node.children)
                reward = calculate_reward(model, child.xss, device)
                backpropagate(child, reward)

    # 가장 좋은 우회 결과 선택
    leaf_nodes = []
    def collect_leaves(n):
        if n.is_leaf():
            leaf_nodes.append(n)
        else:
            for c in n.children:
                collect_leaves(c)
    collect_leaves(root)
    best = max(leaf_nodes, key=lambda n: n.reward / (n.visits + 1e-6))
    return best.xss

def backpropagate(node, reward):
    while node is not None:
        node.visits += 1
        node.reward += reward
        node = node.parent



실행

In [51]:
def generate_all_obfuscated_xss(input_path, output_path, discriminator_model, max_iter=30, device="cpu"):
    with open(input_path, "r", encoding="utf-8") as f:
        original_samples = [line.strip() for line in f if line.strip()]

    print(f"총 {len(original_samples)}개 XSS 문자열 우회 생성 시작...")

    obfuscated_list = []
    for i, xss in enumerate(original_samples):
        print(f"\n[{i+1}/{len(original_samples)}] 원본: {xss}")

        # ✅ 여기에서 device 인자를 넘기지 않아 생긴 오류 → 수정!
        obf = generate_obfuscated_xss(xss, discriminator_model, max_iter=max_iter, device=device)
        obfuscated_list.append(obf)

    # 저장
    with open(output_path, "w", encoding="utf-8") as f:
        for obf in obfuscated_list:
            f.write(obf + "\n")

    print(f"\n✅ 모든 우회 문자열 저장 완료 → {output_path}")


In [52]:
generate_all_obfuscated_xss(
    input_path="generator_input.txt",
    output_path="generator_output.txt",
    discriminator_model=model,
    max_iter=30,
    device=device     # ✅ 이 줄 추가!
)


총 874개 XSS 문자열 우회 생성 시작...

[1/874] 원본: <script>console.log('BLV5DIBIS87PYKBE1QQHCJ22ET7LUGGUCPP9OTYYPSB697X2E7');console.log(document.cookie);</script>

[2/874] 원본: <script>console.log('4IX6WF7YU8XE4XV7F78KR868GYBVZ8SLVEBA2WMLHD1NKP7ZVK');console.log(document.cookie);</script>

[3/874] 원본: <script>console.log('9PRBGN0KXTIRDJWTY9DKY8IVZKIUDMTHH8PCBZ6PM37OLPFVG4');console.log(document.cookie);</script>

[4/874] 원본: <script>console.log('WPV1RRR4KHVU03DRW4HYNUYSNCSXSXHPHM6PYQZVFYOV8H4KNQ');console.log(document.cookie);</script>

[5/874] 원본: <script>console.log('2CBJ75LBD30SXIF6MLVV3UGZZKJNW3MZXSYLSUIHRM8JU2740F');console.log(document.cookie);</script>

[6/874] 원본: <script>console.log('05WXMDQYHK5Q3D6GYEW7V8IM3UYHIMFOEIAZVE5R65PZL9U5RU');console.log(document.cookie);</script>

[7/874] 원본: <script>console.log('L6OI4KXBDJAJE8IK4NYPSSD1XZTAFF7F3HBO78B92UC567POSO');console.log(document.cookie);</script>

[8/874] 원본: <script>console.log('QGK8G8RA33TIM7J51IUWN9DBLUSM0ETCHNJNHAAB17A8ZSS191');cons

평가

In [53]:
def evaluate_obfuscation_success(discriminator_model, obf_file_path):
    # 1. 우회 문자열 불러오기
    with open(obf_file_path, "r", encoding="utf-8") as f:
        samples = [line.strip() for line in f if line.strip()]

    total = len(samples)
    fooled = 0

    discriminator_model.eval()
    with torch.no_grad():
        for xss in samples:
            img = string_to_image_matrix(xss)
            tensor = torch.tensor(img, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device) / 255.0
            output = discriminator_model(tensor)
            pred = torch.argmax(output, dim=1).item()

            if pred == 0:  # 정상으로 분류 → 탐지 실패
                fooled += 1

    print(f"\n✅ 우회 문자열 평가 결과:")
    print(f" - 전체 개수: {total}")
    print(f" - 탐지 실패 (정상으로 오판): {fooled}")
    print(f" - 우회 성공률: {fooled / total * 100:.2f}%")


In [54]:
evaluate_obfuscation_success(model, "generator_output.txt")



✅ 우회 문자열 평가 결과:
 - 전체 개수: 874
 - 탐지 실패 (정상으로 오판): 874
 - 우회 성공률: 100.00%


D-G GAN 혼합 루트

우회 문자열 → 이미지(.npy) 변환 코드

In [55]:
import numpy as np
import os

def convert_string_to_image_matrix(text, max_len=256):
    arr = np.zeros((16, 16), dtype=np.uint8)
    for i, c in enumerate(text[:max_len]):
        arr[i // 16][i % 16] = ord(c) if ord(c) < 256 else 0
    return arr

def convert_obfuscated_txt_to_npy(input_txt, out_dir):
    with open(input_txt, "r", encoding="utf-8") as f:
        lines = [line.strip() for line in f if line.strip()]

    images = [convert_string_to_image_matrix(t) for t in lines]
    labels = [1] * len(images)  # label 1 = XSS (우회공격도 공격임)

    os.makedirs(out_dir, exist_ok=True)
    np.save(os.path.join(out_dir, "images.npy"), np.array(images))
    np.save(os.path.join(out_dir, "labels.npy"), np.array(labels))
    print(f"✅ 우회 이미지 저장 완료: {out_dir}/images.npy, labels.npy")

# 사용 예시
convert_obfuscated_txt_to_npy("generator_output.txt", "dataset_obfuscated")


✅ 우회 이미지 저장 완료: dataset_obfuscated/images.npy, labels.npy


3개 클래스 병합 후 분할

In [56]:
def merge_and_split_all_with_existing_splits(
    train_img, train_lbl,
    val_img, val_lbl,
    test_img, test_lbl,
    obf_img, obf_lbl,
    out_dir="dataset_gan_split"
):
    import numpy as np
    import os

    # 기존 분할 데이터 로드
    X_train = np.load(train_img)
    y_train = np.load(train_lbl)
    X_val = np.load(val_img)
    y_val = np.load(val_lbl)
    X_test = np.load(test_img)
    y_test = np.load(test_lbl)

    # 우회 샘플 로드
    X_obf = np.load(obf_img)
    y_obf = np.load(obf_lbl)

    # 비율 유지하며 6:2:2 비율로 분할
    n = len(X_obf)
    n_train = int(n * 0.6)
    n_val = int(n * 0.2)

    X_obf_train, y_obf_train = X_obf[:n_train], y_obf[:n_train]
    X_obf_val, y_obf_val = X_obf[n_train:n_train+n_val], y_obf[n_train:n_train+n_val]
    X_obf_test, y_obf_test = X_obf[n_train+n_val:], y_obf[n_train+n_val:]

    # 병합
    X_train_merge = np.concatenate([X_train, X_obf_train], axis=0)
    y_train_merge = np.concatenate([y_train, y_obf_train], axis=0)

    X_val_merge = np.concatenate([X_val, X_obf_val], axis=0)
    y_val_merge = np.concatenate([y_val, y_obf_val], axis=0)

    X_test_merge = np.concatenate([X_test, X_obf_test], axis=0)
    y_test_merge = np.concatenate([y_test, y_obf_test], axis=0)

    # 저장
    os.makedirs(out_dir, exist_ok=True)
    np.save(os.path.join(out_dir, "train_images.npy"), X_train_merge)
    np.save(os.path.join(out_dir, "train_labels.npy"), y_train_merge)
    np.save(os.path.join(out_dir, "val_images.npy"), X_val_merge)
    np.save(os.path.join(out_dir, "val_labels.npy"), y_val_merge)
    np.save(os.path.join(out_dir, "test_images.npy"), X_test_merge)
    np.save(os.path.join(out_dir, "test_labels.npy"), y_test_merge)

    print(f"✅ 병합 및 저장 완료 → {out_dir}")
    print(f" - Train: {len(X_train_merge)}")
    print(f" - Val:   {len(X_val_merge)}")
    print(f" - Test:  {len(X_test_merge)}")


In [57]:
merge_and_split_all_with_existing_splits(
    train_img="train_images.npy",
    train_lbl="train_labels.npy",
    val_img="val_images.npy",
    val_lbl="val_labels.npy",
    test_img="test_images.npy",
    test_lbl="test_labels.npy",
    obf_img="dataset_obfuscated/images.npy",
    obf_lbl="dataset_obfuscated/labels.npy",
    out_dir="dataset_gan_split"
)


✅ 병합 및 저장 완료 → dataset_gan_split
 - Train: 1718
 - Val:   430
 - Test:  433


Discriminator 재학습 (GAN용)

In [58]:
# 불러오기
train_loader = create_dataloader_from_files("dataset_gan_split/train_images.npy", "dataset_gan_split/train_labels.npy", shuffle=True)
val_loader = create_dataloader_from_files("dataset_gan_split/val_images.npy", "dataset_gan_split/val_labels.npy")
test_loader = create_dataloader_from_files("dataset_gan_split/test_images.npy", "dataset_gan_split/test_labels.npy")

# 모델 초기화 및 학습
model = DiscriminatorAlexNet()
train_discriminator(model, train_loader, val_loader, epochs=15, lr=1e-3)

# 평가
evaluate_model(model, test_loader)


Epoch 1/15 | Loss: 9.7313 | Train Acc: 0.9139
           >> Val Acc: 0.9977
Epoch 2/15 | Loss: 1.6812 | Train Acc: 0.9953
           >> Val Acc: 1.0000
Epoch 3/15 | Loss: 0.6190 | Train Acc: 0.9983
           >> Val Acc: 1.0000
Epoch 4/15 | Loss: 0.3494 | Train Acc: 0.9988
           >> Val Acc: 1.0000
Epoch 5/15 | Loss: 0.3602 | Train Acc: 0.9994
           >> Val Acc: 1.0000
Epoch 6/15 | Loss: 0.4436 | Train Acc: 0.9983
           >> Val Acc: 1.0000
Epoch 7/15 | Loss: 0.4342 | Train Acc: 0.9983
           >> Val Acc: 0.9977
Epoch 8/15 | Loss: 0.1441 | Train Acc: 0.9988
           >> Val Acc: 0.9977
Epoch 9/15 | Loss: 0.2001 | Train Acc: 0.9983
           >> Val Acc: 1.0000
Epoch 10/15 | Loss: 0.0745 | Train Acc: 0.9994
           >> Val Acc: 0.9977
Epoch 11/15 | Loss: 0.0175 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 12/15 | Loss: 0.0015 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 13/15 | Loss: 0.0007 | Train Acc: 1.0000
           >> Val Acc: 1.0000
Epoch 14