In [None]:
# Load Data
import kagglehub

# Download latest version
path = kagglehub.dataset_download("ismetsemedov/transactions")

print("Path to dataset files:", path)

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('FinAnalysis').config("spark.driver.memory", "8g").getOrCreate()
spark

In [None]:
txn_df = spark.read.csv(
    "./data/transactions_dataset.csv",
    header=True,
    inferSchema=True
)
txn_df = txn_df.repartition(30)

In [3]:
txn_df.show()

+--------------+-----------+----------------+--------------------+-----------------+-------------+--------------------+---------+--------+---------+------------+---------+---------------+------------+-----------+-------+--------------------+---------------+------------------+------------------+----------------+-------------------+--------------------+--------+
|transaction_id|customer_id|     card_number|           timestamp|merchant_category|merchant_type|            merchant|   amount|currency|  country|        city|city_size|      card_type|card_present|     device|channel|  device_fingerprint|     ip_address|distance_from_home|high_risk_merchant|transaction_hour|weekend_transaction|  velocity_last_hour|is_fraud|
+--------------+-----------+----------------+--------------------+-----------------+-------------+--------------------+---------+--------+---------+------------+---------+---------------+------------+-----------+-------+--------------------+---------------+-----------------

In [4]:
dataset_columns = txn_df.columns
print(dataset_columns)

['transaction_id', 'customer_id', 'card_number', 'timestamp', 'merchant_category', 'merchant_type', 'merchant', 'amount', 'currency', 'country', 'city', 'city_size', 'card_type', 'card_present', 'device', 'channel', 'device_fingerprint', 'ip_address', 'distance_from_home', 'high_risk_merchant', 'transaction_hour', 'weekend_transaction', 'velocity_last_hour', 'is_fraud']


In [5]:
# Check for null values
from pyspark.sql.functions import col, sum
null_values = txn_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in dataset_columns])

null_values.show()

#No null values

+--------------+-----------+-----------+---------+-----------------+-------------+--------+------+--------+-------+----+---------+---------+------------+------+-------+------------------+----------+------------------+------------------+----------------+-------------------+------------------+--------+
|transaction_id|customer_id|card_number|timestamp|merchant_category|merchant_type|merchant|amount|currency|country|city|city_size|card_type|card_present|device|channel|device_fingerprint|ip_address|distance_from_home|high_risk_merchant|transaction_hour|weekend_transaction|velocity_last_hour|is_fraud|
+--------------+-----------+-----------+---------+-----------------+-------------+--------+------+--------+-------+----+---------+---------+------------+------+-------+------------------+----------+------------------+------------------+----------------+-------------------+------------------+--------+
|             0|          0|          0|        0|                0|            0|       0|   

In [6]:
# Check Data distribution
total_count = txn_df.count()
fraud_count = txn_df.groupBy("is_fraud").count()
fraud_count.show()

+--------+-------+
|is_fraud|  count|
+--------+-------+
|    true|1494719|
|   false|5989047|
+--------+-------+



In [7]:
print("Number of partitions : ", txn_df.rdd.getNumPartitions())

Number of partitions :  30


