In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter Notebook").getOrCreate()
spark

In [5]:
from pyspark.sql.functions import regexp_replace, col # we use this later

In [49]:
odf = spark.read.format("csv").option("header", "true").load("original.csv") # imports original csv and says first row contains column names

In [6]:
odf = spark.read.csv("original.csv", header=True) #same as above but shorter and preferred

In [50]:
original_ds.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57,576.52 |39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63,863.09 |      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30,101.16 |53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|   

In [51]:
from pyspark.sql.functions import * # gives us all of the sql functions so we did not to import regexp_replace above

In [52]:
odf2 = odf.withColumn("clean_city", when(odf.City.isNull(),'Unknown').otherwise(odf.City)) # replace null with unknown

In [53]:
odf2.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57,576.52 |39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63,863.09 |      null| 37.6489954|      Mytishchi|
|  6|     Maris|

In [54]:
odf2 = odf2.filter(odf2.JobTitle.isNotNull()) #select those rows where JobTitle is not null

In [55]:
odf2.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|     clean_city|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|         Bulgan|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63,863.09 |      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30,101.16 |53.4266145| -6.1644997|Kinsealy-Drinan|
|  8|   Goddart|

In [56]:
odf3 = odf2.withColumn('Clean_Salary', regexp_replace(odf2['Salary'], '[$,]', '').cast('float')) # this removes the $ and , before converting

In [57]:
odf3.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|     clean_city|Clean_Salary|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|      Nowa Ruda|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|         Bulgan|     62846.6|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.1300171|  Divnomorskoye|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63,863.09 |      null| 37.6489954|      Mytishchi|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drina

In [60]:
mean = odf3.groupBy().avg('Clean_Salary').take(1)[0][0] # the code output is a dataframe but not when adding .take(1)[0][0].  This outputs just the one row
print(mean)

55516.32088199837


In [61]:
from pyspark.sql.functions import lit

In [64]:
odf3 = odf3.withColumn('New_Salary', when(odf3.Clean_Salary.isNull(), lit(mean)).otherwise(odf3.Clean_Salary)) # this creates a new salary column which replaces null in clean salary with mean

In [65]:
odf3.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|     clean_city|Clean_Salary|      New_Salary|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63,863.09 

In [66]:
import numpy as np

In [68]:
lat = odf3.select('Latitude') # now we are going to replace Latitude nulls with the median

In [69]:
lat.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|      null|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
+----------+
only showing top 20 rows



In [71]:
lat = lat.filter(lat.Latitude.isNotNull()) # remove the nulls so we can convert to number

In [72]:
lat.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
|48.4902808|
+----------+
only showing top 20 rows



In [74]:
lat = lat.withColumn("Latitude2", lat.Latitude.cast("float")).select("Latitude2")  #convert Lat into float so we can calc median

In [75]:
lat.show()

+----------+
| Latitude2|
+----------+
| 50.577408|
|  48.82316|
| 44.504723|
| 53.426613|
| 45.190517|
| 32.027935|
|  4.272793|
|     -5.85|
|  39.17238|
|  49.81518|
|  42.10148|
|  49.79233|
| 43.494576|
| 52.744167|
| 38.696247|
|-7.7232566|
| 40.717205|
|  49.16291|
| 40.757683|
|  48.49028|
+----------+
only showing top 20 rows



In [77]:
median = np.median(lat.collect()) #calc the median and store as var

In [78]:
print(median)

31.93397331237793


In [79]:
odf3 = odf3.withColumn("Lat", when(odf3.Latitude.isNull(), lit(median)).otherwise(odf3.Latitude)) #Create Lat column and when null, insert median, otherwise Latitude value
odf3.show() #lit stands for literal and assigns the same value across multiple rows

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|     clean_city|Clean_Salary|      New_Salary|              Lat|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.1300171|  Divnomorskoye|    61489.23|  61489.23

In [82]:
import pyspark.sql.functions as sf
gender = odf3.groupBy("gender").agg(sf.avg("New_Salary").alias("Avg_Salary")) #finding the overall avg salary for both genders

