# Market segmentation insurance

## 1. Introduction

## 2. Data information

## 3. ETL process

In [1]:
import os
import pandas as pd

In [2]:
PATH_DATA = os.path.join(os.getenv("PATH_DATA_PROJECTS"), "Tabular", "market_segmentation_insurance")

In [19]:
NUMERIC_FEATURES = ["balance", "balance_frequency", "purchases", "oneoff_purchases", "installments_purchases", "cash_advance", "purchases_frequency", 
                    "oneoff_purchases_frequency", "purchases_installments_frequency", "cash_advance_frequency", "cash_advance_trx", "purchases_trx", 
                    "credit_limit", "payments", "minimum_payments", "prc_full_payment"]

In [20]:
CATEGORICAL_FEATURES = ["tenure"]

### 3.1. Extract

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

In [4]:
spark = SparkSession.builder \
    .appName("Read CSV Market segmentation insurance") \
    .master("local[*]") \
    .getOrCreate()

In [5]:
schema = StructType([
    StructField("cust_id", StringType(), True),
    StructField("balance", StringType(), True),
    StructField("balance_frequency", StringType(), True),
    StructField("purchases", StringType(), True),
    StructField("oneoff_purchases", StringType(), True),
    StructField("installments_purchases", StringType(), True),
    StructField("cash_advance", StringType(), True),
    StructField("purchases_frequency", StringType(), True),
    StructField("oneoff_purchases_frequency", StringType(), True),
    StructField("purchases_installments_frequency", StringType(), True),
    StructField("cash_advance_frequency", StringType(), True),
    StructField("cash_advance_trx", StringType(), True),
    StructField("purchases_trx", StringType(), True),
    StructField("credit_limit", StringType(), True),
    StructField("payments", StringType(), True),
    StructField("minimum_payments", StringType(), True),
    StructField("prc_full_payment", StringType(), True),
    StructField("tenure", StringType(), True),
])

In [6]:
df = spark.read.csv(os.path.join(PATH_DATA, "CustomerData.csv"), header = True, schema = schema)

In [7]:
df.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- balance_frequency: string (nullable = true)
 |-- purchases: string (nullable = true)
 |-- oneoff_purchases: string (nullable = true)
 |-- installments_purchases: string (nullable = true)
 |-- cash_advance: string (nullable = true)
 |-- purchases_frequency: string (nullable = true)
 |-- oneoff_purchases_frequency: string (nullable = true)
 |-- purchases_installments_frequency: string (nullable = true)
 |-- cash_advance_frequency: string (nullable = true)
 |-- cash_advance_trx: string (nullable = true)
 |-- purchases_trx: string (nullable = true)
 |-- credit_limit: string (nullable = true)
 |-- payments: string (nullable = true)
 |-- minimum_payments: string (nullable = true)
 |-- prc_full_payment: string (nullable = true)
 |-- tenure: string (nullable = true)



In [8]:
df.show(5)

+-------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+-----------+----------------+----------------+------+
|cust_id|    balance|balance_frequency|purchases|oneoff_purchases|installments_purchases|cash_advance|purchases_frequency|oneoff_purchases_frequency|purchases_installments_frequency|cash_advance_frequency|cash_advance_trx|purchases_trx|credit_limit|   payments|minimum_payments|prc_full_payment|tenure|
+-------+-----------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+-----------+----------------+----------------+------+
| C10001|  40.900749|         0.818182|     95.4|               0|                  95.4|  

### 3.2. Transform

#### 3.2.1. Data profiling

In [9]:
from pyspark.sql.functions import col, count, when, lit, trim

##### 3.2.1.1. Identify missing values

In [10]:
missing_values_count = df.select([
    count(when((col(c).isNull()) | (trim(col(c)) == ""), c)).alias(c) 
    for c in df.columns
])
print("Total missing values by columns:")
missing_values_count.show(vertical = True)

Total missing values by columns:
-RECORD 0-------------------------------
 cust_id                          | 0   
 balance                          | 0   
 balance_frequency                | 0   
 purchases                        | 0   
 oneoff_purchases                 | 0   
 installments_purchases           | 0   
 cash_advance                     | 0   
 purchases_frequency              | 0   
 oneoff_purchases_frequency       | 0   
 purchases_installments_frequency | 0   
 cash_advance_frequency           | 0   
 cash_advance_trx                 | 0   
 purchases_trx                    | 0   
 credit_limit                     | 1   
 payments                         | 0   
 minimum_payments                 | 313 
 prc_full_payment                 | 0   
 tenure                           | 0   



