In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!pip install -q pyspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"

In [3]:
import pyspark
sc = pyspark.SparkContext(appName="FraudDetection")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 09:09:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("FraudDetection") \
    .getOrCreate()

spark.sparkContext.getConf().getAll()
spark

In [13]:
import numpy as np 
import pandas as pd 
import json
import matplotlib.pyplot as plt
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql import functions as F

warnings.filterwarnings("ignore")
data_path='/kaggle/input/transactions-fraud-datasets'

In [14]:
with open(f"{data_path}/mcc_codes.json", "r") as f:
    mcc_dict = json.load(f)

# Convert to list of dicts
mcc_list = [{"mcc": int(k), "description": v} for k, v in mcc_dict.items()]
mcc_df = spark.createDataFrame(mcc_list)
mcc_df.printSchema()

root
 |-- description: string (nullable = true)
 |-- mcc: long (nullable = true)



In [15]:
with open(f'{data_path}/train_fraud_labels.json', 'r') as file:
    train_fraud_labels = json.load(file)
target_dict = {int(k):v for k,v in train_fraud_labels['target'].items()}

# Convert target_dict to PySpark DataFrame
target_df = spark.createDataFrame(target_dict.items(), ["id", "target"])

In [16]:
target_df.count() 

25/04/28 09:22:07 WARN TaskSetManager: Stage 29 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.
                                                                                                    

8914963

In [17]:
# Read CSV as PySpark DataFrame
transactions_df = spark.read.csv(f'{data_path}/transactions_data.csv', header=True, inferSchema=True)
print(transactions_df.count())
transactions_df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in transactions_df.columns]).show()

                                                                                                    

13305915




+---+----+---------+-------+------+--------+-----------+-------------+--------------+-------+---+--------+
| id|date|client_id|card_id|amount|use_chip|merchant_id|merchant_city|merchant_state|    zip|mcc|  errors|
+---+----+---------+-------+------+--------+-----------+-------------+--------------+-------+---+--------+
|  0|   0|        0|      0|     0|       0|          0|            0|       1563700|1652706|  0|13094522|
+---+----+---------+-------+------+--------+-----------+-------------+--------------+-------+---+--------+



                                                                                                    

In [18]:
transactions_df = transactions_df.fillna({'errors': 'No Error'})

# Drop unnecessary columns
transactions_df = transactions_df.drop('merchant_state', 'zip')

# Join transactions_df with target_df on 'id' column
transactions_df = transactions_df.join(target_df, on="id", how="inner")

In [19]:
print(transactions_df.count())
transactions_df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in transactions_df.columns]).show()

25/04/28 09:23:21 WARN TaskSetManager: Stage 41 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.
                                                                                                    

8914963


25/04/28 09:24:05 WARN TaskSetManager: Stage 50 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.

+---+----+---------+-------+------+--------+-----------+-------------+---+------+------+
| id|date|client_id|card_id|amount|use_chip|merchant_id|merchant_city|mcc|errors|target|
+---+----+---------+-------+------+--------+-----------+-------------+---+------+------+
|  0|   0|        0|      0|     0|       0|          0|            0|  0|     0|     0|
+---+----+---------+-------+------+--------+-----------+-------------+---+------+------+



                                                                                                    

In [20]:
# Remove '$' and ',' and cast the 'amount' column to double type
transactions_df = transactions_df.withColumn("amount", F.regexp_replace("amount", "[\$,]", "").cast("double"))

# Ensure 'date' is in timestamp format
transactions_df = transactions_df.withColumn("date", F.to_date("date"))

transactions_df = transactions_df.join(mcc_df, on="mcc", how="left")

In [21]:
transactions_df.limit(100).toPandas()

25/04/28 09:24:56 WARN TaskSetManager: Stage 59 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.
                                                                                                    

Unnamed: 0,mcc,id,date,client_id,card_id,amount,use_chip,merchant_id,merchant_city,errors,target,description
0,4121,7475341,2010-01-01,1797,1127,43.33,Swipe Transaction,33326,Kahului,No Error,No,Taxicabs and Limousines
1,5541,7475347,2010-01-01,114,3398,-64.00,Swipe Transaction,61195,North Hollywood,No Error,No,Service Stations
2,4121,7475378,2010-01-01,1575,2112,17.14,Swipe Transaction,29232,Osprey,No Error,No,Taxicabs and Limousines
3,7538,7475398,2010-01-01,63,60,30.69,Swipe Transaction,32175,Newark,No Error,No,Automotive Service Shops
4,5812,7475424,2010-01-01,1142,4674,3.68,Swipe Transaction,59397,Cohasset,No Error,No,Eating Places and Restaurants
...,...,...,...,...,...,...,...,...,...,...,...,...
95,5812,7477442,2010-01-01,921,4646,2.48,Swipe Transaction,88646,Scottsville,No Error,No,Eating Places and Restaurants
96,5411,7477479,2010-01-01,639,129,16.82,Swipe Transaction,69034,Republic,No Error,No,"Grocery Stores, Supermarkets"
97,5541,7477498,2010-01-01,89,2998,6.98,Swipe Transaction,61195,San Miguel,No Error,No,Service Stations
98,5541,7477499,2010-01-01,90,1100,73.81,Swipe Transaction,61195,Cocoa,No Error,No,Service Stations


In [22]:
# Load CSV as Spark DataFrame
users_df = spark.read.csv(f"{data_path}/users_data.csv", header=True, inferSchema=True)

# Rename column 'id' to 'client_id'
users_df = users_df.withColumnRenamed("id", "client_id")