In [83]:
gender.show()

+------+------------------+
|gender|        Avg_Salary|
+------+------------------+
|Female|55677.250125558036|
|  Male| 55361.09385573019|
+------+------------------+



In [102]:
gendf = odf3.withColumn("Female_Salary", when(odf3.gender == "Female", odf3.New_Salary).otherwise(lit(0))) # Where gender==Female, capture in Female Salary column otherwise insert 0

In [103]:
gendf.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+-----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|     clean_city|Clean_Salary|      New_Salary|              Lat|   Female_Salary|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+-----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Acc

In [104]:
gendf = gendf.withColumn("Male_Salary", when(odf3.gender == "Male", odf3.New_Salary).otherwise(lit(0))) #merge Male salary column for gendf

In [105]:
gendf.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|     clean_city|Clean_Salary|      New_Salary|              Lat|   Female_Salary|     Male_Salary|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+---------------+------------+----------------+-----------------+----------------+----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|      Nowa Ruda|    57438.18|   57438.1796875|       50.5774075|   57438.1796875|             0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|         Bulgan|     62846.6|   62846.6015625|       48.8231572|   62846.6

In [106]:
gendf = gendf.groupBy("JobTitle").agg(sf.avg("Female_Salary").alias("Fin_Fem_Salary"), sf.avg("Male_Salary").alias("Fin_Male_Salary")) #find salary by gender for each job title

In [107]:
gendf.show()

+--------------------+------------------+------------------+
|            JobTitle|    Fin_Fem_Salary|   Fin_Male_Salary|
+--------------------+------------------+------------------+
|Systems Administr...|   50590.474609375|  15540.9501953125|
|   Media Manager III|29586.436197916668|17381.920572916668|
|  Recruiting Manager|34848.452473958336|  26383.4951171875|
|       Geologist III|      31749.046875|    12830.75390625|
|        Geologist II|               0.0|   43293.865234375|
|Database Administ...|               0.0|     52018.4609375|
|   Financial Analyst|   23353.776953125|       39606.05625|
|  Analyst Programmer|  16406.1287109375|  21042.9634765625|
|Software Engineer II|               0.0|      74782.640625|
|       Accountant IV|   82732.248046875|               0.0|
|    Product Engineer|    41825.48359375|       20464.94375|
|Software Test Eng...|  32218.6083984375|   27122.462890625|
|Safety Technician...|               0.0|   29421.529296875|
|    Junior Executive|15

In [108]:
gendf = gendf.withColumn("Difference", gendf.Fin_Fem_Salary - gendf.Fin_Male_Salary) # find the difference in male and female salaries

In [109]:
gendf.show()

+--------------------+------------------+------------------+-------------------+
|            JobTitle|    Fin_Fem_Salary|   Fin_Male_Salary|         Difference|
+--------------------+------------------+------------------+-------------------+
|Systems Administr...|   50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III|29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager|34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|      31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|               0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|               0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|   23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|  16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|               0.0|      74782.640625|      -74782.640625|
|       Accountant IV|   827

In [111]:
cityavg = odf3.groupBy("City").agg(sf.avg("New_Salary").alias("Avg_Salary")) #find the highest avg salary by city so calc avg per city

In [112]:
cityavg.show()

+-----------------+----------------+
|             City|      Avg_Salary|
+-----------------+----------------+
|        Sułkowice|  33432.98828125|
|          Klippan|     77039.46875|
|      Trollhättan|53311.6845703125|
|        Shinaihai|    39544.640625|
|         Hongzhou|  35707.30859375|
|         Cipinang| 11617.509765625|
| Viejo Daan Banua|         43927.5|
|         Tsiatsan| 18795.439453125|
|       San Andres|  52426.80078125|
|           Krasna|   72022.7890625|
|      Springfield|40697.3251953125|
|            Město|  27797.98046875|
|Chaloem Phra Kiat|  54840.19921875|
|          Tadotsu|  55595.30078125|
|   Hénin-Beaumont|        55082.75|
|          Kajaani|  20224.83984375|
|           Duozhu|    71416.859375|
|           Abéché|   93375.1796875|
|     Habingkloang|     56892.96875|
|         Malishka|   76783.4765625|
+-----------------+----------------+
only showing top 20 rows



