In [1]:
# Install PySpark and related dependencies
!pip install pyspark torch tensorflow




[notice] A new release of pip is available: 24.0 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, to_date, unix_timestamp
from pyspark.ml.feature import StringIndexer, MinMaxScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
from torch.nn import TransformerEncoder, TransformerEncoderLayer
import torch.nn as nn
import torch.optim as optim


In [3]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("CreditCardFraudDetection") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()


In [4]:
# Load datasets
train_df = spark.read.csv("Train.csv", header=True, inferSchema=True)
test_df = spark.read.csv("Test.csv", header=True, inferSchema=True)

# Display schema
train_df.printSchema()
test_df.printSchema()

# Basic statistics
train_df.describe().show()


root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_num: long (nullable = true)
 |-- merchant: string (nullable = true)
 |-- category: string (nullable = true)
 |-- amt: double (nullable = true)
 |-- first: string (nullable = true)
 |-- last: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- city_pop: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- dob: date (nullable = true)
 |-- trans_num: string (nullable = true)
 |-- unix_time: integer (nullable = true)
 |-- merch_lat: double (nullable = true)
 |-- merch_long: double (nullable = true)
 |-- is_fraud: integer (nullable = true)

root
 |-- _c0: integer (nullable = true)
 |-- trans_date_trans_time: timestamp (nullable = true)
 |-- cc_n

In [5]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Assemble numerical columns into a single feature vector
numerical_cols = ['amt', 'lat', 'long', 'merch_lat', 'merch_long', 'city_pop']
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features_num")

# Transform the data using VectorAssembler
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

# Scale the numerical features
scaler = MinMaxScaler(inputCol="features_num", outputCol="scaled_features")
scaler_model = scaler.fit(train_df)

train_df = scaler_model.transform(train_df)
test_df = scaler_model.transform(test_df)

# Verify the resulting DataFrame
train_df.select("features_num", "scaled_features").show(5, truncate=False)


+---------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------+
|features_num                                                   |scaled_features                                                                                                           |
+---------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------+
|[4.97,36.0788,-81.1781,36.011293,-82.048315,3495.0]            |[1.3714293610244608E-4,0.34396843968439694,0.8646384642148135,0.35030195030031674,0.8486024716722786,0.001194491166373147]|
|[107.23,48.8878,-118.2105,49.159046999999994,-118.186462,149.0]|[0.003669696247396184,0.6184497559261308,0.48568183213605953,0.6214876127835203,0.4862075279727285,4.33484697474126E-5]   |
|[220.11,42.1808,-112.262,43.150704,-112.154481,4154.0]

In [6]:
# Split train data into train and validation
train_data, val_data = train_df.randomSplit([0.8, 0.2], seed=42)


In [7]:
class PyTorchTransformer(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers, num_classes):
        super(PyTorchTransformer, self).__init__()

        # Adjust input_dim to ensure divisibility
        assert input_dim % num_heads == 0, "input_dim must be divisible by num_heads"

        # Transformer Encoder Layer
        self.encoder_layer = TransformerEncoderLayer(d_model=input_dim, nhead=num_heads)
        self.transformer = TransformerEncoder(self.encoder_layer, num_layers=num_layers)

        # Fully connected output layer
        self.fc = nn.Linear(input_dim, num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.transformer(x)  # Pass through the transformer
        x = x.mean(dim=1)        # Global average pooling
        x = self.fc(x)           # Fully connected layer
        return self.softmax(x)

# Model hyperparameters
input_dim = len(numerical_cols)  # Ensure input_dim is divisible by num_heads
num_heads = 2                    # Number of attention heads (divisor of input_dim)
num_layers = 2                   # Number of Transformer layers
num_classes = 2                  # Fraud (1) or Not Fraud (0)

# Instantiate the model
model = PyTorchTransformer(input_dim=input_dim, num_heads=num_heads, num_layers=num_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)




In [8]:
# Convert Spark DataFrame to Pandas
train_pandas = train_data.select("scaled_features", "is_fraud").toPandas()
val_pandas = val_data.select("scaled_features", "is_fraud").toPandas()

# Prepare PyTorch datasets
def to_tensor(df):
    X = np.array(df["scaled_features"].tolist())
    y = df["is_fraud"].values
    return torch.tensor(X, dtype=torch.float32), torch.tensor(y, dtype=torch.long)

X_train, y_train = to_tensor(train_pandas)
X_val, y_val = to_tensor(val_pandas)


In [9]:
from torch.utils.data import DataLoader, TensorDataset

# Create datasets
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)

