Claude

In [11]:
import tensorflow as tf
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split

# Data preprocessing
def preprocess_data(data_path):
    df = pd.read_csv(data_path)
    
    # Separate features and target
    X = df.drop('Class', axis=1)
    y = df['Class']
    
    # Standardize 'Time' and 'Amount'
    scaler = StandardScaler()
    X[['Time', 'Amount']] = scaler.fit_transform(X[['Time', 'Amount']])
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
    
    # Convert to float32
    X_train = X_train.values.astype('float32')
    X_test = X_test.values.astype('float32')
    y_train = y_train.values.astype('float32')
    y_test = y_test.values.astype('float32')
    
    return X_train, X_test, y_train, y_test

class FeatureAttention(tf.keras.layers.Layer):
    def __init__(self, channels):
        super(FeatureAttention, self).__init__()
        self.global_attention = tf.keras.layers.MultiHeadAttention(num_heads=1, key_dim=channels)
        self.local_attention = tf.keras.layers.MultiHeadAttention(num_heads=1, key_dim=channels)

    def call(self, x):
        # Split channels into global and local groups
        x_global = tf.transpose(tf.reshape(x, (tf.shape(x)[0], -1, 2)), [1, 0, 2])
        x_local = tf.transpose(tf.reshape(x, (tf.shape(x)[0], 2, -1)), [1, 0, 2])

        # Apply attention
        x_global = self.global_attention(x_global, x_global)
        x_local = self.local_attention(x_local, x_local)

        # Reshape and combine
        x_global = tf.reshape(tf.transpose(x_global, [1, 0, 2]), tf.shape(x))
        x_local = tf.reshape(tf.transpose(x_local, [1, 0, 2]), tf.shape(x))
        
        return x_global + x_local

