### A Crash Course In PySpark

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Get:4 https://developer.download.nvidia.com/comp

### Getting started with examples

In [None]:
import numpy as np
from pyspark.sql import functions as f

#### Ingesting & Cleaning Data

In [None]:
# Load csv and make the first row as the header names
data = spark.read.format("csv").option("header", "true").load("original.csv")

In [None]:
data.show(10)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [None]:
# We won't change original table
data2 = data.withColumn("clean_city", 
                        f.when(data.City.isNull(), "Unknown").otherwise(data.City)
                        )

In [None]:
data2.show(10)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Femal

In [None]:
data2 = data2.filter(data2.JobTitle.isNotNull())

In [None]:
data2.show(10)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|Kinsealy-Drinan|
|  8|   Goddart|     Flear|  Mal

In [None]:
data2 = data2.withColumn("clean_salary",
                         data2.Salary.substr(2, 100).cast("float"))

In [None]:
data2.show(10)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil 

In [None]:
mean = data2.groupBy().avg("clean_salary")
mean.show()

+-----------------+
|avg(clean_salary)|
+-----------------+
|55516.32088199837|
+-----------------+



In [None]:
mean.take(1)

[Row(avg(clean_salary)=55516.32088199837)]

In [None]:
# Take first row
mean = mean.take(1)[0][0]
mean

55516.32088199837

In [None]:
data2 = data2.withColumn("new_salary",
                         f.when(data2.clean_salary.isNull(), f.lit(mean)).otherwise(data2.clean_salary)
                         )


In [None]:
data2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 3

In [None]:
# Select Latitude column
latitudes = data2.select("Latitude")
latitudes.show(5)

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|      null|
|53.4266145|
+----------+
only showing top 5 rows



In [None]:
# Filter out null values
latitudes = latitudes.filter(latitudes.Latitude.isNotNull())

In [None]:
# Select only Latitude2 column with float type
latitudes = latitudes.withColumn("Latitude2",
                                 latitudes.Latitude.cast("float")
                                 ).select("Latitude2")
latitudes.show(5)

+---------+
|Latitude2|
+---------+
|50.577408|
| 48.82316|
|44.504723|
|53.426613|
|45.190517|
+---------+
only showing top 5 rows



In [None]:
median = np.median(np.array(latitudes.collect()).astype(float).ravel())
median

31.93397331237793

In [None]:
data2 = data2.withColumn("lat",
                         f.when(data.Latitude.isNull(), f.lit(median)).otherwise(data2.Latitude)
    
)
data2.show(10)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|     

#### Answering scenario questions

Salary by gender and job title

In [None]:
genders = data2.groupBy("gender").agg(
    f.avg("new_salary").alias("AvgSalary")
    )

In [None]:
genders.show()

+------+------------------+
|gender|         AvgSalary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [None]:
df = data2.withColumn("female_salary",
                      f.when(data2.gender == "Female", data2.new_salary).otherwise(f.lit(0))
                      )
df = df.withColumn("male_salary",
                   f.when(df.gender == "Male", df.new_salary).otherwise(f.lit(0))
                      )

In [None]:
df.show(10)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+--------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      new_salary|              lat|   female_salary|   male_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+--------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|           0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6015625|           

In [None]:
df = df.groupby("JobTitle").agg(
    f.avg("female_salary").alias("fin_female_salary"),
    f.avg("male_salary").alias("fin_male_salary"),
    )



In [None]:
df.show(10)

+--------------------+------------------+------------------+
|            JobTitle| fin_female_salary|   fin_male_salary|
+--------------------+------------------+------------------+
|Systems Administr...|   50590.474609375|  15540.9501953125|
|   Media Manager III|29586.436197916668|17381.920572916668|
|  Recruiting Manager|34848.452473958336|  26383.4951171875|
|       Geologist III|      31749.046875|    12830.75390625|
|        Geologist II|               0.0|   43293.865234375|
|Database Administ...|               0.0|     52018.4609375|
|   Financial Analyst|   23353.776953125|       39606.05625|
|  Analyst Programmer|  16406.1287109375|  21042.9634765625|
|Software Engineer II|               0.0|      74782.640625|
|       Accountant IV|   82732.248046875|               0.0|
+--------------------+------------------+------------------+
only showing top 10 rows



In [None]:
df = df.withColumn("delta", df.fin_female_salary - df.fin_male_salary)
df.show(10)

+--------------------+------------------+------------------+------------------+
|            JobTitle| fin_female_salary|   fin_male_salary|             delta|
+--------------------+------------------+------------------+------------------+
|Systems Administr...|   50590.474609375|  15540.9501953125|  35049.5244140625|
|   Media Manager III|29586.436197916668|17381.920572916668|      12204.515625|
|  Recruiting Manager|34848.452473958336|  26383.4951171875| 8464.957356770836|
|       Geologist III|      31749.046875|    12830.75390625|    18918.29296875|
|        Geologist II|               0.0|   43293.865234375|  -43293.865234375|
|Database Administ...|               0.0|     52018.4609375|    -52018.4609375|
|   Financial Analyst|   23353.776953125|       39606.05625|  -16252.279296875|
|  Analyst Programmer|  16406.1287109375|  21042.9634765625|-4636.834765625001|
|Software Engineer II|               0.0|      74782.640625|     -74782.640625|
|       Accountant IV|   82732.248046875

Salary by city

In [None]:
cityavg = data2.groupby("City").agg(f.avg("new_salary").alias("avgsalary"))
cityavg = cityavg.sort(f.col("avgsalary").desc())

In [None]:
cityavg.show(10)

+--------------+-------------+
|          City|    avgsalary|
+--------------+-------------+
|     Mesopotam|  99948.28125|
|    Zhongcheng| 99942.921875|
|        Caxias|99786.3984375|
|   Karangtawar|99638.9921875|
|     Itabaiana|  99502.15625|
|        Pasian|  99421.34375|
|        Webuye| 99368.546875|
|   Yuktae-dong| 99250.828125|
|        Zinder|  99222.84375|
|Timiryazevskiy|   99142.9375|
+--------------+-------------+
only showing top 10 rows