##### 3.2.1.2. Identify duplicate values

In [11]:
df.groupBy(df.columns) \
    .count() \
    .filter(col("count") > 1) \
    .show()

+-------+-------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+--------+----------------+----------------+------+-----+
|cust_id|balance|balance_frequency|purchases|oneoff_purchases|installments_purchases|cash_advance|purchases_frequency|oneoff_purchases_frequency|purchases_installments_frequency|cash_advance_frequency|cash_advance_trx|purchases_trx|credit_limit|payments|minimum_payments|prc_full_payment|tenure|count|
+-------+-------+-----------------+---------+----------------+----------------------+------------+-------------------+--------------------------+--------------------------------+----------------------+----------------+-------------+------------+--------+----------------+----------------+------+-----+
+-------+-------+-----------------+---------+----------------+----------------------+---------

##### 3.2.1.3. Validate data consistency 

In [12]:
from pyspark.sql.functions import udf

In [13]:
def validate_number(number_string):
    try:
        float(number_string)
        return True
    except ValueError:
        return False

In [14]:
is_numeric = udf(validate_number)

In [None]:
df_with_validity = df.withColumn("balance_is_number", is_numeric(col("balance")))
df_with_validity.filter(col("balance_is_number") == False).show()

#### 3.2.2. Cleaning

##### 3.2.2.1. Update datatype

In [16]:
from pyspark.sql.types import FloatType

In [21]:
for c in NUMERIC_FEATURES:
    df = df.withColumn(c, col(c).cast(FloatType()))

In [22]:
df.printSchema()

root
 |-- cust_id: string (nullable = true)
 |-- balance: float (nullable = true)
 |-- balance_frequency: float (nullable = true)
 |-- purchases: float (nullable = true)
 |-- oneoff_purchases: float (nullable = true)
 |-- installments_purchases: float (nullable = true)
 |-- cash_advance: float (nullable = true)
 |-- purchases_frequency: float (nullable = true)
 |-- oneoff_purchases_frequency: float (nullable = true)
 |-- purchases_installments_frequency: float (nullable = true)
 |-- cash_advance_frequency: float (nullable = true)
 |-- cash_advance_trx: float (nullable = true)
 |-- purchases_trx: float (nullable = true)
 |-- credit_limit: float (nullable = true)
 |-- payments: float (nullable = true)
 |-- minimum_payments: float (nullable = true)
 |-- prc_full_payment: float (nullable = true)
 |-- tenure: string (nullable = true)



##### 3.2.2.2. Filling missing values

In [23]:
import pyspark.sql.functions as f

In [29]:
df.select(f.median(col("minimum_payments")), f.median(col("credit_limit"))).show()

+------------------------+--------------------+
|median(minimum_payments)|median(credit_limit)|
+------------------------+--------------------+
|      312.34393310546875|              3000.0|
+------------------------+--------------------+



In [27]:
minimum_payments_median = df.agg(f.median("minimum_payments")).collect()[0][0]
credit_limit_median = df.agg(f.median("credit_limit")).collect()[0][0]

In [30]:
df = df.withColumn(
    "minimum_payments",
    when(col("minimum_payments").isNull(), minimum_payments_median).otherwise(col("minimum_payments"))
)

In [31]:
df = df.withColumn(
    "credit_limit",
    when(col("credit_limit").isNull(), credit_limit_median).otherwise(col("credit_limit"))
)

In [32]:
missing_values_count = df.select([
    count(when((col(c).isNull()) | (trim(col(c)) == ""), c)).alias(c) 
    for c in df.columns
])
print("Total missing values by columns:")
missing_values_count.show(vertical = True)

Total missing values by columns:
-RECORD 0-------------------------------
 cust_id                          | 0   
 balance                          | 0   
 balance_frequency                | 0   
 purchases                        | 0   
 oneoff_purchases                 | 0   
 installments_purchases           | 0   
 cash_advance                     | 0   
 purchases_frequency              | 0   
 oneoff_purchases_frequency       | 0   
 purchases_installments_frequency | 0   
 cash_advance_frequency           | 0   
 cash_advance_trx                 | 0   
 purchases_trx                    | 0   
 credit_limit                     | 0   
 payments                         | 0   
 minimum_payments                 | 0   
 prc_full_payment                 | 0   
 tenure                           | 0   



##### 3.2.2.3. Remove white spaces

In [33]:
df = df.withColumn("tenure", trim(col("tenure")))

### 3.3. Load