# Create Dataset

In [None]:
import pandas as pd

df_path="data/creditcard.csv"
df = pd.read_csv(df_path)
df.head()

In [None]:
# Check class distribution
if 'Class' in df.columns:
    class_counts = df['Class'].value_counts()
    print("\nClass distribution:")
    print(class_counts)
    print(f"\nPercentage of fraud cases: {class_counts[1] / len(df) * 100:.4f}%")
    print(f"Imbalance ratio: 1:{class_counts[0] / class_counts[1]:.2f}")

## Upload dataset to hugging face

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from datasets import Dataset, DatasetDict
from huggingface_hub import HfApi, login
import os
from dotenv import load_dotenv

#set seed
np.random.seed(42)
# Load environment variables from .env file
load_dotenv()

# Get HF API token from environment variables
hf_token = os.getenv("HF_API_TOKEN")
if not hf_token:
    print("Warning: HF_API_TOKEN not found in .env file")
else:
    print("Successfully loaded HF_API_TOKEN from .env file")

# 1. Normalize Time and Amount columns
print("Normalizing Time and Amount columns...")
scaler_time = StandardScaler()
scaler_amount = StandardScaler()

# Fit the scalers on the entire dataset
df['Time_norm'] = scaler_time.fit_transform(df['Time'].values.reshape(-1, 1))
df['Amount_norm'] = scaler_amount.fit_transform(df['Amount'].values.reshape(-1, 1))

# Store the normalization statistics for future reference
time_stats = {
    'mean': float(scaler_time.mean_[0]),
    'std': float(scaler_time.scale_[0])
}
amount_stats = {
    'mean': float(scaler_amount.mean_[0]),
    'std': float(scaler_amount.scale_[0])
}
normalization_stats = {
    'Time': time_stats,
    'Amount': amount_stats
}

print("Normalization statistics:")
print(f"Time: mean={time_stats['mean']:.2f}, std={time_stats['std']:.2f}")
print(f"Amount: mean={amount_stats['mean']:.2f}, std={amount_stats['std']:.2f}")

# 2. Drop the original Time and Amount columns and rename the normalized ones
df = df.drop(['Time', 'Amount'], axis=1)
df = df.rename(columns={'Time_norm': 'Time', 'Amount_norm': 'Amount'})

# 3. Add an index column to help track original indices
df['original_index'] = np.arange(len(df))

# 4. Create stratified train/validation/test splits (80/10/10)
# First split: 80% train, 20% temp
X = df.drop(['Class'], axis=1)
y = df['Class']

print("\nCreating stratified splits (80% train, 10% validation, 10% test)...")
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Second split: split the temp data into test and validation (50% each, resulting in 10% of original data each)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

# 5. Create dataframes for each split
train_df = pd.concat([X_train, y_train], axis=1)
val_df = pd.concat([X_val, y_val], axis=1)
test_df = pd.concat([X_test, y_test], axis=1)

# 6. Check the distribution in each split
print("\nClass distribution in splits:")
print(f"Train set - Total: {len(y_train)}, Fraud: {sum(y_train)}, Percentage: {sum(y_train)/len(y_train)*100:.4f}%")
print(f"Validation set - Total: {len(y_val)}, Fraud: {sum(y_val)}, Percentage: {sum(y_val)/len(y_val)*100:.4f}%")
print(f"Test set - Total: {len(y_test)}, Fraud: {sum(y_test)}, Percentage: {sum(y_test)/len(y_test)*100:.4f}%")

# 7. Convert to Hugging Face Dataset format
print("\nConverting to Hugging Face Dataset format...")
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

dataset_dict = DatasetDict({
    'train': train_dataset,
    'validation': val_dataset,
    'test': test_dataset
})

# 8. Output some information about the datasets
print("\nDataset split information:")
print(f"Train: {train_dataset.shape}")
print(f"Validation: {val_dataset.shape}")
print(f"Test: {test_dataset.shape}")

# 9. Preview columns and first few examples
print("\nColumns in the dataset:")
print(train_dataset.column_names)
print("\nFirst example from train set:")
print(train_dataset[0])

# 10. Save the datasets locally (optional but useful for verification)
print("\nSaving datasets locally...")
dataset_dict.save_to_disk("credit_card_fraud_dataset")

