In [1]:
import pyspark
from pyspark.sql import SparkSession

## Creating a Spark Session

In [2]:
spark = SparkSession.builder.getOrCreate()
spark

#### Reading a csv file with spark

In [3]:
df = spark.read.format("csv").option("header","true").load(r"C:\Users\maheshbabu.devathi\OneDrive - Advance Auto Parts\Desktop\AAP\practice\pyspark\data\original.csv")

In [4]:
df.show(5)

+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|
+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+
only showing top 5 rows



There are null present in some columns of the dataframe before moving to the next analysis need to handle them.

In [5]:
from pyspark.sql.functions import *

In [6]:
## Created a new column named clean_city in which replacing null values which are present in city column with "UnKnown"
df1 = df.withColumn("Clean_City",when(df.City.isNull(), 'UnKnown').otherwise(df.City))

In [7]:
df1.show(5)

+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+-------------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary|  Latitude|  Longitude|   Clean_City|
+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+-------------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|    Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|       Bulgan|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.9947462|116.3397725|      UnKnown|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|    Mytishchi|
+---+----------+----------+------+-------------+----------------

In [8]:
## Filtering the data frame with the records where there are no null values present in the Jobtitle
df1 = df1.filter(df1.JobTitle.isNotNull())

In [9]:
df1.show(5)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.4266145| -6.1644997|Kinsealy-Drinan|
+---+----------+----------+-----

In [10]:
## Replacing the null value of salary column with mean of salary column
df1 = df1.withColumn('Clean_Salary',df1.Salary.substr(2,100).cast('float'))

In [11]:
df1.show(5)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|Clean_Salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|      Mytishchi|    63863.09|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil 

In [12]:
mean = df1.groupBy().avg('Clean_Salary').take(1)[0][0]

In [13]:
mean

55516.32088199837

In [14]:
from pyspark.sql.functions import lit

In [15]:
df1 = df1.withColumn('new_salary', when(df1.Clean_Salary.isNull(), lit(mean)).otherwise(df1.Clean_Salary))

In [16]:
df1.show(5)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+--------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|Clean_Salary|    new_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+--------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18| 57438.1796875|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6| 62846.6015625|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|61489.23046875|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|  

In [17]:
## populate the median value of latitude and longitude
import numpy as np

In [18]:
lat = df1.select("Latitude")

In [19]:
lat.show(5)

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|      NULL|
|53.4266145|
+----------+
only showing top 5 rows



In [20]:
lat = lat.filter(lat.Latitude.isNotNull())

In [21]:
lat.show()

+----------+
|  Latitude|
+----------+
|50.5774075|
|48.8231572|
|44.5047212|
|53.4266145|
|45.1905186|
| 32.027934|
|  4.272793|
|     -5.85|
| 39.172378|
|49.8151822|
|42.1014803|
|49.7923299|
|43.4945737|
|52.7441662|
| 38.696249|
|-7.7232567|
|40.7172049|
|  49.16291|
|40.7576842|
|48.4902808|
+----------+
only showing top 20 rows



In [22]:
lat = lat.withColumn('latitude2',lat.Latitude.cast('float')).select('latitude2')

In [23]:
lat.show(5)

+---------+
|latitude2|
+---------+
|50.577408|
| 48.82316|
|44.504723|
|53.426613|
|45.190517|
+---------+
only showing top 5 rows



In [24]:
medain = np.median(lat.collect())

In [25]:
medain

31.93397331237793

In [26]:
df1 = df1.withColumn('lat_new', when(df1.Latitude.isNull(), lit(medain)).otherwise(df1.Latitude))

In [27]:
df1.show(5)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+--------------+-----------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|Clean_Salary|    new_salary|          lat_new|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+--------------+-----------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18| 57438.1796875|       50.5774075|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6| 62846.6015625|       48.8231572|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|  Divnomorskoye|    61489.23|61489.23046875|       44.5047212

In [28]:
#Q1 - Overall the mean or women get paid more on average?
#Q2 - By Job Title do men or women get paid more on average?
#Q3 - By Which City has the highest average Salary?

#### Answering Q1


In [126]:
import pyspark.sql.functions as sqlfunc

In [127]:
genders = df1.groupBy('gender').agg(sqlfunc.avg('new_salary').alias('AvgSalary'))

