<a href="https://colab.research.google.com/github/amretapandey/BankMarketing-BigDataProject/blob/main/bank_prospects_transformation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PROBLEM STATEMENT**

A bank wants to identify prospects it can target in its marketing campaign. It has received prospect data from various internal and 3rd party sources. The data has various issues such as missing or unknown values in certain fields. The data needs to be cleansed before analysis. 

Using Big Data Hadoop and Spark technology cleanse, transform and analyze this data.



In [None]:
# Install JDK, Spark and Set Environment Variables to Create a Spark Session
!wget https://dlcdn.apache.org/spark/spark-3.1.3/spark-3.1.3-bin-hadoop3.2.tgz
!tar -xvf spark-3.1.3-bin-hadoop3.2.tgz
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.3-bin-hadoop3.2"
!pip install findspark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
# Copy bank prospects data
!wget https://raw.githubusercontent.com/amretapandey/BankMarketing-BigDataProject/main/bank_prospects.csv

In [62]:
# Create a dataframe from the csv file
bankProspectsDF = spark.read.csv("bank_prospects.csv",header=True)

In [63]:
bankProspectsDF.show()

+----+------+------+-------+---------+
| Age|Salary|Gender|Country|Purchased|
+----+------+------+-------+---------+
|  18| 20000|  Male|Germany|        N|
|  19| 22000|Female| France|        N|
|  20| 24000|Female|England|        N|
|  21|  null|  Male|England|        N|
|  22| 50000|  Male| France|        Y|
|  23| 35000|Female|England|        N|
|  24|  null|  Male|Germany|        N|
|  25| 32000|Female| France|        Y|
|null| 35000|  Male|Germany|        N|
|  27| 37000|Female| France|        N|
|  27| 37000|Female|unknown|        N|
+----+------+------+-------+---------+



In [64]:
bankProspectsDF.printSchema()

root
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



<br>

**1. Change the schema of the dataframe**

In [65]:
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import col

bankProspectsDF1 = bankProspectsDF.withColumn("Age", col("Age").cast(IntegerType())).withColumn("Salary", col("Salary").cast(FloatType()))
bankProspectsDF1.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Salary: float (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



**2. Remove tuples with unknown in Country attribute**




In [66]:
bankProspectsDF2 = bankProspectsDF1.filter('Country != "unknown"')
bankProspectsDF2.show()

+----+-------+------+-------+---------+
| Age| Salary|Gender|Country|Purchased|
+----+-------+------+-------+---------+
|  18|20000.0|  Male|Germany|        N|
|  19|22000.0|Female| France|        N|
|  20|24000.0|Female|England|        N|
|  21|   null|  Male|England|        N|
|  22|50000.0|  Male| France|        Y|
|  23|35000.0|Female|England|        N|
|  24|   null|  Male|Germany|        N|
|  25|32000.0|Female| France|        Y|
|null|35000.0|  Male|Germany|        N|
|  27|37000.0|Female| France|        N|
+----+-------+------+-------+---------+



**3. Replace missing Age values with average age**

In [67]:
from pyspark.sql.functions import mean

avgAge = bankProspectsDF2.select(mean("Age")).collect()[0][0]
print(avgAge)

22.11111111111111


In [68]:
bankProspectsDF3 = bankProspectsDF2.na.fill(avgAge, "Age")
bankProspectsDF3.show()

+---+-------+------+-------+---------+
|Age| Salary|Gender|Country|Purchased|
+---+-------+------+-------+---------+
| 18|20000.0|  Male|Germany|        N|
| 19|22000.0|Female| France|        N|
| 20|24000.0|Female|England|        N|
| 21|   null|  Male|England|        N|
| 22|50000.0|  Male| France|        Y|
| 23|35000.0|Female|England|        N|
| 24|   null|  Male|Germany|        N|
| 25|32000.0|Female| France|        Y|
| 22|35000.0|  Male|Germany|        N|
| 27|37000.0|Female| France|        N|
+---+-------+------+-------+---------+



**4. Replace missing Salary values with average salary**

In [69]:
avgSalary = bankProspectsDF3.select(mean("Salary")).collect()[0][0]
print(avgSalary)

31875.0


In [70]:
bankProspectsDF4 = bankProspectsDF3.na.fill(avgSalary, "Salary")
bankProspectsDF4.show()

+---+-------+------+-------+---------+
|Age| Salary|Gender|Country|Purchased|
+---+-------+------+-------+---------+
| 18|20000.0|  Male|Germany|        N|
| 19|22000.0|Female| France|        N|
| 20|24000.0|Female|England|        N|
| 21|31875.0|  Male|England|        N|
| 22|50000.0|  Male| France|        Y|
| 23|35000.0|Female|England|        N|
| 24|31875.0|  Male|Germany|        N|
| 25|32000.0|Female| France|        Y|
| 22|35000.0|  Male|Germany|        N|
| 27|37000.0|Female| France|        N|
+---+-------+------+-------+---------+



# Saving the transformed dataframe to csv

In [72]:
!rm -r bank_prospects-transformed
bankProspectsDF4.write.format("csv").save("bank_prospects-transformed")

In [76]:
!cat bank_prospects-transformed/*

18,20000.0,Male,Germany,N
19,22000.0,Female,France,N
20,24000.0,Female,England,N
21,31875.0,Male,England,N
22,50000.0,Male,France,Y
23,35000.0,Female,England,N
24,31875.0,Male,Germany,N
25,32000.0,Female,France,Y
22,35000.0,Male,Germany,N
27,37000.0,Female,France,N
