In [2]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [3]:
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql.functions import count, desc, max,col, countDistinct
import os

In [4]:
conf = SparkConf().setAppName("Assignment-4")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [5]:
# import the data into spark df
people = spark.read.csv('/content/Fake_data.csv',header=True)

In [6]:
# Cast the income to double
people = people.withColumn("Income",col("Income").cast('double')).withColumn("Loan_Approved",col("Loan_Approved").cast('boolean'))

In [31]:
people.show()

+---+--------------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+
|_c0|       Birth_Country|               Email|First_Name|  Income|                 Job|Last_name|Loan_Approved|        SSN|
+---+--------------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+
|  0|Bosnia and Herzeg...|emily15@whitehead...|   Melissa|109957.0|Telecommunication...|  Miranda|        false|129-41-7773|
|  1|             Belgium|  ronald87@yahoo.com|    Curtis|301884.0| Animal nutritionist|  Garrett|         true|212-74-3976|
|  2|      United Kingdom|  hannah29@gmail.com|    Connor|341584.0|English as a fore...|   Steele|        false|024-35-3834|
|  3|            Kiribati|derrick59@hotmail...|      Adam|448293.0|Surveyor, commerc...|   Newman|        false|157-82-4486|
|  4|            Malaysia|wendycarpenter@wa...|     Jared| 53621.0|   Drilling engineer|     Mann|         true|199-56-2824|


In [8]:
# Creating spark temp view
people.createOrReplaceTempView("people_tbl")

In [9]:
people.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Birth_Country: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- First_Name: string (nullable = true)
 |-- Income: double (nullable = true)
 |-- Job: string (nullable = true)
 |-- Last_name: string (nullable = true)
 |-- Loan_Approved: boolean (nullable = true)
 |-- SSN: string (nullable = true)



### Part 1

In [10]:
people.groupBy('Birth_Country').agg(count('*').alias('people_Count')).orderBy(desc('people_Count')).head()

Row(Birth_Country='Korea', people_Count=91)

In [11]:
spark.sql("SELECT Birth_Country, count(*) as people_Count FROM people_tbl group by 1 order by 2 desc limit 1").show()

+-------------+------------+
|Birth_Country|people_Count|
+-------------+------------+
|        Korea|          91|
+-------------+------------+



**Korea is the birth country which has maximum number of people.**

### Part 2

In [12]:
people.groupBy('Birth_Country').agg(max('Income').alias('Max_Income')).orderBy(desc('Max_Income')).show(1000,truncate=False)

+---------------------------------------------------+----------+
|Birth_Country                                      |Max_Income|
+---------------------------------------------------+----------+
|Cook Islands                                       |499913.0  |
|Uzbekistan                                         |499720.0  |
|Greenland                                          |499690.0  |
|Cuba                                               |499636.0  |
|Albania                                            |499562.0  |
|Mauritania                                         |499558.0  |
|Oman                                               |499521.0  |
|Togo                                               |499370.0  |
|Somalia                                            |499294.0  |
|San Marino                                         |499238.0  |
|Finland                                            |499207.0  |
|Belgium                                            |499095.0  |
|Peru                    

In [13]:
spark.sql("SELECT Birth_Country,Max(Income) as Max_Income FROM people_tbl group by 1 order by 2 desc").show(1000,truncate=False)

+---------------------------------------------------+----------+
|Birth_Country                                      |Max_Income|
+---------------------------------------------------+----------+
|Cook Islands                                       |499913.0  |
|Uzbekistan                                         |499720.0  |
|Greenland                                          |499690.0  |
|Cuba                                               |499636.0  |
|Albania                                            |499562.0  |
|Mauritania                                         |499558.0  |
|Oman                                               |499521.0  |
|Togo                                               |499370.0  |
|Somalia                                            |499294.0  |
|San Marino                                         |499238.0  |
|Finland                                            |499207.0  |
|Belgium                                            |499095.0  |
|Peru                    

### Part 3

In [14]:
people.filter((people.Income>100000) & (people.Loan_Approved)).count()

3958

In [15]:
spark.sql("SELECT count(*) as count FROM people_tbl where Income>100000 and Loan_Approved=True").show()

+-----+
|count|
+-----+
| 3958|
+-----+



**3958 people have income > 100,000 with an approved loan.**

### Part 4

In [16]:
people.filter(people.Birth_Country=='United States of America').orderBy(desc('Income')).select('First_Name','Income','Job').show(10)