# Create DataLoaders for batching
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)


In [11]:
def train_transformer(model, train_loader, val_loader, optimizer, criterion, epochs=10, device='cpu'):
    model.to(device)
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        
        # Training loop
        for X_batch, y_batch in train_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
        
        # Validation loop
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for X_batch, y_batch in val_loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                val_outputs = model(X_batch)
                val_loss += criterion(val_outputs, y_batch).item()
        
        print(f"Epoch {epoch+1}, Train Loss: {train_loss/len(train_loader):.4f}, Val Loss: {val_loss/len(val_loader):.4f}")


In [12]:
# For independent transactions (sequence_length = 1)
X_train = X_train.view(-1, 1, input_dim)  # Shape: (num_samples, sequence_length, input_dim)
X_val = X_val.view(-1, 1, input_dim)


In [13]:
print("X_train shape:", X_train.shape)  # Should be (num_samples, sequence_length, input_dim)
print("X_val shape:", X_val.shape)


X_train shape: torch.Size([1037731, 1, 6])
X_val shape: torch.Size([258944, 1, 6])


In [14]:
from torch.utils.data import DataLoader, TensorDataset

# Create a DataLoader for batch processing
batch_size = 128  # Adjust batch size based on available memory
train_dataset = TensorDataset(X_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

# Extract features in batches
train_features = []
model.eval()  # Set model to evaluation mode

with torch.no_grad():
    for batch in train_loader:
        batch_features = model(batch[0]).detach().numpy()
        train_features.append(batch_features)

# Concatenate all batch features
train_features = np.concatenate(train_features, axis=0)


In [15]:
from sklearn.decomposition import PCA

# Reshape the 3D tensor into 2D
n_samples, sequence_length, input_dim = X_train.shape
X_train_flattened = X_train.reshape(n_samples, -1)  # Flatten to [n_samples, sequence_length * input_dim]
X_val_flattened = X_val.reshape(X_val.shape[0], -1)

# Dynamically set n_components for PCA
n_components = min(128, min(X_train_flattened.shape[0], X_train_flattened.shape[1]))
print(f"Using n_components for PCA: {n_components}")

# Apply PCA to reduce dimensions
pca = PCA(n_components=n_components)
X_train_reduced = pca.fit_transform(X_train_flattened)
X_val_reduced = pca.transform(X_val_flattened)

print("Reduced X_train shape:", X_train_reduced.shape)
print("Reduced X_val shape:", X_val_reduced.shape)


Using n_components for PCA: 6
Reduced X_train shape: (1037731, 6)
Reduced X_val shape: (258944, 6)


In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
X_train = X_train.to(device)
X_val = X_val.to(device)


In [17]:
X_train = X_train.half()
X_val = X_val.half()
model = model.half()


In [18]:
from torch.utils.data import DataLoader, TensorDataset

# Move data and model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
X_train = X_train.to(device)
X_val = X_val.to(device)

# Batch processing
batch_size = 128
train_dataset = TensorDataset(X_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)

train_features = []
model.eval()

with torch.no_grad():
    for batch in train_loader:
        batch_features = model(batch[0]).cpu().detach().numpy()  # Move to CPU
        train_features.append(batch_features)

# Concatenate results
train_features = np.concatenate(train_features, axis=0)


In [19]:
from pyspark.ml.feature import StringIndexer

# List of categorical columns
categorical_cols = ['merchant', 'category', 'state', 'job']

# StringIndexer to encode categorical variables
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_cols]

# Apply StringIndexer to data
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers)
train_df = pipeline.fit(train_df).transform(train_df)
test_df = pipeline.fit(test_df).transform(test_df)

# Verify encoded columns
train_df.select(*[col+"_index" for col in categorical_cols]).show(5)


