## Install JDK
## Install Spark
## Set Environment variables
## Create a Spark Session

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.6.tgz
!tar -xvf spark-2.4.3-bin-hadoop2.6.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.6"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession \
          .builder \
          .master("local[*]") \
          .enableHiveSupport() \
          .getOrCreate() \

In [7]:
!wget https://raw.githubusercontent.com/futurexskill/bigdata/master/bank_prospects.csv

--2020-06-14 01:51:58--  https://raw.githubusercontent.com/futurexskill/bigdata/master/bank_prospects.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 306 [text/plain]
Saving to: ‘bank_prospects.csv’


2020-06-14 01:51:58 (12.4 MB/s) - ‘bank_prospects.csv’ saved [306/306]



In [8]:
!ls

bank_prospects.csv  sample_data		       spark-2.4.3-bin-hadoop2.6.tgz
retailstore.csv     spark-2.4.3-bin-hadoop2.6


In [0]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import mean

## Extracting Bank Prospects Data Frame from CSV File


In [26]:
bankProspectsDf = spark.read.csv('bank_prospects.csv', header=True, inferSchema=True)
bankProspectsDf.show()

+----+------+------+-------+---------+
| Age|Salary|Gender|Country|Purchased|
+----+------+------+-------+---------+
|  18| 20000|  Male|Germany|        N|
|  19| 22000|Female| France|        N|
|  20| 24000|Female|England|        N|
|  21|  null|  Male|England|        N|
|  22| 50000|  Male| France|        Y|
|  23| 35000|Female|England|        N|
|  24|  null|  Male|Germany|        N|
|  25| 32000|Female| France|        Y|
|null| 35000|  Male|Germany|        N|
|  27| 37000|Female| France|        N|
|  27| 37000|Female|unknown|        N|
+----+------+------+-------+---------+



In [27]:
bankProspectsDf.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



## Changing Salary Column Datatype to Float

In [28]:
bankProspectsDf2 = bankProspectsDf.withColumn("Salary", bankProspectsDf.Salary.cast(FloatType()))
bankProspectsDf2.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Salary: float (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Purchased: string (nullable = true)



## Cleaning Rows with Unknown Countries

In [29]:
no_unknown_countryDf = bankProspectsDf2.filter(bankProspectsDf2.Country != 'unknown')
no_unknown_countryDf.show()

+----+-------+------+-------+---------+
| Age| Salary|Gender|Country|Purchased|
+----+-------+------+-------+---------+
|  18|20000.0|  Male|Germany|        N|
|  19|22000.0|Female| France|        N|
|  20|24000.0|Female|England|        N|
|  21|   null|  Male|England|        N|
|  22|50000.0|  Male| France|        Y|
|  23|35000.0|Female|England|        N|
|  24|   null|  Male|Germany|        N|
|  25|32000.0|Female| France|        Y|
|null|35000.0|  Male|Germany|        N|
|  27|37000.0|Female| France|        N|
+----+-------+------+-------+---------+



## Calculating Avg Age and Avg Salary

In [30]:
meanAge = no_unknown_countryDf.select(mean(no_unknown_countryDf.Age)).collect()
meanAgeVal = meanAge[0][0]
print('Mean Age Value', meanAgeVal)

Mean Age Value 22.11111111111111


In [31]:
meanSalary = no_unknown_countryDf.select(mean(no_unknown_countryDf.Salary)).collect()
meanSalaryVal = meanSalary[0][0]
print('Mean Salary Value', meanSalaryVal)

Mean Salary Value 31875.0


## Replacing Null Values with Avg Values

In [33]:
no_nullDf = no_unknown_countryDf.fillna({"Age": meanAgeVal, "Salary": meanSalaryVal});
no_nullDf.show()

+---+-------+------+-------+---------+
|Age| Salary|Gender|Country|Purchased|
+---+-------+------+-------+---------+
| 18|20000.0|  Male|Germany|        N|
| 19|22000.0|Female| France|        N|
| 20|24000.0|Female|England|        N|
| 21|31875.0|  Male|England|        N|
| 22|50000.0|  Male| France|        Y|
| 23|35000.0|Female|England|        N|
| 24|31875.0|  Male|Germany|        N|
| 25|32000.0|Female| France|        Y|
| 22|35000.0|  Male|Germany|        N|
| 27|37000.0|Female| France|        N|
+---+-------+------+-------+---------+



## Writing the transformed data frame to a new csv file

In [0]:
no_nullDf.write.format('csv').save('BankProspects_Transformed')

In [35]:
!ls

bank_prospects.csv	   retailstore.csv  spark-2.4.3-bin-hadoop2.6
BankProspects_Transformed  sample_data	    spark-2.4.3-bin-hadoop2.6.tgz


In [36]:
!ls BankProspects_Transformed/

part-00000-de2ab7ed-8eed-48c9-99f2-97f9940631bf-c000.csv  _SUCCESS


In [0]:
!mv BankProspects_Transformed/part-00000-de2ab7ed-8eed-48c9-99f2-97f9940631bf-c000.csv BankProspects_Transformed/bank_prospects_clean.csv

In [38]:
!cat BankProspects_Transformed/bank_prospects_clean.csv

18,20000.0,Male,Germany,N
19,22000.0,Female,France,N
20,24000.0,Female,England,N
21,31875.0,Male,England,N
22,50000.0,Male,France,Y
23,35000.0,Female,England,N
24,31875.0,Male,Germany,N
25,32000.0,Female,France,Y
22,35000.0,Male,Germany,N
27,37000.0,Female,France,N


## Loading Transformed Data into Hive Table


In [0]:
no_nullDf.write.mode("overwrite").saveAsTable("bank.prospectclean")
spark.sql("select * from bank.prospectclean").show()