In [128]:
genders.show()

Py4JJavaError: An error occurred while calling o642.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 89.0 failed 1 times, most recent failure: Lost task 0.0 in stage 89.0 (TID 79) (PG030EXS.corp.advancestores.com executor driver): java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\0e\temp_shuffle_998418ba-15c1-49d4-b72b-7ca096f582aa (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\0e\temp_shuffle_998418ba-15c1-49d4-b72b-7ca096f582aa (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


### Answering Q2

In [32]:
df2 = df1.withColumn('female_salary', when(df1.gender == 'Female', df1.new_salary).otherwise(lit(0)))

In [33]:
df2 = df2.withColumn('male_salary', when(df2.gender == 'Male', df2.new_salary).otherwise(0))

In [34]:
df2.show(5)

+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+--------------+-----------------+--------------+--------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude|  Longitude|     Clean_City|Clean_Salary|    new_salary|          lat_new| female_salary|   male_salary|
+---+----------+----------+------+---------------+--------------------+---------+----------+-----------+---------------+------------+--------------+-----------------+--------------+--------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|      Nowa Ruda|    57438.18| 57438.1796875|       50.5774075| 57438.1796875|           0.0|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|         Bulgan|     62846.6| 62846.6015625|       48.8231572| 62846.6015625|           0.0|
|  4|   Shannon

In [35]:
df2 = df2.groupBy("JobTitle").agg(sqlfunc.avg('female_salary').alias('final_female_salary'), sqlfunc.avg('male_salary').alias('final_male_salary'))

In [36]:
df2.show()

+--------------------+-------------------+------------------+
|            JobTitle|final_female_salary| final_male_salary|
+--------------------+-------------------+------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|
|   Media Manager III| 29586.436197916668|17381.920572916668|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|
|       Geologist III|       31749.046875|    12830.75390625|
|        Geologist II|                0.0|   43293.865234375|
|Database Administ...|                0.0|     52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625|
|Software Engineer II|                0.0|      74782.640625|
|       Accountant IV|    82732.248046875|               0.0|
|    Product Engineer|     41825.48359375|       20464.94375|
|Software Test Eng...|   32218.6083984375|   27122.462890625|
|Safety Technician...|                0.0|   29421.529296875|
|    Jun

In [37]:
df2 = df2.withColumn('Delta', df2.final_female_salary - df2.final_male_salary)

In [38]:
df2.show()

+--------------------+-------------------+------------------+-------------------+
|            JobTitle|final_female_salary| final_male_salary|              Delta|
+--------------------+-------------------+------------------+-------------------+
|Systems Administr...|    50590.474609375|  15540.9501953125|   35049.5244140625|
|   Media Manager III| 29586.436197916668|17381.920572916668|       12204.515625|
|  Recruiting Manager| 34848.452473958336|  26383.4951171875|  8464.957356770836|
|       Geologist III|       31749.046875|    12830.75390625|     18918.29296875|
|        Geologist II|                0.0|   43293.865234375|   -43293.865234375|
|Database Administ...|                0.0|     52018.4609375|     -52018.4609375|
|   Financial Analyst|    23353.776953125|       39606.05625|   -16252.279296875|
|  Analyst Programmer|   16406.1287109375|  21042.9634765625| -4636.834765625001|
|Software Engineer II|                0.0|      74782.640625|      -74782.640625|
|       Accounta

### Answering Q3

In [39]:
cityavg = df1.groupBy('City').agg(sqlfunc.avg('new_salary').alias('avgsalary'))

In [40]:
cityavg = cityavg.sort(col('avgsalary').desc())

In [41]:
cityavg.show()

+-----------------+-------------+
|             City|    avgsalary|
+-----------------+-------------+
|        Mesopotam|  99948.28125|
|       Zhongcheng| 99942.921875|
|           Caxias|99786.3984375|
|      Karangtawar|99638.9921875|
|        Itabaiana|  99502.15625|
|           Pasian|  99421.34375|
|           Webuye| 99368.546875|
|      Yuktae-dong| 99250.828125|
|           Zinder|  99222.84375|
|   Timiryazevskiy|   99142.9375|
|        Sawahbaru|99013.7109375|
|          Madimba|98737.8671875|
|         Huangshi|  98690.34375|
|          Gharyan|   98679.3125|
|         Yŏnan-ŭp| 98628.609375|
|     Wringinputih|98603.8203125|
|Monte da Boavista|  98586.71875|
|          Klukeng|98439.4921875|
|         Murmashi|  98226.15625|
|        Fox Creek|      98138.0|
+-----------------+-------------+
only showing top 20 rows



### Bringing Data into Dataframes

In [42]:
## Reading the csv in another way in spark
df_n = spark.read.csv(r'C:\Users\maheshbabu.devathi\OneDrive - Advance Auto Parts\Desktop\AAP\practice\pyspark\data\original.csv', header = True)

In [43]:
df_n.show(5)

+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary|  Latitude|  Longitude|
+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.4967184|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60|48.8231572|103.5218199|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.9947462|116.3397725|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.1300171|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.09|      NULL| 37.6489954|
+---+----------+----------+------+-------------+--------------------+---------+----------+-----------+
only showing top 5 rows



In [44]:
# data types of the attributes present in the dataframe
df_n.dtypes

[('id', 'string'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string')]

As the spark interpretted every column as string eventhough there are some interger or float attribute present need to change them from string to thier respective data type

In [45]:
# changing the data types of the attribues.
from pyspark.sql.types import *
# As one can define their schema in sql here also we can  define the schema prior itself of the data which we will be 
# loading in the spark session. So the schema goes likes this

schema = StructType([StructField('id', IntegerType()),
                     StructField('first_name', StringType()),
                     StructField('last_name', StringType()),
                     StructField('gender',StringType()),
                     StructField('City', StringType()),
                     StructField('JobTitle', StringType()),
                     StructField('Salary', StringType()),
                     StructField('Latitude', StringType()),
                     StructField('Longitude', FloatType())])

df4 = spark.read.csv(r'C:\Users\maheshbabu.devathi\OneDrive - Advance Auto Parts\Desktop\AAP\practice\pyspark\data\original.csv', header = True, schema = schema)

In [46]:
df4.show(5)

+---+----------+----------+------+-------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+-------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.5774075| 16.496717|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60|48.8231572| 103.52182|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.9947462|116.339775|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.5047212| 38.130016|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|
+---+----------+----------+------+-------------+--------------------+---------+----------+----------+
only showing top 5 rows



In [47]:
df4.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'float')]

In [48]:
df4_n = df4.withColumn('Latitude', df4.Longitude.cast('float'))

In [49]:
df4_n.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'float'),
 ('Longitude', 'float')]

