### Importing Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType
from geopy.distance import geodesic

### Initializing Spark

In [None]:
spark = SparkSession.builder \
    .appName("FraudDetection") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

df = spark.read.csv(r"D:\Data Science\Big Data Technology\Project\Streaming-Fraud-Detection\Streaming-Fraud-Detection\data\raw\fraudTrain.csv", header=True, inferSchema=True)
df.printSchema()

In [None]:
df.show(5)

### Feature Engineering

In [None]:
# Calculate the distance use UDF
def calculate_distance(lat1, lon1, lat2, lon2):
    return geodesic((lat1, lon1), (lat2, lon2)).km

In [None]:
distance_udf = F.udf(calculate_distance, DoubleType())

In [None]:
df = df.withColumn("distance", distance_udf("lat", "long", "merch_lat", "merch_long"))

In [None]:
df.show(5)

In [None]:
# Transforming the transaction date to datetime
df = df.withColumn("trans_date", F.to_timestamp("trans_date_trans_time")) \
       .withColumn("hour", F.hour("trans_date")) \
       .withColumn("day_of_week", F.dayofweek("trans_date")) \
       .withColumn("month", F.month("trans_date")) \
       .withColumn("age", F.year("trans_date") - F.year("dob"))

In [None]:
# Drop unnecessary columns for modeling in Spark
df = df.drop('first', 'last', 'street', 'city', 'state', 'zip', 'trans_num')

In [None]:
# Amount vs average amount by category
windowSpec = Window.partitionBy('category')

# Calculate amt_vs_category_avg
df = df.withColumn(
    'amt_vs_category_avg',
    F.col('amt') / F.avg('amt').over(windowSpec)
)

In [None]:
print(df.columns)

### Data Processing

##### Encoding object columns

In [None]:
categorical_cols = ["merchant", "category", "gender", "job"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_cols]

### 2. Splitting Data intro Training and Testing sets

In [None]:
def train_test_split(df, test_size=0.2):
    # Shuffle the dataframe
    df = df.sample(frac=1, random_state=42).reset_index(drop=True)
    
    # Calculate the number of test samples
    test_count = int(len(df) * test_size)
    
    # Split the dataframe
    df_train = df[:-test_count]
    df_test = df[-test_count:]
    
    return df_train, df_test

# Perform the split
df_train, df_test = train_test_split(df, test_size=0.2)

# Display the shapes of the resulting dataframes
print(f"Training set shape: {df_train.shape}")
print(f"Testing set shape: {df_test.shape}")

In [None]:
feature_cols = [col for col in df.columns if col not in ["trans_date_trans_time", "is_fraud"]]
target_col = "is_fraud"

In [None]:
X_train = df_train[feature_cols]
y_train = df_train[target_col]

X_test = df_test[feature_cols]
y_test = df_test[target_col]

### 3. OverSampling (Process Imbalanced Data)

The rule of thumb is: never mess up with your test set. Always split into test and train sets BEFORE trying oversampling/undersampling techniques!

Oversampling before splitting the data can allow the exact same observations to be present in both the test and train sets. This can allow model to simply memorize specific data points and cause overfitting and poor generalization to the test data. Data leakage can cause you to create overly optimistic if not completely invalid predictive models.

![](https://dataaspirant.com/wp-content/uploads/2020/08/10-oversampling.png)
Picture Credit: https://dataaspirant.com

In [None]:
from imblearn.over_sampling import SMOTE

smote = SMOTE(random_state=0)
X_train_smote, y_train_smote = smote.fit_resample(X_train, y_train)
print('Feature/label dataset for training before applying SMOTE: ', X_train.shape, y_train.shape)
print('Feature/label dataset for training after applying SMOTE: ', X_train_smote.shape, y_train_smote.shape)
print('Distribution of label values after applying SMOTE:\n',pd.Series(y_train_smote).value_counts())

# VI. Building Model (Spark ML)

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("Streaming Fraud Detection") \
        .config("spark.executor.memory", "16g").config("spark.executor.cores", "4") \
        .config("spark.task.cpus", "1").config("spark.driver.memory", "8g") \
        .config("spark.driver.cores", "4").config("spark.executor.resource.gpu.amount", "1") \
        .config("spark.executor.resource.gpu.discoveryScript", "/usr/bin/nvidia-smi") \
        .config("spark.rapids.sql.enabled", "true") \
        .config("spark.rapids.memory.pinnedPool.size", "2G") \
        .config("spark.sql.shuffle.partitions", "200") \
        .getOrCreate()

In [None]:
pandas_df = pd.DataFrame(X_train_smote, columns=X_train.columns)
pandas_df['is_fraud'] = y_train_smote

In [None]:
pandas_df.head()

In [None]:
pandas_df.info()

In [None]:
spark_df = spark.createDataFrame(pandas_df)

In [None]:
feature_cols = [col for col in spark_df.columns if col != "is_fraud"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

spark_df = assembler.transform(spark_df).select("features", "label")