In [None]:
# Importing libraries

import pandas as pd 
import numpy as np
from numpy import float32
from tqdm import tqdm

from typing import Optional
from sklearn.model_selection import train_test_split
from sklearn.metrics import recall_score, confusion_matrix, precision_score,roc_auc_score,f1_score

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.multiprocessing import set_start_method

import pytorch_lightning as pl

from torch.utils.data import DataLoader, Dataset
import os

import seaborn as sns
import matplotlib.pyplot as plt


In [None]:
random_seed = 123
torch.manual_seed(random_seed)

BATCH_SIZE=1000
AVAIL_GPUS = min(1, torch.cuda.device_count())
NUM_WORKERS=0

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
#df = pd.read_csv("/kaggle/input/credit-card-fraud/card_transdata.csv")
df = pd.read_csv("/input/creditcardfraud/creditcard.csv")
#df.rename(columns={'fraud':'Class'},inplace=True)
# df = df.drop(['type','nameDest','nameOrign'],axis=1)
from sklearn.preprocessing import StandardScaler
Mx = df.iloc[:, :-1]  # 所有行，除了最后一列
nx = df.iloc[:, -1]   # 所有行的最后一列

# 创建 StandardScaler 对象
scaler = StandardScaler()

# 对特征进行拟合和转换
Mx_scaled = pd.DataFrame(scaler.fit_transform(Mx), columns=Mx.columns)

# 如果你想保留原始的目标变量，可以这样做：
scaled_data = pd.concat([Mx_scaled, nx], axis=1)

# 现在，scaled_data 就是标准化后的数据集
# 你可以查看结果
scaled_data.head()


In [None]:
# Creating function to plot confusion metrics for evaluation
def plot_cm(labels, predictions, p=0.5):
    cm = confusion_matrix(labels, predictions)
    plt.figure(figsize=(5,5))
    sns.heatmap(cm, annot=True, fmt="d")
    plt.title('Confusion matrix @{:.2f}'.format(p))
    plt.ylabel('Actual label')
    plt.xlabel('Predicted label')

In [None]:
train, test = train_test_split(df, test_size=0.2, stratify=df['Class'])
train.shape, test.shape

In [None]:
# Normalizing the data
data_mean = train.iloc[:,:-1].mean()
data_std = train.iloc[:,:-1].std()
train_norm = (train.iloc[:,:-1] - data_mean)/data_std
test_norm = (test.iloc[:,:-1] - data_mean)/data_std
train_norm['Class'] =  train.iloc[:, -1]
test_norm['Class'] =  test.iloc[:, -1]

In [None]:
train_norm['Class'].value_counts()

In [None]:
# Creating data model to use with the pytorch Lightning package
class CreditCardDataFinal(Dataset):
    def __init__(self, data: pd.DataFrame):
        self.data = data
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        label = torch.tensor(self.data[index][-1], dtype=torch.float32)
        row = torch.tensor(self.data[index][:-1], dtype=torch.float32)
        return row, label
    
class CreditCardDataModel(pl.LightningDataModule):
    def __init__(self, data: pd.DataFrame, batch_size=BATCH_SIZE, num_workers=0):
        super().__init__()
        self.data = data
        self.batch_size = batch_size
        self.num_workers = num_workers
    
    def setup(self, stage: Optional[str] = None):
        train_df, test_df = train_test_split(self.data, random_state=123, test_size=0.2, stratify=self.data['Class'])
        self.train_df = torch.tensor(train_norm.to_numpy(float32), dtype=torch.float32)
        self.test_df = torch.tensor(test_norm.to_numpy(float32), dtype=torch.float32)

    def train_dataloader(self):
        return DataLoader(dataset=CreditCardDataFinal(self.train_df), batch_size=self.batch_size, num_workers=self.num_workers)
    
    def test_dataloader(self):
        return DataLoader(dataset=CreditCardDataFinal(self.test_df), batch_size=self.batch_size, num_workers=self.num_workers)
    