### Inspecting a DataFrame

In [50]:
schema = StructType([StructField('id', IntegerType()),
                     StructField('first_name', StringType()),
                     StructField('last_name', StringType()),
                     StructField('gender',StringType()),
                     StructField('City', StringType()),
                     StructField('JobTitle', StringType()),
                     StructField('Salary', StringType()),
                     StructField('Latitude', FloatType()),
                     StructField('Longitude', FloatType())])

df_nn = spark.read.csv(r'C:\Users\maheshbabu.devathi\OneDrive - Advance Auto Parts\Desktop\AAP\practice\pyspark\data\original.csv', header = True, schema = schema)

In [51]:
df_nn.dtypes

[('id', 'int'),
 ('first_name', 'string'),
 ('last_name', 'string'),
 ('gender', 'string'),
 ('City', 'string'),
 ('JobTitle', 'string'),
 ('Salary', 'string'),
 ('Latitude', 'float'),
 ('Longitude', 'float')]

In [52]:
df_nn.head(5)

[Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude=50.57740783691406, Longitude=16.49671745300293),
 Row(id=2, first_name='Kimberly', last_name='Von Welden', gender='Female', City='Bulgan', JobTitle='Programmer II', Salary='$62846.60', Latitude=48.823158264160156, Longitude=103.52182006835938),
 Row(id=3, first_name='Alvera', last_name='Di Boldi', gender='Female', City=None, JobTitle=None, Salary='$57576.52', Latitude=39.994747161865234, Longitude=116.33977508544922),
 Row(id=4, first_name='Shannon', last_name="O'Griffin", gender='Male', City='Divnomorskoye', JobTitle='Budget/Accounting Analyst II', Salary='$61489.23', Latitude=44.504722595214844, Longitude=38.1300163269043),
 Row(id=5, first_name='Sherwood', last_name='Macieja', gender='Male', City='Mytishchi', JobTitle='VP Sales', Salary='$63863.09', Latitude=None, Longitude=37.64899444580078)]

