In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col

spark = SparkSession.builder \
    .appName("FraudAnalytics") \
    .enableHiveSupport() \
    .getOrCreate()


df_raw_sample = spark.read.csv("C:\\Users\\pradh\\Desktop\\archive (1)\\Synthetic_Financial_datasets_log.csv", header=True, inferSchema=True)
df_raw_sample.printSchema()
df_raw_sample.show(5, truncate=False)


root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|type    |amount  |nameOrig   |oldbalanceOrg|newbalanceOrig|nameDest   |oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|1   |PAYMENT |9839.64 |C1231006815|170136.0     |160296.36     |M1979787155|0.0           |0.0           |0      |0             |
|1   |PAY

In [3]:
from pyspark.sql.types import *

schema = StructType([
    StructField("step", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("nameOrig", StringType(), True),
    StructField("oldbalanceOrg", DoubleType(), True),
    StructField("newbalanceOrg", DoubleType(), True),
    StructField("nameDest", StringType(), True),
    StructField("oldbalanceDest", DoubleType(), True),
    StructField("newbalanceDest", DoubleType(), True),
    StructField("isFraud", IntegerType(), True),
    StructField("isFlaggedFraud", IntegerType(), True),
])

df = spark.read.csv("C:\\Users\\pradh\\Desktop\\archive (1)\\Synthetic_Financial_datasets_log.csv", header=True, schema=schema)


In [4]:
# Row count
total_rows = df.count(); print("rows:", total_rows)

# Distinct values & sanity of key columns
df.select("type").distinct().show(50)
df.groupBy("isFraud").count().orderBy("isFraud").show()
df.groupBy("isFlaggedFraud").count().orderBy("isFlaggedFraud").show()

# Null counts column 
from pyspark.sql.functions import sum as _sum
df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

# Numeric summaries
df.select("amount","oldbalanceOrg","newbalanceOrg","oldbalanceDest","newbalanceDest").summary("count","min","max","mean","stddev").show()


rows: 6362620
+--------+
|    type|
+--------+
|TRANSFER|
| CASH_IN|
|CASH_OUT|
| PAYMENT|
|   DEBIT|
+--------+

+-------+-------+
|isFraud|  count|
+-------+-------+
|      0|6354407|
|      1|   8213|
+-------+-------+

+--------------+-------+
|isFlaggedFraud|  count|
+--------------+-------+
|             0|6362604|
|             1|     16|
+--------------+-------+

+----+----+------+--------+-------------+-------------+--------+--------------+--------------+-------+--------------+
|step|type|amount|nameOrig|oldbalanceOrg|newbalanceOrg|nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+----+------+--------+-------------+-------------+--------+--------------+--------------+-------+--------------+
|   0|   0|     0|       0|            0|            0|       0|             0|             0|      0|             0|
+----+----+------+--------+-------------+-------------+--------+--------------+--------------+-------+--------------+

+-------+------------------+-------

In [5]:
from pyspark.sql.functions import when

# Negative checks
neg_amount = df.filter(col("amount") < 0).count()
neg_org = df.filter((col("oldbalanceOrg") < 0) | (col("newbalanceOrg") < 0)).count()
neg_dest = df.filter((col("oldbalanceDest") < 0) | (col("newbalanceDest") < 0)).count()
print("neg_amount:", neg_amount, "neg_org:", neg_org, "neg_dest:", neg_dest)

# Duplicates
dups = df.count() - df.dropDuplicates().count()
print("exact_duplicate_rows:", dups)

# Weird flags
df.groupBy("isFraud","isFlaggedFraud").count().orderBy("isFraud","isFlaggedFraud").show()


neg_amount: 0 neg_org: 0 neg_dest: 0
exact_duplicate_rows: 0
+-------+--------------+-------+
|isFraud|isFlaggedFraud|  count|
+-------+--------------+-------+
|      0|             0|6354407|
|      1|             0|   8197|
|      1|             1|     16|
+-------+--------------+-------+



In [6]:

df_clean = df.dropDuplicates()

print("After cleaning:", df_clean.count())


After cleaning: 6362620


In [7]:
# Fraud vs non-fraud 
df_clean.groupBy("isFraud").count().show()

# Amount summary
df_clean.select("amount").summary("count","min","max","mean","stddev").show()


+-------+-------+
|isFraud|  count|
+-------+-------+
|      1|   8213|
|      0|6354407|
+-------+-------+

+-------+------------------+
|summary|            amount|
+-------+------------------+
|  count|           6362620|
|    min|               0.0|
|    max|     9.244551664E7|
|   mean|179861.90354913048|
| stddev| 603858.2314629363|
+-------+------------------+



In [8]:
spark.sql("CREATE DATABASE IF NOT EXISTS fraud_analytics")
spark.catalog.setCurrentDatabase("fraud_analytics")

df_clean.write.mode("overwrite").format("parquet").saveAsTable("transactions_clean")

# sanity check
spark.sql("SHOW TABLES").show()
spark.sql("SELECT COUNT(*) FROM transactions_clean").show()


SparkRuntimeException: [LOCATION_ALREADY_EXISTS] Cannot name the managed table as `spark_catalog`.`fraud_analytics`.`transactions_clean`, as its associated location 'file:/C:/Users/pradh/Desktop/Financial_Fraud_Analysis/spark-warehouse/fraud_analytics.db/transactions_clean' already exists. Please pick a different table name, or remove the existing location first. SQLSTATE: 42710