In [114]:
cityavg = cityavg.sort(col("Avg_Salary").desc()) # sort avg salary to see who is on top

In [115]:
cityavg.show()

+-----------------+-------------+
|             City|   Avg_Salary|
+-----------------+-------------+
|        Mesopotam|  99948.28125|
|       Zhongcheng| 99942.921875|
|           Caxias|99786.3984375|
|      Karangtawar|99638.9921875|
|        Itabaiana|  99502.15625|
|           Pasian|  99421.34375|
|           Webuye| 99368.546875|
|      Yuktae-dong| 99250.828125|
|           Zinder|  99222.84375|
|   Timiryazevskiy|   99142.9375|
|        Sawahbaru|99013.7109375|
|          Madimba|98737.8671875|
|         Huangshi|  98690.34375|
|          Gharyan|   98679.3125|
|         Yŏnan-ŭp| 98628.609375|
|     Wringinputih|98603.8203125|
|Monte da Boavista|  98586.71875|
|          Klukeng|98439.4921875|
|         Murmashi|  98226.15625|
|        Fox Creek|      98138.0|
+-----------------+-------------+
only showing top 20 rows



In [None]:
#########################################  Loading data into PySpark ##########################################################

In [118]:
odf.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|     Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+-----------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57,576.52 |39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63,863.09 |      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30,101.16 |53.4266145| -6.1644997|
|  7|     Masha|    Divers|Female|   

In [119]:
odf.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

In [130]:
from pyspark.sql.types import *
oschema = StructType([
    StructField("Id", IntegerType()),
    StructField("First_Name", StringType()),
    StructField("Last_Name", StringType()),
    StructField("Gender", StringType()),
    StructField("City", StringType()),
    StructField("Job_Title", StringType()),
    StructField("Salary", StringType()),
    StructField("Latitude", StringType()),
    StructField("Longitude", FloatType())
])

odf5 = spark.read.csv("original.csv", header=True, schema=oschema)
odf5.show()

+---+----------+----------+------+---------------+--------------------+-----------+----------+----------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|     Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+-----------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57,438.18 |50.5774075| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62,846.60 |48.8231572| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|$57,576.52 |39.9947462|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61,489.23 |44.5047212| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63,863.09 |      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30,101.16 |53.4266145|-6.1644998|
|  7|     Masha|    Divers|Female|         Dac

In [131]:
odf5.dtypes

[('Id', 'int'),
 ('First_Name', 'string'),
 ('Last_Name', 'string'),
 ('Gender', 'string'),
 ('City', 'string'),
 ('Job_Title', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'float')]

In [None]:
##############################################  Inspecting Video ############################################

In [132]:
from pyspark.sql.types import *
oschema = StructType([
    StructField("Id", IntegerType()),
    StructField("First_Name", StringType()),
    StructField("Last_Name", StringType()),
    StructField("Gender", StringType()),
    StructField("City", StringType()),
    StructField("Job_Title", StringType()),
    StructField("Salary", FloatType()),
    StructField("Latitude", FloatType()),
    StructField("Longitude", FloatType())
])

odf5 = spark.read.csv("original.csv", header=True, schema=oschema)
odf5.show()

+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|  Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|                null|25

In [133]:
odf5.dtypes

[('Id', 'int'),
 ('First_Name', 'string'),
 ('Last_Name', 'string'),
 ('Gender', 'string'),
 ('City', 'string'),
 ('Job_Title', 'string'),
 ('Salary', 'float'),
 ('Latitude', 'float'),
 ('Longitude', 'float')]

In [134]:
odf5.head(5)