In [None]:
# Building the main neural network to predict if the data is fraud or not. 
class ModelCreditCard(nn.Module):
    def __init__(self, input_size):
        super().__init__()
        self.sequential = nn.Sequential(
            nn.Linear(input_size, 100),
            nn.LeakyReLU(0.1),
            nn.Linear(100, 50),
            nn.LeakyReLU(0.1),
            nn.Linear(50, 25),
            nn.LeakyReLU(0.1),
            nn.Linear(25, 1)
        )
    
    def forward(self, x):
        tensor = torch.sigmoid(self.sequential(x))
        return tensor
    

class ModelTraining(pl.LightningModule):
    def __init__(self, model, lr=1e-3):
        super().__init__()
        self.save_hyperparameters()
        self.model = model
        
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        x = torch.tensor(x, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32)
        y_hat = self.model(x)
        y = y.unsqueeze(1)
        loss = self.binary_loss(y_hat, y)
        return {"loss":loss}
    
    def binary_loss(self, y_hat,y):
        return F.binary_cross_entropy(y_hat, y)
    
    def configure_optimizers(self):
        lr = self.hparams.lr
        opt_g = torch.optim.Adam(self.model.parameters(), lr )
        return [opt_g], []

In [None]:
trainer = pl.Trainer(max_epochs=10, accelerator='cuda', devices=1)

In [None]:
model_card = ModelCreditCard(30)
model = ModelTraining(model_card)

dm = CreditCardDataModel(train_norm)

trainer = pl.Trainer(max_epochs=5, accelerator='gpu', devices=1)
trainer.fit(model, dm)

In [None]:
test_pred = torch.tensor(test_norm.drop(["Class"], axis=1).to_numpy()).float()
test_true =  test_norm['Class'].to_numpy()
test_true = torch.tensor(test_true).unsqueeze(1).float()

In [None]:
test_output_real = model_card.forward(test_pred)
test_pred_real = [1 if i > 0.5 else 0 for i in test_output_real]
recall_score(test_true, test_pred_real)

In [None]:
class CreditCardData(Dataset):
    def __init__(self, data: pd.DataFrame):
        self.data = data
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        row = torch.tensor(self.data.iloc[index].values).float()
        return row

In [None]:
class DataModule(pl.LightningDataModule):
    def __init__(self, data: pd.DataFrame, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS):
        super().__init__()
        self.data = data
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.data_mean = None
        self.data_std = None
        
    def prepare_data(self):
        pass
    
    def setup(self, stage: Optional[str] = None):
        train_df, test_df = train_test_split(self.data, random_state=123, test_size=0.2)
        self.data_mean = train_df.mean()
        self.data_std = train_df.std()
        train_norm = (train_df - self.data_mean) / self.data_std
        test_norm = (test_df - self.data_mean) / self.data_std
        self.train_df = train_norm
        self.test_df = test_norm
    
    def train_dataloader(self):
        return DataLoader(dataset=CreditCardData(self.train_df), batch_size=self.batch_size, num_workers=self.num_workers, pin_memory=True)
    
    def valid_dataloader(self):
        return DataLoader(CreditCardData(self.val_df), batch_size=self.batch_size, num_workers=self.num_workers, pin_memory=True)
    
    def test_dataloader(self):
        return DataLoader(CreditCardData(self.test_df), batch_size=self.batch_size, num_workers=self.num_workers, pin_memory=True)
        