# Show DataFrame info
print("Number of rows:", users_df.count())
users_df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in users_df.columns]).show()

Number of rows: 2000
+---------+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+
|client_id|current_age|retirement_age|birth_year|birth_month|gender|address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+---------+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+
|        0|          0|             0|         0|          0|     0|      0|       0|        0|                0|            0|         0|           0|               0|
+---------+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+



In [23]:
# Remove '$' and ',' and cast the columns to double type
users_df = users_df.withColumn("per_capita_income", F.regexp_replace("per_capita_income", "[\$,]", "").cast("double"))
users_df = users_df.withColumn("yearly_income", F.regexp_replace("yearly_income", "[\$,]", "").cast("double"))
users_df = users_df.withColumn("total_debt", F.regexp_replace("total_debt", "[\$,]", "").cast("double"))

users_df.show(3)

+---------+-----------+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
|client_id|current_age|retirement_age|birth_year|birth_month|gender|             address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+---------+-----------+--------------+----------+-----------+------+--------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
|      825|         53|            66|      1966|         11|Female|       462 Rose Lane|   34.15|  -117.76|          29278.0|      59696.0|  127613.0|         787|               5|
|     1746|         53|            68|      1966|         12|Female|3606 Federal Boul...|   40.76|   -73.74|          37891.0|      77254.0|  191349.0|         701|               5|
|     1718|         81|            67|      1938|         11|Female|     766 Third Drive| 

In [24]:
users_df = users_df.join(transactions_df.select("client_id").distinct(), on="client_id", how="left_semi")
# Show results
print("Number of rows:", users_df.count())
users_df.printSchema()

25/04/28 09:25:28 WARN TaskSetManager: Stage 77 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.
25/04/28 09:25:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:25:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:25:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:25:38 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:25:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:25:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:25:40 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:25:40 WARN RowBasedKeyValu

Number of rows: 1219
root
 |-- client_id: integer (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: double (nullable = true)
 |-- yearly_income: double (nullable = true)
 |-- total_debt: double (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)



In [25]:
# Join transactions and users on client_id
users_transactions_df = transactions_df.join(users_df, on='client_id', how='inner')


# Show schema and data types (like info in pandas)
users_transactions_df.printSchema()

# Optionally show row count
print("Number of rows:", users_transactions_df.count())

root
 |-- client_id: integer (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- errors: string (nullable = false)
 |-- target: string (nullable = true)
 |-- description: string (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: double (nullable = true)
 |-- yearly_income: double (nullable = true)
 |-- total_debt: double (nullable = true)
 |-- credit_score: integer (nullable = true)
 |--

25/04/28 09:26:04 WARN TaskSetManager: Stage 91 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.
25/04/28 09:26:32 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:26:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:26:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:26:33 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:26:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:26:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:26:34 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:26:34 WARN RowBasedKeyValu

Number of rows: 8914963


                                                                                                    

In [26]:
# Load CSV as Spark DataFrame
cards_df = spark.read.csv(f"{data_path}/cards_data.csv", header=True, inferSchema=True)

# Rename column 'id' to 'card_id'
cards_df = cards_df.withColumnRenamed("id", "card_id")

# Remove '$' and ',' and cast the 'credit_limit' column to double type
cards_df = cards_df.withColumn("credit_limit", F.regexp_replace("credit_limit", "[\$,]", "").cast("double"))

# Show DataFrame info
print("Number of rows:", cards_df.count())
cards_df.select([_sum(col(c).isNull().cast("int")).alias(c) for c in cards_df.columns]).show()

Number of rows: 6146
+-------+---------+----------+---------+-----------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|card_id|client_id|card_brand|card_type|card_number|expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+-------+---------+----------+---------+-----------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|      0|        0|         0|        0|          0|      0|  0|       0|               0|           0|             0|                    0|               0|
+-------+---------+----------+---------+-----------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+



In [27]:
cards_df.show(3)

+-------+---------+----------+---------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|card_id|client_id|card_brand|card_type|     card_number|expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+-------+---------+----------+---------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|   4524|      825|      Visa|    Debit|4344676511950444|12/2022|623|     YES|               2|     24295.0|       09/2002|                 2008|              No|
|   2731|      825|      Visa|    Debit|4956965974959986|12/2020|393|     YES|               2|     21968.0|       04/2014|                 2014|              No|
|   3701|      825|      Visa|    Debit|4582313478255491|02/2024|719|     YES|               2|     46414.0|       07/2003|                 2004|              No|
+-------+---------+---

In [28]:
cards_df = cards_df.join(users_df.select("client_id").distinct(), on="client_id", how="left_semi")
# Show results
print("Number of rows:", cards_df.count())
users_df.printSchema()

25/04/28 09:27:10 WARN TaskSetManager: Stage 124 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.
25/04/28 09:27:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:21 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:22 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:22 WARN RowBasedKeyVal

Number of rows: 4514
root
 |-- client_id: integer (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: double (nullable = true)
 |-- yearly_income: double (nullable = true)
 |-- total_debt: double (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)



In [29]:
output_path = "/kaggle/working"

# Save the Data to HDFS
users_df.write.csv(f"{output_path}/users.csv", header=True, mode="overwrite")
cards_df.write.csv(f"{output_path}/cards.csv", header=True, mode="overwrite")
transactions_df.write.csv(f"{output_path}/transactions.csv", header=True, mode="overwrite")

25/04/28 09:27:46 WARN TaskSetManager: Stage 139 contains a task of very large size (26183 KiB). The maximum recommended task size is 1000 KiB.
25/04/28 09:27:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:57 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/28 09:27:58 WARN RowBasedKeyVal