In [24]:
from pyspark.sql import SparkSession
from math import exp
from pyspark.rdd import RDD

spark = SparkSession.builder\
    .appName("credit-card-fraud-detection")\
    .master("local[*]")\
    .config("spark.log.level", "ERROR")\
    .getOrCreate()

sc = spark.sparkContext

# Data preparation

In [25]:
# Change the path to the CSV file as needed
# Load the CSV file as a text file and filter out the header
lines = sc.textFile("../../data/creditcard.csv")
header = lines.first()
data_rdd = lines.filter(lambda line: line != header)

# Parse each line: split by comma and convert each element to float
data_rdd = data_rdd.map(lambda line: [float(x.strip("\"")) for x in line.split(",")])

# Data preprocessing

**Understanding the data**:
- According to the dataset description, the input variables are the result of a PCA transformation except "Time" and "Amount" so the features are previously scaled. 
- Every value in the dataset is not null so imputing is also not needed.
- The dataset is highly unbalanced, the positive class (frauds) account for 0.172% of all transactions. To deal with this problem, we have 2 methods:
    - Cost-sensitive learning: the lost function will be adjusted to favor the detection of the minority class.
    - Undersampling, oversampling technique or a combination of the two.

Because of the reasons above and the fact that I will choose the oversampling method to deal with the highly unbalanced nature of the dataset, this data processing step will include:
- Create an RDD where each record is a tuple (label, features)
- Splitting the dataset into train and test set.
- Oversample the minority class (Class = 1) 

When using DataFrame-based MLlib, the model will standardize the Time and Amount column first. With the low-level implementation, obviously this is not the case so I will need to standardize them by myself.

In [None]:
# Create an RDD where each record is a tuple (label, features)
data_rdd = data_rdd.map(lambda x: (x[-1], x[:-1]))

# Define indices for Time and Amount columns
time_idx = 0 
amount_idx = 29

# Calculate min and max values for scaling
time_min = data_rdd.map(lambda x: x[1][time_idx]).min()
time_max = data_rdd.map(lambda x: x[1][time_idx]).max()
amount_min = data_rdd.map(lambda x: x[1][amount_idx]).min()
amount_max = data_rdd.map(lambda x: x[1][amount_idx]).max()

# Apply min-max scaling to Time and Amount columns
def scale_time_amount(row):
    label, features = row
    features_copy = features.copy()
    # Scale Time: (x - min) / (max - min)
    features_copy[time_idx] = (features[time_idx] - time_min) / (time_max - time_min) 
    # Scale Amount: (x - min) / (max - min)
    features_copy[amount_idx] = (features[amount_idx] - amount_min) / (amount_max - amount_min)
    return (label, features_copy)

# Apply the scaling
data_rdd = data_rdd.map(scale_time_amount)

# Split the data into train and test sets in a stratified fashion
# Convert features to tuples (immutable) for the subtract operation
data_rdd_with_tuples = data_rdd.map(lambda x: (x[0], tuple(x[1])))

# Split the data into train and test sets in a stratified fashion
train_rdd_with_tuples = data_rdd_with_tuples.sampleByKey(
    withReplacement=False, 
    fractions={0.0: 0.8, 1.0: 0.8}, 
    seed=42
)

# Perform the subtract with hashable elements
test_rdd_with_tuples = data_rdd_with_tuples.subtract(train_rdd_with_tuples)

# Convert back to lists for further processing
train_rdd = train_rdd_with_tuples.map(lambda x: (x[0], list(x[1])))
test_rdd = test_rdd_with_tuples.map(lambda x: (x[0], list(x[1])))

# Oversample the train RDD to deal with class imbalance
# Calculate class counts in the training data
count_dict = train_rdd.countByKey()
major_count, minor_count = count_dict[0], count_dict[1]
# Calculate the desired oversampling ratio
ratio = float(major_count) / minor_count
# Filter out and oversample the minor class
oversampled_minor_rdd = train_rdd\
    .filter(lambda x : x[0] == 1)\
    .sample(withReplacement=True, fraction=ratio, seed=42)
# Combine the oversampled minor with the train RDD
train_rdd = train_rdd\
    .filter(lambda x : x[0] == 0)\
    .union(oversampled_minor_rdd)

                                                                                

# Implement and train the model using low-level operations

