In [None]:
import tensorflow
print(tensorflow.__version__)

2.17.1


In [None]:
import tensorflow as tf
from tensorflow.keras.layers import Layer, Reshape, Activation, Conv1D, Conv1DTranspose, Dropout
from tensorflow.keras.layers import Input, Add, Concatenate, Embedding, LeakyReLU, Dense, BatchNormalization, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.initializers import RandomNormal
from functools import partial

# 定义独立的判别器模型
def define_discriminator(in_shape=(200, 67), n_classes=2):
    # 输入
    in_url = Input(shape=in_shape)

    # 下采样到 14x14
    fe = Conv1D(16, 3, strides=1, padding='same')(in_url)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(negative_slope=0.2)(fe)
    fe = Conv1D(16, 3, strides=2, padding='same')(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(negative_slope=0.2)(fe)
    fe = Dropout(0.2)(fe)

    # 正常卷积层
    fe = Conv1D(32, 3, strides=1, padding='same')(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(negative_slope=0.2)(fe)
    fe = Conv1D(32, 3, strides=2, padding='same')(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(negative_slope=0.2)(fe)

    # 下采样到 7x7
    fe = Conv1D(128, 3, strides=1, padding='same')(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(negative_slope=0.2)(fe)
    fe = Conv1D(128, 3, strides=2, padding='same')(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(negative_slope=0.2)(fe)
    fe = Dropout(0.2)(fe)

    # 展平特征图
    fe = Flatten()(fe)
    dense_1 = Dense(256)(fe)
    dense_1 = LeakyReLU()(dense_1)
    dense_1 = Dense(64)(dense_1)

    # 真假输出
    out1 = Dense(1, activation='sigmoid')(dense_1)
    # 类别标签输出
    out2 = Dense(n_classes, activation='softmax')(dense_1)

    # 定义模型
    model = Model(in_url, [out1, out2], name="Discriminator")

    # 编译模型
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='mse', optimizer=opt, loss_weights=[1, 1])

    return model

# 定义独立的生成器模型
def define_generator(latent_dim=(50,), signal_shape=(200, 67), label_shape=(2,)):
    """
    定义一个生成器模型，用于生成新的样本。

    参数:
    latent_dim (tuple): 潜在空间的维度，默认为 (50,)。
    signal_shape (tuple): 输入信号的形状，默认为 (200, 67)。
    label_shape (tuple): 输入标签的形状，默认为 (2,)。

    返回:
    model (Model): 编译好的生成器模型。
    """
    depth = 4
    dropout = 0.25
    dim = signal_shape[0]

    # 信号输入
    in_signal = Input(shape=signal_shape)
    si = in_signal

    # 标签输入
    in_label = Input(shape=label_shape)
    # 标签嵌入
    li = Embedding(2, 50)(in_label)
    # 线性变换
    n_nodes = 200 * 1
    li = Dense(n_nodes)(li)
    # 重塑为额外通道
    li = Reshape((200, 2))(li)

    # 噪声输入
    in_lat = Input(shape=latent_dim)
    lat = Reshape((1, 50))(in_lat)

    # 基础层
    n_nodes = dim * depth
    gen = Dense(n_nodes)(lat)
    gen = LeakyReLU(negative_slope=0.2)(gen)
    gen = Reshape((dim, depth))(gen)

    # 合并生成器输出和标签输入
    merge = Concatenate()([gen, li, si])

    # 卷积层
    gen = Conv1D(32, 3, strides=1, padding='same')(merge)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1D(32, 3, strides=2, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1D(64, 3, strides=1, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1D(64, 3, strides=2, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1D(128, 3, strides=1, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1D(128, 3, strides=2, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1DTranspose(128, 3, strides=2, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1DTranspose(64, 3, strides=2, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    gen = Conv1DTranspose(32, 3, strides=2, padding='same')(gen)
    gen = BatchNormalization()(gen)
    gen = LeakyReLU(negative_slope=0.2)(gen)

    # 输出层
    gen = Conv1D(67, 3, strides=1, padding='same')(gen)
    out_layer = Activation('sigmoid')(gen)

    # 定义模型
    model = Model([in_signal, in_lat, in_label], out_layer, name="Generator")

    # 编译模型
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='mse', optimizer=opt)

    return model



In [None]:
# 定义 GAN 模型
def define_gan(g_model, d_model, latent_dim=(50,), signal_shape=(200, 67), label_shape=(2,)):
    """
    定义一个 GAN 模型，将生成器和判别器连接起来。

    参数:
    g_model (Model): 生成器模型。
    d_model (Model): 判别器模型。
    latent_dim (tuple): 潜在空间的维度，默认为 (50,)。
    signal_shape (tuple): 输入信号的形状，默认为 (200, 67)。
    label_shape (tuple): 输入标签的形状，默认为 (2,)。

    返回:
    model (Model): 编译好的 GAN 模型。
    """
    # 冻结判别器的权重
    d_model.trainable = False

    # 连接生成器的输出到判别器的输入
    [out1, out2] = d_model(g_model.output)

    # 定义 GAN 模型
    model = Model([g_model.input[0], g_model.input[1], g_model.input[2]], [out1, out2], name="Gan")

    # 编译模型
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='mse', optimizer=opt, loss_weights=[1, 10])

    return model



In [None]:
import numpy as np
from numpy import load
import tensorflow as tf
from numpy.random import randint
from numpy import zeros, ones
from scipy.signal import savgol_filter
from scipy.signal import butter, filtfilt
from scipy.ndimage import gaussian_filter1d

# 生成潜在空间中的点作为生成器的输入
def generate_latent_points(latent_dim, n_samples):
    """
    生成潜在空间中的随机点，并应用高斯滤波平滑数据。

    参数:
    latent_dim (int): 潜在空间的维度。
    n_samples (int): 需要生成的样本数量。

    返回:
    z_input (ndarray): 形状为 (n_samples, latent_dim) 的潜在空间点。
    """
    # 确保 latent_dim 是一个整数
    latent_dim = latent_dim[0] if isinstance(latent_dim, tuple) else latent_dim
    # 生成标准正态分布的随机数
    x_input = np.random.randn(n_samples, latent_dim)
    # 应用高斯滤波平滑数据
    x_input = gaussian_filter1d(x_input, 4, axis=1)
    return x_input

# 使用生成器生成 n 个假样本，并附带类别标签
def generate_fake_samples(generator, real_sample, labels_input, latent_dim, n_samples):
    """
    使用生成器生成假样本，并附带类别标签。

    参数:
    generator (Model): 生成器模型。
    real_sample (ndarray): 真实样本，用于条件生成。
    labels_input (ndarray): 类别标签。
    latent_dim (int): 潜在空间的维度。
    n_samples (int): 需要生成的样本数量。

    返回:
    url (list): 包含生成的url和类别标签的列表。
    y (ndarray): 假样本的标签，全为 0。
    """
    # 生成潜在空间中的点
    z_input = generate_latent_points(latent_dim, n_samples)
    # 使用生成器生成图像
    url = generator.predict([real_sample, z_input, labels_input])
    # 创建假样本的标签
    y = zeros((n_samples, 1))
    return [url, labels_input], y

# 从数据集中随机生成真实样本及其标签
def generate_real_random(dataset, n_samples):
    """
    从数据集中随机生成真实样本及其标签。

    参数:
    dataset (tuple): 数据集。
    n_samples (int): 需要生成的样本数量。

    返回:
    X (ndarray): 真实样本。
    labels (ndarray): 真实样本的标签。
    y (ndarray): 真实样本的标签，全为 1。
    """
    url, labels = dataset
    # 随机选择索引
    ix = randint(0, url.shape[0], n_samples)
    # 根据索引提取样本和标签
    X, labels = url[ix], labels[ix]
    # 创建真实样本的标签
    y = ones((n_samples, 1))
    return [X, labels], y

# 从数据集中按顺序生成真实样本及其标签
def generate_real_samples(dataset, batch_id, n_samples):
    """
    从数据集中按顺序生成真实样本及其标签。

    参数:
    dataset (tuple): 数据集。
    batch_id (int): 当前批次的编号。
    n_samples (int): 每个批次的样本数量。

    返回:
    X (ndarray): 真实样本。
    labels (ndarray): 真实样本的标签。
    y (ndarray): 真实样本的标签，全为 1。
    """
    url, labels = dataset
    # 计算当前批次的起始和结束索引
    start = batch_id * n_samples
    end = start + n_samples
    # 根据索引提取样本和标签
    X, labels = url[start:end], labels[start:end]
    # 创建真实样本的标签
    y = ones((n_samples, 1))
    return [X, labels], y

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import os
import random

def summarize_performance_fixed(reverse_dictionary, step, g_model,d_model ,dataset, n_samples=3,latent_dim=128,  savedir='weights_plots'):
    if not os.path.exists(savedir):
            os.makedirs(savedir)
    # select a sample of input images
    [X,labels],_ = generate_real_random(dataset, n_samples)
    # generate a batch of fake samples
    [X_fake, _], _ = generate_fake_samples(g_model,X, labels, latent_dim, n_samples)

    for url in X_fake:
                this_url_gen = ""
                for position in url:
                    this_index = np.argmax(position)
                    if this_index != 0:
                        this_url_gen += reverse_dictionary[this_index]

                print(this_url_gen)
    # save the generator model
    filename2 = savedir+'/'+'gmodel_%001d.h5' % (step+1)
    g_model.save(filename2)
    filename3 = savedir+'/'+'dmodel_%001d.h5' % (step+1)
    d_model.save(filename3)
    print('>Saved: %s and %s' % (filename2, filename3))


def to_csv(dr_hist, df_hist, g_hist, gan_hist,savedir='dummy'):
    if not os.path.exists(savedir):
        os.makedirs(savedir)
    dr = np.array(dr_hist)
    df = np.array(df_hist)
    g = np.array(g_hist)
    gan = np.array(gan_hist)
    df = pd.DataFrame(data=(dr,df,g,gan)).T
    df.columns=["dr","df","g","gan"]
    filename = savedir+"/ecg-atk-loss.csv"
    df.to_csv(filename)


def plot_history(dr_hist, df_hist, g_hist, gan_hist,savedir='dummy'):
    if not os.path.exists(savedir):
        os.makedirs(savedir)
    plt.plot(dr_hist, label='dr')
    plt.plot(df_hist, label='dfm')
    plt.plot(g_hist, label='g_loss')
    plt.plot(gan_hist, label='gan_loss')
    plt.legend()
    filename = savedir+'/plot_line_plot_loss.png'
    plt.savefig(filename)
    plt.close()
    print('Saved %s' % (filename))

In [21]:
import os
import argparse
import string
import gc
import tensorflow.keras.backend as K
from sklearn.utils import shuffle

def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=2, n_batch=64, savedir="dummy"):
    # calculate the number of batches per training epoch
    bat_per_epo = 100
    print('batch per epoch: %d' % bat_per_epo)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    print('number of steps: %d' % n_steps)
    dr_hist, df_hist = list(), list()
    g_hist, gan_hist = list(), list()

    alphabet = string.ascii_lowercase + string.digits + "!#$%&()*+,-./:;<=>?@[\\]^_`{|}~"
    reverse_dictionary = {}
    for i, c in enumerate(alphabet):
        reverse_dictionary[i+1] = c
    b = 0
    for e in range(n_epochs):
        for i in range(bat_per_epo):
            for j in range(2):
                d_model.trainable = True
                g_model.trainable = False
                # 获取随机选择的真实样本
                [X_real, labels_real], y_real = generate_real_samples(dataset, i, n_batch)
                # 更新判别器模型权重
                d_loss_real = d_model.train_on_batch(X_real, [y_real, labels_real])
                # 生成虚假样本
                [X_fake, labels_fake], y_fake = generate_fake_samples(g_model, X_real, labels_real, latent_dim, n_batch)
                # 更新判别器模型权重
                d_loss_fake = d_model.train_on_batch(X_fake, [y_fake, labels_fake])
            # 更新
            d_model.trainable = False
            g_model.trainable = True
            # 准备潜在空间中的点作为生成器的输入
            z_input = generate_latent_points(latent_dim, n_batch)
            # 数据样本批次
            [X_real, labels_real], y_real = generate_real_samples(dataset, i, n_batch)
            g_loss = g_model.train_on_batch([X_real, z_input, labels_real], X_real)
            gan_loss = gan_model.train_on_batch([X_real, z_input, labels_real], [y_real, labels_real])
            # 打印该批次的损失
            print('>%d, dr[%.3f], df[%.3f], g[%.3f], gan[%.3f]' % (i+1, d_loss_real[0], d_loss_fake[0], g_loss, gan_loss[0]))
            dr_hist.append(d_loss_real[0])
            df_hist.append(d_loss_fake[0])
            g_hist.append(g_loss)
            gan_hist.append(gan_loss[0])
            # 释放不必要的内存
            gc.collect()
            K.clear_session()

            # evaluate the model performance every 'epoch'
            #if (i+1) % (bat_per_epo * 1) == 0:
        summarize_performance_fixed(reverse_dictionary, b, g_model, d_model, dataset, 3, latent_dim, savedir=savedir)
        b = b + 1
        #summarize_performance(i, g_model, latent_dim,X_real,n_samples=n_batch,savedir=savedir)
    plot_history(dr_hist, df_hist, g_hist, gan_hist, savedir=savedir)
    to_csv(dr_hist, df_hist, g_hist, gan_hist, savedir=savedir)


if __name__ == "__main__":

    epochs = 6
    batch_size = 64
    npz_file = 'phishing.npz'
    latent_dim = (50,)
    savedir = 'PhishGan'
    resume_training = 'no'
    weight_name_dis = None
    weight_name_gen = None


    K.clear_session()
    if not os.path.exists(savedir):
        os.makedirs(savedir)
    #create the discriminator
    discriminator = define_discriminator()
    #create the generator
    generator = define_generator(latent_dim)

    if resume_training == 'yes':
        discriminator.load_weights(weight_name_dis)
        generator.load_weights(weight_name_gen)

    # create the gan
    gan_model = define_gan(generator, discriminator)
    # load image data
    data = np.load(npz_file)
    dataset = shuffle(data['X_train'], data['y_train'])
    # train model
    train(generator, discriminator, gan_model, dataset, latent_dim=latent_dim, n_epochs=epochs, n_batch=batch_size, savedir=savedir)

In [None]:
import numpy as np
import gc
import tensorflow.keras.backend as K
from sklearn.metrics import accuracy_score
from tensorflow.keras.models import load_model
from tensorflow.keras.losses import MeanSquaredError


def test_model_accuracy(npz_file, latent_dim, savedir):
    # 加载数据
    data = np.load(npz_file)
    X_test = data['X_test']
    y_test = data['y_test']

    # 明确指定mse损失函数
    custom_objects = {'mse': MeanSquaredError()}

    # 加载判别器和生成器的权重文件
    discriminator = load_model(f"{savedir}/dmodel_66.h5", custom_objects=custom_objects)
    generator = load_model(f"{savedir}/gmodel_66.h5", custom_objects=custom_objects)

    # 真实样本的预测
    real_preds = []
    batch_size = 64
    num_test_samples = X_test.shape[0]


    for batch_start in range(0, num_test_samples, batch_size):
        batch_end = min(batch_start + batch_size, num_test_samples)
        batch_x_test = X_test[batch_start:batch_end]
        batch_y_test = y_test[batch_start:batch_end]

        discriminator.trainable = True
        generator.trainable = False

        # 获取判别器对真实样本的预测输出
        [real_pred_1, real_pred_2] = discriminator.predict(batch_x_test)
        real_preds.append(real_pred_2)


    real_preds = np.concatenate(real_preds, axis=0)
    real_predicted_labels = np.argmax(real_preds, axis=1)

    # 确保true_labels与处理后的预测标签数量一致
    true_labels = np.argmax(y_test[:num_test_samples], axis=1)

    # 计算真实样本预测准确率
    accuracy = accuracy_score(true_labels, real_predicted_labels)

    print(f"模型的准确率: {accuracy}")

    return accuracy


if __name__ == "__main__":

    gc.collect()
    K.clear_session()

    npz_file = '/content/drive/MyDrive/phishing.npz'
    latent_dim = (50,)
    savedir = '/content/drive/MyDrive/phishing_detection'

    test_model_accuracy(npz_file, latent_dim, savedir)



[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step  
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step 
[1m2/2[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4