# Analyzing Financial Transaction Data using Spark ✨
## Topics
- Use SparkSQL for database
- Regression
- Classification
- Clustering
- Summarize

# Initialization (Must do)

In [None]:
!pip install pyspark
!pip install findspark



In [None]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context.
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
# Creating a spark context class
sc = SparkContext()

In [None]:
# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames Import Student") \
    .config("spark.some.config.option", "some-value") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

## Import Spark Dataframe

In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
df = spark.read.csv("/content/drive/MyDrive/Financial_Dataset/archive/transactions_data_updated.csv", header=True, inferSchema=True)
#df.show()

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)
 |-- is_fraud: string (nullable = true)



### Merge json labels of fraud detection into the csv file (ignore this)

In [None]:
#import json

In [None]:
# Load the JSON file as a dictionary
'''
with open('/content/drive/My Drive/Financial_Dataset/archive/train_fraud_labels.json') as f:
    fraud_labels = json.load(f)
'''

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import lit, col, create_map
from itertools import chain
'''
# Convert the fraud_labels dictionary to a Spark DataFrame
fraud_df = spark.createDataFrame(fraud_labels['target'].items(), ["id", "is_fraud"])

# Cast the 'id' column in fraud_df to integer to match the transaction DataFrame
fraud_df = fraud_df.withColumn("id", col("id").cast("integer"))

# Join the transaction DataFrame with the fraud_df on the 'id' column (on the right)
df = df.join(fraud_df, on="id", how="right")'''

In [None]:
## I think we should convert the amount into float first
from pyspark.sql.functions import udf
def clean(s):
  if s[0] == '$':
    s = s[1:]
  return s

amountToFloatUDF = udf(lambda s:clean(s))

In [None]:
from pyspark.sql.types import DoubleType

df = df.withColumn("amount",amountToFloatUDF("amount").cast(DoubleType()))

In [None]:
df.printSchema()

In [None]:
df.show()

# Spark SQL

In [None]:
from pyspark.sql.functions import sum,count,avg

# Group by 'merchant_city' and calculate the sum of 'amount'
city_amount_df = df.groupBy("merchant_city").agg(
    sum("amount").alias("total_amount"),
    count("*").alias("entry_count"),
    avg("amount").alias("average_amount")
)

# Sort the result by 'total_amount' in descending order
sorted_city_amount_df = city_amount_df.orderBy("total_amount", ascending=False)
sorted_city_amount_df.show(10, truncate=False) ## this is the data frames with the count of all data

In [None]:
# Don't use anymore found a more efficient way
'''def getYear(date):
  columns = date.split("-")
  try:
    year = int(columns)
    if year < 2000 and year < 2025:
      return year
  except:
    pass
  return -1
getYearUDF = udf(lambda date : getYear(date))
#year_amount_df = df.withColumn("year", getYearUDF("date")).groupBy("year").agg(sum("amount").alias("total_amount"))'''

In [None]:
from pyspark.sql.functions import sum, year

# Instead of using the getYearUDF use the year function in the spark sql liabrary
year_amount_df = df.withColumn("year", year("date")).groupBy("year").agg(
    sum("amount").alias("total_amount"),
    count("*").alias("entry_count"),
    avg("amount").alias("average_amount")
)

# Sort the result by 'year' in ascending order
sorted_year_amount_df = year_amount_df.orderBy("year",ascending=False)
sorted_year_amount_df.show()

# Regression

In [None]:
import pandas as pd

In [None]:
df.show(20)

+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+-------+----+------+--------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|errors|is_fraud|
+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+-------+----+------+--------+
|7475338|2010-01-01 00:23:00|      554|   3912|  $3.51| Swipe Transaction|      67570|     Pearland|            TX|77581.0|5311|  NULL|      No|
|7475353|2010-01-01 00:43:00|      301|   3742| $10.17|Online Transaction|      39021|       ONLINE|          NULL|   NULL|4784|  NULL|      No|
|7475365|2010-01-01 01:01:00|      820|    127|$270.22|Online Transaction|      73186|       ONLINE|          NULL|   NULL|4814|  NULL|      No|
|7475379|2010-01-01 01:22:00|      986|   4755|  $1.85| Swipe Transaction|      14528|  Garden City|            NY|11530.0|5499|  

### Extract feature from date

In [None]:
from pyspark.sql.functions import hour, dayofweek, col

df = df.withColumn("hour", hour("date"))
df = df.withColumn("dayofweek", dayofweek("date"))
df = df.withColumn("use_chip", col("use_chip").cast("int"))
df.show(20)