In [None]:
class MyLogisticRegressionModel:
    def __init__(self, learning_rate=5.0, num_iterations=50, convergence_tol=1e-4):
        self.learning_rate = learning_rate
        self.num_iterations = num_iterations
        self.convergence_tol = convergence_tol
        self.weights = None
        self.converged_at = None
        
    def predictProb(self, features):
        """Computes the prediction for a single feature vector."""
        s = sum(w * f for w, f in zip(self.weights, features))
        return 1 / (1 + exp(-s)) if s >= 0 else exp(s) / (1 + exp(s))
    
    def updateWeights(self, train_rdd, iteration):
        """Computes the gradient using mapPartitions for better performance."""
        # Use adaptive learning rate
        current_lr = self.learning_rate / (1 + 0.01 * iteration)
        
        def process_partition(partition):
            """Process all records in a partition at once for better efficiency."""
            local_gradients = [0.0] * len(self.weights)
            count = 0
            
            # Process all records in the partition
            for label, features in partition:
                # Calculate prediction
                s = sum(w * f for w, f in zip(self.weights, features))
                pred = 1 / (1 + exp(-s)) if s >= 0 else exp(s) / (1 + exp(s))
                
                # Update gradient for this record
                error = pred - label
                for i, feature in enumerate(features):
                    local_gradients[i] += error * feature
                count += 1
            
            # Return normalized gradients from this partition
            if count > 0:
                local_gradients = [g / count for g in local_gradients]
            
            yield local_gradients
        
        # Calculate gradients across all partitions
        gradients = train_rdd\
            .mapPartitions(process_partition)\
            .reduce(lambda a, b: [x + y for x, y in zip(a, b)])
        
        # Update weights using current learning rate
        updated_weights = [w - current_lr * g for w, g in zip(self.weights, gradients)]
        return updated_weights
    
    def fit(self, train_rdd):
        """Fits the Logistic Regression model with optimizations."""
        # Add intercept and cache the data
        train_rdd_with_intercept = train_rdd.map(lambda x: (x[0], [1.0] + x[1])).cache()
        
        # Initialize weights
        self.weights = [0.0] * len(train_rdd_with_intercept.first()[1])
        
        # Iterate until convergence or max iterations
        for i in range(self.num_iterations):
            # Update weights
            new_weights = self.updateWeights(train_rdd_with_intercept, i)
            
            # Check for convergence
            weight_diff = sum(abs(new - old) for new, old in zip(new_weights, self.weights))
            self.weights = new_weights
            
            if weight_diff < self.convergence_tol * len(self.weights):
                self.converged_at = i
                break
                
        if not hasattr(self, 'converged_at') or self.converged_at is None:
            self.converged_at = self.num_iterations
            
        # Clean up cached RDD
        train_rdd_with_intercept.unpersist()
        
    def predict(self, features):
        """Predicts class label."""
        features_with_intercept = [1.0] + features
        return 0.0 if self.predictProb(features_with_intercept) < 0.5 else 1.0
        
# Initialize and train with optimized model
model = MyLogisticRegressionModel(learning_rate=5.0, num_iterations=50, convergence_tol=1e-4)
model.fit(train_rdd)

                                                                                

# Evaluate on test set

In [31]:
# Import BinaryClassificationMetrics for ROC calculation
from pyspark.mllib.evaluation import BinaryClassificationMetrics

print("Coefficients:", model.weights[1:])
print("Intercept:", model.weights[0])
print("Converged at:", model.converged_at)

# Get predictions and labels for evaluation
predictionAndLabels = test_rdd.map(lambda p: (model.predict(p[1]), p[0]))

# Calculate accuracy
accuracy = predictionAndLabels.filter(lambda x: x[0] == x[1]).count() / predictionAndLabels.count()

# Calculate confusion matrix values
true_positives = predictionAndLabels.filter(lambda x: x[0] == 1.0 and x[1] == 1.0).count()
false_positives = predictionAndLabels.filter(lambda x: x[0] == 1.0 and x[1] == 0.0).count()
true_negatives = predictionAndLabels.filter(lambda x: x[0] == 0.0 and x[1] == 0.0).count()
false_negatives = predictionAndLabels.filter(lambda x: x[0] == 0.0 and x[1] == 1.0).count()

# Calculate precision for each label
precision_0 = true_negatives / (true_negatives + false_negatives) if (true_negatives + false_negatives) > 0 else 0
precision_1 = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0

# Calculate recall for each label
recall_0 = true_negatives / (true_negatives + false_positives) if (true_negatives + false_positives) > 0 else 0
recall_1 = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0

# For ROC calculation, we need prediction scores (probabilities) instead of binary predictions
# Create RDD with (score, label) format required by BinaryClassificationMetrics
scoreAndLabels = test_rdd.map(lambda x: (model.predictProb([1.0] + x[1]), x[0]))

# Create binary classification metrics object
metrics = BinaryClassificationMetrics(scoreAndLabels)

# Get area under ROC
auc_roc = metrics.areaUnderROC
auc_pr = metrics.areaUnderPR

# Print evaluation metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Precision by label: [0: {precision_0:.4f}, 1: {precision_1:.4f}]")
print(f"Recall by label: [0: {recall_0:.4f}, 1: {recall_1:.4f}]")
print(f"Area under ROC: {auc_roc:.4f}")
print(f"Area under PR: {auc_pr:.4f}")

Coefficients: [-3.4292099748116573, -9.802899192074307, 9.696726839609024, -27.818595614355036, 21.408690192345475, -19.53336699438556, -7.407869845508147, -39.40542508060768, 4.935976623038762, -24.352598367969605, -53.109896020676814, 35.28457868072276, -64.36455530481766, -0.46322655112228733, -66.45665150061443, -1.5828028978673325, -58.489120381694086, -104.56813100275232, -38.46713176496969, 14.084508086021499, 4.802675362058377, 9.36926524675539, 2.5781994898848266, -2.274474631475483, 0.9319434869757415, -0.5558624105814621, -3.6953273573100223, 10.241343912665242, 5.000428271361841, 0.039141697375986634]
Intercept: -6.866025234934795
Converged at: 50


                                                                                

Accuracy: 0.9710
Precision by label: [0: 0.9998, 1: 0.0491]
Recall by label: [0: 0.9711, 1: 0.9032]
Area under ROC: 0.9785
Area under PR: 0.4404