[Row(Id=1, First_Name='Melinde', Last_Name='Shilburne', Gender='Female', City='Nowa Ruda', Job_Title='Assistant Professor', Salary=57438.1796875, Latitude=50.57740783691406, Longitude=16.49671745300293),
 Row(Id=2, First_Name='Kimberly', Last_Name='Von Welden', Gender='Female', City='Bulgan', Job_Title='Programmer II', Salary=62846.6015625, Latitude=48.823158264160156, Longitude=103.52182006835938),
 Row(Id=3, First_Name='Alvera', Last_Name='Di Boldi', Gender='Female', City=None, Job_Title=None, Salary=57576.51953125, Latitude=39.994747161865234, Longitude=116.33977508544922),
 Row(Id=4, First_Name='Shannon', Last_Name="O'Griffin", Gender='Male', City='Divnomorskoye', Job_Title='Budget/Accounting Analyst II', Salary=61489.23046875, Latitude=44.504722595214844, Longitude=38.1300163269043),
 Row(Id=5, First_Name='Sherwood', Last_Name='Macieja', Gender='Male', City='Mytishchi', Job_Title='VP Sales', Salary=63863.08984375, Latitude=None, Longitude=37.64899444580078)]

In [136]:
odf5.head(1)

[Row(Id=1, First_Name='Melinde', Last_Name='Shilburne', Gender='Female', City='Nowa Ruda', Job_Title='Assistant Professor', Salary=57438.1796875, Latitude=50.57740783691406, Longitude=16.49671745300293)]

In [138]:
odf5.describe().show()

+-------+-----------------+----------+---------+------+-------------------+-------------------+-----------------+------------------+-----------------+
|summary|               Id|First_Name|Last_Name|Gender|               City|          Job_Title|           Salary|          Latitude|        Longitude|
+-------+-----------------+----------+---------+------+-------------------+-------------------+-----------------+------------------+-----------------+
|  count|             1000|      1000|     1000|  1000|                999|                998|             1000|               999|             1000|
|   mean|            500.5|      null|     null|  null|               null|               null|55487.95562890625| 25.43151724702484|43.33756460386515|
| stddev|288.8194360957494|      null|     null|  null|               null|               null|25855.22985831031|24.579082550156635| 69.4206453674681|
|    min|                1|   Abagail|    Abbay|Female|             Abéché|Account Coordinator

In [139]:
odf5.columns

['Id',
 'First_Name',
 'Last_Name',
 'Gender',
 'City',
 'Job_Title',
 'Salary',
 'Latitude',
 'Longitude']

In [140]:
odf5.count()

1000

In [141]:
odf5.distinct().count()

1000

In [None]:
##############################################  Null and Duplicate Values ####################################

In [142]:
odf6_dropped = odf5.na.drop() #drops entire row if one column has a null

In [145]:
odf_nulljobs = odf.filter(odf.JobTitle.isNotNull()) #drops a row where job title is null
odf_nulljobs.show()

+---+----------+----------+------+---------------+--------------------+--------+----------+-----------+
| id|first_name| last_name|gender|           City|            JobTitle|  Salary|  Latitude|  Longitude|
+---+----------+----------+------+---------------+--------------------+--------+----------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|48.8231572|103.5218199|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.6489954|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|30101.16|53.4266145| -6.1644997|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|46116.36|45.1905186|  0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Product Man

In [148]:
odf_nullcity = odf.withColumn("Clean City", when(odf.City.isNull(), "Unknown").otherwise(odf.City)) #create the unknown value in new column
odf_nullcity.show()

+---+----------+----------+------+---------------+--------------------+--------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|  Salary|  Latitude|  Longitude|     Clean City|
+---+----------+----------+------+---------------+--------------------+--------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|48.8231572|103.5218199|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52|39.9947462|116.3397725|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsea

In [149]:
odf_nodups = odf.dropDuplicates() #drop duplicates
odf_nodups.show()

