# Explortatory Data Analysis for Scalable-Credit-Card-Fraud-Detection-Behavior-Analysis


This notebook will focus on understanding the structure and content of the datasets we are using the project.The objective is to explore the transactions, cards, and users data to identify the key variables, data quality issues, and  other initial patterns that may be useful for fraud detection.

No cleaning or feature engineering is performed at this stage.

The notebook is formatted based on the steps followed in the  EDA report.


In [2]:
#Start Spark Session
from pyspark.sql import SparkSession

session = (
    SparkSession.builder
    .appName("Credit Card Fraud Detection")
    .master("local[*]")
    .config("spark.driver.memory", "6g")
    .config("spark.sql.shuffle.partitions", "8")
    .getOrCreate()
)

#### Load the Raw Datasets

In [3]:
# Load Data
transactions_df = session.read.csv("data/raw/transactions_data.csv", header=True, inferSchema=True)
cards_df = session.read.csv("data/raw/cards_data.csv", header=True, inferSchema=True) 
users_df = session.read.csv("data/raw/users_data.csv", header=True, inferSchema=True)

## Transactions Data

The transactions dataset is significantly larger with more than 8 lac records whis confirms that transactions dataset is the main dataset, while cards and users provide contextual information.

In [5]:
transactions_df.columns

['id',
 'date',
 'client_id',
 'card_id',
 'amount',
 'use_chip',
 'merchant_id',
 'merchant_city',
 'merchant_state',
 'zip',
 'mcc',
 'errors']

The transactions dataset includes:
- Identifiers (id, client_id, card_id)
- Temporal information (date)
- amount
- use_chip
- Merchant information(merchant_id,merchant_city,merchant_state)
- mcc (Might be Merchant Country Code)
- An error field that may indicate abnormal or failed transactions

This dataset is the core of the fraud detection task.


### 1. Data Inspection

In [6]:
#Preview Sample Rows
transactions_df.show(10)

+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+-------+----+------+
|     id|               date|client_id|card_id| amount|          use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|errors|
+-------+-------------------+---------+-------+-------+------------------+-----------+-------------+--------------+-------+----+------+
|7475327|2010-01-01 00:01:00|     1556|   2972|$-77.00| Swipe Transaction|      59935|       Beulah|            ND|58523.0|5499|  NULL|
|7475328|2010-01-01 00:02:00|      561|   4575| $14.57| Swipe Transaction|      67570|   Bettendorf|            IA|52722.0|5311|  NULL|
|7475329|2010-01-01 00:02:00|     1129|    102| $80.00| Swipe Transaction|      27092|        Vista|            CA|92084.0|4829|  NULL|
|7475331|2010-01-01 00:05:00|      430|   2860|$200.00| Swipe Transaction|      27092|  Crown Point|            IN|46307.0|4829|  NULL|
|7475332|2010-01-01 00:06:00|      848|   3915| 

#### Check Data Types

In [7]:
transactions_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: string (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)



All the datatypes are consistent with the columns, except for the amount feature.

In [8]:
#Transaction Amount Format Check
transactions_df.select("amount").show(10, truncate=False)

+-------+
|amount |
+-------+
|$-77.00|
|$14.57 |
|$80.00 |
|$200.00|
|$46.41 |
|$4.81  |
|$77.00 |
|$26.46 |
|$261.58|
|$10.74 |
+-------+
only showing top 10 rows


As this column has a currency sign in the data, it must me removed and converted to float values.

#### Check Duplicate Transactions

In [9]:
from pyspark.sql.functions import count

duplicate_id_count = (
    transactions_df
    .groupBy("id")
    .agg(count("*").alias("cnt"))
    .filter("cnt > 1")
    .count()
)

duplicate_id_count
# Number of duplicate ids

0

There are no duplicate transaction ids in the dataset.

#### Categories and Counts

In [10]:
transactions_df.groupBy("use_chip").count().show()

+------------------+-------+
|          use_chip|  count|
+------------------+-------+
|Online Transaction|1557912|
| Swipe Transaction|6967185|
|  Chip Transaction|4780818|
+------------------+-------+



The majority of transactions are **Swipe Transactions**, followed by **Chip Transactions**, while **Online Transactions** represent a smaller portion of the total volume.

This distribution is expected in real-world card usage, where most everyday purchases are made using physical cards.  
However, even though online transactions are fewer in number, they are often associated with a higher fraud risk due to the absence of physical card verification.

For this reason, the transaction type (`use_chip`) is considered an important variable for later analysis and modeling.


In [11]:
transactions_df.groupBy("merchant_state").count().show()

+--------------------+-------+
|      merchant_state|  count|
+--------------------+-------+
|                  NJ| 322227|
|                  IL| 467930|
|United Arab Emirates|    300|
|        South Africa|    339|
|           Indonesia|    194|
|         South Korea|   1153|
|              Israel|    941|
|           Australia|    569|
|              Brunei|      3|
|                  CA|1427087|
|                  IN| 312468|
|                  OK| 159902|
|                  KY| 170013|
|                  LA| 159719|
|                  KS|  99442|
|                  WY|   8747|
|              Tuvalu|      5|
|            Thailand|    461|
|             Romania|     50|
|    Marshall Islands|     11|
+--------------------+-------+
only showing top 20 rows


In [12]:
transactions_df.groupBy("errors").count().show()

+--------------------+--------+
|              errors|   count|
+--------------------+--------+
|    Technical Glitch|   26271|
|Bad PIN,Insuffici...|     293|
|Bad Card Number,B...|      38|
|             Bad CVV|    6106|
|Bad Card Number,B...|      33|
|Bad PIN,Technical...|      70|
|                NULL|13094522|
|         Bad Zipcode|    1126|
|Insufficient Bala...|     243|
|Bad Expiration,In...|      47|
|Bad CVV,Insuffici...|      57|
|Bad Card Number,T...|      15|
|Insufficient Balance|  130902|
|Bad Expiration,Ba...|      32|
|      Bad Expiration|    6161|
|     Bad Card Number|    7767|
|             Bad PIN|   32119|
|Bad Expiration,Te...|      21|
|Bad Card Number,I...|      71|
|Bad CVV,Technical...|       8|
+--------------------+--------+
only showing top 20 rows