In [8]:
# Check the datatypes of the columns
txn_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- merchant_type: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- currency: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- city_size: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_present: boolean (nullable = true)
 |-- device: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- device_fingerprint: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- distance_from_home: integer (nullable = true)
 |-- high_risk_merchant: boolean (nullable = true)
 |-- transaction_hour: integer (nullable = true)
 |-- weekend_transaction: boolean (nullable = true)
 |-- velocity_last_hour: string (nu

In [9]:
txn_df.select("timestamp").show()

+--------------------+
|           timestamp|
+--------------------+
|2024-10-01 13:52:...|
|2024-09-30 07:22:...|
|2024-10-01 02:38:...|
|2024-09-30 07:12:...|
|2024-10-01 10:11:...|
|2024-09-30 12:29:...|
|2024-09-30 09:00:...|
|2024-09-30 06:25:...|
|2024-09-30 10:45:...|
|2024-09-30 16:48:...|
|2024-09-30 16:03:...|
|2024-10-01 09:14:...|
|2024-09-30 20:46:...|
|2024-09-30 21:01:...|
|2024-09-30 22:51:...|
|2024-09-30 08:40:...|
|2024-09-30 09:35:...|
|2024-10-01 13:02:...|
|2024-09-30 16:43:...|
|2024-10-01 05:39:...|
+--------------------+
only showing top 20 rows


In [10]:
for field in txn_df.schema.fields:
    print(field.name,"-",field.dataType)

transaction_id - StringType()
customer_id - StringType()
card_number - LongType()
timestamp - TimestampType()
merchant_category - StringType()
merchant_type - StringType()
merchant - StringType()
amount - DoubleType()
currency - StringType()
country - StringType()
city - StringType()
city_size - StringType()
card_type - StringType()
card_present - BooleanType()
device - StringType()
channel - StringType()
device_fingerprint - StringType()
ip_address - StringType()
distance_from_home - IntegerType()
high_risk_merchant - BooleanType()
transaction_hour - IntegerType()
weekend_transaction - BooleanType()
velocity_last_hour - StringType()
is_fraud - BooleanType()


In [11]:
for col in txn_df.columns:
    unq_count = txn_df.select(col).distinct().count()
    print(f"{col} : {unq_count} unq values")

transaction_id : 7477306 unq values
customer_id : 4869 unq values
card_number : 5000 unq values
timestamp : 7483754 unq values
merchant_category : 8 unq values
merchant_type : 17 unq values
merchant : 105 unq values
amount : 2831167 unq values
currency : 11 unq values
country : 12 unq values
city : 11 unq values
city_size : 2 unq values
card_type : 5 unq values
card_present : 2 unq values
device : 9 unq values
channel : 3 unq values
device_fingerprint : 785462 unq values
ip_address : 7477187 unq values
distance_from_home : 2 unq values
high_risk_merchant : 2 unq values
transaction_hour : 24 unq values
weekend_transaction : 2 unq values
velocity_last_hour : 7483740 unq values
is_fraud : 2 unq values


# Data Preprocessing

In [12]:
from pyspark.sql.functions import col
cols_to_drop = [
    "transaction_id",
    "customer_id",
    "card_number",
    "timestamp",
    "device_fingerprint",
    "ip_address",
    "velocity_last_hour"
]

txn_df = txn_df.drop(*cols_to_drop)
txn_df = txn_df.withColumn("is_fraud", col("is_fraud").cast("integer"))
txn_df.show()

+-----------------+-------------+-----------------+---------+--------+---------+------------+---------+---------------+------------+---------------+-------+------------------+------------------+----------------+-------------------+--------+
|merchant_category|merchant_type|         merchant|   amount|currency|  country|        city|city_size|      card_type|card_present|         device|channel|distance_from_home|high_risk_merchant|transaction_hour|weekend_transaction|is_fraud|
+-----------------+-------------+-----------------+---------+--------+---------+------------+---------+---------------+------------+---------------+-------+------------------+------------------+----------------+-------------------+--------+
|           Travel|     airlines|         Emirates|  2264.21|     SGD|Singapore|Unknown City|   medium|  Premium Debit|       false|        iOS App| mobile|                 1|              true|               1|              false|       0|
|              Gas|        major|   

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, StandardScaler

# Identifying columns
string_cols = [field.name for field in txn_df.schema.fields if field.dataType.typeName() == "string"]
boolean_cols = [field.name for field in txn_df.schema.fields if field.dataType.typeName() == "boolean"]
numeric_cols = [field.name for field in txn_df.schema.fields if field.dataType.typeName in ["integer", "double", "long"]]
timestamp_cols = [field.name for field in txn_df.schema.fields if field.dataType.typeName == "timestamp"]

In [14]:
indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep") for col in string_cols]


feature_cols = [col+"_idx" for col in string_cols] + boolean_cols + numeric_cols
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

scaler = StandardScaler(inputCol="features", outputCol="scaled_features")


# Building the Pipeline
pipeline = Pipeline(stages= indexers + [assembler , scaler])

pipeline_model = pipeline.fit(txn_df)
preprocessed_df = pipeline_model.transform(txn_df)

In [15]:
preprocessed_df.show()

+-----------------+-------------+-----------------+---------+--------+---------+------------+---------+---------------+------------+---------------+-------+------------------+------------------+----------------+-------------------+--------+---------------------+-----------------+------------+------------+-----------+--------+-------------+-------------+----------+-----------+--------------------+--------------------+
|merchant_category|merchant_type|         merchant|   amount|currency|  country|        city|city_size|      card_type|card_present|         device|channel|distance_from_home|high_risk_merchant|transaction_hour|weekend_transaction|is_fraud|merchant_category_idx|merchant_type_idx|merchant_idx|currency_idx|country_idx|city_idx|city_size_idx|card_type_idx|device_idx|channel_idx|            features|     scaled_features|
+-----------------+-------------+-----------------+---------+--------+---------+------------+---------+---------------+------------+---------------+-------+--

In [16]:
train_df, test_df = preprocessed_df.randomSplit([0.8, 0.2], seed=42)

In [17]:
from pyspark.ml.classification import LogisticRegression

logistic_regression = LogisticRegression(featuresCol="scaled_features", labelCol="is_fraud")

In [18]:
model = logistic_regression.fit(train_df)

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test_df)
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud")
auc = evaluator.evaluate(predictions)
print(f"Test AUC : {auc}")

Test AUC : 0.8455208169037175


In [20]:
from pyspark.sql.functions import col

conf_df = predictions.select(
    col("is_fraud").alias("actual"),
    col("prediction").cast("integer").alias("predicted")
)

conf_matrix = conf_df.groupBy("actual","predicted").count().orderBy("actual", "predicted")
conf_matrix.show()

+------+---------+-------+
|actual|predicted|  count|
+------+---------+-------+
|     0|        0|1198388|
|     1|        0| 168690|
|     1|        1| 130233|
+------+---------+-------+