+---+----------+----------+------+--------------------+--------------------+--------+----------+-----------+
| id|first_name| last_name|gender|                City|            JobTitle|  Salary|  Latitude|  Longitude|
+---+----------+----------+------+--------------------+--------------------+--------+----------+-----------+
| 26|   Leandra|    Anfrey|Female|             Isfahan|VP Product Manage...|30201.32|32.6546275| 51.6679826|
| 33|     Odell|   Morritt|  Male|Chalan Pago-Ordot...|     Design Engineer|72777.48|13.4441374|144.7820914|
|502|      Tova|    Klauer|Female|          Kafr Zaytā| Staff Accountant II|58421.33|35.3740182| 36.6012942|
|553|     Lauri|Denisovich|Female|     Mineralnye Vody| Geological Engineer|73810.37|44.2452219| 43.0880662|
|677|   Candida| Halversen|Female|              Guzhen|Mechanical System...|94642.39| 22.613406| 113.190869|
|787|  Lynnelle|Challender|Female|          Silikatnyy|    Graphic Designer|37492.58| 54.036802| 48.3304944|
|821|   Lizette|   

In [None]:
###################################  Filtering Data ########################################

In [150]:
from pyspark.sql.types import *
oschema = StructType([
    StructField("Id", IntegerType()),
    StructField("First_Name", StringType()),
    StructField("Last_Name", StringType()),
    StructField("Gender", StringType()),
    StructField("City", StringType()),
    StructField("Job_Title", StringType()),
    StructField("Salary", FloatType()),
    StructField("Latitude", FloatType()),
    StructField("Longitude", FloatType())
])

odf = spark.read.csv("original.csv", header=True, schema=oschema)
odf.show()

+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|  Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|                null|25

In [151]:
odf_select = odf.select("First_Name", "Last_Name")
odf_select.show()

+----------+----------+
|First_Name| Last_Name|
+----------+----------+
|   Melinde| Shilburne|
|  Kimberly|Von Welden|
|    Alvera|  Di Boldi|
|   Shannon| O'Griffin|
|  Sherwood|   Macieja|
|     Maris|      Folk|
|     Masha|    Divers|
|   Goddart|     Flear|
|      Roth|O'Cannavan|
|      Bran|   Trahear|
|    Kylynn|   Lockart|
|       Rey|    Meharg|
|      Kerr|    Braden|
|    Mickie| Whanstall|
|    Kaspar|     Pally|
|    Norbie|    Gwyllt|
|    Claude|    Briant|
|     Thain|    Habbon|
|  Tiffanie|  Pattison|
|    Ettore|  Gerriets|
+----------+----------+
only showing top 20 rows



In [156]:
odf_rename = odf.withColumnRenamed("First_Name", "First").withColumnRenamed("Last_Name", "Last")
odf_rename.show()

+---+--------+----------+------+---------------+--------------------+--------+----------+----------+
| Id|   First|      Last|Gender|           City|           Job_Title|  Salary|  Latitude| Longitude|
+---+--------+----------+------+---------------+--------------------+--------+----------+----------+
|  1| Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18| 50.577408| 16.496717|
|  2|Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|  48.82316| 103.52182|
|  3|  Alvera|  Di Boldi|Female|           null|                null|57576.52| 39.994747|116.339775|
|  4| Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23| 44.504723| 38.130016|
|  5|Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.648994|
|  6|   Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|30101.16| 53.426613|-6.1644998|
|  7|   Masha|    Divers|Female|         Dachun|                null|25090.87| 24.879416|11

In [157]:
odf_filter = odf.filter((odf.First_Name == "Kimberly"))
odf_filter.show()

+---+----------+----------+------+------+-------------+-------+--------+---------+
| Id|First_Name| Last_Name|Gender|  City|    Job_Title| Salary|Latitude|Longitude|
+---+----------+----------+------+------+-------------+-------+--------+---------+
|  2|  Kimberly|Von Welden|Female|Bulgan|Programmer II|62846.6|48.82316|103.52182|
+---+----------+----------+------+------+-------------+-------+--------+---------+



In [163]:
odf_filter = odf.filter((odf.First_Name.like("%wood%")))
odf_filter.show()