The `errors` column contains information about transactions that encountered issues during processing.
Most transactions have a `NULL` value, meaning they were completed successfully without any reported problem.

However, a small subset of transactions contains specific error types, such as:
- Insufficient balance
- Bad PIN
- Bad card number
- Bad expiration date
- Technical glitches

Among these, errors related to **insufficient balance**, **bad PIN**, and **technical issues** appear more frequently than others.


### 2. Handling Missing Values

In [13]:
# check missing values for all the dataset
from pyspark.sql.functions import col, sum as spark_sum
def count_missing_values(df):
    missing_counts = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
    return missing_counts
print("Missing values in transactions dataset:")
count_missing_values(transactions_df).show()

Missing values in transactions dataset:
+---+----+---------+-------+------+--------+-----------+-------------+--------------+-------+---+--------+
| id|date|client_id|card_id|amount|use_chip|merchant_id|merchant_city|merchant_state|    zip|mcc|  errors|
+---+----+---------+-------+------+--------+-----------+-------------+--------------+-------+---+--------+
|  0|   0|        0|      0|     0|       0|          0|            0|       1563726|1652706|  0|13094522|
+---+----+---------+-------+------+--------+-----------+-------------+--------------+-------+---+--------+



The missing data is in `merchant_state`, `zip`, and `errors`. Most of the `merchant_state` and `zip` is from the online transactions. So, these data can be filled with that clearly states that it is an online transaction to preseve the semantic meaning.

### 3. Standardizing Categorical Variables

As we saw the data, the categorical columns have formatted data already. There is no further formatting required.

### 4. Data Integrity Check

#### Transactions Consistency

We are checking if the merchant_id maps consistently to the same merchant location exceot for online transactions.

In [14]:
from pyspark.sql import functions as F

# Filter non-online transactions
physical_txns = transactions_df.filter(
    F.col("use_chip") != "Online Transaction"
)

# Count distinct city/state combinations per merchant
merchant_location_check = (
    physical_txns
    .groupBy("merchant_id")
    .agg(
        F.countDistinct(
            F.struct("merchant_city", "merchant_state")
        ).alias("num_locations")
    )
)

# Merchants with inconsistent locations
inconsistent_merchants = merchant_location_check.filter(
    F.col("num_locations") > 1
)

inconsistent_merchants.show(truncate=False)


+-----------+-------------+
|merchant_id|num_locations|
+-----------+-------------+
|54850      |588          |
|27092      |1049         |
|11468      |368          |
|9041       |7            |
|44919      |744          |
|28395      |398          |
|86438      |1305         |
|31258      |2            |
|36392      |2            |
|18014      |5            |
|7131       |2            |
|58897      |5            |
|95855      |112          |
|83240      |3            |
|57386      |85           |
|78680      |2            |
|24891      |9            |
|11901      |29           |
|94625      |256          |
|30928      |217          |
+-----------+-------------+
only showing top 20 rows


These merchants have multiple locations, which may indicate either: 
- Data quality issue
- Chain merchants

We are also checking if the online transaction data is having any physical location to check the integrity of the transaction details. 

For that we are using a Filter use_chip == 'Online Transaction'
and checking if merchant_state, or zip are filled

Any non-null values will be an integrity error.

In [15]:
online_location_violations = transactions_df.filter(
    (F.col("use_chip") == "Online Transaction") &
    (
        (F.col("merchant_city").isNotNull() & (F.col("merchant_city") != "ONLINE")) |
        (F.col("merchant_state").isNotNull()) |
        (F.col("zip").isNotNull())
    )
)

online_location_violations.show(truncate=False)


+---+----+---------+-------+------+--------+-----------+-------------+--------------+---+---+------+
|id |date|client_id|card_id|amount|use_chip|merchant_id|merchant_city|merchant_state|zip|mcc|errors|
+---+----+---------+-------+------+--------+-----------+-------------+--------------+---+---+------+
+---+----+---------+-------+------+--------+-----------+-------------+--------------+---+---+------+



As we see there are no fraudulent data for the online transactions.

## Data Cleaning

Based on the Analyis, the dataset is cleaned and exported for our further use.

In [18]:
from pyspark.sql.functions import (
    col, when, regexp_replace, countDistinct, upper, trim, collect_set, concat_ws, concat, lit
)


# 1. Clean amount
transactions_clean = transactions_df.withColumn(
    "amount",
    regexp_replace(col("amount"), "[$,]", "")
)

transactions_clean = transactions_clean.withColumn(
    "amount",
    col("amount").cast("double")
)

# 2. Handling missing errors
transactions_clean = transactions_clean.withColumn(
    "errors",
    when(col("errors").isNull(), "No Error").otherwise(col("errors"))
)

# 3. Normalize merchant location
transactions_clean = transactions_clean.withColumn(
    "merchant_city",
    upper(trim(col("merchant_city")))
).withColumn(
    "merchant_state",
    upper(trim(col("merchant_state")))
)



# 4. Merchant location consistency check

merchant_location_check = (
    transactions_clean
    .filter(col("use_chip") != "Online Transaction")
    .groupBy("merchant_id")
    .agg(
        countDistinct("merchant_city").alias("city_count"),
        countDistinct("merchant_state").alias("state_count"),
        concat_ws(
            " | ",
            collect_set(
                concat(col("merchant_city"), lit(" "), col("merchant_state"))
            )
        ).alias("all_locations")
    )
    .filter((col("city_count") > 1) | (col("state_count") > 1))
)


# Final verification
print("Transactions data after cleaning")

transactions_clean.printSchema()

transactions_clean.show(10)


print("Merchant data location")
merchant_location_check.show(5)

Transactions data after cleaning
root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)

+-------+-------------------+---------+-------+------+------------------+-----------+-------------+--------------+-------+----+--------+
|     id|               date|client_id|card_id|amount|          use_chip|merchant_id|merchant_city|merchant_state|    zip| mcc|  errors|
+-------+-------------------+---------+-------+------+------------------+-----------+-------------+--------------+-------+----+--------+
|7475327|2010-01-01 00:01:00|     1556|   2972| -77.0| Swipe Transaction