+----------+--------+--------------------+
|First_Name|  Income|                 Job|
+----------+--------+--------------------+
|    Alyssa|482588.0|Amenity horticult...|
|    Hunter|468946.0|Psychologist, pri...|
|      Rose|426115.0|Adult guidance wo...|
|  Danielle|389810.0|Furniture conserv...|
|     Terry|380410.0|       Meteorologist|
|     Cindy|370322.0|Research scientis...|
|     Scott|368913.0|       Art therapist|
|   Christy|355150.0|      Engineer, land|
|     Kelly|341448.0|           Press sub|
|  Kristina|338804.0|           Herbalist|
+----------+--------+--------------------+
only showing top 10 rows



In [27]:
spark.sql("SELECT First_name,Income,Job FROM people_tbl where Birth_Country='United States of America' order by 2 desc limit 10").show()

+----------+--------+--------------------+
|First_name|  Income|                 Job|
+----------+--------+--------------------+
|    Alyssa|482588.0|Amenity horticult...|
|    Hunter|468946.0|Psychologist, pri...|
|      Rose|426115.0|Adult guidance wo...|
|  Danielle|389810.0|Furniture conserv...|
|     Terry|380410.0|       Meteorologist|
|     Cindy|370322.0|Research scientis...|
|     Scott|368913.0|       Art therapist|
|   Christy|355150.0|      Engineer, land|
|     Kelly|341448.0|           Press sub|
|  Kristina|338804.0|           Herbalist|
+----------+--------+--------------------+



### Part 5

In [18]:
people.groupBy('Job').count().count()

639

In [19]:
spark.sql("SELECT count (distinct Job) as count_distinct_jobs FROM people_tbl").show()

+-------------------+
|count_distinct_jobs|
+-------------------+
|                639|
+-------------------+



### Part 6

In [20]:
people.filter((people.Job=='Writer') & (people.Income<100000) ).count()

5

In [21]:
spark.sql("SELECT count(*) as count_writer_lt_100k FROM people_tbl where job='Writer' and Income<100000").show()

+--------------------+
|count_writer_lt_100k|
+--------------------+
|                   5|
+--------------------+



### Part 7

In [22]:
people.filter(((people.First_Name == 'Stephen') | (people.First_Name == 'Alexandra')) & (people.Income>200000)).groupBy('Birth_Country').agg(count('Birth_Country').alias('count')).orderBy(desc('count')).show(5)

+--------------------+-----+
|       Birth_Country|count|
+--------------------+-----+
|             Algeria|    2|
|       Faroe Islands|    2|
|United States Min...|    2|
|              Malawi|    1|
|                Fiji|    1|
+--------------------+-----+
only showing top 5 rows



In [23]:
spark.sql("SELECT Birth_Country,count(*) as count FROM people_tbl where First_Name in ('Alexandra','Stephen') and Income>200000 group by 1 order by 2 desc limit 5").show()

+--------------------+-----+
|       Birth_Country|count|
+--------------------+-----+
|             Algeria|    2|
|       Faroe Islands|    2|
|United States Min...|    2|
|             Tokelau|    1|
|         Philippines|    1|
+--------------------+-----+



### Part 8

In [24]:
people.groupBy("SSN").count().filter("count > 1").select("SSN").show()

+-----------+
|        SSN|
+-----------+
|879-74-5521|
+-----------+



In [25]:
people.filter((people.SSN=="879-74-5521")).show()

+----+---------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+
| _c0|  Birth_Country|               Email|First_Name|  Income|                 Job|Last_name|Loan_Approved|        SSN|
+----+---------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+
|3249|          Qatar|christine35@shaw.com|     Kathy|319962.0|    Textile designer|    Hicks|         true|879-74-5521|
|3378|Kyrgyz Republic|stantontina@mcgee...|    Carrie|277542.0|Therapist, hortic...|  Lambert|        false|879-74-5521|
+----+---------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+



In [28]:
people.groupBy("SSN").count().filter("count > 1").select("SSN").collect()[0][0]

'879-74-5521'

In [26]:
spark.sql("SELECT SSN,count(*) as count FROM people_tbl group by 1 having count(*)>1").show()

+-----------+-----+
|        SSN|count|
+-----------+-----+
|879-74-5521|    2|
+-----------+-----+



In [29]:
spark.sql("SELECT * from people_tbl where SSN = '879-74-5521'").show()

+----+---------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+
| _c0|  Birth_Country|               Email|First_Name|  Income|                 Job|Last_name|Loan_Approved|        SSN|
+----+---------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+
|3249|          Qatar|christine35@shaw.com|     Kathy|319962.0|    Textile designer|    Hicks|         true|879-74-5521|
|3378|Kyrgyz Republic|stantontina@mcgee...|    Carrie|277542.0|Therapist, hortic...|  Lambert|        false|879-74-5521|
+----+---------------+--------------------+----------+--------+--------------------+---------+-------------+-----------+



In [30]:
spark.sql("SELECT SSN from (SELECT SSN,count(*) as count FROM people_tbl group by 1 having count(*)>1)").collect()[0][0]

'879-74-5521'