# 11. To upload to HuggingFace
def upload_to_huggingface(dataset_dict, repo_name, token):
    # Login to HuggingFace
    login(token)
    
    # Get HF API
    api = HfApi()
    
    # Create repository if it doesn't exist
    try:
        api.create_repo(repo_id=repo_name, exist_ok=True)
        print(f"Repository {repo_name} ready.")
    except Exception as e:
        print(f"Error creating repository: {e}")
        return
    
    # Push dataset to HuggingFace
    dataset_dict.push_to_hub(repo_name)
    
    # Also save normalization stats as README.md
    readme_content = f"""# Credit Card Fraud Dataset

This dataset contains normalized credit card transaction data for fraud detection.

## Normalization Statistics
- Time: mean={time_stats['mean']:.4f}, std={time_stats['std']:.4f}
- Amount: mean={amount_stats['mean']:.4f}, std={amount_stats['std']:.4f}

## Class Distribution
- Train set: {sum(y_train)} fraud out of {len(y_train)} ({sum(y_train)/len(y_train)*100:.4f}%)
- Validation set: {sum(y_val)} fraud out of {len(y_val)} ({sum(y_val)/len(y_val)*100:.4f}%)
- Test set: {sum(y_test)} fraud out of {len(y_test)} ({sum(y_test)/len(y_test)*100:.4f}%)

## Features
- Original Time and Amount columns have been normalized
- 'original_index' column refers to the index in the original dataset
"""
    with open("README.md", "w") as f:
        f.write(readme_content)
    
    api.upload_file(
        path_or_fileobj="README.md",
        path_in_repo="README.md",
        repo_id=repo_name,
        commit_message="Add README with normalization stats"
    )
    
    print(f"Dataset successfully uploaded to https://huggingface.co/datasets/{repo_name}")

# Upload to Hugging Face with your username
if hf_token:
    username = "stanpony"
    repo_name = f"{username}/full_european_credit_card_fraud_dataset"
    upload_to_huggingface(dataset_dict, repo_name, hf_token)
else:
    print("Skipping upload to Hugging Face as no API token was found")

In [None]:
from datasets import load_dataset
import pandas as pd

# Load the dataset without try/except
dataset = load_dataset("stanpony/full_european_credit_card_fraud_dataset")
print("Successfully loaded the dataset!")

# Print available splits
print("\nAvailable splits:", list(dataset.keys()))

# Print statistics for each split
for split_name, split_dataset in dataset.items():
    print(f"\n{split_name.upper()} split:")
    print(f"- Number of samples: {len(split_dataset)}")
    
    # Convert to pandas for easier analysis
    split_df = split_dataset.to_pandas()
    
    # Check class distribution
    if 'Class' in split_df.columns:
        class_counts = split_df['Class'].value_counts()
        print(f"- Class distribution: {dict(class_counts)}")
        fraud_percentage = class_counts.get(1, 0) / len(split_df) * 100
        print(f"- Fraud percentage: {fraud_percentage:.4f}%")
    
    # Check if Time and Amount are normalized
    if 'Time' in split_df.columns and 'Amount' in split_df.columns:
        print(f"- Time column stats: mean={split_df['Time'].mean():.4f}, std={split_df['Time'].std():.4f}")
        print(f"- Amount column stats: mean={split_df['Amount'].mean():.4f}, std={split_df['Amount'].std():.4f}")
    
    # Check if original_index column exists
    if 'original_index' in split_df.columns:
        print("- original_index column exists ✓")
    else:
        print("- original_index column missing ✗")
        
    # Check for any missing values
    missing_values = split_df.isnull().sum().sum()
    print(f"- Missing values: {missing_values}")

# Dataset Exploration

In [10]:
from datasets import load_dataset
import pandas as pd

# Load the dataset
dataset = load_dataset("stanpony/full_european_credit_card_fraud_dataset")
print("Successfully loaded the dataset!")

# Convert each split to a pandas DataFrame
df_train = dataset['train'].to_pandas()
df_validation = dataset['validation'].to_pandas()
df_test = dataset['test'].to_pandas()

# Concatenate all splits into a single DataFrame
df_all = pd.concat([df_train, df_validation, df_test], ignore_index=True)


  from .autonotebook import tqdm as notebook_tqdm


Successfully loaded the dataset!


In [None]:
# Print basic info about the combined DataFrame
print("Combined DataFrame shape:", df_all.shape)
display(df_all.head())

In [None]:
df_all

In [13]:
import numpy as np
import pandas as pd
from scipy.stats import ks_2samp
import matplotlib.pyplot as plt
import seaborn as sns

# Assuming df_all is your combined DataFrame containing a 'Class' column 
# where 0 = normal and 1 = anomalous
# Remove columns 'original_index' and '__index_level_0__' if they exist
columns_to_remove = ['original_index', '__index_level_0__']
df_all = df_all.drop(columns=columns_to_remove, errors='ignore')
# Separate normal and anomalous data
normal = df_all[df_all['Class'] == 0]
anomalous = df_all[df_all['Class'] == 1]



In [None]:
import pandas as pd
import numpy as np
from scipy.stats import ks_2samp, anderson_ksamp

import numpy as np
import pandas as pd
from scipy.stats import ks_2samp
import matplotlib.pyplot as plt
import seaborn as sns


# Define the feature columns: all columns that start with 'V' plus 'Time' and 'Amount'
feature_columns = [col for col in normal.columns if col.startswith('V')] + ['Time', 'Amount']

print("Performing KS and Anderson-Darling tests on each feature:\n")

results = []  # to store results for later comparison

for feature in feature_columns:
    # Perform the two-sample KS test (univariate, handles unequal sample sizes)
    ks_stat, ks_p = ks_2samp(normal[feature], anomalous[feature])
    
    # Perform the Anderson-Darling k-sample test.
    # This test also works with unequal sample sizes and gives extra weight to the tails.
    ad_result = anderson_ksamp([normal[feature].values, anomalous[feature].values])
    
    results.append((feature, ks_stat, ks_p, ad_result.statistic, ad_result.significance_level))
    
    print(f"Feature: {feature}")
    print(f"  KS test:  Statistic = {ks_stat:.4f}, p-value = {ks_p:.4g}")
    print(f"  AD test:  Statistic = {ad_result.statistic:.4f}, significance level = {ad_result.significance_level:.4f}\n")