+---+----------+---------+------+---------+--------------------+--------+---------+---------+
| Id|First_Name|Last_Name|Gender|     City|           Job_Title|  Salary| Latitude|Longitude|
+---+----------+---------+------+---------+--------------------+--------+---------+---------+
|  5|  Sherwood|  Macieja|  Male|Mytishchi|            VP Sales|63863.09|     null|37.648994|
|729|  Sherwood|   Misson|  Male| Guanyang|Payment Adjustmen...|56567.31|25.489384|111.16085|
+---+----------+---------+------+---------+--------------------+--------+---------+---------+



In [161]:
odf_filter = odf.filter((odf.First_Name.endswith("wood")))
odf_filter.show()

+---+----------+---------+------+---------+--------------------+--------+---------+---------+
| Id|First_Name|Last_Name|Gender|     City|           Job_Title|  Salary| Latitude|Longitude|
+---+----------+---------+------+---------+--------------------+--------+---------+---------+
|  5|  Sherwood|  Macieja|  Male|Mytishchi|            VP Sales|63863.09|     null|37.648994|
|729|  Sherwood|   Misson|  Male| Guanyang|Payment Adjustmen...|56567.31|25.489384|111.16085|
+---+----------+---------+------+---------+--------------------+--------+---------+---------+



In [162]:
odf_filter = odf.filter((odf.First_Name.startswith("Mick")))
odf_filter.show()

+---+----------+---------+------+-----------+--------------------+--------+--------+----------+
| Id|First_Name|Last_Name|Gender|       City|           Job_Title|  Salary|Latitude| Longitude|
+---+----------+---------+------+-----------+--------------------+--------+--------+----------+
| 14|    Mickie|Whanstall|  Male|Springfield|Assistant Media P...|50838.53|42.10148|-72.576675|
+---+----------+---------+------+-----------+--------------------+--------+--------+----------+



In [164]:
odf_filter = odf.filter((odf.Salary.between(95000, 100000)))
odf_filter.show()

+---+----------+------------+------+---------------+--------------------+--------+----------+---------+
| Id|First_Name|   Last_Name|Gender|           City|           Job_Title|  Salary|  Latitude|Longitude|
+---+----------+------------+------+---------------+--------------------+--------+----------+---------+
| 37|     Nicko|       Frays|  Male|         Caxias|      Health Coach I| 99786.4|-4.8654137|  -43.362|
| 45| Gabrielle|    Tippings|Female|       Huangshi|              Editor|98690.34|  30.20003|115.03883|
| 64|     Dougy|      Browse|  Male|     Neftegorsk|        Engineer III|97531.47| 44.366554|39.709908|
| 78|     Perri|       Brett|Female|        Klukeng|Software Test Eng...|98439.49|   -8.3965| 123.0856|
| 85|   Gothart|      Olivey|  Male|     Zhongcheng|Assistant Media P...|99942.92| 22.356083|112.56237|
|122|       Rem|   Dumberell|  Male|  Katima Mulilo|           Librarian|96665.24|-17.506157|24.280655|
|179|   Laverna|   Yuryichev|Female|     Lyaskovets| Marketing A

In [166]:
odf_filter = odf.filter((odf.First_Name.isin("Nicko", "Cathleen")))
odf_filter.show()

+---+----------+---------+------+-------+--------------------+--------+----------+---------+
| Id|First_Name|Last_Name|Gender|   City|           Job_Title|  Salary|  Latitude|Longitude|
+---+----------+---------+------+-------+--------------------+--------+----------+---------+
| 37|     Nicko|    Frays|  Male| Caxias|      Health Coach I| 99786.4|-4.8654137|  -43.362|
|202|  Cathleen|    Moens|Female|Manolás|Payment Adjustmen...|97232.78| 36.434372|25.344727|
+---+----------+---------+------+-------+--------------------+--------+----------+---------+



In [170]:
odf_substr = odf.select(odf.First_Name, odf.First_Name.substr(1,5).alias("Name"))
odf_substr.show()                       