In [None]:
class GatedSelfAttention(nn.Module):
    def __init__(self, input_dim):
        super(GatedSelfAttention, self).__init__()
        self.query = nn.Linear(input_dim, input_dim)
        self.key = nn.Linear(input_dim, input_dim)
        self.value = nn.Linear(input_dim, input_dim)
        self.gate = nn.Linear(input_dim, input_dim)  # 新增的门控层

    def forward(self, x):
        Q = self.query(x)
        K = self.key(x)
        V = self.value(x)
        G = torch.sigmoid(self.gate(x))  # 计算门控信号，并通过Sigmoid激活
        
        # 计算注意力权重
        attention_scores = torch.bmm(Q, K.transpose(1, 2)) / (x.size(-1) ** 0.5)
        attention_weights = F.softmax(attention_scores, dim=-1)
        
        # 应用门控机制到值向量上
        gated_V = V * G  # 逐元素乘法
        
        # 应用注意力权重到门控后的值向量
        attention_output = torch.bmm(attention_weights, gated_V)
        
        return attention_output

In [None]:
class Discriminator(nn.Module):
    def __init__(self, input_dim=30):
        super(Discriminator, self).__init__()

        self.conv_layers = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv1d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.LeakyReLU(0.2)
        )

        self.attention = GatedSelfAttention(64)  # 使用新的门控自注意力模块

        self.fc_layers = nn.Sequential(
            nn.Linear(64 * input_dim, 128),
            nn.LeakyReLU(0.2),
            nn.Linear(128, 64),
            nn.LeakyReLU(0.2),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.conv_layers(x)
        x = x.transpose(1, 2)
        x = self.attention(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return torch.sigmoid(x)

In [None]:
class Generator(nn.Module):
    def __init__(self, latent_dim, output_dim):
        super().__init__()
        self.latent_dim = latent_dim
        self.output_dim = output_dim

        # 编码器部分：将输入数据映射到潜在空间参数
        self.encoder = nn.Sequential(
            nn.Linear(output_dim, 100),
            nn.LeakyReLU(0.2),
            nn.Linear(100, 100),
            nn.LeakyReLU(0.2)
        )
        self.fc_mu = nn.Linear(100, latent_dim)        # 均值向量
        self.fc_logvar = nn.Linear(100, latent_dim)    # 对数方差向量

        # 解码器部分（生成器）：从潜在空间重建数据
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 100),
            nn.LeakyReLU(0.2),
            nn.Linear(100, 100),
            nn.LeakyReLU(0.2),
            nn.Linear(100, output_dim)
        )

    def reparameterize(self, mu, log_var):
        """重参数化技巧"""
        std = torch.exp(0.5 * log_var)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        # 编码过程
        h = self.encoder(x)
        mu = self.fc_mu(h)
        log_var = self.fc_logvar(h)
        
        # 重参数化采样
        z = self.reparameterize(mu, log_var)
        
        # 解码重建
        x_recon = self.decoder(z)
        return x_recon, mu, log_var

    def sample(self, num_samples, device):
        """从潜在空间生成样本"""
        z = torch.randn(num_samples, self.latent_dim).to(device)
        samples = self.decoder(z)
        return samples
#classic generator
#class Generator(nn.Module):
    #def __init__(self, latent_dim, output_dim):
        #super().__init__()
        #self.latent_dim = latent_dim
        #self.output_dim = output_dim
        
        #self.model = nn.Sequential(
            #nn.Linear(latent_dim, output_dim)
            #nn.Linear(latent_dim, 100),
            #nn.LeakyReLU(0.2),
            #nn.Linear(100, 100),
            #nn.LeakyReLU(0.2),
            #nn.Linear(100, output_dim)
        #)

    #def forward(self, z):
        #return self.model(z)