+--------------+--------------+-----------+---------+
|merchant_index|category_index|state_index|job_index|
+--------------+--------------+-----------+---------+
|         571.0|          11.0|       12.0|    141.0|
|          81.0|           1.0|       29.0|     61.0|
|         319.0|           6.0|       45.0|    457.0|
|          40.0|           0.0|       39.0|    232.0|
|         519.0|          10.0|       14.0|    297.0|
+--------------+--------------+-----------+---------+
only showing top 5 rows



In [20]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

# Assemble features for scaling
scaler_assembler = VectorAssembler(inputCols=["amt"], outputCol="amt_vector")
train_df = scaler_assembler.transform(train_df)
test_df = scaler_assembler.transform(test_df)

# Apply MinMaxScaler
scaler = MinMaxScaler(inputCol="amt_vector", outputCol="amt_scaled")
scaler_model = scaler.fit(train_df)

train_df = scaler_model.transform(train_df)
test_df = scaler_model.transform(test_df)

# Drop unnecessary columns
train_df = train_df.drop("amt_vector")
test_df = test_df.drop("amt_vector")

# Verify scaled column
train_df.select("amt", "amt_scaled").show(5)


+------+--------------------+
|   amt|          amt_scaled|
+------+--------------------+
|  4.97|[1.37142936102446...|
|107.23|[0.00366969624739...|
|220.11|[0.00756911554896...|
|  45.0|[0.00151997208778...|
| 41.96|[0.00141495583444...|
+------+--------------------+
only showing top 5 rows



In [21]:
# List of all feature columns (include scaled amt and encoded categorical features)
feature_cols = ["amt_scaled"] + [col + "_index" for col in categorical_cols]

# Assemble all features into a single vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

# Verify assembled features
train_df.select("features").show(5, truncate=False)


+---------------------------------------------+
|features                                     |
+---------------------------------------------+
|[1.3714293610244608E-4,571.0,11.0,12.0,141.0]|
|[0.003669696247396184,81.0,1.0,29.0,61.0]    |
|[0.0075691155489690095,319.0,6.0,45.0,457.0] |
|[0.001519972087785297,40.0,0.0,39.0,232.0]   |
|[0.0014149558344474036,519.0,10.0,14.0,297.0]|
+---------------------------------------------+
only showing top 5 rows



In [22]:
# Split dataset if needed
train_df, test_df = train_df.randomSplit([0.8, 0.2], seed=42)


In [23]:
from pyspark.ml.classification import LogisticRegression

# Define the Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="is_fraud")

# Train the model
lr_model = lr.fit(train_df)

# Evaluate on test set
predictions = lr_model.transform(test_df)

# View predictions
predictions.select("is_fraud", "prediction", "probability").show(5, truncate=False)


+--------+----------+------------------------------------------+
|is_fraud|prediction|probability                               |
+--------+----------+------------------------------------------+
|0       |0.0       |[0.9849165636632622,0.01508343633673781]  |
|0       |0.0       |[0.9973706242638098,0.0026293757361901537]|
|0       |0.0       |[0.9962710139264893,0.0037289860735106872]|
|0       |0.0       |[0.994004059840046,0.005995940159953972]  |
|0       |0.0       |[0.9932827313393577,0.006717268660642262] |
+--------+----------+------------------------------------------+
only showing top 5 rows



In [25]:
# Building the Transformer Model Architecture
import torch
import torch.nn as nn

class FraudDetectionTransformer(nn.Module):
    def __init__(self, input_dim, n_heads, n_layers, hidden_dim, output_dim, dropout=0.1):
        super(FraudDetectionTransformer, self).__init__()
        
        self.embedding = nn.Linear(input_dim, hidden_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 500, hidden_dim))  # Adjust 500 if sequence length varies
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=n_heads, dim_feedforward=hidden_dim * 2, dropout=dropout
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=n_layers)
        
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, output_dim),
            nn.Sigmoid()
        )
    
    def forward(self, x):
        seq_len, batch_size, _ = x.size()
        x = self.embedding(x)
        x += self.positional_encoding[:, :seq_len, :]
        x = self.transformer(x)
        x = x.mean(dim=1)  # Pool across sequence length
        return self.fc(x)


In [27]:
import numpy as np

# Drop non-numerical columns and target column from features
features = train_df.select(
    ['amt', 'city_pop', 'lat', 'long', 'unix_time', 'merch_lat', 'merch_long', 'merchant_index', 'category_index']
).toPandas()

labels = train_df.select('is_fraud').toPandas()

# Convert to NumPy arrays
train_sequences = features.values
train_labels = labels.values


In [28]:
val_features = test_df.select(
    ['amt', 'city_pop', 'lat', 'long', 'unix_time', 'merch_lat', 'merch_long', 'merchant_index', 'category_index']
).toPandas()

val_labels = test_df.select('is_fraud').toPandas()

val_sequences = val_features.values
val_labels = val_labels.values


In [29]:
import torch
from torch.utils.data import Dataset

class FraudDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        return torch.tensor(self.sequences[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.float32)


In [30]:
from torch.utils.data import DataLoader

# Create Dataset objects
train_dataset = FraudDataset(train_sequences, train_labels)
val_dataset = FraudDataset(val_sequences, val_labels)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)


In [33]:
import torch
from torch.utils.data import Dataset, DataLoader

class FraudDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences
        self.labels = labels

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        # Add a dummy sequence dimension (sequence length = 1)
        sequence = torch.tensor(self.sequences[idx], dtype=torch.float32).unsqueeze(0)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return sequence, label


# Example: Assume train_seq and test_seq are lists of sequences and labels
train_dataset = FraudDataset(train_sequences, train_labels)
val_dataset = FraudDataset(val_sequences, val_labels)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)