+----------+-----+
|First_Name| Name|
+----------+-----+
|   Melinde|Melin|
|  Kimberly|Kimbe|
|    Alvera|Alver|
|   Shannon|Shann|
|  Sherwood|Sherw|
|     Maris|Maris|
|     Masha|Masha|
|   Goddart|Godda|
|      Roth| Roth|
|      Bran| Bran|
|    Kylynn|Kylyn|
|       Rey|  Rey|
|      Kerr| Kerr|
|    Mickie|Micki|
|    Kaspar|Kaspa|
|    Norbie|Norbi|
|    Claude|Claud|
|     Thain|Thain|
|  Tiffanie|Tiffa|
|    Ettore|Ettor|
+----------+-----+
only showing top 20 rows



In [None]:
###################################  Multiple Filters #######################################

In [175]:
odf_filter = odf.filter((odf.First_Name.isin("Nicko", "Cathleen")) & (odf.City=="Caxias"))
odf_filter.show()

+---+----------+---------+------+------+--------------+-------+----------+---------+
| Id|First_Name|Last_Name|Gender|  City|     Job_Title| Salary|  Latitude|Longitude|
+---+----------+---------+------+------+--------------+-------+----------+---------+
| 37|     Nicko|    Frays|  Male|Caxias|Health Coach I|99786.4|-4.8654137|  -43.362|
+---+----------+---------+------+------+--------------+-------+----------+---------+



In [177]:
odf_filter = odf.filter((odf.Salary>90000) & (odf.City=="Caxias"))
odf_filter.show()

+---+----------+---------+------+------+--------------+-------+----------+---------+
| Id|First_Name|Last_Name|Gender|  City|     Job_Title| Salary|  Latitude|Longitude|
+---+----------+---------+------+------+--------------+-------+----------+---------+
| 37|     Nicko|    Frays|  Male|Caxias|Health Coach I|99786.4|-4.8654137|  -43.362|
+---+----------+---------+------+------+--------------+-------+----------+---------+



In [None]:
####################################  Running SQL #################################

In [178]:
from pyspark.sql.types import *
oschema = StructType([
    StructField("Id", IntegerType()),
    StructField("First_Name", StringType()),
    StructField("Last_Name", StringType()),
    StructField("Gender", StringType()),
    StructField("City", StringType()),
    StructField("Job_Title", StringType()),
    StructField("Salary", FloatType()),
    StructField("Latitude", FloatType()),
    StructField("Longitude", FloatType())
])

odf = spark.read.csv("original.csv", header=True, schema=oschema)
odf.show()

+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|  Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|                null|25

In [179]:
odf.registerTempTable("original")



In [187]:
query1 = spark.sql("select First_Name, Last_Name from original where Gender='Female'")
query1.show()

+----------+----------+
|First_Name| Last_Name|
+----------+----------+
|   Melinde| Shilburne|
|  Kimberly|Von Welden|
|    Alvera|  Di Boldi|
|     Maris|      Folk|
|     Masha|    Divers|
|    Kylynn|   Lockart|
|       Rey|    Meharg|
|    Claude|    Briant|
|  Tiffanie|  Pattison|
|   Lurleen|   Janczak|
|    Nichol|    Holtum|
|     Shaun|    Bridle|
|   Leandra|    Anfrey|
|  Jaquelyn|    Hazard|
|  Prudence|  Honacker|
|    Cherey|     Liger|
|      Neda|      Krop|
|     Barbi| Fattorini|
|    Lonnie| Townshend|
|    Valida|  Salzberg|
+----------+----------+
only showing top 20 rows



In [None]:
############################### Calculating columns ###############################

In [188]:
from pyspark.sql.types import *
oschema = StructType([
    StructField("Id", IntegerType()),
    StructField("First_Name", StringType()),
    StructField("Last_Name", StringType()),
    StructField("Gender", StringType()),
    StructField("City", StringType()),
    StructField("Job_Title", StringType()),
    StructField("Salary", FloatType()),
    StructField("Latitude", FloatType()),
    StructField("Longitude", FloatType())
])

odf = spark.read.csv("original.csv", header=True, schema=oschema)
odf.show()

+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|  Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+--------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|30101.16| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|                null|25

In [190]:
odf = odf.withColumn('Monthly_Salary', odf.Salary/12)
odf.show()

