## Problem Statement:  <br>
XYZ credit union in Latin America is performing very well in selling the Banking products (eg: Credit card, deposit account, retirement account, safe deposit box etc) but their existing customer is not not buying more than 1 product which means bank is not performing good in cross selling (Bank is not able to sell their other offerings to existing customer). XYZ Credit Union decided to approach ABC analytics to solve their problem.

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Cross_Selling_Project").getOrCreate()
spark

In [3]:
# reading the csv
path = "FinalProjectDataset/"
cs_data = spark.read.csv(path+"Train.csv",inferSchema=True,header=True)

### Column Name and its Description:

**fecha_dato**	The table is partitioned for this column<br>
**ncodpers**	Customer code<br>
**ind_empleado**	Employee index: **A** active, **B** ex employed, **F** filial, **N** not employee, **P** pasive<br>
**pais_residencia**	Customer's Country residence<br>
**sexo**	Customer's sex<br>
**age**	Age<br>
**fecha_alta**	The date in which the customer became as the first holder of a contract in the bank<br>
**ind_nuevo**	New customer Index. 1 if the customer registered in the last 6 months.<br>
**antiguedad**	Customer seniority (in months)<br>
**indrel**	1 (First/Primary), 99 (Primary customer during the month but not at the end of the month)<br>
**ult_fec_cli_1t**	Last date as primary customer (if he isn't at the end of the month)<br>
**indrel_1mes**	Customer type at the beginning of the month ,1 (First/Primary customer), 2 (co-owner ),P (Potential),3 (former primary), 4(former co-owner)<br>
**tiprel_1mes**	Customer relation type at the beginning of the month, A (active), I (inactive), P (former customer),R (Potential)<br>
**indresi**	Residence index (S (Yes) or N (No) if the residence country is the same than the bank country)<br>
**indext**	Foreigner index (S (Yes) or N (No) if the customer's birth country is different than the bank country)<br>
**conyuemp**	Spouse index. 1 if the customer is spouse of an employee<br>
**canal_entrada**	channel used by the customer to join<br>
**indfall**	Deceased index. N/S<br>
**tipodom**	Addres type. 1, primary address<br>
**cod_prov**	Province code (customer's address)<br>
**nomprov**	Province name<br>
**ind_actividad_cliente**	Activity index (1, active customer; 0, inactive customer)<br>
**renta**	Gross income of the household<br>
**segmento**	segmentation: 01 - VIP, 02 - Individuals 03 - college graduated<br>
**ind_ahor_fin_ult1**	Saving Account<br>
**ind_aval_fin_ult1**	Guarantees<br>
**ind_cco_fin_ult1**	Current Accounts<br>
**ind_cder_fin_ult1**	Derivada Account<br>
**ind_cno_fin_ult1**	Payroll Account<br>
**ind_ctju_fin_ult1**	Junior Account<br>
**ind_ctma_fin_ult1**	Más particular Account<br>
**ind_ctop_fin_ult1**	particular Account<br>
**ind_ctpp_fin_ult1**	particular Plus Account<br>
**ind_deco_fin_ult1**	Short-term deposits<br>
**ind_deme_fin_ult1**	Medium-term deposits<br>
**ind_dela_fin_ult1**	Long-term deposits<br>
**ind_ecue_fin_ult1**	e-account<br>
**ind_fond_fin_ult1**	Funds<br>
**ind_hip_fin_ult1**	Mortgage<br>
**ind_plan_fin_ult1**	Pensions<br>
**ind_pres_fin_ult1**	Loans<br>
**ind_reca_fin_ult1**	Taxes<br>
**ind_tjcr_fin_ult1**	Credit Card<br>
**ind_valo_fin_ult1**	Securities<br>
**ind_viv_fin_ult1**	Home Account<br>
**ind_nomina_ult1**	Payroll<br>
**ind_nom_pens_ult1**	Pensions<br>
**ind_recibo_ult1**	Direct Debit<br>

In [4]:
cs_data.limit(4).toPandas()

Unnamed: 0,fecha_dato,ncodpers,ind_empleado,pais_residencia,sexo,age,fecha_alta,ind_nuevo,antiguedad,indrel,...,ind_hip_fin_ult1,ind_plan_fin_ult1,ind_pres_fin_ult1,ind_reca_fin_ult1,ind_tjcr_fin_ult1,ind_valo_fin_ult1,ind_viv_fin_ult1,ind_nomina_ult1,ind_nom_pens_ult1,ind_recibo_ult1
0,2015-01-28,1375586.0,N,ES,H,35,2015-01-12,0,6,1,...,0,0,0,0,0,0,0,0,0,0
1,2015-01-28,1050611.0,N,ES,V,23,2012-08-10,0,35,1,...,0,0,0,0,0,0,0,0,0,0
2,2015-01-28,1050612.0,N,ES,V,23,2012-08-10,0,35,1,...,0,0,0,0,0,0,0,0,0,0
3,2015-01-28,1050613.0,N,ES,H,22,2012-08-10,0,35,1,...,0,0,0,0,0,0,0,0,0,0


**Finding the number of rows and columns in the dataset:**

In [5]:
# Number of rows:
cs_data.count()

13647309

In [6]:
# Number of Columns:
len(cs_data.columns)

48

**Verifing if pyspark has infered the datatypes correctly:**

In [7]:
# Dataset structure
cs_data.printSchema()

root
 |-- fecha_dato: string (nullable = true)
 |-- ncodpers: double (nullable = true)
 |-- ind_empleado: string (nullable = true)
 |-- pais_residencia: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- age: string (nullable = true)
 |-- fecha_alta: string (nullable = true)
 |-- ind_nuevo: string (nullable = true)
 |-- antiguedad: string (nullable = true)
 |-- indrel: string (nullable = true)
 |-- ult_fec_cli_1t: string (nullable = true)
 |-- indrel_1mes: string (nullable = true)
 |-- tiprel_1mes: string (nullable = true)
 |-- indresi: string (nullable = true)
 |-- indext: string (nullable = true)
 |-- conyuemp: string (nullable = true)
 |-- canal_entrada: string (nullable = true)
 |-- indfall: string (nullable = true)
 |-- tipodom: string (nullable = true)
 |-- cod_prov: string (nullable = true)
 |-- nomprov: string (nullable = true)
 |-- ind_actividad_cliente: string (nullable = true)
 |-- renta: double (nullable = true)
 |-- segmento: string (nullable = true)
 |-- in

**Correcting the Datatypes** of columns using **pyspark.sql.functions** library: <br>

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [9]:
cs_data.select('age').show(5)

+---+
|age|
+---+
| 35|
| 23|
| 23|
| 22|
| 23|
+---+
only showing top 5 rows



In [10]:
# Changing age to integer
cs_data = cs_data.withColumn("age", cs_data["age"].cast(IntegerType()))

In [12]:
cs_data.schema["age"].dataType

IntegerType

In [14]:
cs_data.select('age').distinct().count()

121

## Handling missing data in the dataframe

Creating a **function to count the number of missing data:**

In [15]:
def count_null_values(df):
    null_columns = []
    totalRows = df.count()
    for k in df.columns:        
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows,((nullRows/totalRows)*100)
            null_columns.append(temp)
    return(null_columns)

In [16]:
null_columns_list = count_null_values(cs_data)

In [17]:
missing_data_df = spark.createDataFrame(null_columns_list, ['Column_Name', 'Null_Values_Count','Null_Value_%'])

In [18]:
missing_data_df = missing_data_df.withColumn('Null_Value_%', \
                                               round(missing_data_df['Null_Value_%'],3))

In [19]:
missing_data_df.orderBy(missing_data_df['Null_Value_%'].desc()).show(truncate=False)

+---------------+-----------------+------------+
|Column_Name    |Null_Values_Count|Null_Value_%|
+---------------+-----------------+------------+
|conyuemp       |13645501         |99.987      |
|ult_fec_cli_1t |13622516         |99.818      |
|renta          |2794375          |20.476      |
|segmento       |189368           |1.388       |
|canal_entrada  |186126           |1.364       |
|indrel_1mes    |149781           |1.098       |
|tiprel_1mes    |149781           |1.098       |
|nomprov        |93591            |0.686       |
|sexo           |27804            |0.204       |
|indresi        |27734            |0.203       |
|indext         |27734            |0.203       |
|indfall        |27734            |0.203       |
|ind_empleado   |27734            |0.203       |
|fecha_alta     |27734            |0.203       |
|age            |27734            |0.203       |
|pais_residencia|27734            |0.203       |
+---------------+-----------------+------------+

