In [None]:
# PySpark revolves around the concept of DataFrames, which are similar to tables in relational databases or spreadsheets in Excel
# DataFrames consist of rows and columns
# When data is loaded into PySpark, it is stored in a DataFrame

# DataFrames are IMMUTABLE

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

# recall that we need a spark session to work with spark
# SparkSession.builder creates that session and master("local[*]") runs it localy using whaterver cores are avaiable
# finlay, the session is created, or an existing session is returned
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [9]:
data = spark.read.format("csv").option("header", "true").load("original.csv")
data.show()

# could of also done this
# data = spark.read.csv("original.csv", header=True)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

In [13]:
from pyspark.sql.functions import *

# check all columns that have null - sum() is done column-wise, meaning we sum up all rows in the col
# in brief, we are going over all the columns in df (by column name c)
# fetching the column content by name using col(c)
# returning a boolean mask using isNull and casting all T/F values to int
# this produces a new col, which we alias with the name c so its easier to see
# we are producing a list of columns with new data, using list coersion

# note: select() is DEFINING A TRANSFORMATION -> select() is not only used to select columns, but also to define a transoformation on the columns like here
data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()

# withColumn(...) -> create a new column, and do the when(), if when() is False, then do otherwise
# when(...) -> if "City" is null, replace it with "Unknown"
# pretty much, the when() says "when city is null, add valu unknown into the column"
# "otherwise, keep the original value at data.City for that row"
data_clean = data.withColumn("clean_city", when(data.City.isNull(), "Unknown").otherwise(data.City))
data_clean.select([sum(col(c).isNull().cast("int")).alias(c) for c in data_clean.columns]).show()
# can also overwritte:
# data = data.withColumn("City", when(data.City.isNull(), "Unknown").otherwise(data.City))
data_clean.show()

+---+----------+---------+------+----+--------+------+--------+---------+
| id|first_name|last_name|gender|City|JobTitle|Salary|Latitude|Longitude|
+---+----------+---------+------+----+--------+------+--------+---------+
|  0|         0|        0|     0|   1|       2|     0|       1|        0|
+---+----------+---------+------+----+--------+------+--------+---------+

+---+----------+---------+------+----+--------+------+--------+---------+----------+
| id|first_name|last_name|gender|City|JobTitle|Salary|Latitude|Longitude|clean_city|
+---+----------+---------+------+----+--------+------+--------+---------+----------+
|  0|         0|        0|     0|   1|       2|     0|       1|        0|         0|
+---+----------+---------+------+----+--------+------+--------+---------+----------+

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary| 

In [15]:
# remove rows where job title is null
data_clean = data_clean.filter(data_clean.JobTitle.isNotNull())
data_clean.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|Kinsealy-Drinan|
|  8|   Goddart|     Flear|  Mal

In [17]:
from pyspark.sql.functions import col
# Note: we can access cols using the dot notation or col() (col has to be inside a withColumn or similar method that has referance to the df you are using)
# ex: data_clean.Salary -> same as -> col("Salary")
data_clean = data_clean.withColumn("clean_salary", data_clean.Salary.substr(2, 100).cast("float"))
data_clean.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|      Mytishchi|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil 

In [19]:
from pyspark.sql.functions import lit

# selectExpr() projects an SQL querry onto the datafram
mean_salary = data_clean.selectExpr("avg(clean_salary) as mean").collect()[0]["mean"]

# lit creates a literal column -> a column containing the value mean_salary for all entries
# this line create a new column New_Salary if clean_salary has a null value, it uses the mean_salary otherwise we use whatever value was at that row for Clean_Salary
data_clean = data_clean.withColumn("New_Salary", when(data_clean.clean_salary.isNull(), lit(mean_salary)).otherwise(data_clean.clean_salary))
data_clean.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      New_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 3

In [24]:
import numpy as np

# note: no errors here, Spark does not fail when parsing the NULL —the NULLs remain as NULL, but we filter them out
latitudes = data_clean.select("Latitude").filter(data_clean.Latitude.isNotNull()).withColumn("Latitude", col("Latitude").cast("float"))

# important for all items here to be float -> unlike spark, numpy will throw ERROR if it encounters a null when agregating
median_lat = np.median([row["Latitude"] for row in latitudes.collect()])