## Feature Engineering

Feature engineering transforms cleaned raw transaction data into meaningful variables that help capture spending behavior, transaction patterns, and potential fraud indicators.

#### Time-based Features

Transaction timing is a strong behavioral indicator. Fraudulent transactions often occur at unusual hours (late night / early morning).

Steps:
- Extract the hour from the date timestamp.
- Categorize transactions into time-of-day buckets.
- Time Buckets for morning, afternoon, evening and night

Engineered Feature `day_timing`: categorical feature representing time of day



In [19]:
from pyspark.sql.functions import (
    col, hour, when
)

transactions_fe = transactions_clean.withColumn(
    "transaction_hour",
    hour(col("date"))
)

transactions_fe = transactions_fe.withColumn(
    "day_timing",
    when((col("transaction_hour") >= 5) & (col("transaction_hour") < 12), "Morning")
    .when((col("transaction_hour") >= 12) & (col("transaction_hour") < 17), "Afternoon")
    .when((col("transaction_hour") >= 17) & (col("transaction_hour") < 21), "Evening")
    .otherwise("Night")
)


#### Behavioral Data Flags

Binary flags simplify model learning and improve interpretability.

##### Online Transaction Flag

    Online transactions are different from swipe transactions. It Helps isolate e-commerce fraud patterns.

Feature `is_online_transaction` : 
    
        1 → Online Transaction
        0 → Other transaction


In [22]:
# Online transaction flag
transactions_fe = transactions_fe.withColumn(
    "is_online_transaction",
    when(col("use_chip") == "Online Transaction", 1).otherwise(0)
)


#### Refund Detection Flag

    Negative transaction amounts represent refunds. Refunds have different risk characteristics.

Feature `is_refund` :

        1 → amount < 0
        0 → amount ≥ 0


In [26]:
# Refund flag
transactions_fe = transactions_fe.withColumn(
    "is_refund",
    when(col("amount") < 0, 1).otherwise(0)
)


#### Error Flag

    Original errors column contains many categories. Binary flag simplifies analysis.

Feature `has_error` :

        1 → error present
        0 → no error

In [28]:
# Error presence flag
transactions_fe = transactions_fe.withColumn(
    "has_error",
    when(col("errors") != "No Error", 1).otherwise(0)
)

In [29]:
transactions_fe.printSchema()
transactions_fe.show(10, truncate=False)

root
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_id: integer (nullable = true)
 |-- amount: double (nullable = true)
 |-- use_chip: string (nullable = true)
 |-- merchant_id: integer (nullable = true)
 |-- merchant_city: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- zip: double (nullable = true)
 |-- mcc: integer (nullable = true)
 |-- errors: string (nullable = true)
 |-- transaction_hour: integer (nullable = true)
 |-- day_timing: string (nullable = false)
 |-- is_online_transaction: integer (nullable = false)
 |-- is_refund: integer (nullable = false)
 |-- has_error: integer (nullable = false)

+-------+-------------------+---------+-------+------+------------------+-----------+-------------+--------------+-------+----+--------+----------------+----------+---------------------+---------+---------+
|id     |date               |client_id|card_id|amount|use_chip          |mer

## Cards Data


Financial institutions issue multiple payment cards to users over time.
This dataset captures card-level attributes, including card brand, type, issuance history, security properties, and risk indicators such as dark web exposure.

In [5]:
cards_df.columns

['id',
 'client_id',
 'card_brand',
 'card_type',
 'card_number',
 'expires',
 'cvv',
 'has_chip',
 'num_cards_issued',
 'credit_limit',
 'acct_open_date',
 'year_pin_last_changed',
 'card_on_dark_web']

### 1. Data Inspection



In [6]:
cards_df.show(10)

+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|  id|client_id|card_brand|      card_type|     card_number|expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|
+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+
|4524|      825|      NULL|          Debit|4344676511950444|12/2022|623|     YES|               2|      $24295|       09/2002|                 2008|              No|
|2731|      825|      NULL|          Debit|4956965974959986|12/2020|393|     YES|               2|      $21968|       04/2014|                 2014|              No|
|3701|      825|      Visa|          Debit|4582313478255491|02/2024|719|     YES|               2|      $46414|       07/2003|                 2004|              No|
|  4

#### Check Data Types

In [12]:
cards_df.printSchema()

print("Total number of rows: ", cards_df.count())


root
 |-- id: integer (nullable = true)
 |-- client_id: integer (nullable = true)
 |-- card_brand: string (nullable = true)
 |-- card_type: string (nullable = true)
 |-- card_number: long (nullable = true)
 |-- expires: string (nullable = true)
 |-- cvv: integer (nullable = true)
 |-- has_chip: string (nullable = true)
 |-- num_cards_issued: integer (nullable = true)
 |-- credit_limit: string (nullable = true)
 |-- acct_open_date: string (nullable = true)
 |-- year_pin_last_changed: integer (nullable = true)
 |-- card_on_dark_web: string (nullable = true)

Total number of rows:  6146


- Date-related fields (expires, acct_open_date) are currently stored as string and will need conversion to date formats during cleaning.
- Columns such as has_chip, and card_on_dark_web are incorrectly stored as string types
- The credit_limit column is stored as string, as it contains currency symbols and must be converted to numeric/float.

#### Check Duplicate Values

In [14]:
duplicate_cards = cards_df.groupBy("id").count().filter(col("count") > 1)
duplicate_cards.show()


+---+-----+
| id|count|
+---+-----+
+---+-----+



No duplicate rows were detected in the Cards dataset based on the id column.

#### Category and Counts

In [15]:
from pyspark.sql.functions import col, count

# List of categorical columns
categorical_cols = ["card_brand", "card_type", "has_chip", "card_on_dark_web"]

# Show value counts for each categorical column
for col_name in categorical_cols:
    print(f"Value counts for column: {col_name}")
    cards_df.groupBy(col_name).count().orderBy(col("count").desc()).show()


Value counts for column: card_brand
+----------+-----+
|card_brand|count|
+----------+-----+
|Mastercard| 3150|
|      Visa| 2292|
|      Amex|  402|
|  Discover|  209|
|      NULL|   93|
+----------+-----+