+-------+-------------------+---------+-------+-------+--------+-----------+-------------+--------------+-------+----+------+--------+----+---------+
|     id|               date|client_id|card_id| amount|use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|errors|is_fraud|hour|dayofweek|
+-------+-------------------+---------+-------+-------+--------+-----------+-------------+--------------+-------+----+------+--------+----+---------+
|7475338|2010-01-01 00:23:00|      554|   3912|  $3.51|    NULL|      67570|     Pearland|            TX|77581.0|5311|  NULL|      No|   0|        6|
|7475353|2010-01-01 00:43:00|      301|   3742| $10.17|    NULL|      39021|       ONLINE|          NULL|   NULL|4784|  NULL|      No|   0|        6|
|7475365|2010-01-01 01:01:00|      820|    127|$270.22|    NULL|      73186|       ONLINE|          NULL|   NULL|4814|  NULL|      No|   1|        6|
|7475379|2010-01-01 01:22:00|      986|   4755|  $1.85|    NULL|      14528|  Garden City|          

### Remove unnecessary columns

In [None]:
df = df.drop("id", "date")

### Clean \$ sign from the amount column

In [None]:
from pyspark.sql.functions import regexp_replace, col
df = df.withColumn("amount", regexp_replace("amount", "\\$", "").cast("float"))

### Split just a fraction of data for faster training

In [None]:
df = df.sample(withReplacement=False, fraction=0.005, seed=42)

In [None]:
df.count()

44435

### Encoding categorical columns

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

categorical_cols = ["client_id", "card_id", "merchant_id", "merchant_city", "merchant_state", "zip", "mcc"]

indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_vec") for col in categorical_cols]


In [None]:
indexed_df = df
for i in range(len(indexers)):
    indexer = indexers[i]
    print(f"Indexing: {i+1}/{len(indexers)}")
    indexed_df = indexer.fit(indexed_df).transform(indexed_df)

Indexing: 1/7
Indexing: 2/7
Indexing: 3/7
Indexing: 4/7
Indexing: 5/7
Indexing: 6/7
Indexing: 7/7


In [None]:
encoder_df = indexed_df
for i in range(len(encoders)):
  encoder = encoders[i]
  print(f"Encoding: {i+1}/{len(encoders)}")
  encoder_df = encoder.fit(encoder_df).transform(encoder_df)

Encoding: 1/7
Encoding: 2/7
Encoding: 3/7
Encoding: 4/7
Encoding: 5/7
Encoding: 6/7
Encoding: 7/7


In [None]:
# from pyspark.ml.feature import VectorAssembler
# from pyspark.sql.functions import col

# # Define the feature columns
# included_columns = ["merchant_city"]
# indexed_columns = [column+"_idx" for column in included_columns]
# featuresColumns = indexed_columns + ["mcc","amount"]
# indexed_df_filled = indexed_df.na.fill(0, subset=featuresColumns)

# assembler = VectorAssembler(inputCols=featuresColumns, outputCol="features")
# assembled_df = assembler.transform(indexed_df_filled)

# # Show the schema of the assembled dataframe
# assembled_df.printSchema()

### Assembling features

In [None]:
feature_cols = ["hour", "dayofweek"] + [col+"_vec" for col in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [None]:
# Apply vector assembler to encoder_df
assembled_df = assembler.transform(encoder_df)

In [None]:
assembled_df.show(5)

+---------+-------+------+--------+-----------+-------------+--------------+-------+----+------+--------+----+---------+-------------+-----------+---------------+-----------------+------------------+-------+-------+------------------+-------------------+------------------+------------------+------------------+-------------------+----------------+--------------------+
|client_id|card_id|amount|use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|errors|is_fraud|hour|dayofweek|client_id_idx|card_id_idx|merchant_id_idx|merchant_city_idx|merchant_state_idx|zip_idx|mcc_idx|     client_id_vec|        card_id_vec|   merchant_id_vec| merchant_city_vec|merchant_state_vec|            zip_vec|         mcc_vec|            features|
+---------+-------+------+--------+-----------+-------------+--------------+-------+----+------+--------+----+---------+-------------+-----------+---------------+-----------------+------------------+-------+-------+------------------+-------------------+------

### Regressor

In [None]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="amount", featuresCol="features", numTrees=5)

In [None]:
train, test = assembled_df.randomSplit([0.8, 0.2], seed=12345678)

In [None]:
model = rf.fit(train)

In [None]:
model.save("model_regression")

### Spark pipeline

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])
train, test = df.randomSplit([0.8, 0.2], seed=12345678)
model = pipeline.fit(train)

### Predict

In [None]:
predictions = model.transform(test)
predictions.select("amount", "prediction").show()

In [None]:
model.save("regressionModel")

In [None]:
import time
for i in range(3600):
  print(i)
  # Keep colab alive
  time.sleep(1)

