In [None]:
#Download file from https://drive.google.com/file/d/1_9rigZEx2jwPd3Dq93b3xOCN2uJY1ypF/view?usp=sharing

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

#Load the csv file and specify that we have header

In [None]:
mydata = spark.read.format("csv").option("header","true").load("original.csv")

In [None]:
mydata.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

#Deal With Null Values

In [None]:
from pyspark.sql.functions import *
mydata2 = mydata.withColumn('clean_city', when(mydata.City.isNull(),'Unknown').otherwise(mydata.City))

In [None]:
mydata2.filter(mydata2.City.isNull()).show()

+---+----------+---------+------+----+--------+---------+----------+-----------+----------+
| id|first_name|last_name|gender|City|JobTitle|   Salary|  Latitude|  Longitude|clean_city|
+---+----------+---------+------+----+--------+---------+----------+-----------+----------+
|  3|    Alvera| Di Boldi|Female|null|    null|$57576.52|39.9947462|116.3397725|   Unknown|
+---+----------+---------+------+----+--------+---------+----------+-----------+----------+



In [None]:
mydata2 = mydata2.filter(mydata2.JobTitle.isNotNull())

In [None]:
mydata2.filter(mydata2.JobTitle.isNull()).show()

+---+----------+---------+------+----+--------+------+--------+---------+----------+
| id|first_name|last_name|gender|City|JobTitle|Salary|Latitude|Longitude|clean_city|
+---+----------+---------+------+----+--------+------+--------+---------+----------+
+---+----------+---------+------+----+--------+------+--------+---------+----------+



In [None]:
#The Salary column has $ sign and is a string column. Make it float.
mydata2 = mydata2.withColumn('clean_salary', mydata2.Salary.substr(2,100).cast('float'))

In [None]:
mydata2.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('clean_city', 'string'),
 ('clean_salary', 'float')]

In [None]:
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil 

#If Longitude or latitude is null, populate it with mean

In [None]:
mean = mydata2.groupBy().avg('clean_salary').take(1)[0][0] #get first row and first value of dataframe. the dataframe includes only 1 value
mean

55516.32088199837

In [None]:
#If salary is null, replace with average salary
mydata2 = mydata2.withColumn('new_salary', when(mydata2.clean_salary.isNull(),lit(mean)).otherwise(mydata2.clean_salary))

In [None]:
import numpy as np
latitudes = mydata2.select('Latitude')

In [None]:
latitudes.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|      null|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
+----------+
only showing top 20 rows



#Get mean of not null values

In [None]:
latitudes = latitudes.filter(latitudes.Latitude.isNotNull())

In [None]:
latitudes = latitudes.withColumn('latitude2', latitudes.Latitude.cast('float')).select('latitude')

In [None]:
latitudes.show()

+----------+
|  latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
|48.4902808|
+----------+
only showing top 20 rows



In [None]:
import numpy as np
latitudes = latitudes.withColumn('latitude2', latitudes.latitude.cast('float')).select('latitude2')

In [None]:
median = np.median(latitudes.collect())

In [None]:
print(median)

31.93397331237793


In [None]:
mydata2 = mydata2.withColumn('lat', when(mydata2.Latitude.isNull(), lit(median)).otherwise(mydata2.Latitude))

#Average salary by gender

In [None]:
genders = mydata2.groupBy('gender').agg(avg('new_salary')).alias('AvgSalary')

In [None]:
genders.show()

+------+------------------+
|gender|   avg(new_salary)|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [None]:
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|     

#Get average salary by gender and job title

In [None]:
group_cols = ["JobTitle", "Gender"]
salary_gender_job = mydata2.groupBy(group_cols).agg(avg('clean_salary')).alias('average_salary')

In [None]:
salary_gender_job.show()

+--------------------+------+------------------+
|            JobTitle|Gender| avg(clean_salary)|
+--------------------+------+------------------+
| Staff Accountant IV|  Male|      73921.328125|
| Clinical Specialist|Female|47270.971923828125|
|       Accountant IV|Female|   82732.248046875|
|Automation Specia...|  Male|     90030.3515625|
| Marketing Assistant|  Male|  47679.7607421875|
|Compensation Analyst|  Male| 39083.09814453125|
|Research Assistan...|Female|   35185.919921875|
|    Graphic Designer|  Male|57231.927408854164|
|        Developer IV|Female|    42080.21171875|
|Human Resources A...|Female|     76556.5703125|
|       Social Worker|  Male|   67558.255859375|
| Biostatistician III|  Male|         49262.125|
|           Professor|Female|     34906.6484375|
| Office Assistant IV|Female|     71279.6015625|
|Software Engineer II|  Male|      74782.640625|
|Quality Control S...|Female| 59387.18505859375|
|             Teacher|  Male|64773.052083333336|
|     Web Designer I

#Highest Salaries by City

In [None]:
salary_city = mydata2.groupBy('City').agg(avg('clean_salary').alias('average_salary'))

In [None]:
salary_city = salary_city.sort(col('average_salary').desc())

In [None]:
salary_city.show()

+-----------------+--------------+
|             City|average_salary|
+-----------------+--------------+
|        Mesopotam|   99948.28125|
|       Zhongcheng|  99942.921875|
|           Caxias| 99786.3984375|
|      Karangtawar| 99638.9921875|
|        Itabaiana|   99502.15625|
|           Pasian|   99421.34375|
|           Webuye|  99368.546875|
|      Yuktae-dong|  99250.828125|
|           Zinder|   99222.84375|
|   Timiryazevskiy|    99142.9375|
|        Sawahbaru| 99013.7109375|
|          Madimba| 98737.8671875|
|         Huangshi|   98690.34375|
|          Gharyan|    98679.3125|
|         Yŏnan-ŭp|  98628.609375|
|     Wringinputih| 98603.8203125|
|Monte da Boavista|   98586.71875|
|          Klukeng| 98439.4921875|
|         Murmashi|   98226.15625|
|        Fox Creek|       98138.0|
+-----------------+--------------+
only showing top 20 rows