Value counts for column: card_type
+---------------+-----+
|      card_type|count|
+---------------+-----+
|          Debit| 3485|
|         Credit| 2044|
|Debit (Prepaid)|  574|
|            Det|    4|
|           Dbit|    4|
|            Dit|    4|
|           Cret|    3|
|           Cdit|    3|
| Dbit (Prepaid)|    2|
|           Deit|    2|
|            bit|    2|
|        Credoit|    1|
|       Crenodit|    1|
|         De=bit|    1|
|         Deblit|    1|
|           Crit|    1|
|           Debt|    1|
|          Crdit|    1|
|         Debbit|    1|
|       Cr iedit|    1|
+---------------+-----+
only showing top 20 rows
Value counts for column: has_chip
+--------+-----+
|has_chip|count|
+--------+-----+
|     YES| 5500|
|      NO|  646|
+--------+-----+

Value counts for co

Observations:

    1. card_brand:

        Major brands: Mastercard (3150), Visa (2292), Amex (402), Discover (209)
        Missing values: 93
        card_brand missing values should be filled as "Unknown".

    2. card_type:

        Debit (3485), Credit (2044), and Debit (Prepaid) (574)
        Several inconsistent or misspelled entries: Det, Dbit, Dit, Cret, Cdit, etc.
        card_type contains multiple misspellings and variations; standardization is required

    3. has_chip:

        YES: 5500, NO: 646
        No missing values

    4. card_on_dark_web:
    
        All values are "No", no variation

### 2. Checking Missing Values

In [19]:
from pyspark.sql.functions import col, sum as spark_sum

missing_summary = cards_df.select(
    *[
        spark_sum(col(c).isNull().cast("int")).alias(c)
        for c in cards_df.columns
    ]
)

missing_summary.show(truncate=False)


+---+---------+----------+---------+-----------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+------------------+
|id |client_id|card_brand|card_type|card_number|expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|credit_limit_clean|
+---+---------+----------+---------+-----------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+------------------+
|0  |0        |93        |0        |0          |0      |0  |0       |0               |0           |0             |0                    |0               |0                 |
+---+---------+----------+---------+-----------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+------------------+



The column card_brand has 93 missing values out of 6146 rows. Other columns appear to have complete values.

- Missing card_brand likely indicates legacy or unbranded cards.\
- Dropping these rows would result in data loss; instead, these missing values should be handled carefully during cleaning.

Impute missing card_brand values with "Unknown".

In [18]:
from pyspark.sql.functions import col, regexp_replace, count, when

# Clean credit_limit temporarily to check numeric conversion
cards_df = cards_df.withColumn(
    "credit_limit_clean",
    regexp_replace(col("credit_limit"), "[$,]", "")
)

# Count missing or null values
cards_df.select(
    count(when(col("credit_limit_clean").isNull() | (col("credit_limit_clean") == ""), True)).alias("missing_credit_limit"),
    count(when(col("num_cards_issued").isNull(), True)).alias("missing_num_cards_issued")
).show()

# Show min and max for numeric check
cards_df.select(
    col("credit_limit_clean").cast("double").alias("credit_limit"),
    col("num_cards_issued")
).describe().show()


+--------------------+------------------------+
|missing_credit_limit|missing_num_cards_issued|
+--------------------+------------------------+
|                   0|                       0|
+--------------------+------------------------+

+-------+------------------+------------------+
|summary|      credit_limit|  num_cards_issued|
+-------+------------------+------------------+
|  count|              6146|              6146|
|   mean|14347.493979824276|1.5030914415880248|
| stddev|12014.463884038893|0.5191909056590772|
|    min|               0.0|                 1|
|    max|          151223.0|                 3|
+-------+------------------+------------------+



- credit_limit:
        No missing values.
        Values range from 0 to 151,223, with a mean of ~14,347 and standard deviation of ~12,014.
        A few cards have a credit limit of 0, likely corresponding to debit or prepaid cards.

- num_cards_issued:
        No missing values.
        Values range from 1 to 3, with a mean of ~1.5, indicating most clients hold 1–2 cards.

### 3. Data Integrity Check

#### Cards with zero or negative credit limit

In [23]:
from pyspark.sql.functions import col, regexp_replace

cards_clean = cards_df.withColumn(
    "credit_limit_clean",
    regexp_replace(col("credit_limit"), "[$,]", "").cast("double")
)
cards_clean.filter(
    (col("card_type") == "Credit") & (col("credit_limit_clean") <= 0)
).select(
    "id", "client_id", "card_type", "credit_limit_clean"
).show()


print("Credit cards with limit 0: ")
# Count Credit cards with zero or negative limits
cards_clean.filter(
    (col("card_type") == "Credit") & (col("credit_limit_clean") <= 0)
).count()


+----+---------+---------+------------------+
|  id|client_id|card_type|credit_limit_clean|
+----+---------+---------+------------------+
|4318|      668|   Credit|               0.0|
|3626|      870|   Credit|               0.0|
|5957|     1975|   Credit|               0.0|
|1799|      934|   Credit|               0.0|
| 782|     1658|   Credit|               0.0|
|  96|       81|   Credit|               0.0|
|5680|      214|   Credit|               0.0|
|3443|      846|   Credit|               0.0|
|5264|     1770|   Credit|               0.0|
|3182|       15|   Credit|               0.0|
|3183|      318|   Credit|               0.0|
|6081|     1260|   Credit|               0.0|
|2105|      649|   Credit|               0.0|
|2411|     1364|   Credit|               0.0|
|2707|      132|   Credit|               0.0|
|4610|     1306|   Credit|               0.0|
| 265|       37|   Credit|               0.0|
|2885|     1202|   Credit|               0.0|
| 905|     1224|   Credit|        

26

Count of Credit cards with zero or negative credit limits: 26.

A small number of Credit cards have zero limits, which could indicate inactive cards or placeholders. The 26 cards with zero limits should be flagged for review or treated carefully during modeling.

#### Card Expiry vs Account Open Date validation

In [30]:
from pyspark.sql.functions import col, to_timestamp, when