# same as before, if null for latitude, replace it by the mean lattitude
data_clean = data_clean.withColumn("Updated_Latitude", when(data_clean.Latitude.isNull(),lit(median_lat)).otherwise(data_clean.Latitude))
data_clean.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      New_Salary| Updated_Latitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|     

Salary Analysis Part

In [36]:
# the sql.function library in pyspark makes pysprak "SQL" Like.
from pyspark.sql.functions import col, avg, when, lit

# agg = aggregate opperation
# note: like SQL, pyspark is NOT case sensative when using SQL opperations
genders = data_clean.groupBy("GenDER").agg(avg("New_Salary").alias("Average_Salary"))
genders.show()

# we can also make multuple agg opps in one go:
# df1 = df.groupBy("Gender").agg(
#  sum("Clean_Salary").alias("Total_Salary"),
#  avg("Clean_Salary").alias("Average_Salary"),
#  min("Clean_Salary").alias("Min_Salary"),
#  max("Clean_Salary").alias("Max_Salary")
# )
# df1.show()

# to finish and write the data we can do this:
# df1.write.csv("df1.csv")
# df1.write.json("df1.json")
# df1.write.parquet("df1.parquet") # Parquet is optimized for big data processing and analytics.

+------+------------------+
|GenDER|    Average_Salary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [29]:
df = data_clean.withColumn("Female_Salary", when(col("Gender") == "Female", col("New_Salary")).otherwise(lit(0)))
df = df.withColumn("Male_Salary", when(col("Gender") == "Male", col("New_Salary")).otherwise(lit(0)))
df.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|clean_salary|      New_Salary| Updated_Latitude|   Female_Salary|     Male_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|             0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6015625|   

In [30]:
df = df.groupBy("JobTitle").agg(
 avg("Female_Salary").alias("Final_Female_Salary"),
 avg("Male_Salary").alias("Final_Male_Salary")
)
df.show()

+--------------------+-------------------+------------------+
|            JobTitle|Final_Female_Salary| Final_Male_Salary|
+--------------------+-------------------+------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|
|   Media Manager III| 29586.436197916668|17381.920572916668|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|
|       Geologist III|       31749.046875|    12830.75390625|
|        Geologist II|                0.0|   43293.865234375|
|Database Administ...|                0.0|     52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625|
|Software Engineer II|                0.0|      74782.640625|
|       Accountant IV|    82732.248046875|               0.0|
|    Product Engineer|     41825.48359375|       20464.94375|
|Software Test Eng...|   32218.6083984375|   27122.462890625|
|Safety Technician...|                0.0|   29421.529296875|
|    Jun

In [31]:
df = df.withColumn("Delta", col("Final_Female_Salary") - col("Final_Male_Salary"))
df.show()

+--------------------+-------------------+------------------+-------------------+
|            JobTitle|Final_Female_Salary| Final_Male_Salary|              Delta|
+--------------------+-------------------+------------------+-------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III| 29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|       31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|                0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|                0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|                0.0|      74782.640625|      -74782.640625|
|       Accounta

In [33]:
city_avg = data_clean.groupBy("City").agg(avg("New_Salary").alias("Average_Salary"))
city_avg = city_avg.sort(col("Average_Salary").desc())
city_avg.show()

+-----------------+--------------+
|             City|Average_Salary|
+-----------------+--------------+
|        Mesopotam|   99948.28125|
|       Zhongcheng|  99942.921875|
|           Caxias| 99786.3984375|
|      Karangtawar| 99638.9921875|
|        Itabaiana|   99502.15625|
|           Pasian|   99421.34375|
|           Webuye|  99368.546875|
|      Yuktae-dong|  99250.828125|
|           Zinder|   99222.84375|
|   Timiryazevskiy|    99142.9375|
|        Sawahbaru| 99013.7109375|
|          Madimba| 98737.8671875|
|         Huangshi|   98690.34375|
|          Gharyan|    98679.3125|
|         Yŏnan-ŭp|  98628.609375|
|     Wringinputih| 98603.8203125|
|Monte da Boavista|   98586.71875|
|          Klukeng| 98439.4921875|
|         Murmashi|   98226.15625|
|        Fox Creek|       98138.0|
+-----------------+--------------+
only showing top 20 rows

