Configure the environment and set up spark environment

In [1]:
from pyspark.sql import SparkSession
import getpass
username=getpass.getuser()
spark=SparkSession. \
builder.\
config('spark.ui.port','0').\
config("spark.sql.wareshouse.dir",f"/user/itv010790/warehouse").\
enableHiveSupport().\
master('yarn').\
getOrCreate()

In [2]:
spark

ingest data

In [3]:
mydata = spark.read.format("csv").option("header","true").load("original.csv")
mydata.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

second approach to insert data 

In [4]:
mydata1=spark.read.csv("original.csv", header="true")
mydata1.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

The dataset is loaded and we can see issues with data. There are null values present in city column
JobTitle has null values and we have to remove nulls from Job Title
so we really need to do some data cleaning before deriving any value from this data set.

In [5]:
#first we import libraries
from pyspark.sql.functions import *

create new dataframe and If city is Null then replace it with Unknown. since dataframe is immutable we cannot modify existing dataframe.
we have to create new dataframe with new column cleanCity

In [6]:
mydata2= mydata.withColumn("cleanCity",when(mydata.City.isNull(),'Unknown').otherwise(mydata.City))

In [7]:
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|      cleanCity|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Femal

#Filter
#overwrite the values of original dataframe mydata2,  and filter records with JobTitle as NULL.
Now mydata2 will store of result of data where job title is not null

In [8]:
mydata2=mydata2.filter(mydata2.JobTitle.isNotNull())

In [9]:
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|      cleanCity|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|Kinsealy-Drinan|
|  8|   Goddart|     Flear|  Mal

Next things we have to do
When salary is null then replace it with average salary value.
Before we calculate average of salary column, we have to remove the "$" symbol
since the datatype is string we cant take average of string and hence we cast it to float

In [10]:
#to clean up remove the dollar
mydata2=mydata2.withColumn('cleanSalary',mydata2.Salary.substr(2,100).cast('float'))

In [11]:
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|      cleanCity|cleanSalary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|   57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|    62846.6|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|   61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|   63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Enginee

In [12]:
mean=mydata2.groupBy().avg('cleanSalary')
mean.show()

+-----------------+
| avg(cleanSalary)|
+-----------------+
|55516.32088199837|
+-----------------+



In [13]:
mean=mydata2.groupBy().avg('cleanSalary').take(1)[0][0]
print(mean)

55516.32088199837


In [14]:
from pyspark.sql.functions import lit

In [15]:
mydata2=mydata2.withColumn('new_Salary', when(mydata2.cleanSalary.isNull(), lit(mean)).otherwise(mydata2.cleanSalary))

In [16]:
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|      cleanCity|cleanSalary|      new_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|   57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|    62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|   61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489

to populate longitue and latidue

In [17]:
import numpy as np


In [18]:
latitudes =mydata2.select('Latitude')

In [19]:
#we cannot calculate avg values if we have nulls , so we will filter out the records with null latitudes
latitudes=latitudes.filter(latitudes.Latitude.isNotNull())

In [20]:
latitudes=latitudes.withColumn('latitude2', latitudes.Latitude.cast('float')).select('latitude2')
latitudes.show()

+----------+
| latitude2|
+----------+
| 50.577408|
|  48.82316|
| 44.504723|
| 53.426613|
| 45.190517|
| 32.027935|
|  4.272793|
|     -5.85|
|  39.17238|
|  49.81518|
|  42.10148|
|  49.79233|
| 43.494576|
| 52.744167|
| 38.696247|
|-7.7232566|
| 40.717205|
|  49.16291|
| 40.757683|
|  48.49028|
+----------+
only showing top 20 rows



In [21]:
#Now there is nothing stoping us from calculating mean values as we have removed null values
median =np.median(latitudes.collect())
print(median)


31.93397331237793


when original latitude is null then display average values for latitude column