cards_clean = cards_clean.withColumn(
    "expiry_dt",
    when(
        col("expires").rlike(r"^\d{2}/\d{2}$"),  # format MM/yy
        to_timestamp(col("expires"), "MM/yy")
    ).when(
        col("expires").rlike(r"^[A-Za-z]{3}-\d{2}$"),  # format MMM-yy
        to_timestamp(col("expires"), "MMM-yy")
    ).otherwise(None)
)

# Account open date parsing
cards_clean = cards_clean.withColumn(
    "acct_open_date_dt",
    when(
        col("acct_open_date").rlike(r"^[A-Za-z]{3}-\d{2}$"),  # format MMM-yy
        to_timestamp(col("acct_open_date"), "MMM-yy")
    ).otherwise(None)
)

# Filter cards where expiry < account open date
cards_clean.filter(col("expiry_dt") < col("acct_open_date_dt")).select(
    "id", "client_id", "acct_open_date", "expires", "expiry_dt", "acct_open_date_dt"
).show(10, truncate=False)


+---+---------+--------------+-------+---------+-----------------+
|id |client_id|acct_open_date|expires|expiry_dt|acct_open_date_dt|
+---+---------+--------------+-------+---------+-----------------+
+---+---------+--------------+-------+---------+-----------------+



- We validated that each card’s expiry date occurs after the account open date:
- Converted the expires and acct_open_date columns to proper timestamp format.
- Checked for any cards where expiry_dt < acct_open_date_dt.

 No cards violate this rule; all expiry dates are consistent with account creation dates.

#### Pin Change Year Validation

In [31]:
from pyspark.sql.functions import col, year, to_date

# Extract account open year
cards_clean = cards_clean.withColumn(
    "acct_open_year",
    year(to_date(col("acct_open_date_dt")))
)

# Check for PIN change year validity
invalid_pin_years = cards_clean.filter(
    (col("year_pin_last_changed") < col("acct_open_year")) |
    (col("year_pin_last_changed") > 2025)  # Replace with current year if needed
).select(
    "id", "client_id", "acct_open_date", "year_pin_last_changed"
)

# Show invalid PIN change year records
invalid_pin_years.show()


+---+---------+--------------+---------------------+
| id|client_id|acct_open_date|year_pin_last_changed|
+---+---------+--------------+---------------------+
+---+---------+--------------+---------------------+



All cards in the dataset have valid PIN change years. Each year_pin_last_changed is greater than or equal to the account open year and less than or equal to the current year. No inconsistencies were detected, indicating that the PIN history is consistent with the account lifecycle.

## Data Cleaning

In [33]:
from pyspark.sql.functions import col, regexp_replace, when, upper, trim

# 1. Clean credit limit column
cards_clean = cards_df.withColumn(
    "credit_limit_clean",
    regexp_replace(col("credit_limit"), "[$,]", "").cast("double")
)

# 2. Fill missing card_brand
cards_clean = cards_clean.withColumn(
    "card_brand",
    when(col("card_brand").isNull(), "Unknown").otherwise(col("card_brand"))
)

# 3. Standardize categorical columns
cards_clean = cards_clean.withColumn("card_brand", upper(trim(col("card_brand"))))
cards_clean = cards_clean.withColumn("card_type", upper(trim(col("card_type"))))
cards_clean = cards_clean.withColumn("has_chip", upper(trim(col("has_chip"))))
cards_clean = cards_clean.withColumn("card_on_dark_web", upper(trim(col("card_on_dark_web"))))

# 4. Ensure numeric columns are proper type
cards_clean = cards_clean.withColumn("num_cards_issued", col("num_cards_issued").cast("integer"))

# 5. Fill missing has_chip values as 'UNKNOWN'
cards_clean = cards_clean.withColumn(
    "has_chip",
    when(col("has_chip").isNull(), "UNKNOWN").otherwise(col("has_chip"))
)

# 6. Preview cleaned data
cards_clean.show(10, truncate=False)
cards_clean.printSchema()


+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+------------------+
|id  |client_id|card_brand|card_type      |card_number     |expires|cvv|has_chip|num_cards_issued|credit_limit|acct_open_date|year_pin_last_changed|card_on_dark_web|credit_limit_clean|
+----+---------+----------+---------------+----------------+-------+---+--------+----------------+------------+--------------+---------------------+----------------+------------------+
|4524|825      |UNKNOWN   |DEBIT          |4344676511950444|12/2022|623|YES     |2               |$24295      |09/2002       |2008                 |NO              |24295.0           |
|2731|825      |UNKNOWN   |DEBIT          |4956965974959986|12/2020|393|YES     |2               |$21968      |04/2014       |2014                 |NO              |21968.0           |
|3701|825      |VISA      |DEBIT          |4582313478255491|02/2024|719|YES

#### Standardizing Card type

In [44]:
from pyspark.sql.functions import when, col

# Standardize card_type properly
cards_clean = cards_clean.withColumn(
    "card_type_clean",
    when(col("card_type").rlike("(?i)^DEBIT\\s*\\(PREPAID\\)$"), "DEBIT (PREPAID)")  # Match DEBIT (PREPAID)
    .when(col("card_type").rlike("(?i)^DEBIT$"), "DEBIT")  # Match exact DEBIT
    .when(col("card_type").rlike("(?i)^CREDIT$"), "CREDIT")  # Match exact CREDIT
    .otherwise("UNKNOWN")  # Any other unknown/misspelled values
)

# Verify the correction
cards_clean.groupBy("card_type_clean").count().show()


+---------------+-----+
|card_type_clean|count|
+---------------+-----+
|          DEBIT| 3485|
|         CREDIT| 2044|
|DEBIT (PREPAID)|  574|
|        UNKNOWN|   43|
+---------------+-----+



During the data cleaning here:

1. We cleaned the credit_limit columns which was string originally.
2. Missing values were checked and no nulls werre found in credit limit or number of cards issues.
3. The card_type column had inconsistent spellings and formats such as DEBIT, DE=BIt, CREJDIT, etc. All these entriees were standardized to DEBIT, CREDIT, DEBIT (PREPAID).
4. card_brand missing values were replaced with UNKOWN.
5. all categorical columns were converted to uppercase to prevent case-sensitive inconsistencies.

