In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("project").getOrCreate()

# Load the dataset
df = spark.read.csv("Base.csv", header=True, inferSchema=True)

# Show initial rows
#df.show(20)

# Handling missing values by filling with a default value or dropping
#df_clean = df.fillna({'amount': 0, 'fraud_label': 0})

# Show schema and cleaned data
df.printSchema()
df.show(5)


root
 |-- fraud_bool: integer (nullable = true)
 |-- income: double (nullable = true)
 |-- name_email_similarity: double (nullable = true)
 |-- prev_address_months_count: integer (nullable = true)
 |-- current_address_months_count: integer (nullable = true)
 |-- customer_age: integer (nullable = true)
 |-- days_since_request: double (nullable = true)
 |-- intended_balcon_amount: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- zip_count_4w: integer (nullable = true)
 |-- velocity_6h: double (nullable = true)
 |-- velocity_24h: double (nullable = true)
 |-- velocity_4w: double (nullable = true)
 |-- bank_branch_count_8w: integer (nullable = true)
 |-- date_of_birth_distinct_emails_4w: integer (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- credit_risk_score: integer (nullable = true)
 |-- email_is_free: integer (nullable = true)
 |-- housing_status: string (nullable = true)
 |-- phone_home_valid: integer (nullable = true)
 |-- phone_mobil

In [3]:
from pyspark.sql.functions import col

# Group by fraud label and count the number of transactions
fraud_counts = df.groupBy("fraud_bool").count()

fraud_counts.show()


+----------+------+
|fraud_bool| count|
+----------+------+
|         1| 11029|
|         0|988971|
+----------+------+



In [9]:
from pyspark.sql.functions import col

# Group by payment_type and fraud_bool to count fraudulent transactions for each payment type
fraud_by_payment_type = df.groupBy("payment_type", "fraud_bool").count()

# Show the results
fraud_by_payment_type.show()



+------------+----------+------+
|payment_type|fraud_bool| count|
+------------+----------+------+
|          AB|         0|366385|
|          AA|         0|256885|
|          AB|         1|  4169|
|          AC|         1|  4209|
|          AA|         1|  1364|
|          AD|         1|  1286|
|          AE|         0|   288|
|          AC|         0|247862|
|          AD|         0|117551|
|          AE|         1|     1|
+------------+----------+------+



In [6]:
# Group by employment status and calculate average income
avg_income_by_status = df.groupBy("employment_status").avg("income")

avg_income_by_status.show()


+-----------------+-------------------+
|employment_status|        avg(income)|
+-----------------+-------------------+
|               CG| 0.5719646799116999|
|               CA| 0.5688723892574412|
|               CC| 0.5257481858149258|
|               CE|0.41986515665624063|
|               CB| 0.6163369200508979|
|               CF|0.47423127583230884|
|               CD| 0.4344619561119097|
+-----------------+-------------------+



In [7]:
from pyspark.sql.functions import corr

# Calculate correlation between fraud_bool and velocity_6h
fraud_velocity_corr = df.stat.corr("fraud_bool", "velocity_6h")

print(f"Correlation between fraud and velocity_6h: {fraud_velocity_corr}")


Correlation between fraud and velocity_6h: -0.016892357909054904


In [16]:
from pyspark.sql.functions import col

# Group by 'income' and 'fraud_bool' to count fraudulent transactions for each income group
fraud_by_income = df.groupBy("income", "fraud_bool").count()

# Show the results
fraud_by_income.show()



+------------------+----------+------+
|            income|fraud_bool| count|
+------------------+----------+------+
|0.6000000000000001|         0|110990|
|               0.3|         0| 50495|
|               0.5|         0| 55414|
|               0.1|         0|156540|
|               0.4|         0| 80767|
|0.7000000000000001|         0|104182|
|               0.4|         1|   597|
|               0.2|         1|   438|
|               0.1|         1|   909|
|               0.2|         0| 68907|
|               0.9|         1|  4791|
|0.7000000000000001|         1|   927|
|               0.8|         1|  1602|
|0.6000000000000001|         1|   983|
|               0.3|         1|   338|
|               0.5|         1|   444|
|               0.8|         0|145048|
|               0.9|         0|216628|
+------------------+----------+------+



In [18]:
from pyspark.sql.functions import when, col

# Create age groups (e.g., 18-30, 31-40, etc.)
df_age_grouped = df.withColumn("age_group", 
                              when(col("customer_age").between(18, 30), "18-30")
                              .when(col("customer_age").between(31, 40), "31-40")
                              .when(col("customer_age").between(41, 50), "41-50")
                              .when(col("customer_age").between(51, 60), "51-60")
                              .otherwise("60+"))

# Group by age_group and fraud_bool to count fraudulent transactions for each age group
fraud_by_age_group = df_age_grouped.groupBy("age_group", "fraud_bool").count()

# Show the results
fraud_by_age_group.show()


+---------+----------+------+
|age_group|fraud_bool| count|
+---------+----------+------+
|    18-30|         0|553494|
|      60+|         0| 28472|
|    31-40|         0|235836|
|    18-30|         1|  3794|
|    51-60|         0| 33621|
|    31-40|         1|  2876|
|      60+|         1|   405|
|    41-50|         1|  2805|
|    41-50|         0|137548|
|    51-60|         1|  1149|
+---------+----------+------+



In [22]:
import os
# Create a directory for storing the output files
output_dir = os.path.expanduser("~/Documents/brainstormers")
os.makedirs(output_dir, exist_ok=True)
fraud_counts.write.csv(os.path.join(output_dir, "fraud_counts.csv"), header=True)
fraud_by_payment_type.write.csv(os.path.join(output_dir, "fraud_by_payment_type.csv"), header=True)
avg_income_by_status.write.csv(os.path.join(output_dir, "avg_income_by_status.csv"), header=True)
fraud_by_income.write.csv(os.path.join(output_dir, "fraud_by_income.csv"), header=True)
fraud_by_age_group.write.csv(os.path.join(output_dir, "fraud_by_age_group.csv"), header=True)
with open(os.path.join(output_dir, "fraud_velocity_corr.txt"), "w") as f:
    f.write(f"Correlation between fraud and velocity_6h: {fraud_velocity_corr}\n")