# Optionally, you could convert results to a DataFrame for further inspection:
results_df = pd.DataFrame(results, columns=["Feature", "KS_stat", "KS_p", "AD_stat", "AD_sig"])



In [None]:
import pandas as pd
import numpy as np
from scipy.stats import ks_2samp, anderson_ksamp
import matplotlib.pyplot as plt

# --- Assuming `normal` and `anomalous` DataFrames are already defined ---

# Define the feature columns: all columns that start with 'V' plus 'Time' and 'Amount'
feature_columns = [col for col in normal.columns if col.startswith('V')] + ['Time', 'Amount']

# Perform KS and AD tests
results = []
for feature in feature_columns:
    ks_stat, ks_p = ks_2samp(normal[feature], anomalous[feature])
    ad_result = anderson_ksamp([normal[feature].values, anomalous[feature].values])
    results.append((feature, ks_stat, ks_p, ad_result.statistic, ad_result.significance_level))

# Create a DataFrame from results
results_df = pd.DataFrame(results, columns=["Feature", "KS_stat", "KS_p", "AD_stat", "AD_sig"])

# --- Visualization: Dual y-axis Grouped Bar Chart ---
x = np.arange(len(results_df))  # positions for each feature
width = 0.35  # width of each bar

# Make the figure taller so labels fit
fig, ax1 = plt.subplots(figsize=(16, 8))

# Plot KS statistics on the left y-axis
bars1 = ax1.bar(x - width/2, results_df["KS_stat"], width,
                label="KS Statistic", color="skyblue")
ax1.set_ylabel("KS Statistic (0–1 scale)")

# Dynamically set y-limit for KS
max_ks = results_df["KS_stat"].max()
ax1.set_ylim(0, max_ks + 0.2)  # Add some buffer above max

ax1.set_xticks(x)
ax1.set_xticklabels(results_df["Feature"], rotation=90)

# Create a twin y-axis for AD statistics
ax2 = ax1.twinx()
bars2 = ax2.bar(x + width/2, results_df["AD_stat"], width,
                label="AD Statistic", color="salmon")
ax2.set_ylabel("AD Statistic (integrated difference)")

# Dynamically set y-limit for AD
max_ad = results_df["AD_stat"].max()
ax2.set_ylim(0, max_ad * 1.2)

# Annotate KS bars with vertical **bold** p-values
for bar, p_val in zip(bars1, results_df["KS_p"]):
    height = bar.get_height()
    ax1.text(
        bar.get_x() + bar.get_width() / 2,
        height + 0.02,
        r"$\mathbf{p=%.4f}$" % p_val,
        ha='center',
        va='bottom',
        fontsize=8,
        color='blue',
        rotation=90
    )

# Annotate AD bars with vertical **bold** significance levels
for bar, sig in zip(bars2, results_df["AD_sig"]):
    height = bar.get_height()
    offset = max_ad * 0.02
    ax2.text(
        bar.get_x() + bar.get_width() / 2,
        height + offset,
        r"$\mathbf{sig=%.4f}$" % sig,
        ha='center',
        va='bottom',
        fontsize=8,
        color='red',
        rotation=90
    )


# Combine legends
lines1, labels1 = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper left")

plt.title("Comparison of KS and Anderson–Darling Test Statistics by Feature")

# Auto-adjust layout, then add extra space on top
plt.tight_layout()
plt.subplots_adjust(top=0.90)  # If still clipped, try 0.85 or 0.8
plt.show()


# Recon error analysis

In [None]:
import matplotlib.pyplot as plt

# Optional: Set global font sizes (can be overridden locally)
plt.rcParams.update({
    "axes.titlesize": 14,
    "axes.labelsize": 12,
    "xtick.labelsize": 10,
    "ytick.labelsize": 10,
    "legend.fontsize": 11,
    "font.size": 11  # general default font size
})

# Create a 4x2 grid of subplots
fig, axes = plt.subplots(nrows=4, ncols=2, figsize=(14, 18))

# Loop through each key in the specified order
for idx, key in enumerate(order):
    row_idx = idx // 2
    col_idx = idx % 2
    ax = axes[row_idx, col_idx]
    
    stats_normal = data[key]["normal"]
    stats_fraud  = data[key]["fraud"]
    box_data = [stats_normal, stats_fraud]
    
    ax.bxp(box_data, showmeans=True)
    ax.set_xticklabels(["Normal Samples", "Fraud Samples"], fontsize=10)
    
    model_name, eval_type = key
    ax.set_title(f"{model_name} - {eval_type}", fontsize=14)
    ax.set_ylabel("Reconstruction Loss", fontsize=12)
    ax.set_yscale("log")

plt.tight_layout()
plt.savefig("recon_error_analysis/detailed_grouped_boxplots_log.png", dpi=300)
plt.show()