## User's Data

This dataset contains user-level demographic, financial, and credit-related attributes.

In [48]:
users_df.columns

['id',
 'current_age',
 'retirement_age',
 'birth_year',
 'birth_month',
 'gender',
 'address',
 'latitude',
 'longitude',
 'per_capita_income',
 'yearly_income',
 'total_debt',
 'credit_score',
 'num_credit_cards']

In [49]:
users_df.show(10, truncate=False)

+----+-----------+--------------+----------+-----------+------+------------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
|id  |current_age|retirement_age|birth_year|birth_month|gender|address                 |latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+----+-----------+--------------+----------+-----------+------+------------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
|825 |53         |66            |1966      |11         |Female|462 Rose Lane           |34.15   |-117.76  |$29278           |$59696       |$127613   |787         |5               |
|1746|NULL       |68            |1966      |12         |Female|3606 Federal Boulevard  |40.76   |-73.74   |$37891           |$77254       |$191349   |701         |5               |
|1718|81         |67            |1938      |11         |Female|766 Third Drive         |34.02  

### 1. Data Inspection

#### Check Datatype

In [47]:
users_df.printSchema()

total_rows = users_df.count()
print("Total number of rows: ", total_rows)


root
 |-- id: integer (nullable = true)
 |-- current_age: integer (nullable = true)
 |-- retirement_age: integer (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- birth_month: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- per_capita_income: string (nullable = true)
 |-- yearly_income: string (nullable = true)
 |-- total_debt: string (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- num_credit_cards: integer (nullable = true)

Total number of rows:  2000
+----+-----------+--------------+----------+-----------+------+------------------------+--------+---------+-----------------+-------------+----------+------------+----------------+
|id  |current_age|retirement_age|birth_year|birth_month|gender|address                 |latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+--

Observations:

- Columns related to monetary amounts (per_capita_income, yearly_income, total_debt) are stored as strings with currency symbols and need conversion to numeric.
- The dataset contains a mix of categorical (gender) and numerical variables, which will require type standardization.
- id is a unique identifier for each user.
- Location information (latitude, longitude, address) is available and can be used for spatial analysis or feature engineering.

#### Checking Duplicacte Data

In [51]:
duplicate_users = users_df.groupBy("id").count().filter(col("count") > 1)
duplicate_users.show()

+---+-----+
| id|count|
+---+-----+
+---+-----+



Observation:

- No duplicate user IDs were found.
- This ensures that each row represents a unique user, maintaining data integrity for subsequent analysis and feature engineering.

#### Categories and Counts

In [53]:
# Category counts for gender
users_df.groupBy("gender").count().show()


+------+-----+
|gender|count|
+------+-----+
|Female| 1016|
|  Male|  984|
+------+-----+



This shows that there is no significant imbalance in gender representation in the dataset.
There is also no need for standardization.

#### 2. Handling Missing Data

In [57]:
from pyspark.sql.functions import col, sum
# Count missing (NULL) values per column
missing_df = users_df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in users_df.columns
])

missing_df.show(truncate=False)

+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+
|id |current_age|retirement_age|birth_year|birth_month|gender|address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+
|0  |21         |3             |30        |0          |0     |0      |0       |0        |0                |0            |0         |0           |0               |
+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+



Observations

- Missing values are limited and concentrated in: current_age, retirement_age, birth_year.
- We can handle these by imposing some rules and calculations based on eahc of these columns if the dat is present.


### 3. Data Integrity Check

#### Age consistency

In [60]:
from pyspark.sql.functions import col

# Derive implied report year
users_df.withColumn(
    "derived_report_year",
    col("birth_year") + col("current_age")
).groupBy("derived_report_year") \
 .count() \
 .orderBy("derived_report_year") \
 .show()


+-------------------+-----+
|derived_report_year|count|
+-------------------+-----+
|               NULL|   47|
|               2019| 1573|
|               2020|  380|
+-------------------+-----+



To verify whether all user records belong to the same reporting year by deriving the dataset year using demographic attributes.

`Derived Report Year = birth_year + current_age`

The majority of records (≈79%) correspond to report year 2019.
A significant subset of records corresponds to report year 2020.
This might be due to variations in teh month of birth. 
We can clearly say that the dat is collected in the year 2019.

With this year, we can also calculate the missing birth years or ages.

47 records could not produce a derived report year due to missing current_age or birth year.

#### Current age vs Retirement age

In [62]:
from pyspark.sql.functions import col

# Check age vs retirement age consistency
users_df.filter(
    col("current_age").isNotNull() &
    col("retirement_age").isNotNull() &
    (col("current_age") >= col("retirement_age"))
).select(
    "id", "current_age", "retirement_age"
).show()


+----+-----------+--------------+
|  id|current_age|retirement_age|
+----+-----------+--------------+
|1718|         81|            67|
| 708|         63|            63|
| 153|         76|            71|
|1946|         76|            66|
|1674|         70|            64|
|1492|         63|            58|
|1198|         82|            67|
| 898|         90|            66|
|  16|         75|            67|
|1269|         56|            56|
|1913|         60|            57|
| 871|         86|            71|
|1520|         67|            65|
|1372|         76|            66|
|1155|         83|            65|
| 663|         68|            63|
|1987|         63|            62|
|1484|         92|            72|
|1769|         75|            65|
| 906|         75|            69|
+----+-----------+--------------+
only showing top 20 rows


In [65]:
from pyspark.sql.functions import col

# Show summary statistics for retirement_age
users_df.select("retirement_age").describe().show()


+-------+------------------+
|summary|    retirement_age|
+-------+------------------+
|  count|              1997|
|   mean| 66.24186279419129|
| stddev|3.6152773885824585|
|    min|                50|
|    max|                79|
+-------+------------------+



Observations:

Most retirement ages fall within a reasonable range (50–79), with an average of ~66 years.



#### Income vs Debt Consistency

In [70]:
from pyspark.sql.functions import round

users_clean = users_clean.withColumn(
    "debt_to_income_ratio",
    round(col("total_debt_clean") / col("yearly_income_clean"), 2)
)