In [53]:
df_nn.first()

Row(id=1, first_name='Melinde', last_name='Shilburne', gender='Female', City='Nowa Ruda', JobTitle='Assistant Professor', Salary='$57438.18', Latitude=50.57740783691406, Longitude=16.49671745300293)

In [54]:
df_nn.describe().show()

+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+
|summary|               id|first_name|last_name|gender|               City|           JobTitle|   Salary|          Latitude|        Longitude|
+-------+-----------------+----------+---------+------+-------------------+-------------------+---------+------------------+-----------------+
|  count|             1000|      1000|     1000|  1000|                999|                998|     1000|               999|             1000|
|   mean|            500.5|      NULL|     NULL|  NULL|               NULL|               NULL|     NULL| 25.43151724702484|43.33756460386515|
| stddev|288.8194360957494|      NULL|     NULL|  NULL|               NULL|               NULL|     NULL|24.579082550156635| 69.4206453674681|
|    min|                1|   Abagail|    Abbay|Female|             Abéché|Account Coordinator|$10101.92|         -54.28115|       -123.04196|

In [55]:
df_nn.columns

['id',
 'first_name',
 'last_name',
 'gender',
 'City',
 'JobTitle',
 'Salary',
 'Latitude',
 'Longitude']

In [56]:
df_nn.distinct().count()

1000

In [57]:
df_nn.count()

1000

### Handling Nulls and Duplicate Values

In [58]:
## dropping all the records from the data where ever nulls are present in any of the column
df_dropped = df_nn.na.drop()

In [59]:
df_dropped.show(5)

+---+----------+----------+------+---------------+--------------------+---------+---------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary| Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+---------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18|50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60| 48.82316| 103.52182|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23|44.504723| 38.130016|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16|53.426613|-6.1644998|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36|45.190517| 0.7423124|
+---+----------+----------+------+---------------+--------------------+---------+---------+----------+
only showing top 5 rows



In [62]:
## Dropping the records from the dataset where nulls are present in the jobtitle column
df_not_null_jobs = df_nn.filter(df_nn.JobTitle.isNotNull())

In [63]:
df_not_null_jobs.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|
|  6|     Maris|      Folk|Female|Kinsealy-Drinan|      Civil Engineer|$30101.16| 53.426613|-6.1644998|
|  8|   Goddart|     Flear|  Male|      Trélissac|Desktop Support T...|$46116.36| 45.190517| 0.7423124|
|  9|      Roth|O'Cannavan|  Male|         Heitan|VP Product Man

In [65]:
## Creating a new column if there is a null present in a column replace null with unknown or keep what is was there.
df_handled = df_nn.withColumn("Clean_City", when(df_nn.City.isNull(),"Unknown").otherwise(df_nn.City))

In [66]:
df_handled.show()

+---+----------+----------+------+---------------+--------------------+---------+----------+----------+---------------+
| id|first_name| last_name|gender|           City|            JobTitle|   Salary|  Latitude| Longitude|     Clean_City|
+---+----------+----------+------+---------------+--------------------+---------+----------+----------+---------------+
|  1|   Melinde| Shilburne|Female|      Nowa Ruda| Assistant Professor|$57438.18| 50.577408| 16.496717|      Nowa Ruda|
|  2|  Kimberly|Von Welden|Female|         Bulgan|       Programmer II|$62846.60|  48.82316| 103.52182|         Bulgan|
|  3|    Alvera|  Di Boldi|Female|           NULL|                NULL|$57576.52| 39.994747|116.339775|        Unknown|
|  4|   Shannon| O'Griffin|  Male|  Divnomorskoye|Budget/Accounting...|$61489.23| 44.504723| 38.130016|  Divnomorskoye|
|  5|  Sherwood|   Macieja|  Male|      Mytishchi|            VP Sales|$63863.09|      NULL| 37.648994|      Mytishchi|
|  6|     Maris|      Folk|Female|Kinsea

In [67]:
## Dropping the duplicates from the dataset
df_no_duplicates = df_nn.dropDuplicates()

