#### Start a simple Spark Session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

#### Read the CSV

In [2]:
df = spark.read.csv('../data/original.csv', header=True, inferSchema=True)

#### If city is null, then set city to 'Unknown'

In [3]:
from pyspark.sql.functions import *

df_city = df.withColumn('City_N',when(df.City.isNull(),'Unknown').otherwise(df.City))
df_city.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|         City_N|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52|39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Femal

### If the Job title is null, then delete the row

In [4]:
df_jt = df.filter(df.JobTitle.isNotNull())
df_jt.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36|45.1905186|  0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Pr

### If Salary is null then replace it with Average salary

In [5]:
df_sal = df.withColumn('Salary_fix', df.Salary.substr(2,100).cast('float'))
mean_salary = df_sal.groupBy().avg('Salary_fix').take(1)[0][0]
print(mean_salary)
df_sal = df_sal.withColumn('New_salary', when(df_sal.Salary_fix.isNull(),lit(mean_salary)).otherwise(df_sal.Salary_fix))

df_sal.show()

55487.95562890625
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+----------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|Salary_fix|      New_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+----------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|  57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|   62846.6|   62846.6015625|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52|39.9947462|116.3397725|  57576.52|  57576.51953125|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi| 

### Update the Latitude with Median where latitude value is missing/NULL

In [6]:
latitude = df_sal.select('Latitude')

latitude = latitude.filter(latitude.Latitude.isNotNull())

latitude = latitude.withColumn('Latitude_N', latitude.Latitude.cast('float')).select('Latitude_N')

latitude.show()

import numpy as np

median = np.median(latitude.collect())
print(median)

df_sal =df_sal.withColumn('lat', when(df_sal.Latitude.isNull(), lit(median)).otherwise(df_sal.Latitude))
df_sal.show()

+----------+
|Latitude_N|
+----------+
| 50.577408|
|  48.82316|
| 39.994747|
| 44.504723|
| 53.426613|
| 24.879416|
| 45.190517|
| 32.027935|
|  4.272793|
|     -5.85|
|  39.17238|
|  49.81518|
|  42.10148|
|  49.79233|
| 43.494576|
| 52.744167|
| 38.696247|
|-7.7232566|
| 40.717205|
|  49.16291|
+----------+
only showing top 20 rows

31.93397331237793
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+----------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|Salary_fix|      New_salary|              lat|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+----------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|  57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|      

#### By Job Title , Men & women get paid on average

In [7]:

genders = df_sal.groupBy(df_sal.gender).agg(avg(df_sal.New_salary).alias('avg_salary'))

genders.show()

+------+-----------------+
|gender|       avg_salary|
+------+-----------------+
|Female|55618.94298820185|
|  Male|55361.09385573019|
+------+-----------------+



#### Per Job-role who get paid more gender wise

In [27]:
df_new = df_sal.withColumn('female_sal', when(df_sal.gender == 'Female', df_sal.New_salary).otherwise(lit(0)))
df_final = df_new.withColumn('male_sal', when(df_new.gender == 'Male', df_new.New_salary).otherwise(lit(0)))

df_agg = df_final.groupBy(df_final.JobTitle).agg(avg("female_sal").alias('avg_female_sal'), avg(df_final.male_sal).alias('avg_male_sal'))
df_agg = df_agg.withColumn('delta', df_agg.avg_female_sal - df_agg.avg_male_sal)
df_agg.show(truncate=False)

+-----------------------------+------------------+------------------+-------------------+
|JobTitle                     |avg_female_sal    |avg_male_sal      |delta              |
+-----------------------------+------------------+------------------+-------------------+
|Systems Administrator II     |50590.474609375   |15540.9501953125  |35049.5244140625   |
|Media Manager III            |29586.436197916668|17381.920572916668|12204.515625       |
|Recruiting Manager           |34848.452473958336|26383.4951171875  |8464.957356770836  |
|Geologist III                |31749.046875      |12830.75390625    |18918.29296875     |
|Geologist II                 |0.0               |43293.865234375   |-43293.865234375   |
|Database Administrator IV    |0.0               |52018.4609375     |-52018.4609375     |
|Financial Analyst            |23353.776953125   |39606.05625       |-16252.279296875   |
|Analyst Programmer           |16406.1287109375  |21042.9634765625  |-4636.834765625001 |
|Software 

#### Which city have highest average salary

In [37]:
city_avg = df_sal.groupBy('City').agg(avg('New_salary').alias('avg_city_salary')).orderBy('avg_city_salary', ascending=False)
city_avg = city_avg.sort(col('avg_city_salary').desc())
city_avg.show()

+-----------------+---------------+
|             City|avg_city_salary|
+-----------------+---------------+
|        Mesopotam|    99948.28125|
|       Zhongcheng|   99942.921875|
|           Caxias|  99786.3984375|
|      Karangtawar|  99638.9921875|
|        Itabaiana|    99502.15625|
|           Pasian|    99421.34375|
|           Webuye|   99368.546875|
|      Yuktae-dong|   99250.828125|
|           Zinder|    99222.84375|
|   Timiryazevskiy|     99142.9375|
|        Sawahbaru|  99013.7109375|
|          Madimba|  98737.8671875|
|         Huangshi|    98690.34375|
|          Gharyan|     98679.3125|
|         Yŏnan-ŭp|   98628.609375|
|     Wringinputih|  98603.8203125|
|Monte da Boavista|    98586.71875|
|          Klukeng|  98439.4921875|
|         Murmashi|    98226.15625|
|        Fox Creek|        98138.0|
+-----------------+---------------+
only showing top 20 rows

