In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [2]:
mydata = spark.read.format("csv").option("header","true").load("original.csv")
mydata.show()


+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|         Dachun|     

## **spark.read.format("csv")**

Sparks read csv format file

## **option("header","true")**

Header = true allows csv file first line as header

## **load("original.csv")**

origional.csv is the file which contains data

## **mydata**
Values or data stored in to mydata, and mydata is a dataframe

## **What is a DataFrame?**
A DataFrame is a data structure that organizes data into a 2-dimensional table of rows and columns, much like a spreadsheet.
Every DataFrame contains a blueprint, known as a schema, that defines the name and data type of each column. Missing or incomplete values are stored as **null** values in the DataFrame.


In [3]:
from pyspark.sql.functions import *
mydata2 = mydata.withColumn("clean_city", when(mydata.City.isNull(), 'Unknown').otherwise(mydata.City))
mydata2.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57576.52|39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Femal

# from pyspark.sql.functions import *
imports pyspark.sql.functions

mydata.withColumn("clean_city", when(mydata.City.isNull(), 'Unknown').otherwise(mydata.City))

**when(mydata.City.isNull(), 'Unknown')**

When City is null, replace with Unknown

**.otherwise(mydata.City))**

otherwise use data from City itself

**mydata.withColumn("clean_city",**

Add Unknown values  into clen_city a new column

In [None]:
mydata2 = mydata2.filter(mydata2.JobTitle.isNotNull())
mydata2.show()

## **mydata2.filter(mydata2.JobTitle.isNotNull()**

Filter if JobTitle is Not Null, show me rows with value, dont show me empty rows

In [None]:
mydata2 = mydata2.withColumn("clean_salary", mydata2.Salary.substr(2,100).cast('float'))
mydata2.show()

## **mydata2.Salary.substr(2,100).cast('float'))**

Subtring removes $ sign creates new column clean_salary and typecast from string to float

In [None]:
from pyspark.sql.functions import lit
mean = mydata2.groupBy().avg('clean_salary')
mean.show()


groupBy is similar to SQL command, avg() function to get the average of clean_salary

In [None]:
mean = mydata2.groupby().avg('clean_salary').take(1)[0][0]
mydata2.show()

In [None]:
mydata2 = mydata2.withColumn('new_salary', when(mydata2.clean_salary.isNull(), lit(mean)).otherwise(mydata2.clean_salary))
mydata2.show()

In [None]:
import numpy as np
latitudes = mydata2.select('Latitude')
latitudes.show()

In [None]:
latitudes = latitudes.filter(latitudes.Latitude.isNotNull())
latitudes.show()

In [None]:
latitudes =latitudes.withColumn('latitude2', latitudes.Latitude.cast('float')).select('latitude2')
latitudes.show()

In [None]:
median = np.median(latitudes.collect())
print(median)

In [None]:
mydata2 = mydata2.withColumn('lat', when(mydata2.Latitude.isNull(), lit(median)).otherwise(mydata2.Latitude))
mydata2.show()

In [None]:
import pyspark.sql.functions as sqlfunc
genders = mydata2.groupby('gender').agg(sqlfunc.avg('new_salary').alias('AvgSalary'))
genders.show()

In [None]:
df = mydata2.withColumn('female_salary', when(mydata2.gender == 'Female', mydata2.new_salary).otherwise(lit(0)))
df = df.withColumn('male_salary', when(mydata2.gender == 'Male', mydata2.new_salary).otherwise(lit(0)))
df.show()

In [None]:
df = df.groupBy('JobTitle').agg(sqlfunc.avg('female_salary').alias('final_female_salary'), sqlfunc.avg('male_salary').alias('final_male_salary'))
df.show()

In [None]:
df = df.withColumn('delta', df.final_female_salary - df.final_male_salary)
df.show()

In [None]:
cityavg = mydata2.groupBy('City').agg(sqlfunc.avg('new_salary').alias('avg_salary'))

cityavg.show()

In [None]:
cityavg = cityavg.sort(col('avg_salary').desc())
cityavg.show()