users_clean.select("id", "total_debt_clean", "yearly_income_clean", "debt_to_income_ratio")\
    .filter(col("debt_to_income_ratio") > 1)\
    .show()


+----+----------------+-------------------+--------------------+
|  id|total_debt_clean|yearly_income_clean|debt_to_income_ratio|
+----+----------------+-------------------+--------------------+
| 825|        127613.0|            59696.0|                2.14|
|1746|        191349.0|            77254.0|                2.48|
|1164|        183855.0|           109687.0|                1.68|
|1075|        102286.0|            51500.0|                1.99|
|1711|        114711.0|            54623.0|                 2.1|
|1752|         81262.0|            38190.0|                2.13|
| 640|         94016.0|            45727.0|                2.06|
|1679|         89214.0|            69149.0|                1.29|
|1094|         78833.0|            41442.0|                 1.9|
|1590|         32509.0|            20513.0|                1.58|
|1747|         38333.0|            36497.0|                1.05|
| 429|         89056.0|            53995.0|                1.65|
| 511|         55369.0|  

We computed the Debt-to-Income Ratio for each user as:

Debt-to-Income Ratio = total_debt_clean/yearly_income_clean

Users with unusually high ratios (>1) are flagged as potential inconsistencies.

#### Sanity Check for Income Columns

In [71]:
from pyspark.sql.functions import regexp_replace, col

# Remove $ and commas
users_clean = users_df.withColumn(
    "per_capita_income_clean",
    regexp_replace(col("per_capita_income"), "[$,]", "").cast("double")
).withColumn(
    "yearly_income_clean",
    regexp_replace(col("yearly_income"), "[$,]", "").cast("double")
).withColumn(
    "total_debt_clean",
    regexp_replace(col("total_debt"), "[$,]", "").cast("double")
)

# Check for negative or extreme values
users_clean.select(
    "id", "per_capita_income_clean", "yearly_income_clean", "total_debt_clean"
).filter(
    (col("per_capita_income_clean") < 0) |
    (col("yearly_income_clean") < 0) |
    (col("total_debt_clean") < 0)
).show()


+---+-----------------------+-------------------+----------------+
| id|per_capita_income_clean|yearly_income_clean|total_debt_clean|
+---+-----------------------+-------------------+----------------+
+---+-----------------------+-------------------+----------------+



All income and debt columns were cleaned to remove $ and , symbols. No negative or missing values remain after cleaning.

#### Summary of columns to check for outliers

In [69]:
users_clean.select(
    "per_capita_income_clean", "yearly_income_clean", "total_debt_clean"
).describe().show()


+-------+-----------------------+-------------------+-----------------+
|summary|per_capita_income_clean|yearly_income_clean| total_debt_clean|
+-------+-----------------------+-------------------+-----------------+
|  count|                   2000|               2000|             2000|
|   mean|              23141.928|          45715.882|        63709.694|
| stddev|      11324.13735766499|  22992.61545631198|52254.45342050286|
|    min|                    0.0|                1.0|              0.0|
|    max|               163145.0|           307018.0|         516263.0|
+-------+-----------------------+-------------------+-----------------+



#### Credit Score Consistency

In [75]:
users_clean.select("id", "credit_score").filter(
    (col("credit_score") < 450) | (col("credit_score") > 850)
).show()


+---+------------+
| id|credit_score|
+---+------------+
+---+------------+



All values in credit_score were within expected ranges (typically 300–850), with no missing or out-of-range values detected.

## Data Cleaning

In [80]:
from pyspark.sql.functions import col, when
from pyspark.sql import functions as F

# from our previous calculations
report_year = 2019

# Calculate median retirement age and median current age
median_retirement_age = users_df.approxQuantile("retirement_age", [0.5], 0.01)[0]
median_current_age = users_df.approxQuantile("current_age", [0.5], 0.01)[0]

# Impute missing values
users_df = users_df.withColumn(
    "current_age",
    when(col("current_age").isNull() & col("birth_year").isNotNull(), report_year - col("birth_year"))
    .when(col("current_age").isNull() & col("birth_year").isNull(), median_current_age)
    .otherwise(col("current_age"))
).withColumn(
    "birth_year",
    when(col("birth_year").isNull() & col("current_age").isNotNull(), report_year - col("current_age"))
    .when(col("birth_year").isNull() & col("current_age").isNull(), report_year - median_current_age)
    .otherwise(col("birth_year"))
).withColumn(
    "retirement_age",
    when(col("retirement_age").isNull(), median_retirement_age)
    .otherwise(col("retirement_age"))
)


Imputing `current_age`, `birth_year`:

        If current_age is missing but birth_year is available:

        current_age = report_year − birth_year

        If birth_year is missing but current_age is available:

            birth_year = report_year − current_age

If both current_age and birth_year are missing, impute using median current age.

Impute missing `retirement_age`:

    Missing values filled with median retirement age of the dataset.



In [81]:
from pyspark.sql.functions import col, sum
# Count missing (NULL) values per column
missing_df = users_df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in users_df.columns
])

missing_df.show(truncate=False)

+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+
|id |current_age|retirement_age|birth_year|birth_month|gender|address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|
+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+
|0  |0          |0             |0         |0          |0     |0      |0       |0        |0                |0            |0         |0           |0               |
+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+



All users now have non-null values for current_age, birth_year, and retirement_age.
This ensures consistent age-related calculations and supports further analysis like age vs. retirement age consistency checks.



#### Type casting columns

In [82]:
from pyspark.sql.functions import regexp_replace, col

# Clean financial columns by removing $ and commas, then cast to double
financial_cols = ["per_capita_income", "yearly_income", "total_debt"]

for col_name in financial_cols:
    users_df = users_df.withColumn(
        f"{col_name}_clean",
        regexp_replace(col(col_name), "[$,]", "").cast("double")
    )

# Verify cleaning
users_df.select([f"{c}_clean" for c in financial_cols]).show(10)