+---+----------+----------+------+---------------+--------------------+--------+----------+----------+------------------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|  Salary|  Latitude| Longitude|    Monthly_Salary|
+---+----------+----------+------+---------------+--------------------+--------+----------+----------+------------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18| 50.577408| 16.496717| 4786.514973958333|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|  48.82316| 103.52182|    5237.216796875|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52| 39.994747|116.339775| 4798.043294270833|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23| 44.504723| 38.130016|   5124.1025390625|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|63863.09|      null| 37.648994| 5321.924153645833|
|  6|     Maris|      Fo

In [194]:
odf = odf.withColumn('High_Salary', when(odf.Salary>60000,'Yes').otherwise('No'))
odf.show()                     

+---+----------+----------+------+---------------+--------------------+--------+----------+----------+------------------+-----------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|  Salary|  Latitude| Longitude|    Monthly_Salary|High_Salary|
+---+----------+----------+------+---------------+--------------------+--------+----------+----------+------------------+-----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|57438.18| 50.577408| 16.496717| 4786.514973958333|         No|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II| 62846.6|  48.82316| 103.52182|    5237.216796875|        Yes|
|  3|    Alvera|  Di Boldi|Female|           null|                null|57576.52| 39.994747|116.339775| 4798.043294270833|         No|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|61489.23| 44.504723| 38.130016|   5124.1025390625|        Yes|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            

In [None]:
##################################### Group By ####################################

In [31]:
import pyspark.sql.functions as sf
from pyspark.sql.types import *
oschema = StructType([
    StructField("Id", IntegerType()),
    StructField("First_Name", StringType()),
    StructField("Last_Name", StringType()),
    StructField("Gender", StringType()),
    StructField("City", StringType()),
    StructField("Job_Title", StringType()),
    StructField("Salary", FloatType()),
    StructField("Latitude", FloatType()),
    StructField("Longitude", FloatType())
])

odf = spark.read.csv("original.csv", header=True, schema=oschema)
odf.show()

+---+----------+----------+------+---------------+--------------------+------+----------+----------+
| Id|First_Name| Last_Name|Gender|           City|           Job_Title|Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|  null| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|  null|  48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|           null|                null|  null| 39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|  null| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|  null|      null| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|  null| 53.426613|-6.1644998|
|  7|     Masha|    Divers|Female|         Dachun|                null|  null| 24.879416|11

In [25]:
odf1 = odf.groupBy('gender','job_title').agg(sf.sum('Salary').alias('Total'),
                                 sf.avg('Salary').alias('Avg'),
                                  sf.min('Salary').alias('Min'),
                                   sf.max('Salary').alias('Max'),
                                    sf.stddev('Salary').alias('Stdev'))
                                  
odf1.show()

+------+--------------------+-----+----+----+----+-----+
|gender|           job_title|Total| Avg| Min| Max|Stdev|
+------+--------------------+-----+----+----+----+-----+
|Female|    Statistician III| null|null|null|null| null|
|  Male|     Cost Accountant| null|null|null|null| null|
|Female|         Engineer IV| null|null|null|null| null|
|Female| Clinical Specialist| null|null|null|null| null|
|Female|    Dental Hygienist| null|null|null|null| null|
|Female|Research Assistan...| null|null|null|null| null|
|Female|  Nurse Practicioner| null|null|null|null| null|
|  Male| Geological Engineer| null|null|null|null| null|
|  Male|            VP Sales| null|null|null|null| null|
|Female|Systems Administr...| null|null|null|null| null|
|Female|Automation Specia...| null|null|null|null| null|
|  Male|      Civil Engineer| null|null|null|null| null|
|  Male|     Design Engineer| null|null|null|null| null|
|  Male|               Nurse| null|null|null|null| null|
|  Male|Nuclear Power Eng...| n

In [32]:
output_path = r"C:\Users\erick\OneDrive\Desktop\python_work\PySpark\test"
odf.write.csv(output_path)

Py4JJavaError: An error occurred while calling o254.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:847)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:341)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:331)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:370)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 23 more