class Encoder(tf.keras.layers.Layer):
    def __init__(self, hidden_dims, output_dim):
        super(Encoder, self).__init__()
        self.layers = []
        for dim in hidden_dims:
            self.layers.append(tf.keras.layers.Dense(dim, activation='leaky_relu'))
        self.layers.append(tf.keras.layers.Dense(output_dim))

    def call(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, hidden_dims, output_dim):
        super(Decoder, self).__init__()
        self.layers = []
        for dim in hidden_dims:
            self.layers.append(tf.keras.layers.Dense(dim, activation='leaky_relu'))
        self.layers.append(tf.keras.layers.Dense(output_dim, activation='tanh'))

    def call(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class Discriminator(tf.keras.layers.Layer):
    def __init__(self, hidden_dims):
        super(Discriminator, self).__init__()
        self.layers = []
        for dim in hidden_dims:
            self.layers.append(tf.keras.layers.Dense(dim, activation='leaky_relu'))
        self.layers.append(tf.keras.layers.Dense(1, activation='sigmoid'))

    def call(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class UAAD_FDNet(tf.keras.Model):
    def __init__(self, input_dim, hidden_dims, latent_dim):
        super(UAAD_FDNet, self).__init__()
        self.encoder = Encoder(hidden_dims, latent_dim)
        self.decoder = Decoder(hidden_dims[::-1], input_dim)
        self.encoder_reconstructed = Encoder(hidden_dims, latent_dim)
        self.discriminator = Discriminator(hidden_dims)
        self.feature_attention = FeatureAttention(input_dim)

    def call(self, x):
        z = self.encoder(x)
        x_reconstructed = self.decoder(z)
        x_attention = self.feature_attention(x_reconstructed)
        z_reconstructed = self.encoder_reconstructed(x_attention)
        return z, x_reconstructed, z_reconstructed

# Loss functions
def adversarial_loss(D_real, D_fake):
    return -tf.reduce_mean(tf.math.log(D_real) + tf.math.log(1 - D_fake))

def context_loss(x, x_reconstructed):
    return tf.reduce_mean(tf.abs(x - x_reconstructed))

def latent_loss(z, z_reconstructed):
    return tf.reduce_mean((z - z_reconstructed) ** 2)

# Training step
@tf.function
def train_step(model, x, optimizer_G, optimizer_D, lambda_con, lambda_lat):
    with tf.GradientTape() as tape_G, tf.GradientTape() as tape_D:
        z, x_reconstructed, z_reconstructed = model(x)
        D_fake = model.discriminator(x_reconstructed)
        D_real = model.discriminator(x)

        loss_adv = adversarial_loss(D_real, D_fake)
        loss_con = context_loss(x, x_reconstructed)
        loss_lat = latent_loss(z, z_reconstructed)

        loss_G = loss_adv + lambda_con * loss_con + lambda_lat * loss_lat
        loss_D = adversarial_loss(D_real, D_fake)

    gradients_G = tape_G.gradient(loss_G, model.trainable_variables)
    gradients_D = tape_D.gradient(loss_D, model.discriminator.trainable_variables)

    optimizer_G.apply_gradients(zip(gradients_G, model.trainable_variables))
    optimizer_D.apply_gradients(zip(gradients_D, model.discriminator.trainable_variables))

    return loss_G, loss_D

# Training loop
def train(model, train_data, val_data, num_epochs, batch_size, lr, lambda_con, lambda_lat):
    optimizer_G = tf.keras.optimizers.Adam(lr)
    optimizer_D = tf.keras.optimizers.Adam(lr)

    train_dataset = tf.data.Dataset.from_tensor_slices(train_data).shuffle(buffer_size=1024).batch(batch_size)
    val_dataset = tf.data.Dataset.from_tensor_slices(val_data).batch(batch_size)

    for epoch in range(num_epochs):
        # Training
        train_loss_G, train_loss_D = 0, 0
        for batch in train_dataset:
            loss_G, loss_D = train_step(model, batch, optimizer_G, optimizer_D, lambda_con, lambda_lat)
            train_loss_G += loss_G
            train_loss_D += loss_D
        train_loss_G /= len(train_dataset)
        train_loss_D /= len(train_dataset)

        # Validation
        val_loss_G, val_loss_D = 0, 0
        for batch in val_dataset:
            z, x_reconstructed, z_reconstructed = model(batch)
            D_fake = model.discriminator(x_reconstructed)
            D_real = model.discriminator(batch)

            loss_adv = adversarial_loss(D_real, D_fake)
            loss_con = context_loss(batch, x_reconstructed)
            loss_lat = latent_loss(z, z_reconstructed)

            val_loss_G += loss_adv + lambda_con * loss_con + lambda_lat * loss_lat
            val_loss_D += adversarial_loss(D_real, D_fake)
        val_loss_G /= len(val_dataset)
        val_loss_D /= len(val_dataset)

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss G: {train_loss_G:.4f}, Train Loss D: {train_loss_D:.4f}")
        print(f"Val Loss G: {val_loss_G:.4f}, Val Loss D: {val_loss_D:.4f}")

# Fraud detection function
def detect_fraud(model, data, threshold):
    z, x_reconstructed, z_reconstructed = model(data)
    reconstruction_error = tf.reduce_mean((data - x_reconstructed) ** 2, axis=1)
    latent_distance = tf.reduce_mean((z - z_reconstructed) ** 2, axis=1)
    anomaly_score = reconstruction_error + latent_distance
    predictions = tf.cast(anomaly_score > threshold, tf.float32)
    return predictions, anomaly_score

# Main execution
if __name__ == "__main__":
    # Hyperparameters
    input_dim = 30  # 28 PCA features + Time + Amount
    hidden_dims = [64, 128, 256, 512]
    latent_dim = 1024
    batch_size = 256
    num_epochs = 100
    lr = 0.001
    lambda_con = 1.0
    lambda_lat = 0.1

    # Load and preprocess data
    data_path = "creditcard.csv"
    X_train, X_test, y_train, y_test = preprocess_data(data_path)

    # Initialize and train the model
    model = UAAD_FDNet(input_dim, hidden_dims, latent_dim)
    train(model, X_train, X_test, num_epochs, batch_size, lr, lambda_con, lambda_lat)

    # Detect fraud on test set
    threshold = 0.5  # This threshold should be tuned based on your specific requirements
    test_predictions, test_scores = detect_fraud(model, X_test, threshold)

    # Evaluate the model (you can add more metrics as needed)
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

    accuracy = accuracy_score(y_test, test_predictions.numpy())
    precision = precision_score(y_test, test_predictions.numpy())
    recall = recall_score(y_test, test_predictions.numpy())
    f1 = f1_score(y_test, test_predictions.numpy())

    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

Epoch 1/100
Train Loss G: 0.4256, Train Loss D: 0.0145
Val Loss G: 0.3678, Val Loss D: 0.0010
Epoch 2/100
Train Loss G: 0.3368, Train Loss D: 0.0022
Val Loss G: 0.3299, Val Loss D: 0.0002
Epoch 3/100
Train Loss G: 0.3129, Train Loss D: 0.0021
Val Loss G: 0.3545, Val Loss D: 0.0011
Epoch 4/100
Train Loss G: nan, Train Loss D: nan
Val Loss G: nan, Val Loss D: nan
Epoch 5/100
Train Loss G: nan, Train Loss D: nan
Val Loss G: nan, Val Loss D: nan
Epoch 6/100
Train Loss G: nan, Train Loss D: nan
Val Loss G: nan, Val Loss D: nan
Epoch 7/100
Train Loss G: nan, Train Loss D: nan
Val Loss G: nan, Val Loss D: nan
Epoch 8/100
Train Loss G: nan, Train Loss D: nan
Val Loss G: nan, Val Loss D: nan
Epoch 9/100
Train Loss G: nan, Train Loss D: nan
Val Loss G: nan, Val Loss D: nan
Epoch 10/100
Train Loss G: nan, Train Loss D: nan
Val Loss G: nan, Val Loss D: nan
