In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
from pyspark.sql.functions import col, to_date, dayofmonth, dayofweek, month, when, lit, isnull, count
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from datetime import datetime
import time
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

# Initialize Spark Session optimized for local Jupyter execution
spark = SparkSession.builder \
    .appName("FraudDetection") \
    .master("local[*]") \
    .config("spark.sql.debug.maxToStringFields", "100") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()

print("Spark session created successfully!")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/20 13:59:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session created successfully!


In [2]:
# Define schema
schema = StructType([
    StructField("Transaction ID", StringType(), True),
    StructField("Customer ID", StringType(), True),
    StructField("Transaction Amount", FloatType(), True),
    StructField("Transaction Date", StringType(), True),
    StructField("Payment Method", StringType(), True),
    StructField("Product Category", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Customer Age", IntegerType(), True),
    StructField("Customer Location", StringType(), True),
    StructField("Device Used", StringType(), True),
    StructField("IP Address", StringType(), True),
    StructField("Shipping Address", StringType(), True),
    StructField("Billing Address", StringType(), True),
    StructField("Is Fraudulent", IntegerType(), True),
    StructField("Account Age Days", IntegerType(), True),
    StructField("Transaction Hour", IntegerType(), True)
])

# Load data with error handling
try:
    # For local Jupyter environment, use local file path - adjust this to your file location
    # If you're using actual HDFS, keep the HDFS path
    try:
        # First try local file
        file_path = "transaction_data.csv"  # Change this to your local file path
        df = spark.read.option("header", "true") \
                       .option("multiLine", "true") \
                       .schema(schema) \
                       .csv(file_path)
    except:
        # If local fails, try HDFS path
        today = datetime.today()
        hdfs_path = f"hdfs://namenode:9000/user/root/transactions/YYYY={today.year}/MM={today.month:02d}/DD={today.day:02d}/transaction_data.csv"
        df = spark.read.option("header", "true") \
                       .option("multiLine", "true") \
                       .schema(schema) \
                       .csv(hdfs_path)
    
    print("Data loaded successfully. Row count:", df.count())
except Exception as e:
    print(f"Error loading data: {str(e)}")
    print("Please update the file path to point to your transaction data.")

Data loaded successfully. Row count: 23634


In [3]:
# Data Quality Check
print("\nData Quality Check:")
print("Null values per column:")
null_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
null_counts.show(vertical=True)

# Show sample data
print("\nSample data:")
df.show(5)

# Check class distribution
fraud_count = df.filter(col("Is Fraudulent") == 1).count()
non_fraud_count = df.filter(col("Is Fraudulent") == 0).count()
total = fraud_count + non_fraud_count
fraud_ratio = fraud_count / total

print(f"\nFraudulent transactions: {fraud_count} ({fraud_ratio:.2%})")
print(f"Non-fraudulent transactions: {non_fraud_count} ({1-fraud_ratio:.2%})")
print(f"Imbalance ratio: 1:{non_fraud_count/fraud_count:.1f}")


Data Quality Check:
Null values per column:
-RECORD 0-----------------
 Transaction ID     | 0   
 Customer ID        | 0   
 Transaction Amount | 0   
 Transaction Date   | 0   
 Payment Method     | 0   
 Product Category   | 0   
 Quantity           | 0   
 Customer Age       | 0   
 Customer Location  | 0   
 Device Used        | 0   
 IP Address         | 0   
 Shipping Address   | 0   
 Billing Address    | 0   
 Is Fraudulent      | 0   
 Account Age Days   | 0   
 Transaction Hour   | 0   


Sample data:
+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-------------------+-----------+---------------+--------------------+--------------------+-------------+----------------+----------------+
|      Transaction ID|         Customer ID|Transaction Amount|   Transaction Date|Payment Method|Product Category|Quantity|Customer Age|  Customer Location|Device Used|     IP Address|    Shipping Address| 

In [4]:
# Data Preprocessing
print("\nStarting Data Preprocessing...")
start_time = time.time()

# Convert Transaction Date and extract features
df = df.withColumn("Transaction Date", to_date(col("Transaction Date")))
df = df.withColumn("DayOfMonth", dayofmonth(col("Transaction Date")))
df = df.withColumn("DayOfWeek", dayofweek(col("Transaction Date")))
df = df.withColumn("Month", month(col("Transaction Date")))

# Create feature: is shipping different from billing
df = df.withColumn("AddressMismatch", 
                  when(col("Shipping Address") != col("Billing Address"), 1).otherwise(0))

# Create feature: transaction amount per quantity
df = df.withColumn("AmountPerQuantity", 
                  when(col("Quantity") > 0, col("Transaction Amount") / col("Quantity")).otherwise(col("Transaction Amount")))

# Create time-based features
df = df.withColumn("IsWeekend", 
                  when((col("DayOfWeek") == 1) | (col("DayOfWeek") == 7), 1).otherwise(0))

df = df.withColumn("IsNightTime", 
                  when((col("Transaction Hour") >= 22) | (col("Transaction Hour") <= 5), 1).otherwise(0))

# Calculate class weights (to handle imbalance)
# Higher weight for minority class (fraud)
weight_multiplier = 5.0  # Increase this for better fraud detection
fraud_weight = (non_fraud_count / total) * weight_multiplier
non_fraud_weight = (fraud_count / total)

df = df.withColumn("classWeight", 
                  when(col("Is Fraudulent") == 1, fraud_weight)
                  .otherwise(non_fraud_weight))

# Fill nulls in categorical columns
categorical_cols = ["Payment Method", "Product Category", "Customer Location", "Device Used"]
for col_name in categorical_cols:
    df = df.fillna("unknown", subset=[col_name])

print(f"\nPreprocessing completed in {time.time() - start_time:.2f} seconds")
print("Sample of weights (fraud cases should have higher weight):")
df.select("Is Fraudulent", "classWeight").distinct().orderBy("Is Fraudulent").show(5)

# Show all features
print("\nAll features after preprocessing:")
df.printSchema()


Starting Data Preprocessing...

Preprocessing completed in -2.01 seconds
Sample of weights (fraud cases should have higher weight):
+-------------+------------------+
|Is Fraudulent|       classWeight|
+-------------+------------------+
|            0|0.0517051705170517|
|            1| 4.741474147414742|
+-------------+------------------+


All features after preprocessing:
root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Transaction Amount: float (nullable = true)
 |-- Transaction Date: date (nullable = true)
 |-- Payment Method: string (nullable = false)
 |-- Product Category: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- Customer Age: integer (nullable = true)
 |-- Customer Location: string (nullable = false)
 |-- Device Used: string (nullable = false)
 |-- IP Address: string (nullable = true)
 |-- Shipping Address: string (nullable = true)
 |-- Billing Address: string (nullable = true)
 |-- Is Fraudulent:

In [6]:
# 1. Clear existing stages if any
stages = []

# 2. First, handle categorical columns
categorical_cols = ["Payment Method", "Product Category", "Customer Location", "Device Used"]
categorical_features = []

# For each categorical column, create an indexed column and then one-hot encode it
for col_name in categorical_cols:
    # Create a string indexer
    indexer = StringIndexer(
        inputCol=col_name, 
        outputCol=f"{col_name}_indexed", 
        handleInvalid="keep"
    )
    stages.append(indexer)
    
    # Create a one-hot encoder
    encoder = OneHotEncoder(
        inputCol=f"{col_name}_indexed", 
        outputCol=f"{col_name}_encoded",
        handleInvalid="keep"
    )
    stages.append(encoder)
    categorical_features.append(f"{col_name}_encoded")

# 3. Handle numerical columns
numerical_cols = ["Transaction Amount", "Quantity", "Customer Age", 
                  "Account Age Days", "Transaction Hour", "DayOfMonth",
                  "DayOfWeek", "Month", "AddressMismatch", "AmountPerQuantity",
                  "IsWeekend", "IsNightTime"]

# Create an imputer for numerical columns
imputer = Imputer(
    inputCols=numerical_cols,
    outputCols=[f"{col}_imputed" for col in numerical_cols],
    strategy="median"
)
stages.append(imputer)
numerical_features = [f"{col}_imputed" for col in numerical_cols]

# 4. Assemble all features
assembler = VectorAssembler(
    inputCols=categorical_features + numerical_features,
    outputCol="raw_features",
    handleInvalid="keep"
)
stages.append(assembler)

# 5. Scale features
scaler = StandardScaler(
    inputCol="raw_features", 
    outputCol="features", 
    withStd=True, 
    withMean=True
)
stages.append(scaler)

print("Feature pipeline rebuilt with stages:")
for i, stage in enumerate(stages):
    print(f"{i+1}. {type(stage).__name__}")

# Now let's try to train a simple model without hyperparameter tuning
rf = RandomForestClassifier(
    featuresCol="features", 
    labelCol="Is Fraudulent", 
    weightCol="classWeight",
    numTrees=100,
    maxDepth=10,
    seed=42
)

# Split data (80% train, 20% test)
train, test = df.randomSplit([0.8, 0.2], seed=42)
print(f"\nTrain rows: {train.count()}, Test rows: {test.count()}")

# Add the model to stages
model_pipeline = Pipeline(stages=stages + [rf])

# Train the model
print("\nTraining Random Forest model...")
start_time = time.time()
try:
    model = model_pipeline.fit(train)
    training_time = time.time() - start_time
    print(f"Training completed in {training_time:.2f} seconds")
    
    # Make predictions
    predictions = model.transform(test)
    
    # Evaluate model
    evaluator = BinaryClassificationEvaluator(labelCol="Is Fraudulent")
    auc = evaluator.evaluate(predictions)
    recall_eval = MulticlassClassificationEvaluator(
        labelCol="Is Fraudulent", 
        metricName="weightedRecall"
    )
    recall = recall_eval.evaluate(predictions)
    
    print(f"AUC: {auc:.4f}")
    print(f"Recall: {recall:.4f}")
    
except Exception as e:
    print(f"Error during training: {str(e)}")
    
    # Let's examine the DataFrame structure
    print("\nExamining DataFrame schema:")
    train.printSchema()
    
    # Count the number of features to ensure we don't have too many
    print(f"\nNumber of categorical features: {len(categorical_features)}")
    print(f"Number of numerical features: {len(numerical_features)}")
    print(f"Total features: {len(categorical_features) + len(numerical_features)}")
    
    # Display a sample of the categorical and numerical features
    print("\nSample of feature columns:")
    if train.count() > 0:
        sample_row = train.limit(1)
        for col_name in categorical_cols[:2]:
            print(f"{col_name}: {sample_row.select(col_name).collect()[0][0]}")
        for col_name in numerical_cols[:2]:
            print(f"{col_name}: {sample_row.select(col_name).collect()[0][0]}")

Feature pipeline rebuilt with stages:
1. StringIndexer
2. OneHotEncoder
3. StringIndexer
4. OneHotEncoder
5. StringIndexer
6. OneHotEncoder
7. StringIndexer
8. OneHotEncoder
9. Imputer
10. VectorAssembler
11. StandardScaler

Train rows: 18916, Test rows: 4718

Training Random Forest model...


25/04/20 14:03:07 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
25/04/20 14:03:08 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
25/04/20 14:03:09 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/04/20 14:03:10 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/04/20 14:03:14 WARN DAGScheduler: Broadcasting large task binary with size 2.6 MiB
25/04/20 14:03:58 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/04/20 14:04:14 WARN DAGScheduler: Broadcasting large task binary with size 2.8 MiB
25/04/20 14:04:17 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/04/20 14:04:22 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/04/20 14:04:25 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/04/20 14:04:28 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/04/20 14:04:31 WARN DAGScheduler: Broadcasting larg

Training completed in 107.35 seconds


25/04/20 14:04:51 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
25/04/20 14:04:54 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB
[Stage 77:>                                                         (0 + 1) / 1]

AUC: 0.7829
Recall: 0.0466


                                                                                

In [None]:

# Check class distribution in splits
train_fraud_ratio = train.filter(col("Is Fraudulent") == 1).count() / train.count()
test_fraud_ratio = test.filter(col("Is Fraudulent") == 1).count() / test.count()
print(f"Train fraud ratio: {train_fraud_ratio:.2%}")
print(f"Test fraud ratio: {test_fraud_ratio:.2%}")

# Instead of using CrossValidator, let's manually try different parameter combinations
rf_params = [
    {"numTrees": 50, "maxDepth": 5, "minInstancesPerNode": 1, "impurity": "gini"},
    {"numTrees": 100, "maxDepth": 10, "minInstancesPerNode": 2, "impurity": "entropy"},
    {"numTrees": 200, "maxDepth": 15, "minInstancesPerNode": 4, "impurity": "gini"}
]

rf_results = []
evaluator = BinaryClassificationEvaluator(labelCol="Is Fraudulent", metricName="areaUnderROC")

print("\nTraining Random Forest with multiple parameter combinations...")
for params in rf_params:
    start_time = time.time()
    
    # Create Random Forest with specific parameters
    rf = RandomForestClassifier(
        featuresCol="features", 
        labelCol="Is Fraudulent", 
        weightCol="classWeight",
        **params
    )
    
    # Create pipeline with preprocessing stages plus classifier
    rf_pipeline = Pipeline(stages=stages + [rf])
    
    # Train model
    rf_model = rf_pipeline.fit(train)
    predictions = rf_model.transform(test)
    
    # Evaluate
    auc = evaluator.evaluate(predictions)
    recall_eval = MulticlassClassificationEvaluator(
        labelCol="Is Fraudulent", 
        metricName="weightedRecall"
    )
    recall = recall_eval.evaluate(predictions)
    
    training_time = time.time() - start_time
    
    rf_results.append({
        "numTrees": params["numTrees"],
        "maxDepth": params["maxDepth"],
        "minInstancesPerNode": params["minInstancesPerNode"],
        "impurity": params["impurity"],
        "AUC": auc,
        "Recall": recall,
        "Time": training_time
    })
    
    print(f"RF with {params} - AUC: {auc:.4f}, Recall: {recall:.4f}, Time: {training_time:.1f}s")

# Find best model
best_rf_params = max(rf_results, key=lambda x: x["AUC"])
print("\nBest Random Forest parameters:")
print(f"Number of trees: {best_rf_params['numTrees']}")
print(f"Max depth: {best_rf_params['maxDepth']}")
print(f"Min instances per node: {best_rf_params['minInstancesPerNode']}")
print(f"Impurity: {best_rf_params['impurity']}")
print(f"AUC: {best_rf_params['AUC']:.4f}")

In [None]:
# Define models to test
models = {
    "Logistic Regression": LogisticRegression(
        featuresCol="features", 
        labelCol="Is Fraudulent", 
        weightCol="classWeight",
        maxIter=20,
        regParam=0.01,
        elasticNetParam=0.5
    ),
    "Random Forest": RandomForestClassifier(
        featuresCol="features", 
        labelCol="Is Fraudulent", 
        weightCol="classWeight",
        numTrees=best_rf_model.getNumTrees(),
        maxDepth=best_rf_model.getMaxDepth(),
        minInstancesPerNode=best_rf_model.getMinInstancesPerNode(),
        impurity=best_rf_model.getImpurity()
    ),
    "Gradient-Boosted Trees": GBTClassifier(
        featuresCol="features", 
        labelCol="Is Fraudulent", 
        weightCol="classWeight",
        maxIter=50,
        maxDepth=8,
        stepSize=0.1
    ),
    "Decision Tree": DecisionTreeClassifier(
        featuresCol="features", 
        labelCol="Is Fraudulent", 
        weightCol="classWeight",
        maxDepth=10,
        impurity="entropy"
    )
}

# Metrics to evaluate
metrics = {
    "AUC": BinaryClassificationEvaluator(labelCol="Is Fraudulent", metricName="areaUnderROC"),
    "PR AUC": BinaryClassificationEvaluator(labelCol="Is Fraudulent", metricName="areaUnderPR"),
    "Recall": MulticlassClassificationEvaluator(labelCol="Is Fraudulent", metricName="weightedRecall"),
    "Precision": MulticlassClassificationEvaluator(labelCol="Is Fraudulent", metricName="weightedPrecision"),
    "F1": MulticlassClassificationEvaluator(labelCol="Is Fraudulent", metricName="f1")
}

results = []

# Train and evaluate each model
for model_name, model in models.items():
    print(f"\nTraining {model_name}...")
    start_time = time.time()
    
    # Create pipeline with preprocessing stages plus classifier
    pipeline = Pipeline(stages=stages + [model])
    
    # Fit model
    trained_model = pipeline.fit(train)
    predictions = trained_model.transform(test)
    
    # Evaluate with all metrics
    model_results = {"Model": model_name}
    for metric_name, evaluator in metrics.items():
        score = evaluator.evaluate(predictions)
        model_results[metric_name] = score
    
    training_time = time.time() - start_time
    model_results["Time (s)"] = training_time
    
    results.append(model_results)
    
    # Print model performance
    print(f"{model_name} evaluation:")
    for metric, value in model_results.items():
        if metric != "Model" and metric != "Time (s)":
            print(f"  - {metric}: {value:.4f}")
    print(f"  - Training time: {training_time:.1f}s")

# Show results table
results_df = spark.createDataFrame(results).orderBy("F1", ascending=False)
print("\nModel Performance Comparison:")
results_df.show(truncate=False)

# Convert to pandas for visualization
results_pd = results_df.toPandas()

In [None]:
# Plot model comparison
import matplotlib.pyplot as plt

# Set figure size
plt.figure(figsize=(14, 8))

# Get models and metrics
models = results_pd['Model'].tolist()
metrics_to_plot = ['AUC', 'PR AUC', 'Recall', 'Precision', 'F1']

# Set width of bars
barWidth = 0.15
r = range(len(models))

# Plot bars
for i, metric in enumerate(metrics_to_plot):
    plt.bar([x + i*barWidth for x in r], results_pd[metric], width=barWidth, label=metric)

# Add labels and legend
plt.xlabel('Models', fontweight='bold', fontsize=12)
plt.ylabel('Score', fontweight='bold', fontsize=12)
plt.xticks([r + barWidth*2 for r in range(len(models))], models, rotation=45)
plt.legend()
plt.title('Model Performance Comparison', fontsize=14)
plt.tight_layout()

# Show plot
plt.show()

# Identify best model based on F1 score
best_model_row = results_pd.loc[results_pd['F1'].idxmax()]
print(f"\nBest model based on F1 score: {best_model_row['Model']}")
print(f"F1 score: {best_model_row['F1']:.4f}")
print(f"AUC: {best_model_row['AUC']:.4f}")
print(f"Recall: {best_model_row['Recall']:.4f}")

In [None]:
rf_results_list = [r for r in results if r["Model"] == rf_name]
if rf_results_list:
    rf_results = rf_results_list[0]
    print(f"\nAnalyzing {rf_name} model (F1: {rf_results['F1']:.4f})")
else:
    print(f"Model '{rf_name}' not found in results.")

# Get the trained Random Forest model from pipeline
rf_pipeline = Pipeline(stages=stages + [models[rf_name]])
rf_trained = rf_pipeline.fit(train)
rf_model = rf_trained.stages[-1]

# Get feature importance
feature_importances = rf_model.featureImportances
print("Feature importances:", feature_importances)

# Get feature names (the order matches the VectorAssembler's inputCols)
feature_cols = assembler_inputs

# Create a DataFrame with feature names and importances
if len(feature_cols) == len(feature_importances):
    importance_list = [(feature_cols[i], float(feature_importances[i])) 
                     for i in range(len(feature_cols))]
    
    # Convert to Spark DataFrame and then to Pandas
    importance_df = spark.createDataFrame(importance_list, ["feature", "importance"])
    importance_pd = importance_df.orderBy("importance", ascending=False).toPandas()
    
    # Plot top 15 features
    plt.figure(figsize=(10, 8))
    top_n = min(15, len(importance_pd))
    plt.barh(importance_pd['feature'][:top_n], importance_pd['importance'][:top_n])
    plt.xlabel('Importance')
    plt.ylabel('Feature')
    plt.title('Top 15 Important Features')
    plt.gca().invert_yaxis()  # To have the highest importance at the top
    plt.tight_layout()
    plt.show()
else:
    print(f"Warning: Feature count mismatch. Features: {len(feature_cols)}, Importances: {len(feature_importances)}")

In [None]:
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
# from pyspark.sql.functions import col, to_date, dayofmonth, dayofweek, month, when, lit, isnull, count
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer
# from pyspark.ml.classification import RandomForestClassifier
# from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# from pyspark.ml import Pipeline
# from datetime import datetime
# import time

# # Initialize Spark Session with optimized configurations
# spark = SparkSession.builder \
#     .appName("FraudDetection") \
#     .master("local[*]") \
#     .config("spark.sql.debug.maxToStringFields", "100") \
#     .config("spark.driver.memory", "4g") \
#     .getOrCreate()

# print("Spark session created successfully!")

# # Define schema
# schema = StructType([
#     StructField("Transaction ID", StringType(), True),
#     StructField("Customer ID", StringType(), True),
#     StructField("Transaction Amount", FloatType(), True),
#     StructField("Transaction Date", StringType(), True),
#     StructField("Payment Method", StringType(), True),
#     StructField("Product Category", StringType(), True),
#     StructField("Quantity", IntegerType(), True),
#     StructField("Customer Age", IntegerType(), True),
#     StructField("Customer Location", StringType(), True),
#     StructField("Device Used", StringType(), True),
#     StructField("IP Address", StringType(), True),
#     StructField("Shipping Address", StringType(), True),
#     StructField("Billing Address", StringType(), True),
#     StructField("Is Fraudulent", IntegerType(), True),
#     StructField("Account Age Days", IntegerType(), True),
#     StructField("Transaction Hour", IntegerType(), True)
# ])

# # Load data from local file with error handling
# try:
#     today = datetime.today()
#     hdfs_path = f"hdfs://namenode:9000/user/root/transactions/YYYY={today.year}/MM={today.month:02d}/DD={today.day:02d}/transaction_data.csv"
    
#     df = spark.read.option("header", "true") \
#                    .option("multiLine", "true") \
#                    .schema(schema) \
#                    .csv(hdfs_path)
    
#     print("Data loaded successfully. Row count:", df.count())
# except Exception as e:
#     print(f"Error loading data: {str(e)}")
#     spark.stop()
#     exit()
# ####################################################################################################
# # Data Quality Check
# print("\nData Quality Check:")
# print("Null values per column:")
# null_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
# null_counts.show(vertical=True)
# ####################################################################################################
# # Data Quality Check
# print("\nData Quality Check:")
# print("Null values per column:")
# null_counts = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
# null_counts.show(vertical=True)
# ####################################################################################################
# # Data Preprocessing
# print("\nStarting Data Preprocessing...")
# start_time = time.time()
# from pyspark.sql.functions import col, to_date, dayofmonth, dayofweek, month, when

# # Convert Transaction Date and extract features
# df = df.withColumn("Transaction Date", to_date(col("Transaction Date")))
# df = df.withColumn("DayOfMonth", dayofmonth(col("Transaction Date")))
# df = df.withColumn("DayOfWeek", dayofweek(col("Transaction Date")))
# df = df.withColumn("Month", month(col("Transaction Date")))

# # Create feature: is shipping different from billing
# df = df.withColumn("AddressMismatch", 
#                   when(col("Shipping Address") != col("Billing Address"), 1).otherwise(0))

# # Create feature: transaction amount per quantity
# df = df.withColumn("AmountPerQuantity", 
#                   col("Transaction Amount") / col("Quantity"))

# # Handle class imbalance
# fraud_count = df.filter(col("Is Fraudulent") == 1).count()
# non_fraud_count = df.filter(col("Is Fraudulent") == 0).count()
# fraud_ratio = fraud_count / (fraud_count + non_fraud_count)

# print(f"\nFraudulent transactions: {fraud_count} ({fraud_ratio:.2%})")
# print(f"Non-fraudulent transactions: {non_fraud_count}")
# ####################################################################################################
# # 1. Calculate class weights (to handle imbalance)
# fraud_weight = non_fraud_count / (fraud_count + non_fraud_count)  # Weight for fraud class
# non_fraud_weight = fraud_count / (fraud_count + non_fraud_count)  # Weight for non-fraud

# df = df.withColumn("classWeight", 
#                   when(col("Is Fraudulent") == 1, fraud_weight)
#                   .otherwise(non_fraud_weight))

# # 2. Fill nulls in categorical columns (defensive programming)
# categorical_cols = ["Payment Method", "Product Category", "Customer Location", "Device Used"]
# for col_name in categorical_cols:
#     df = df.fillna("unknown", subset=[col_name])

# print("\nClass weights applied and categorical nulls handled (if any existed).")
# print("Sample of weights (fraud cases should have higher weight):")
# df.select("Is Fraudulent", "classWeight").show(5)
# ####################################################################################################
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, Imputer

# # Define pipeline stages
# stages = []

# # 1. Categorical encoding
# categorical_cols = ["Payment Method", "Product Category", "Customer Location", "Device Used"]
# for col_name in categorical_cols:
#     # Convert strings to numerical indices
#     string_indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_Index", handleInvalid="keep")
#     # One-hot encode indices
#     encoder = OneHotEncoder(inputCols=[f"{col_name}_Index"], outputCols=[f"{col_name}_OHE"], handleInvalid="keep")
#     stages += [string_indexer, encoder]

# # 2. Numerical columns (impute missing values with median)
# numerical_cols = ["Transaction Amount", "Quantity", "Customer Age", 
#                  "Account Age Days", "Transaction Hour", "DayOfMonth",
#                  "DayOfWeek", "Month", "AddressMismatch", "AmountPerQuantity"]

# for num_col in numerical_cols:
#     imputer = Imputer(inputCol=num_col, outputCol=f"{num_col}_imputed", strategy="median")
#     stages.append(imputer)
#     numerical_cols[numerical_cols.index(num_col)] = f"{num_col}_imputed"  # Update column name

# # 3. Assemble all features into a vector
# assembler_inputs = [f"{c}_OHE" for c in categorical_cols] + numerical_cols
# assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="rawFeatures", handleInvalid="keep")
# stages.append(assembler)

# # 4. Scale features (mean=0, std=1)
# scaler = StandardScaler(inputCol="rawFeatures", outputCol="features", withStd=True, withMean=True)
# stages.append(scaler)

# # 5. Random Forest with class weights
# rf = RandomForestClassifier(
#     featuresCol="features",
#     labelCol="Is Fraudulent",
#     weightCol="classWeight",  # Critical for imbalance!
#     numTrees=50,
#     maxDepth=5,
#     seed=42
# )
# stages.append(rf)

# # Create the pipeline
# pipeline = Pipeline(stages=stages)
# print("\nPipeline built successfully. Ready for training!")
# ####################################################################################################
# # Split data (80% train, 20% test)
# train, test = df.randomSplit([0.8, 0.2], seed=42)
# print(f"\nTrain rows: {train.count()}, Test rows: {test.count()}")

# # Train the model
# print("\nTraining started...")
# start_time = time.time()
# model = pipeline.fit(train)
# print(f"Training completed in {time.time() - start_time:.2f} seconds")

# # Generate predictions on test set
# predictions = model.transform(test)
# print("\nPredictions ready for evaluation.")
# ####################################################################################################
# from pyspark.ml.classification import (
#     LogisticRegression, 
#     RandomForestClassifier, 
#     GBTClassifier, 
#     DecisionTreeClassifier
# )
# from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# import time

# # Define models to test
# models = {
#     "Logistic Regression": LogisticRegression(
#         featuresCol="features", 
#         labelCol="Is Fraudulent", 
#         weightCol="classWeight"
#     ),
#     "Random Forest": RandomForestClassifier(
#         featuresCol="features", 
#         labelCol="Is Fraudulent", 
#         weightCol="classWeight",
#         numTrees=50
#     ),
#     "Gradient-Boosted Trees": GBTClassifier(
#         featuresCol="features", 
#         labelCol="Is Fraudulent", 
#         weightCol="classWeight",
#         maxIter=20
#     ),
#     "Decision Tree": DecisionTreeClassifier(
#         featuresCol="features", 
#         labelCol="Is Fraudulent", 
#         weightCol="classWeight"
#     )
# }

# # Metrics to evaluate
# metrics = {
#     "AUC": BinaryClassificationEvaluator(labelCol="Is Fraudulent"),
#     "Recall": MulticlassClassificationEvaluator(
#         labelCol="Is Fraudulent", 
#         metricName="weightedRecall"
#     )
# }

# results = []

# # Train and evaluate each model
# for model_name, model in models.items():
#     print(f"\nTraining {model_name}...")
#     start_time = time.time()
    
#     # Create pipeline (reuse your existing stages, replace classifier)
#     pipeline = Pipeline(stages=stages[:-1] + [model])  # Keep all but last stage
    
#     # Fit model
#     trained_model = pipeline.fit(train)
#     predictions = trained_model.transform(test)
    
#     # Evaluate
#     auc = metrics["AUC"].evaluate(predictions)
#     recall = metrics["Recall"].evaluate(predictions)
#     training_time = time.time() - start_time
    
#     results.append({
#         "Model": model_name,
#         "AUC": auc,
#         "Recall": recall,
#         "Time (s)": training_time
#     })
    
#     print(f"{model_name} - AUC: {auc:.4f}, Recall: {recall:.4f}, Time: {training_time:.1f}s")

# # Show results
# results_df = spark.createDataFrame(results).orderBy("Recall", ascending=False)
# print("\nModel Performance Comparison:")
# results_df.show(truncate=False)
# ####################################################################################################
# spark.stop()