In [22]:
mydata2=mydata2.withColumn('lat', when(mydata2.Latitude.isNull(),lit(median)).otherwise(mydata2.Latitude))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|      cleanCity|cleanSalary|      new_Salary|              lat|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|   57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|    62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|   61489.23|  61489.23046875|       44.5

Do men or Women get more salary

In [26]:
import pyspark.sql.functions as sqlfunc

In [27]:
genders =mydata2.groupBy("gender").agg(sqlfunc.avg('new_salary').alias('AvgSalary'))

In [28]:
genders.show()

+------+------------------+
|gender|         AvgSalary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [39]:
df=mydata2.withColumn('female_Salary', when(mydata2.gender=='Female', mydata2.new_Salary).otherwise(lit(0)))
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+-----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|      cleanCity|cleanSalary|      new_Salary|              lat|   female_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+-----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|   57438.18|   57438.1796875|       50.5774075|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|    62846.6|   62846.6015625|       48.8231572|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$614

In [40]:
df=df.withColumn('Male_Salary', when(df.gender=='Male', df.new_Salary).otherwise(lit(0)))
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+-----------------+----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|      cleanCity|cleanSalary|      new_Salary|              lat|   female_Salary|     Male_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+-----------+----------------+-----------------+----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|   57438.18|   57438.1796875|       50.5774075|   57438.1796875|             0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|    62846.6|   62846.6015625|       48.8231572|   62846.6015625|        

In [41]:
df=df.groupBy('jobTitle').agg(sqlfunc.avg('female_Salary').alias('final_female_Salary'),sqlfunc.avg('male_Salary').alias('final_male_salary'))
df.show()

+--------------------+-------------------+------------------+
|            jobTitle|final_female_Salary| final_male_salary|
+--------------------+-------------------+------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|
|   Media Manager III| 29586.436197916668|17381.920572916668|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|
|       Geologist III|       31749.046875|    12830.75390625|
|        Geologist II|                0.0|   43293.865234375|
|Database Administ...|                0.0|     52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625|
|Software Engineer II|                0.0|      74782.640625|
|       Accountant IV|    82732.248046875|               0.0|
|    Product Engineer|     41825.48359375|       20464.94375|
|Software Test Eng...|   32218.6083984375|   27122.462890625|
|Safety Technician...|                0.0|   29421.529296875|
|    Jun

In [42]:
df = df.withColumn('delta',df.final_female_Salary - df.final_male_salary)

df.show()

+--------------------+-------------------+------------------+-------------------+
|            jobTitle|final_female_Salary| final_male_salary|              delta|
+--------------------+-------------------+------------------+-------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III| 29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|       31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|                0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|                0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|                0.0|      74782.640625|      -74782.640625|
|       Accounta

In [43]:
cityavg = mydata2.groupBy('city').agg(sqlfunc.avg('new_Salary').alias('avgsalary'))

In [44]:
cityavg=cityavg.sort(col('avgsalary').desc())

In [45]:
cityavg.show()

+-----------------+-------------+
|             city|    avgsalary|
+-----------------+-------------+
|        Mesopotam|  99948.28125|
|       Zhongcheng| 99942.921875|
|           Caxias|99786.3984375|
|      Karangtawar|99638.9921875|
|        Itabaiana|  99502.15625|
|           Pasian|  99421.34375|
|           Webuye| 99368.546875|
|      Yuktae-dong| 99250.828125|
|           Zinder|  99222.84375|
|   Timiryazevskiy|   99142.9375|
|        Sawahbaru|99013.7109375|
|          Madimba|98737.8671875|
|         Huangshi|  98690.34375|
|          Gharyan|   98679.3125|
|         Yŏnan-ŭp| 98628.609375|
|     Wringinputih|98603.8203125|
|Monte da Boavista|  98586.71875|
|          Klukeng|98439.4921875|
|         Murmashi|  98226.15625|
|        Fox Creek|      98138.0|
+-----------------+-------------+
only showing top 20 rows