# Classification
- Binary Classification (Logistic Regression) , to classify if the transcation is a fraud or not based on specified features

In [None]:
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, StandardScaler
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

df.printSchema()

In [None]:
from pyspark.ml.feature import StringIndexer

#do string indexer first, for features like use_chip, merchant_city, errors
included_columns = ["use_chip", "merchant_city", "errors"]
indexed_columns = [column+"_indexed" for column in included_columns]

indexers = [StringIndexer(inputCol=included_columns[i], outputCol=indexed_columns[i]).setHandleInvalid("keep")for i in range(len(included_columns))]

# The indexers list now contains the StringIndexer stages for each categorical column.
# These will be used later, for example, in a Pipeline or applied sequentially.
# Apply indexers sequentially to create indexed columns
indexed_df = df
for indexer in indexers:
    indexed_df = indexer.fit(indexed_df).transform(indexed_df)

# Show the schema to confirm the new indexed columns exist
indexed_df.printSchema()

In [None]:
# Also do indexer for is_fraud (cuz its a string), which is a label since we want to predict fraud
label_indexer = StringIndexer(inputCol="is_fraud", outputCol="label")
indexed_df = label_indexer.fit(indexed_df).transform(indexed_df)
indexed_df.printSchema()

In [None]:
# feature columns for classification (use_chip, merchant_city, errors, mcc, amount)
featuresColumns = indexed_columns + ["mcc","amount"]

In [None]:
# I decided to use pipeline
assembler = VectorAssembler(inputCols=featuresColumns, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
lr = LogisticRegression(labelCol="label", featuresCol="scaled_features")

In [None]:
# Creating and fitting pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])

In [None]:
(trainingData, testData) = indexed_df.randomSplit([0.8,0.2], seed = 6580043)
model = pipeline.fit(trainingData)

## Hyperparameter Tuning (Model Tuning)
improve the performance of the algorithm on that data by trying different settings and pick the one that performs best using cross-validation.

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(lr.fitIntercept, [False, True]) \
    .addGrid(lr.maxIter, [5, 10,20]) \
    .build()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(indexed_df)

In [None]:
# list of average evaluation scores (e.g., accuracy) for each hyperparameter combination tested
cvModel.avgMetrics

In [None]:
predictions = cvModel.transform(testData)

In [None]:
predictions.select("probability","prediction","label").show(truncate=False)

## Evalutions

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
# Evaluate model performance
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator.evaluate(predictions)
print("Recall =", recall)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator.evaluate(predictions)
print("F1 score = ", f1_score)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="label")
# true positive rate vs false positive
AUC = evaluator.evaluate(predictions)
print("AUC = %g " % (AUC))

In [None]:
predictions.select("probability","prediction", "label").show(truncate=False)

In [None]:
columns_to_save = ["id", "prediction", "is_fraud"]
predictions.select(columns_to_save).write.save("classification_predictions", format="com.databricks.spark.csv", header=True)

In [None]:
import pandas as pd
import glob

files = glob.glob("classification_predictions/part-*")
df = pd.concat([pd.read_csv(f) for f in files])
df.to_csv("merged_predictions.csv", index=False)

## Confusion Matrix

In [None]:
import pandas as pd
from sklearn.metrics import confusion_matrix
import numpy as np

csvfile = './classification_predictions.csv'
test_df = pd.read_csv(csvfile)

result=[]
actual=[]

#columns : id, predicted, actual

# Use the DataFrame directly instead of reading the file line by line
# 0 = no fraud, 1 = fraud
for index, row in test_df.iterrows():
    result.append(row['prediction'])
    if row['is_fraud'] == 'Yes':
        actual.append(1)
    else:
        actual.append(0)

cnf_mat=confusion_matrix(actual,result)

In [None]:
from sklearn.metrics import accuracy_score

print(cnf_mat)
print('Test Accuracy:', accuracy_score(actual,result))

In [None]:
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt

classes = ["0", "1"]
df_cfm = pd.DataFrame(cnf_mat, index = classes, columns = classes)
plt.figure(figsize = (10,7))

cfm_plot = sn.heatmap(df_cfm, annot=True)
cfm_plot.figure.savefig("cfm.png")

In [None]:
# initially i accidentally swapped the false neg and true pos so the matrix looked odd. but i now changed them to the correct position, sorry
group_names = ["True Neg","False Pos","False Neg", "True Pos"]
group_counts = ["{0:0.0f}".format(value) for value in cnf_mat.flatten()]
group_percentages = ["{0:.2%}".format(value) for value in cnf_mat.flatten()/np.sum(cnf_mat)]

