In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import precision_score, recall_score, roc_auc_score, roc_curve
from imblearn.over_sampling import ADASYN
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import logging
from typing import Tuple, Dict, List, Optional

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)

class FraudDataset(Dataset):
    """
    Custom dataset for credit card fraud data.
    """
    def __init__(self, features: np.ndarray, labels: np.ndarray):
        """
        Args:
            features (np.ndarray): Feature matrix.
            labels (np.ndarray): Target labels (0 or 1).
        """
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels.values, dtype=torch.float32)

    def __len__(self) -> int:
        return len(self.labels)

    def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
        return self.features[idx], self.labels[idx]

class FraudDetector(nn.Module):
    """
    Neural network model for fraud detection.
    """
    def __init__(self, input_dim: int, hidden_dims: List[int] = [64, 32]):
        super(FraudDetector, self).__init__()
        layers = []
        prev_dim = input_dim
        for dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, dim),
                nn.ReLU(),
                nn.BatchNorm1d(dim),
                nn.Dropout(0.3)
            ])
            prev_dim = dim
        layers.append(nn.Linear(prev_dim, 1))
        layers.append(nn.Sigmoid())
        self.model = nn.Sequential(*layers)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.model(x)

class FraudDetectionPipeline:
    """
    Pipeline for credit card fraud detection.
    """
    def __init__(self, data_path: str, output_dir: str = "fraud_results"):
        """
        Args:
            data_path (str): Path to the credit card dataset CSV.
            output_dir (str): Directory to save outputs.
        """
        self.data_path = Path(data_path)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.scaler = StandardScaler()
        self.model = None
        self.train_loader = None
        self.test_loader = None
        self.data = None

    def load_data(self) -> None:
        """
        Load and validate the dataset.
        """
        logger.info(f"Loading data from {self.data_path}")
        self.data = pd.read_csv(self.data_path)
        expected_columns = ['Time', 'V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9',
                            'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18',
                            'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27',
                            'V28', 'Amount', 'Class']
        if not all(col in self.data.columns for col in expected_columns):
            raise ValueError("Dataset missing required columns")
        logger.info(f"Dataset shape: {self.data.shape}")

    def visualize_data(self) -> None:
        """
        Visualize class distribution and feature correlations.
        """
        logger.info("Generating visualizations")
        
        # Class distribution
        plt.figure(figsize=(6, 4))
        sns.countplot(x='Class', data=self.data)
        plt.title("Class Distribution (0: Non-Fraud, 1: Fraud)")
        plt.savefig(self.output_dir / "class_distribution.png")
        plt.close()
        
        # Correlation heatmap for selected features
        plt.figure(figsize=(10, 8))
        corr = self.data[['V1', 'V2', 'V3', 'V4', 'Amount', 'Class']].corr()
        sns.heatmap(corr, annot=True, cmap="viridis", fmt=".2f")
        plt.title("Correlation Heatmap")
        plt.savefig(self.output_dir / "correlation_heatmap.png")
        plt.close()

    def preprocess_data(self, test_size: float = 0.2, random_state: int = 42) -> None:
        """
        Preprocess data: scale features, balance classes, and create data loaders.

        Args:
            test_size (float): Proportion of data for testing.
            random_state (int): Seed for reproducibility.
        """
        logger.info("Preprocessing data")
        
        # Split features and target
        X = self.data.drop('Class', axis=1)
        y = self.data['Class']
        
        # Scale features
        X_scaled = self.scaler.fit_transform(X)
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X_scaled, y, test_size=test_size, stratify=y, random_state=random_state
        )
        
        # Balance training data with ADASYN
        logger.info("Balancing training data with ADASYN")
        adasyn = ADASYN(random_state=random_state)
        X_train_bal, y_train_bal = adasyn.fit_resample(X_train, y_train)
        
        # Create datasets
        train_dataset = FraudDataset(X_train_bal, y_train_bal)
        test_dataset = FraudDataset(X_test, y_test)
        
        # Create data loaders
        self.train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
        self.test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
        
        logger.info(f"Train samples: {len(train_dataset)}, Test samples: {len(test_dataset)}")

    def initialize_model(self, input_dim: int) -> None:
        """
        Initialize the neural network model.

        Args:
            input_dim (int): Number of input features.
        """
        self.model = FraudDetector(input_dim).to(self.device)
        logger.info("Model initialized")

    def train_model(self, epochs: int = 50, lr: float = 0.001) -> Dict[str, List[float]]:
        """
        Train the model and track metrics.

        Args:
            epochs (int): Number of training epochs.
            lr (float): Learning rate.

        Returns:
            Dict[str, List[float]]: Training and validation metrics history.
        """
        logger.info("Starting training")
        criterion = nn.BCELoss()
        optimizer = optim.Adam(self.model.parameters(), lr=lr, weight_decay=0.01)
        
        history = {"train_loss": [], "train_recall": [], "val_loss": [], "val_recall": []}
        
        for epoch in range(epochs):
            self.model.train()
            train_loss, train_preds, train_labels = 0.0, [], []
            
            for features, labels in self.train_loader:
                features, labels = features.to(self.device), labels.to(self.device)
                
                optimizer.zero_grad()
                outputs = self.model(features).squeeze()
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                
                train_loss += loss.item()
                train_preds.extend((outputs >= 0.5).float().cpu().numpy())
                train_labels.extend(labels.cpu().numpy())
            
            train_loss /= len(self.train_loader)
            train_recall = recall_score(train_labels, train_preds)
            
            # Validation
            self.model.eval()
            val_loss, val_preds, val_labels = 0.0, [], []
            with torch.no_grad():
                for features, labels in self.test_loader:
                    features, labels = features.to(self.device), labels.to(self.device)
                    outputs = self.model(features).squeeze()
                    loss = criterion(outputs, labels)
                    val_loss += loss.item()
                    val_preds.extend((outputs >= 0.5).float().cpu().numpy())
                    val_labels.extend(labels.cpu().numpy())
            
            val_loss /= len(self.test_loader)
            val_recall = recall_score(val_labels, val_preds)
            
            history["train_loss"].append(train_loss)
            history["train_recall"].append(train_recall)
            history["val_loss"].append(val_loss)
            history["val_recall"].append(val_recall)
            
            logger.info(
                f"Epoch {epoch+1}/{epochs}: "
                f"Train Loss: {train_loss:.4f}, Train Recall: {train_recall:.4f}, "
                f"Val Loss: {val_loss:.4f}, Val Recall: {val_recall:.4f}"
            )
        
        # Save model
        torch.save(self.model.state_dict(), self.output_dir / "fraud_model.pth")
        return history

    def evaluate_model(self) -> Dict[str, float]:
        """
        Evaluate the model on test data.

        Returns:
            Dict[str, float]: Evaluation metrics.
        """
        logger.info("Evaluating model")
        self.model.eval()
        preds, probs, labels = [], [], []
        
        with torch.no_grad():
            for features, lbls in self.test_loader:
                features = features.to(self.device)
                outputs = self.model(features).squeeze()
                preds.extend((outputs >= 0.5).float().cpu().numpy())
                probs.extend(outputs.cpu().numpy())
                labels.extend(lbls.numpy())
        
        metrics = {
            "precision": precision_score(labels, preds),
            "recall": recall_score(labels, preds),
            "roc_auc": roc_auc_score(labels, probs)
        }
        
        # Compute ROC curve and optimal threshold
        fpr, tpr, thresholds = roc_curve(labels, probs)
        optimal_idx = np.argmax(tpr - fpr)
        optimal_threshold = thresholds[optimal_idx]
        metrics["optimal_threshold"] = optimal_threshold
        
        # Plot ROC curve
        plt.figure(figsize=(6, 6))
        plt.plot(fpr, tpr, label=f"ROC Curve (AUC = {metrics['roc_auc']:.2f})")
        plt.plot([0, 1], [0, 1], 'k--')
        plt.scatter(fpr[optimal_idx], tpr[optimal_idx], marker='o', color='red', label=f"Threshold = {optimal_threshold:.2f}")
        plt.xlabel("False Positive Rate")
        plt.ylabel("True Positive Rate")
        plt.title("ROC Curve")
        plt.legend()
        plt.savefig(self.output_dir / "roc_curve.png")
        plt.close()
        
        logger.info(f"Test Metrics: {metrics}")
        return metrics

    def save_results(self, history: Dict[str, List[float]], metrics: Dict[str, float]) -> None:
        """
        Save training history and evaluation metrics.

        Args:
            history (Dict[str, List[float]]): Training history.
            metrics (Dict[str, float]): Evaluation metrics.
        """
        logger.info("Saving results")
        
        # Plot training history
        plt.figure(figsize=(12, 4))
        
        plt.subplot(1, 2, 1)
        plt.plot(history["train_loss"], label="Train Loss")
        plt.plot(history["val_loss"], label="Validation Loss")
        plt.title("Loss Over Epochs")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.legend()
        
        plt.subplot(1, 2, 2)
        plt.plot(history["train_recall"], label="Train Recall")
        plt.plot(history["val_recall"], label="Validation Recall")
        plt.title("Recall Over Epochs")
        plt.xlabel("Epoch")
        plt.ylabel("Recall")
        plt.legend()
        
        plt.tight_layout()
        plt.savefig(self.output_dir / "training_history.png")
        plt.close()
        
        # Save metrics
        metrics_df = pd.DataFrame([metrics])
        metrics_df.to_csv(self.output_dir / "test_metrics.csv", index=False)

def main():
    pipeline = FraudDetectionPipeline(
        data_path="creditcard.csv",  # Adjust to your path
        output_dir="fraud_results"
    )
    pipeline.load_data()
    pipeline.visualize_data()
    pipeline.preprocess_data()
    pipeline.initialize_model(input_dim=30)
    history = pipeline.train_model(epochs=20)
    metrics = pipeline.evaluateälla

if __name__ == "__main__":
    main()

In [None]:
pipeline = FraudDetectionPipeline(
    data_path="/path/to/creditcard.csv",
    output_dir="fraud_results"
)
pipeline.load_data()
pipeline.visualize_data()
pipeline.preprocess_data()
pipeline.initialize_model(input_dim=30)
history = pipeline.train_model(epochs=20)
metrics = pipeline.evaluate_model()
pipeline.save_results(history, metrics)