In [68]:
df_no_duplicates.show()

Py4JJavaError: An error occurred while calling o229.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 48.0 failed 1 times, most recent failure: Lost task 0.0 in stage 48.0 (TID 38) (PG030EXS.corp.advancestores.com executor driver): java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\3d\temp_shuffle_883309af-6fa2-4334-a55a-37c02de4d1bb (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\3d\temp_shuffle_883309af-6fa2-4334-a55a-37c02de4d1bb (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


### Selecting and filtering Data

In [69]:
## Select particular columns from the dataframe
df_nn.show(2)

+---+----------+----------+------+---------+-------------------+---------+---------+---------+
| id|first_name| last_name|gender|     City|           JobTitle|   Salary| Latitude|Longitude|
+---+----------+----------+------+---------+-------------------+---------+---------+---------+
|  1|   Melinde| Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.577408|16.496717|
|  2|  Kimberly|Von Welden|Female|   Bulgan|      Programmer II|$62846.60| 48.82316|103.52182|
+---+----------+----------+------+---------+-------------------+---------+---------+---------+
only showing top 2 rows



Let's Select first_name and Last_name from the above dataframe

In [70]:
df_select = df_nn.select("first_name","last_name")
df_select.show()

+----------+----------+
|first_name| last_name|
+----------+----------+
|   Melinde| Shilburne|
|  Kimberly|Von Welden|
|    Alvera|  Di Boldi|
|   Shannon| O'Griffin|
|  Sherwood|   Macieja|
|     Maris|      Folk|
|     Masha|    Divers|
|   Goddart|     Flear|
|      Roth|O'Cannavan|
|      Bran|   Trahear|
|    Kylynn|   Lockart|
|       Rey|    Meharg|
|      Kerr|    Braden|
|    Mickie| Whanstall|
|    Kaspar|     Pally|
|    Norbie|    Gwyllt|
|    Claude|    Briant|
|     Thain|    Habbon|
|  Tiffanie|  Pattison|
|    Ettore|  Gerriets|
+----------+----------+
only showing top 20 rows



In [72]:
##Renaming a column
df_rename_firstname = df_nn.withColumnRenamed('first_name','fn')
df_rename_firstname.show(3)

+---+--------+----------+------+---------+-------------------+---------+---------+----------+
| id|      fn| last_name|gender|     City|           JobTitle|   Salary| Latitude| Longitude|
+---+--------+----------+------+---------+-------------------+---------+---------+----------+
|  1| Melinde| Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.577408| 16.496717|
|  2|Kimberly|Von Welden|Female|   Bulgan|      Programmer II|$62846.60| 48.82316| 103.52182|
|  3|  Alvera|  Di Boldi|Female|     NULL|               NULL|$57576.52|39.994747|116.339775|
+---+--------+----------+------+---------+-------------------+---------+---------+----------+
only showing top 3 rows



In [76]:
## filtering the dataframe for a particular attribute from a particular column
df_filter = df_nn.filter(df_nn.first_name == 'Alvera')
df_filter.show()

+---+----------+---------+------+----+--------+---------+---------+----------+
| id|first_name|last_name|gender|City|JobTitle|   Salary| Latitude| Longitude|
+---+----------+---------+------+----+--------+---------+---------+----------+
|  3|    Alvera| Di Boldi|Female|NULL|    NULL|$57576.52|39.994747|116.339775|
+---+----------+---------+------+----+--------+---------+---------+----------+



In [83]:
## filtering the dataframe with a wildcard expression from a particular column
df_filter_we = df_nn.filter(df_nn.first_name.like('%lver%'))
df_filter_we.show()

+---+----------+---------+------+----------+-------------------+---------+---------+----------+
| id|first_name|last_name|gender|      City|           JobTitle|   Salary| Latitude| Longitude|
+---+----------+---------+------+----------+-------------------+---------+---------+----------+
|  3|    Alvera| Di Boldi|Female|      NULL|               NULL|$57576.52|39.994747|116.339775|
|775|   Alverta| MacNulty|Female|Megalópoli|Geological Engineer|$17299.62|37.401245| 22.136488|
+---+----------+---------+------+----------+-------------------+---------+---------+----------+



In [80]:
## filtering a dataframe based on  a particular column where the values endwith a particular pattern
df_filter_ew = df_nn.filter(df_nn.first_name.endswith('din'))
df_filter_ew.show()

+---+----------+-------------+------+-----------+--------+---------+----------+---------+
| id|first_name|    last_name|gender|       City|JobTitle|   Salary|  Latitude|Longitude|
+---+----------+-------------+------+-----------+--------+---------+----------+---------+
|901|     Aldin|Matuszkiewicz|  Male|East London|Operator|$41468.83|-32.954933|27.931913|
+---+----------+-------------+------+-----------+--------+---------+----------+---------+



In [84]:
## filtering a dataframe based on  a particular column where the values endwith a particular pattern
df_filter_sw = df_nn.filter(df_nn.first_name.startswith('Alv'))
df_filter_sw.show()

+---+----------+---------+------+----------+--------------------+---------+---------+----------+
| id|first_name|last_name|gender|      City|            JobTitle|   Salary| Latitude| Longitude|
+---+----------+---------+------+----------+--------------------+---------+---------+----------+
|  3|    Alvera| Di Boldi|Female|      NULL|                NULL|$57576.52|39.994747|116.339775|
| 81|     Alvin|    Doman|  Male|      Niny|Research Assistant I|$53258.86|44.486843| 43.940807|
|775|   Alverta| MacNulty|Female|Megalópoli| Geological Engineer|$17299.62|37.401245| 22.136488|
+---+----------+---------+------+----------+--------------------+---------+---------+----------+



In [85]:
### filtering a numerical column between certain range of values
df_filter_btw = df_nn.filter(df_nn.id.between(1,5))
df_filter_btw.show()

+---+----------+----------+------+-------------+--------------------+---------+---------+----------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary| Latitude| Longitude|
+---+----------+----------+------+-------------+--------------------+---------+---------+----------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60| 48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.09|     NULL| 37.648994|
+---+----------+----------+------+-------------+--------------------+---------+---------+----------+



In [86]:
### filtering the dataframe based on more than one attribute from a particular column.
df_filter_fnm = df_nn.filter((df_nn.first_name.isin('Aldin','Valma')))
df_filter_fnm.show()

+---+----------+-------------+------+-----------+----------------+---------+----------+---------+
| id|first_name|    last_name|gender|       City|        JobTitle|   Salary|  Latitude|Longitude|
+---+----------+-------------+------+-----------+----------------+---------+----------+---------+
|569|     Valma|      Bratton|Female|  Kurayoshi|Web Developer II|$32665.89| 35.449905|133.76134|
|901|     Aldin|Matuszkiewicz|  Male|East London|        Operator|$41468.83|-32.954933|27.931913|
+---+----------+-------------+------+-----------+----------------+---------+----------+---------+



In [89]:
## breaking the values of a particular column to certain characters and creating a new column with it.(Substring)
df_subs = df_nn.select(df_nn.first_name, df_nn.first_name.substr(1,5).alias('New_Name'))
df_subs.show(5)

+----------+--------+
|first_name|New_Name|
+----------+--------+
|   Melinde|   Melin|
|  Kimberly|   Kimbe|
|    Alvera|   Alver|
|   Shannon|   Shann|
|  Sherwood|   Sherw|
+----------+--------+
only showing top 5 rows



### Applying multiple Filter

In [93]:
## scenario - 1
df_filt1 = df_nn.filter((df_nn.first_name.isin('Aldin','Valma')) | (df_nn.City.like("%ondon")))
df_filt1.show()

+---+----------+-------------+------+-----------+----------------+---------+----------+---------+
| id|first_name|    last_name|gender|       City|        JobTitle|   Salary|  Latitude|Longitude|
+---+----------+-------------+------+-----------+----------------+---------+----------+---------+
|569|     Valma|      Bratton|Female|  Kurayoshi|Web Developer II|$32665.89| 35.449905|133.76134|
|901|     Aldin|Matuszkiewicz|  Male|East London|        Operator|$41468.83|-32.954933|27.931913|
+---+----------+-------------+------+-----------+----------------+---------+----------+---------+



In [95]:
# Scenario - 2
df_filt2 = df_nn.filter((df_nn.id > 10) & (df_nn.id < 100))
df_filt2.show(5)

+---+----------+---------+------+-----------+--------------------+---------+--------+----------+
| id|first_name|last_name|gender|       City|            JobTitle|   Salary|Latitude| Longitude|
+---+----------+---------+------+-----------+--------------------+---------+--------+----------+
| 11|    Kylynn|  Lockart|Female|   El Cardo|Nuclear Power Eng...|$13604.63|   -5.85| -79.88333|
| 12|       Rey|   Meharg|Female|Wangqingtuo|Systems Administr...|$73423.70|39.17238| 116.93161|
| 13|      Kerr|   Braden|  Male|  Sułkowice|Compensation Analyst|$33432.99|49.81518| 19.377174|
| 14|    Mickie|Whanstall|  Male|Springfield|Assistant Media P...|$50838.53|42.10148|-72.576675|
| 15|    Kaspar|    Pally|  Male|     Chrást|  Analyst Programmer|$40163.03|49.79233| 13.491532|
+---+----------+---------+------+-----------+--------------------+---------+--------+----------+
only showing top 5 rows



### Running SQL on DataFrames

In [98]:
df_nn.registerTempTable('original')

In [100]:
query1 = spark.sql('select * from original')
query1.show(5)

+---+----------+----------+------+-------------+--------------------+---------+---------+----------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary| Latitude| Longitude|
+---+----------+----------+------+-------------+--------------------+---------+---------+----------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.577408| 16.496717|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60| 48.82316| 103.52182|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.994747|116.339775|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.504723| 38.130016|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.09|     NULL| 37.648994|
+---+----------+----------+------+-------------+--------------------+---------+---------+----------+
only showing top 5 rows



In [102]:
query2 = spark.sql('select concat(first_name," ",last_name) as full_name from original')
query2.show(5)

+-------------------+
|          full_name|
+-------------------+
|  Melinde Shilburne|
|Kimberly Von Welden|
|    Alvera Di Boldi|
|  Shannon O'Griffin|
|   Sherwood Macieja|
+-------------------+
only showing top 5 rows



In [103]:
df_nn.show(1)

+---+----------+---------+------+---------+-------------------+---------+---------+---------+
| id|first_name|last_name|gender|     City|           JobTitle|   Salary| Latitude|Longitude|
+---+----------+---------+------+---------+-------------------+---------+---------+---------+
|  1|   Melinde|Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.577408|16.496717|
+---+----------+---------+------+---------+-------------------+---------+---------+---------+
only showing top 1 row



### Adding Calculated Columns

In [104]:
df_cc = df_nn.withColumn('Clean_Salary', df_nn.Salary.substr(2,100).cast('float'))
df_cc.show(3)

+---+----------+----------+------+---------+-------------------+---------+---------+----------+------------+
| id|first_name| last_name|gender|     City|           JobTitle|   Salary| Latitude| Longitude|Clean_Salary|
+---+----------+----------+------+---------+-------------------+---------+---------+----------+------------+
|  1|   Melinde| Shilburne|Female|Nowa Ruda|Assistant Professor|$57438.18|50.577408| 16.496717|    57438.18|
|  2|  Kimberly|Von Welden|Female|   Bulgan|      Programmer II|$62846.60| 48.82316| 103.52182|     62846.6|
|  3|    Alvera|  Di Boldi|Female|     NULL|               NULL|$57576.52|39.994747|116.339775|    57576.52|
+---+----------+----------+------+---------+-------------------+---------+---------+----------+------------+
only showing top 3 rows



In [107]:
df_mn_slr = df_cc.withColumn('Monthly_Salary', df_cc.Clean_Salary/12)
df_mn_slr.show(5)

+---+----------+----------+------+-------------+--------------------+---------+---------+----------+------------+-----------------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary| Latitude| Longitude|Clean_Salary|   Monthly_Salary|
+---+----------+----------+------+-------------+--------------------+---------+---------+----------+------------+-----------------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.577408| 16.496717|    57438.18|4786.514973958333|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60| 48.82316| 103.52182|     62846.6|   5237.216796875|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.994747|116.339775|    57576.52|4798.043294270833|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.504723| 38.130016|    61489.23|  5124.1025390625|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.

In [111]:
## Adding a new column to identify whether the customer is a female or not
df_fc = df_nn.withColumn("Are_they_Female", when(df_nn.gender == 'Female', 'Yes').otherwise('No'))
df_fc.show(5)

+---+----------+----------+------+-------------+--------------------+---------+---------+----------+---------------+
| id|first_name| last_name|gender|         City|            JobTitle|   Salary| Latitude| Longitude|Are_they_Female|
+---+----------+----------+------+-------------+--------------------+---------+---------+----------+---------------+
|  1|   Melinde| Shilburne|Female|    Nowa Ruda| Assistant Professor|$57438.18|50.577408| 16.496717|            Yes|
|  2|  Kimberly|Von Welden|Female|       Bulgan|       Programmer II|$62846.60| 48.82316| 103.52182|            Yes|
|  3|    Alvera|  Di Boldi|Female|         NULL|                NULL|$57576.52|39.994747|116.339775|            Yes|
|  4|   Shannon| O'Griffin|  Male|Divnomorskoye|Budget/Accounting...|$61489.23|44.504723| 38.130016|             No|
|  5|  Sherwood|   Macieja|  Male|    Mytishchi|            VP Sales|$63863.09|     NULL| 37.648994|             No|
+---+----------+----------+------+-------------+----------------

### Groupby and Aggregation

In [120]:
## Groupby gender and get the sum of total salary
df_g_s = df_cc.groupBy('gender').agg(sqlfunc.sum('Clean_Salary'))

In [121]:
df_g_s.show()

Py4JJavaError: An error occurred while calling o570.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 85.0 failed 1 times, most recent failure: Lost task 0.0 in stage 85.0 (TID 75) (PG030EXS.corp.advancestores.com executor driver): java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\19\temp_shuffle_e851c21d-b8a1-416f-a165-2b3cb021e10d (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\19\temp_shuffle_e851c21d-b8a1-416f-a165-2b3cb021e10d (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


In [123]:
## Groupby gender and get the sum, avg, min and max of clean_salary
df_gp_cs = df_cc.groupBy('gender').agg(sqlfunc.sum('Clean_Salary').alias("Total"),
                                       sqlfunc.avg('Clean_Salary').alias("Average"),
                                       sqlfunc.max('Clean_Salary').alias("Maximum"),
                                       sqlfunc.min('Clean_Salary').alias("Minimum"))

df_gp_cs.show()

Py4JJavaError: An error occurred while calling o608.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 87.0 failed 1 times, most recent failure: Lost task 0.0 in stage 87.0 (TID 77) (PG030EXS.corp.advancestores.com executor driver): java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\1a\temp_shuffle_fd5a2765-2076-47b3-823b-23573caddfa6 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\1a\temp_shuffle_fd5a2765-2076-47b3-823b-23573caddfa6 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


In [124]:
## Groupby gender, city and get the sum, avg, min and max of clean_salary

df_g_c_cs = df_cc.groupBy('gender','city').agg(sqlfunc.sum("Clean_Salary").alias("Total"),
                                               sqlfunc.avg("Clean_Salary").alias("Average"),
                                               sqlfunc.max("Clean_Salary").alias("Maximum"),
                                               sqlfunc.min("Clean_Salary").alias("Minimum"))

In [125]:
df_g_c_cs.show()

Py4JJavaError: An error occurred while calling o630.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 88.0 failed 1 times, most recent failure: Lost task 0.0 in stage 88.0 (TID 78) (PG030EXS.corp.advancestores.com executor driver): java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\39\temp_shuffle_878116bb-f0f3-4f72-94e7-fa7956acd0c9 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.io.FileNotFoundException: C:\Users\maheshbabu.devathi\AppData\Local\Temp\blockmgr-24202675-d42c-4632-b5c9-73128b645a90\39\temp_shuffle_878116bb-f0f3-4f72-94e7-fa7956acd0c9 (The system cannot find the path specified)
	at java.io.FileOutputStream.open0(Native Method)
	at java.io.FileOutputStream.open(Unknown Source)
	at java.io.FileOutputStream.<init>(Unknown Source)
	at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:141)
	at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:161)
	at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:308)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:171)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)


In [129]:
## Wrting output to a file

df_nn.write.csv('df_nn.csv')

Py4JJavaError: An error occurred while calling o649.csv.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:372)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:964)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1129)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more