labels = [f"{v1}\n{v2}\n{v3}" for v1, v2, v3 in
          zip(group_names,group_counts,group_percentages)]

labels = np.asarray(labels).reshape(2,2)
sn.heatmap(cnf_mat, annot=labels, fmt='', cmap='Blues')

# Clusterring

In [None]:
df.describe().toPandas().transpose()

### Next step is choosing relevant data to include as a features column

Id does not have any meaning so, id, client_id, card_id, merchant_id will be exclude

But before we can use the column to analyze we have to use the indexer to change all the string value to numerical value before using it to do the analysis or training

In [None]:
from pyspark.ml.feature import StringIndexer

included_columns = ["use_chip", "merchant_city", "errors", "is_fraud"]
indexed_columns = [column+"_indexed" for column in included_columns]

indexers = [StringIndexer(inputCol=included_columns[i], outputCol=indexed_columns[i]).setHandleInvalid("keep")for i in range(len(included_columns))]

# The indexers list now contains the StringIndexer stages for each categorical column.
# These will be used later, for example, in a Pipeline or applied sequentially.
# Apply indexers sequentially to create indexed columns
indexed_df = df
for indexer in indexers:
    indexed_df = indexer.fit(indexed_df).transform(indexed_df)

# Show the schema to confirm the new indexed columns exist
indexed_df.printSchema()

In [None]:
print(indexed_columns)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Define the feature columns
featuresColumns = indexed_columns + ["mcc","amount"]
indexed_df_filled = indexed_df.na.fill(0, subset=featuresColumns)

assembler = VectorAssembler(inputCols=featuresColumns, outputCol="features")
assembled_df = assembler.transform(indexed_df_filled)

# Show the schema of the assembled dataframe
assembled_df.printSchema()

In [None]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol = "features", outputCol = "scaled_features")
scaled_df = scaler.fit(assembled_df).transform(assembled_df)

In [None]:
# Test data, just for debuggin code and don't have to wait 5mins
(trainData,testData) = scaled_df.randomSplit([0.7,0.3], seed = 6580081)

In [None]:
# Increase parallelism, making the job faster and more memory-efficient
trainData = trainData.repartition(200)

## KMeans
K-means is an iterative, centroid-based clustering algorithm that partitions a dataset into similar groups based on the distance between their centroids. Default k is 2

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import col

# Now fit KMeans on the processed trainData using the existing 'scaled_features'
kmeans = KMeans(featuresCol='scaled_features')
model = kmeans.fit(trainData)

In [None]:
predictions = model.transform(testData)
predictions.show()

In [None]:
predictions.groupBy('prediction').count().show()

## elbow method to determine the best k

In [None]:
import matplotlib.pyplot as plt

cost = []
for k in range(2, 11):
    kmeans = KMeans(featuresCol='scaled_features', k=k, maxIter=10)
    model = kmeans.fit(trainData)
    cost.append(model.summary.trainingCost)

plt.plot(range(2, 11), cost, marker='o')
plt.xlabel('Number of Clusters (k)')
plt.ylabel('Within Set Sum of Squared Errors (WSS)')
plt.title('Elbow Method For Optimal k')
plt.show()


We can see that the graph's steep drops from k = 2 to k = 5
And after k = 5, the reduction slows down (means small improvements for more clusters), not worth it. So k should be 5

In [None]:
# clusters = 5
kmeans = KMeans(featuresCol='scaled_features', k=5)
model = kmeans.fit(trainData)

In [None]:
# make predictions
predictions = model.transform(trainData)

In [None]:
predictions.groupBy('prediction').count().show()

Cost (based on Silhouette Score)
- Close to 1 → well-separated clusters
- Close to 0 → overlapping clusters, bad

In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
cost = evaluator.evaluate(predictions)

In [None]:
print(cost)

In [None]:
centers = model.clusterCenters()

In [None]:
print(centers)

In [None]:
featuresColumns

In [None]:
# get stats for each cluster
for i in range(5):
  predictions.filter(predictions['prediction'] == i) \
        .select(featuresColumns) \
        .describe() \
        .show()

In [None]:
# prediction is cluster number
columns_to_save = ["id", "prediction"]
predictions.select(columns_to_save).write.save("clustering_predictions", format="com.databricks.spark.csv", header=True)

In [None]:
import pandas as pd
import glob

files = glob.glob("clustering_predictions/part-*")
df = pd.concat([pd.read_csv(f) for f in files])
df.to_csv("merged_predictions.csv", index=False)

In [None]:
sc.stop()

In [None]:
# Merge the two dataframes based on the 'id' column
merged_df = df1.join(df2, on="id", how="inner")

# Show the first few rows of the merged dataframe to verify
merged_df.show()