+-----------------------+-------------------+----------------+
|per_capita_income_clean|yearly_income_clean|total_debt_clean|
+-----------------------+-------------------+----------------+
|                29278.0|            59696.0|        127613.0|
|                37891.0|            77254.0|        191349.0|
|                22681.0|            33483.0|           196.0|
|               163145.0|           249925.0|        202328.0|
|                53797.0|           109687.0|        183855.0|
|                20599.0|            41997.0|             0.0|
|                25258.0|            51500.0|        102286.0|
|                26790.0|            54623.0|        114711.0|
|                26273.0|            42509.0|          2895.0|
|                18730.0|            38190.0|         81262.0|
+-----------------------+-------------------+----------------+
only showing top 10 rows


Removing non-numeric characters:

    $ signs and commas removed from all financial columns.

Convert to numeric type:

    Columns cast to double for proper numeric calculations.

All users now have clean numeric values in per_capita_income_clean, yearly_income_clean, and total_debt_clean.

In [88]:
from pyspark.sql.functions import col, when

# Check for invalid latitude/longitude
invalid_lat_lon = users_df.filter(
    (col("latitude") < -90) | (col("latitude") > 90) |
    (col("longitude") < -180) | (col("longitude") > 180)
)

print("invalid geogrpahic locations: ", invalid_lat_lon.show())

users_clean = users_df.withColumn(
    "latitude_clean",
    when((col("latitude") >= -90) & (col("latitude") <= 90), col("latitude"))
    .otherwise(None)
).withColumn(
    "longitude_clean",
    when((col("longitude") >= -180) & (col("longitude") <= 180), col("longitude"))
    .otherwise(None)
)

# Verify cleaned columns
users_clean.select("latitude_clean", "longitude_clean").show(10)


+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+-----------------------+-------------------+----------------+
| id|current_age|retirement_age|birth_year|birth_month|gender|address|latitude|longitude|per_capita_income|yearly_income|total_debt|credit_score|num_credit_cards|per_capita_income_clean|yearly_income_clean|total_debt_clean|
+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+-----------------------+-------------------+----------------+
+---+-----------+--------------+----------+-----------+------+-------+--------+---------+-----------------+-------------+----------+------------+----------------+-----------------------+-------------------+----------------+

invalid geogrpahic locations:  None
+--------------+---------------+
|latitude_clean|longitude_clean|
+

## Feature Engineering

#### Debt-to-Income Ratio
Calculated as total_debt / yearly_income for each user.

Provides a measure of financial risk: higher ratios indicate that a larger portion of income is consumed by debt obligations.

Column added to the cleaned dataset as debt_to_income_ratio.
THis shows if a usesr is at financial risk or stable situation.

In [92]:
from pyspark.sql.functions import col, round

users_clean = users_clean.withColumn(
    "debt_to_income_ratio",
    round(col("total_debt_clean") / col("yearly_income_clean"), 2)
)

users_clean.select("id", "total_debt_clean", "yearly_income_clean", "debt_to_income_ratio").show(10)


+----+----------------+-------------------+--------------------+
|  id|total_debt_clean|yearly_income_clean|debt_to_income_ratio|
+----+----------------+-------------------+--------------------+
| 825|        127613.0|            59696.0|                2.14|
|1746|        191349.0|            77254.0|                2.48|
|1718|           196.0|            33483.0|                0.01|
| 708|        202328.0|           249925.0|                0.81|
|1164|        183855.0|           109687.0|                1.68|
|  68|             0.0|            41997.0|                 0.0|
|1075|        102286.0|            51500.0|                1.99|
|1711|        114711.0|            54623.0|                 2.1|
|1116|          2895.0|            42509.0|                0.07|
|1752|         81262.0|            38190.0|                2.13|
+----+----------------+-------------------+--------------------+
only showing top 10 rows


#### Credit Score Segments

Users were grouped into categories based on their numeric credit score:

        Poor: <580
        Fair: 580–669
        Good: 670–739
        Very Good: 740–799
        Excellent: 800+

This segmentation helps identify different credit risk profiles among users.

Added as a new column credit_score_segment.

In [94]:
from pyspark.sql.functions import when

# Create credit score categories
users_clean = users_clean.withColumn(
    "credit_score_segment",
    when(col("credit_score") < 580, "Poor")
    .when((col("credit_score") >= 580) & (col("credit_score") < 670), "Fair")
    .when((col("credit_score") >= 670) & (col("credit_score") < 740), "Good")
    .when((col("credit_score") >= 740) & (col("credit_score") < 800), "Very Good")
    .otherwise("Excellent")
)

users_clean.groupBy("credit_score_segment").count().show()


+--------------------+-----+
|credit_score_segment|count|
+--------------------+-----+
|                Fair|  348|
|           Very Good|  474|
|           Excellent|  166|
|                Good|  931|
|                Poor|   81|
+--------------------+-----+



#### Financial Behavior Flags

High Debt Flag:
    
    Users with a debt-to-income ratio > 2.0 are marked as high debt (high_debt_flag = 1).
    Indicates potentially risky financial behavior.

High Income, Low Debt Flag:

    Users with yearly income > 75,000 and debt-to-income ratio < 0.5 are marked as financially stable (high_income_low_debt_flag = 1).

These flags can help segment users for risk assessment and targeted financial interventions.

In [95]:
# High debt flag: debt-to-income > 2.0
users_clean = users_clean.withColumn(
    "high_debt_flag",
    when(col("debt_to_income_ratio") > 2.0, 1).otherwise(0)
)

# High income, low debt flag: yearly_income > 75000 & debt-to-income < 0.5
users_clean = users_clean.withColumn(
    "high_income_low_debt_flag",
    when((col("yearly_income_clean") > 75000) & (col("debt_to_income_ratio") < 0.5), 1).otherwise(0)
)

# Show counts for flags
users_clean.groupBy("high_debt_flag").count().show()
users_clean.groupBy("high_income_low_debt_flag").count().show()


+--------------+-----+
|high_debt_flag|count|
+--------------+-----+
|             1|  470|
|             0| 1530|
+--------------+-----+

+-------------------------+-----+
|high_income_low_debt_flag|count|
+-------------------------+-----+
|                        1|   33|
|                        0| 1967|
+-------------------------+-----+



Further cleaning and feature Engineering required for the data pipeline is continued in the other jupyter file.