In [16]:
from pyspark.sql.functions import when

df_risk = df_cleaned.withColumn(
    "fraud_score",
    when((col("amount") > 20000) & (col("oldbalanceOrg") < 1000), 3)
    .when(col("amount") > 15000, 2)
    .when(col("isFraud") == 1, 4)
    .otherwise(1)
)

df_risk.select("amount", "oldbalanceOrg", "isFraud", "fraud_score", "risk_level").show(10)


StatementMeta(mysparkpool, 0, 16, Finished, Available, Finished)

+--------+-------------+-------+-----------+----------+
|  amount|oldbalanceOrg|isFraud|fraud_score|risk_level|
+--------+-------------+-------+-----------+----------+
| 9839.64|     170136.0|      0|          1|    NORMAL|
| 1864.28|      21249.0|      0|          1|    NORMAL|
|   181.0|        181.0|      1|          4|    NORMAL|
|   181.0|        181.0|      1|          4|    NORMAL|
|11668.14|      41554.0|      0|          1|      HIGH|
| 7817.71|      53860.0|      0|          1|    NORMAL|
| 7107.77|     183195.0|      0|          1|    NORMAL|
| 7861.64|    176087.23|      0|          1|    NORMAL|
| 4024.36|       2671.0|      0|          1|    NORMAL|
| 5337.77|      41720.0|      0|          1|    NORMAL|
+--------+-------------+-------+-----------+----------+
only showing top 10 rows



In [17]:
df_risk.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("financial.transactions_with_risk_score")


StatementMeta(mysparkpool, 0, 17, Finished, Available, Finished)

In [18]:
#Analytics with Spark SQL

#1.Total Fraud Transactions
spark.sql("""
SELECT COUNT(*) AS total_frauds
FROM financial.transactions_with_risk_score
WHERE isFraud = 1
""").show()


StatementMeta(mysparkpool, 0, 18, Finished, Available, Finished)

+------------+
|total_frauds|
+------------+
|        8213|
+------------+



In [19]:
#Top 10 Originators by Fraud Amount

spark.sql("""
SELECT nameOrig, SUM(amount) AS total_lost
FROM financial.transactions_with_risk_score
WHERE isFraud = 1
GROUP BY nameOrig
ORDER BY total_lost DESC
LIMIT 10
""").show()


StatementMeta(mysparkpool, 0, 19, Finished, Available, Finished)

+-----------+----------+
|   nameOrig|total_lost|
+-----------+----------+
|C1057439889|     1.0E7|
| C576718894|     1.0E7|
|C1751546135|     1.0E7|
|C1208192074|     1.0E7|
|C1879989290|     1.0E7|
|C1421664654|     1.0E7|
| C884104189|     1.0E7|
| C618976547|     1.0E7|
|C1036572575|     1.0E7|
|C1577275521|     1.0E7|
+-----------+----------+



In [20]:
# Fraud Score Distribution

spark.sql("""
SELECT fraud_score, COUNT(*) AS transactions
FROM financial.transactions_with_risk_score
GROUP BY fraud_score
ORDER BY fraud_score DESC
""").show()


StatementMeta(mysparkpool, 0, 20, Finished, Available, Finished)

+-----------+------------+
|fraud_score|transactions|
+-----------+------------+
|          4|         372|
|          3|     1629199|
|          2|     3021429|
|          1|     1711620|
+-----------+------------+



In [21]:
#High-Risk Transactions Over Time

spark.sql("""
SELECT step, COUNT(*) AS high_risk_count
FROM financial.transactions_with_risk_score
WHERE risk_level = 'HIGH'
GROUP BY step
ORDER BY step
""").show()


StatementMeta(mysparkpool, 0, 21, Finished, Available, Finished)

+----+---------------+
|step|high_risk_count|
+----+---------------+
|   1|           1367|
|   2|            518|
|   3|            324|
|   4|            269|
|   5|            286|
|   6|           1109|
|   7|           3907|
|   8|          14980|
|   9|          30523|
|  10|          28615|
|  11|          30260|
|  12|          29702|
|  13|          30932|
|  14|          33212|
|  15|          36329|
|  16|          35089|
|  17|          35937|
|  18|          39980|
|  19|          40496|
|  20|          30537|
+----+---------------+
only showing top 20 rows