In [35]:
import torch
import torch.nn as nn

class TransformerModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, n_heads, n_layers):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Linear(input_dim, hidden_dim)  # Embed input features
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=n_heads)
        self.transformer = nn.TransformerEncoder(self.encoder_layer, num_layers=n_layers)
        self.fc = nn.Linear(hidden_dim, output_dim)  # Final classification layer
        self.hidden_dim = hidden_dim

    def forward(self, x):
        # x shape: (batch_size, seq_len, feature_dim)
        seq_len = x.size(1)

        # Embed the input features
        x = self.embedding(x)

        # Skip positional encoding if seq_len = 1
        if seq_len > 1:
            positional_encoding = torch.zeros(x.size(), device=x.device)
            for pos in range(seq_len):
                positional_encoding[:, pos, :] = torch.arange(self.hidden_dim, device=x.device)
            x += positional_encoding

        # Transformer expects input of shape (seq_len, batch_size, hidden_dim)
        x = x.transpose(0, 1)
        x = self.transformer(x)
        x = x.mean(dim=0)  # Pool across sequence length
        return torch.sigmoid(self.fc(x))


In [37]:
# Adjust input_dim to match your dataset
model = TransformerModel(input_dim=9, hidden_dim=128, output_dim=1, n_heads=4, n_layers=2)

criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Train the model
train_transformer(model, train_loader, val_loader, optimizer, criterion, epochs=10, device=device)


Epoch 1, Train Loss: 0.0359, Val Loss: 0.0361
Epoch 2, Train Loss: 0.0358, Val Loss: 0.0357
Epoch 3, Train Loss: 0.0358, Val Loss: 0.0361
Epoch 4, Train Loss: 0.0358, Val Loss: 0.0357
Epoch 5, Train Loss: 0.0358, Val Loss: 0.0359
Epoch 6, Train Loss: 0.0358, Val Loss: 0.0361
Epoch 7, Train Loss: 0.0357, Val Loss: 0.0358
Epoch 8, Train Loss: 0.0357, Val Loss: 0.0365
Epoch 9, Train Loss: 0.0357, Val Loss: 0.0361
Epoch 10, Train Loss: 0.0357, Val Loss: 0.0361


In [38]:
model.eval()
deep_features = []
with torch.no_grad():
    for X_batch, _ in train_loader:
        X_batch = X_batch.to(device)
        outputs = model(X_batch)
        deep_features.append(outputs.cpu().numpy())