In [None]:
class GAN(pl.LightningModule):
    def __init__(self, latent_dim=100, lr=0.002):
        super().__init__()
        self.save_hyperparameters()
        
        self.generator = Generator(latent_dim=self.hparams.latent_dim,output_dim=30)  # 假设输出维度为28)
        self.discriminator = Discriminator()
        
        self.validation_z = torch.randn(6, self.hparams.latent_dim).to(self.device)

        self.automatic_optimization = False  # Disable automatic optimization
        
    def setup(self, stage):
        # 获取数据模块
        data_module = self.trainer.datamodule
        
        # 获取数据统计信息
        self.data_mean = torch.tensor(data_module.data_mean.values, dtype=torch.float32)
        self.data_std = torch.tensor(data_module.data_std.values, dtype=torch.float32)

    def forward(self, z):
        # 生成标准化的数据
        x_normalized = self.generator(z)
        # 使用数据的均值和方差进行反标准化
        #x = x_normalized * self.data_std + self.data_mean
        x = x_normalized
        return x
    
    def adversarial_loss(self, y_hat, y):
        return F.binary_cross_entropy(y_hat, y)
    
    def training_step(self, batch, batch_idx):
        real_data = batch #.to(self.device)
        z = torch.randn(real_data.size(0), self.hparams.latent_dim).to(self.device)
        
        # Access optimizers manually
        opt_g, opt_d = self.optimizers()
        
        # Train Complementary GAN:
        
        # Train CCFD-generator: max log(D(G(z)))
        fake_data = self.generator(z)
        y_hat_fake = self.discriminator(fake_data)
        g_loss = self.adversarial_loss(y_hat_fake, torch.ones_like(y_hat_fake))
        
        opt_g.zero_grad()
        self.manual_backward(g_loss)
        opt_g.step()
        
        # Train CCFD-discriminator: max log(D(x)) + log(1- D(G(z)))
        y_hat_real = self.discriminator(real_data)
        y_real = torch.ones(real_data.size(0), 1).to(self.device)
        real_loss = self.adversarial_loss(y_hat_real, y_real)
        
        y_hat_fake = self.discriminator(fake_data.detach())
        y_fake = torch.zeros(real_data.size(0), 1).to(self.device)
        fake_loss = self.adversarial_loss(y_hat_fake, y_fake)
        
        d_loss = (real_loss + fake_loss) / 2
        
        opt_d.zero_grad()
        self.manual_backward(d_loss)
        opt_d.step()
        
        self.log("g_loss", g_loss, prog_bar=True, logger=True)
        self.log("d_loss", d_loss, prog_bar=True, logger=True)
        
    def configure_optimizers(self):
        lr = self.hparams.lr
        opt_g = torch.optim.Adam(self.generator.parameters(), lr)
        opt_d = torch.optim.Adam(self.discriminator.parameters(), lr)
        return [opt_g, opt_d]

In [None]:
only_pos = train_norm[train_norm['Class'] == 1 ].copy()
only_pos = only_pos.drop(['Class'], axis=1)
only_pos.shape

In [None]:
model = GAN()
model = model.to(device)

In [None]:
dm = DataModule(only_pos)
trainer = pl.Trainer(max_epochs=500)
trainer.fit(model, dm)

In [None]:
z = torch.randn(227451,100)
#227451 730078
output = model(z)

z = torch.randn(10000,100,16)
t = torch.randint(0, model.hparams.timesteps, (100, 1), dtype=torch.float, device=model.device)
output = model(z, t)

In [None]:
output

In [None]:
only_pos_df =  pd.DataFrame(output.detach().numpy())
only_pos_df

In [None]:
only_pos_df['Class'] = 1

In [None]:
only_neg_real_df = train_norm[train_norm['Class'] == 0]

In [None]:
train_combined_fake_pos_only = only_pos_df
train_combined_fake_pos_only.columns = only_neg_real_df.columns
train_combined_fake_pos_only = pd.concat([only_pos_df, only_neg_real_df], ignore_index=True)
train_combined_fake_pos_only = train_combined_fake_pos_only.sample(frac=1) # Shuffle the dataset

In [None]:
train_combined_fake_pos_only['Class'].value_counts()

In [None]:
model_card_with_gan = ModelCreditCard(30)
model_gan = ModelTraining(model_card_with_gan)
dm_only_gan = CreditCardDataModel(train_combined_fake_pos_only)
trainer = pl.Trainer(max_epochs=10, accelerator='cuda', devices=1)
trainer.fit(model_gan, dm_only_gan)