# H-1B Visa Approvals  (2011-2017): Part 3


## PySpark / spark.sql exploration of a  Spark data frame object running on an AWS EC2 Instance
  
  
**AMOD-5410H: Big Data**   
**Winter 2018**  
**Nicholas Hopewell - 0496633**

In this note book, I cover the neccessary steps in order to conduct a big data analysis with pyspark. I am including some steps and comments so that this may be used as a learning tool in the future and to simply refresh my memory when I come back to it. Although, it should be known that this notebook does not include any the set-up work involved with connecting to an ec2 instance from my local machine. The set-up work is quite involved and lengthy the first time and until one gets comfortable with it.

I am going to talk about some important spark objects and some associated methods. There are a ton and more details of these important objects and associated methods can be found here: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark-sql-module

**Note:** For file sharing, I used FileZilla (https://filezilla-project.org/) to transfer the data file I wrote to my local machine after preprocessing (my first notebook) to the ubuntu server. I will be using the pregeo_encod data as, for time concerns, geo-encoding must be done at a later date. I plan to do spacial visualizations of the data once I encode it's lon and lat. 

If the data were larger I would use a cluster instead of a single instance. This ec2 instance is only the single free-tier version.


In [None]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark

In order to work with Spark dataframes I need to start a spark session

In [None]:
# start spark session
from pyspark.sql import SparkSession

Start the Spark session by applying it with these conventions:

note: with elastic mapreduce this is faster.

In [24]:
# create session
spark = SparkSession.builder.appName('BigData_proj').getOrCreate()

First I will read in the data that I cleaned and preprocessed with python/pandas into a spark dataframe.  

note: If I start the notebook from the same file as the data set, I dont need to specify the path.

In [25]:
# read the csv file
spark_h1b_data = spark.read.csv("/home/ubuntu/csv/pregeo_encoding.csv",inferSchema=True,header=True)

I will begin by doing basic manipulations and operations on the spark data frame and then move on to more sophisticated operations such as aggregations. 

First, I will print out the structure of the data / data types:

In [26]:
spark_h1b_data.printSchema()

root
 |-- CASE_NUMBER: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- SOC_CODE: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: string (nullable = true)
 |-- WORKSITE_STATE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)



Notice that that the structure if the data file is correct but the data types are not all correct AKA: the schema was not inferred correctly. This should be fixed first. 

In [27]:
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType,
                               DoubleType)

Now I can explicitly give a schema to the data (for best practice, I think giving a data schema to large data ought to be a primary step). 

In [38]:
# layout data schema
data_schema = [StructField('CASE_NUMBER', StringType(), True), # not doing math withese these (plus '-' included)
               StructField('CASE_STATUS', StringType(), True),  
               StructField('EMPLOYER_NAME', StringType(), True),
               StructField('SOC_NAME', StringType(), True),
               StructField('SOC_CODE', StringType(), True),
               StructField('JOB_TITLE', StringType(), True),
               StructField('FULL_TIME_POSITION', StringType(), True),
               StructField('PREVAILING_WAGE', DoubleType(), True),
               StructField('WORKSITE_STATE', StringType(), True),
               StructField('YEAR', StringType(), True),
               StructField('WORKSITE', StringType(), True)]

Now that I have set up the schema which I would expect, I can pass that schema into a final structure for the data frame.

In [39]:
# explict schema
f_struc = StructType(fields=data_schema)

Now, if I read in the data again but this time specifying the schema I expect, the issue should be solved. 

In [40]:
# final parameter schema=
spark_h1b_data = spark.read.csv("/home/ubuntu/csv/pregeo_encoding.csv",inferSchema=True,header=True, schema=f_struc)

In [10]:
# make sure the issue is fixed:
spark_h1b_data.printSchema()

root
 |-- CASE_NUMBER: string (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- SOC_CODE: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: double (nullable = true)
 |-- WORKSITE_STATE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)



Now the data types are what I would expect them to be. 

Showing the top 5 rows, notice how columns are represented. It looks messy because the columns do not fit the screen and they do not support the scrolling window a pandas df window would have. 

In [11]:
spark_h1b_data.show(n=5)

+------------------+-------------------+--------------------+--------------------+----------+--------------------+------------------+---------------+--------------+----+--------------------+
|       CASE_NUMBER|        CASE_STATUS|       EMPLOYER_NAME|            SOC_NAME|  SOC_CODE|           JOB_TITLE|FULL_TIME_POSITION|PREVAILING_WAGE|WORKSITE_STATE|YEAR|            WORKSITE|
+------------------+-------------------+--------------------+--------------------+----------+--------------------+------------------+---------------+--------------+----+--------------------+
|I-200-09124-371007|          CERTIFIED|WILLISTON NORTHAM...|Teachers and Inst...|   25-3999|     CHINESE TEACHER|                 Y|        23350.0| massachusetts|2011|EASTHAMPTON,  MAS...|
|I-200-09125-717448|          CERTIFIED|         NYFIX, INC.|Computer Software...|15-1031.00|     PROJECT MANAGER|                 Y|       101088.0|      new york|2011| NEW YORK,  NEW YORK|
|I-200-09126-530125|CERTIFIED-WITHDRAWN|TGS-N

Zooming into the first three rows:

In [12]:
# first row
spark_h1b_data.head(3)

[Row(CASE_NUMBER='I-200-09124-371007', CASE_STATUS='CERTIFIED', EMPLOYER_NAME='WILLISTON NORTHAMPTON SCHOOL', SOC_NAME='Teachers and Instructors, All Other*', SOC_CODE='25-3999', JOB_TITLE='CHINESE TEACHER', FULL_TIME_POSITION='Y', PREVAILING_WAGE=23350.0, WORKSITE_STATE='massachusetts', YEAR='2011', WORKSITE='EASTHAMPTON,  MASSACHUSETTS'),
 Row(CASE_NUMBER='I-200-09125-717448', CASE_STATUS='CERTIFIED', EMPLOYER_NAME='NYFIX, INC.', SOC_NAME='Computer Software Engineers, Applications', SOC_CODE='15-1031.00', JOB_TITLE='PROJECT MANAGER', FULL_TIME_POSITION='Y', PREVAILING_WAGE=101088.0, WORKSITE_STATE='new york', YEAR='2011', WORKSITE='NEW YORK,  NEW YORK'),
 Row(CASE_NUMBER='I-200-09126-530125', CASE_STATUS='CERTIFIED-WITHDRAWN', EMPLOYER_NAME='TGS-NOPEC GEOPHYSICAL COMPANY', SOC_NAME='Computer Software Engineers, Applications', SOC_CODE='15-1031.00', JOB_TITLE='PRINCIPAL TRAINER / DEVELOPMENT ANALYST', FULL_TIME_POSITION='Y', PREVAILING_WAGE=77480.0, WORKSITE_STATE='texas', YEAR='2011'

Notice how this returns a list object which I can index in a conventional way to see only the first row if I want. 

In [13]:
type(spark_h1b_data.head(3))

list

In [14]:
spark_h1b_data.head(3)[0]

Row(CASE_NUMBER='I-200-09124-371007', CASE_STATUS='CERTIFIED', EMPLOYER_NAME='WILLISTON NORTHAMPTON SCHOOL', SOC_NAME='Teachers and Instructors, All Other*', SOC_CODE='25-3999', JOB_TITLE='CHINESE TEACHER', FULL_TIME_POSITION='Y', PREVAILING_WAGE=23350.0, WORKSITE_STATE='massachusetts', YEAR='2011', WORKSITE='EASTHAMPTON,  MASSACHUSETTS')

This individual, who is a full time Chinese teacher in Easthampton massachusetts, has had his/her visa was certified in 2011.

Notice if I call type on this, a row object is returned. It is useful to realize there is a row object within the data frame. I will briefly mention why I think this is important to understand after I look at some columns.

In [15]:
type(spark_h1b_data.head(3)[0])

pyspark.sql.types.Row

Subsetting a spark data frame by columns cannot be done with simple indexing with square brackets like one would subset a pandas data frame. In fact, doing so would simply return a column object, like seen below:

In [16]:
type(spark_h1b_data['WORKSITE'])

pyspark.sql.column.Column

There is more flexibility to have a data frame of a column rather than an spark sql column object. We can get such a data frame by selecting it as such (note .show() can be removed if you do not want to see the column).

In [17]:
spark_h1b_data.select(['JOB_TITLE', 'WORKSITE', 'PREVAILING_WAGE']).show()

+--------------------+--------------------+---------------+
|           JOB_TITLE|            WORKSITE|PREVAILING_WAGE|
+--------------------+--------------------+---------------+
|     CHINESE TEACHER|EASTHAMPTON,  MAS...|        23350.0|
|     PROJECT MANAGER| NEW YORK,  NEW YORK|       101088.0|
|PRINCIPAL TRAINER...|     HOUSTON,  TEXAS|        77480.0|
|    DRILLING MANAGER|THE WOODLANDS,  T...|       165506.0|
|TECHNICAL SUPPORT...|BOSTON,  MASSACHU...|        62358.0|
|SENIOR GLOBAL CAT...|  BEAVERTON,  OREGON|       111405.0|
|     ENDOCRINOLOGIST|SALISBURY,  MARYLAND|        99632.0|
|SENIOR PROJECT MA...|MERRIMACK,  NEW H...|        78728.0|
|COMPUTER SOFTWARE...|FORT MYERS,  FLORIDA|        58802.0|
| SECURITY RESEARCHER|SAN DIEGO,  CALIF...|        61963.0|
|EMBEDDED SOFTWARE...| NEWARK,  CALIFORNIA|        95867.0|
|SENIOR BUSINESS A...|  VISTA,  CALIFORNIA|       102123.0|
|SOFTWARE INTEGRAT...|FREDERICK,  MARYLAND|        73749.0|
|SENIOR PROJECT MA...|SEATTLE,  WASHINGT

Notice, the type returned is a data frame (with many more associated methods to play with).

In [18]:
type(spark_h1b_data.select('JOB_TITLE', 'WORKSITE', 'PREVAILING_WAGE'))

pyspark.sql.dataframe.DataFrame

Why is it so important for spark to include so many special objects and imbedded objects such as row and column objects? These features allow spark to read from a distributed data source and map tasks to a distributed computing system. This is a very powerful tool when data becomes too large a task to manage on a local computer.

Using select, I can take a subset of my original data frame since some of the columns are not very important to me. 

In [19]:
# only desired cols
spark_h1b_data = spark_h1b_data.select(['CASE_STATUS', 'EMPLOYER_NAME', 'SOC_NAME', 'JOB_TITLE', 
                                       'FULL_TIME_POSITION','PREVAILING_WAGE', 'YEAR', 'WORKSITE'])

If I wanted to mutate the data frame by adding a column from the values of one column or the result of performing an operation on multiple columns, I can use the .withColumn method. Here, I need to pass a column object to the withColumn method. Below is an example of such a mutation making a monthly wage column from the existing prevailing wage column (which is in yearly units). I will not actually append this new column to the data frame as I do not need it.

In [20]:
# select one col mutate with second
wage_data = spark_h1b_data.select('PREVAILING_WAGE')
wage_data = wage_data.withColumn('monthly_wage', spark_h1b_data['PREVAILING_WAGE']/12)
wage_data.show(3)

+---------------+------------------+
|PREVAILING_WAGE|      monthly_wage|
+---------------+------------------+
|        23350.0|1945.8333333333333|
|       101088.0|            8424.0|
|        77480.0| 6456.666666666667|
+---------------+------------------+
only showing top 3 rows



I accidently did not keep the convention of the variable names in this data frame by not capitalizing the new variable. I can fix that like so:

In [21]:
# rename col
wage_data = wage_data.withColumnRenamed('monthly_wage', 'MONTHLY_WAGE')
wage_data.show(3)

+---------------+------------------+
|PREVAILING_WAGE|      MONTHLY_WAGE|
+---------------+------------------+
|        23350.0|1945.8333333333333|
|       101088.0|            8424.0|
|        77480.0| 6456.666666666667|
+---------------+------------------+
only showing top 3 rows



Getting the descriptive statistics of the numeric attributes (done below) is actually quite a lengthy task because I am runnin just the single free-tier ec2 instance. On a cluster this would be blazzingly fast until the data got really big.

In [22]:
from pyspark.sql.functions import mean, min, max

* Summary stats just stopped working for some reason *

In [1]:
#spark_h1b_data.describe().show()

spark.sql is named as such because it can interact directly with SQL queries. By making the data frame an SQL temporary view with .createOrReplaceView() - incase you already made a temporary view and want to replace it.

In [None]:
# create SQL temporary view
spark_h1b_data.createOrReplaceTempView('h1b')

Now, direct SQL commands can be passed to spark.sql in quotations. Here are some simple examples:

In [None]:
# read as 'Select all from temporary view where wage is greater than 100k'
high_earners = spark.sql("SELECT * FROM h1b WHERE PREVAILING_WAGE > 100000")
# full time position
full_timers = spark.sql("SELECT * FROM h1b WHERE FULL_TIME_POSITION = 'Y'")
# full time and high earner
full_high_earners = spark.sql("SELECT * FROM h1b WHERE PREVAILING_WAGE > 100000 AND FULL_TIME_POSITION = 'Y'")

In [None]:
full_high_earners.show(3)

These queries could get a lot more complex, but the potential power fo spark.sql should be obvious now. 

An important concept is that spark data frames are built on top of the sparksql platform I have been talking about. Because of this, a big part of working with spark data frames is to filter out and select relevant information of the data. These filtering commands are based on sql syntax.

To begin, I will take a look at some individuals who do have full time positions and show only there job title and worksite.  

In [None]:
spark_h1b_data.filter("FULL_TIME_POSITION = 'Y'").select(['JOB_TITLE', 'WORKSITE']).show(n=10)

pyspark is flexible in that we can do this same sort of command either with SQL-like syntax or python syntax. Here is that same comman but with traditional python syntax:

In [None]:
spark_h1b_data['JOB_TITLE', 'WORKSITE'].filter(spark_h1b_data['FULL_TIME_POSITION'] =='Y').show(n=10)

Similar idea using .select()

In [None]:
spark_h1b_data.filter(spark_h1b_data['PREVAILING_WAGE'] >= 200000).select(
                     'JOB_TITLE', 'FULL_TIME_POSITION', 'WORKSITE', 'EMPLOYER_NAME').show()

Passing in an indexed column in python syntax is a way I am more comfortable with and something I will likely stick to. Either way works.  

However, filtering based on multiple conditions using column indexing is quite different than it is in Python for a few reasons. Mainly because bools cannot be seperated by keywords 'and' 'or' like they can in Python, but are instead replaced with their symbol forms ( ' & ' , ' | ' ) and conditions are now enclosed in parentheses. Also notice I can choose to immediately index to select columns from the initial call of the spark data frame object (used above) or use .select (used below).

Here is an example of this syntax below:

In [None]:
spark_h1b_data.filter( (spark_h1b_data['FULL_TIME_POSITION'] =='Y') & 
                      ~(spark_h1b_data['PREVAILING_WAGE'] >= 100000) 
                     ).select(['JOB_TITLE', 'WORKSITE']).show(n=10)

Notice above I added the '~' ('not') to select records with full time positions but not more than or equal to 100,000 prevailing annual wage.

Another typical scenario is to selecting a specific record from the data based on some condition (maybe a low or high value at some point). When you want a specific record from a data frame, it might not be very valuable to always use .show() if the data has many attributes. As mentioned before it simply will not fit on the screen and cannot be read easily. Instead, I can use .collect to collect the data into a row object instead of displaying it across the screen in columns. This is more practical and will give me the data I want to work with.


* Note: The micro instance of ec2 does not allow me to do .collect() in any way. It is really, really important but I get a memory error. Apprently it is quite memory intensive. You can cast the collected row object to a dictionary and it gives a realluy nice table for that record (looks like a hash table). This dictionary can also be easily indexed. 

I will include the code below to show how it would work if it were not a micro teir ec2 instance. This bit is actually very helpful.

In [None]:
#record = spark_h1b_data.filter(spark_h1b_data['JOB_TITLE'] == 'ANESTHESIOLOGIST').collect()
#record_row = record[0]  -> will return the first row object which meets this command 
#record_row.asDict() -> converts to a dictionary with cols and keys and vals and values
#record_row.asDict()['PREVAILING_WAGE'] -> to index dict with the col as key and returning the value

A more sophisticated approach to analyzing data involves grouping and aggregating data to produce more interesting insights. I will go over some of these aggregations now. 

Grouping by full time position ('Y' and 'N')

In [32]:
spark_h1b_data.groupBy('FULL_TIME_POSITION')

<pyspark.sql.group.GroupedData at 0x7efc0feedd30>

Notice this returns a group.GroupedData object. This grouped data object has it own set of associated methods which are very useful.

In [44]:
#spark_h1b_data.groupBy('FULL_TIME_POSITION').mean()

In [52]:
# read the csv file
kaggle_h1b_data = spark.read.csv("/home/ubuntu/csv/h1b_kaggle.csv",inferSchema=True,header=True)

In [53]:
kaggle_h1b_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- CASE_STATUS: string (nullable = true)
 |-- EMPLOYER_NAME: string (nullable = true)
 |-- SOC_NAME: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- FULL_TIME_POSITION: string (nullable = true)
 |-- PREVAILING_WAGE: string (nullable = true)
 |-- YEAR: string (nullable = true)
 |-- WORKSITE: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- lat: string (nullable = true)



In [57]:
# layout data schema
kaggle_schema = [StructField('CASE_STATUS', StringType(), True),# not doing math withese these (plus '-' included) 
                   StructField('EMPLOYER_NAME', StringType(), True),
                   StructField('SOC_NAME', StringType(), True),
                   StructField('JOB_TITLE', StringType(), True),
                   StructField('FULL_TIME_POSITION', StringType(), True),
                   StructField('PREVAILING_WAGE', DoubleType(), True),
                   StructField('WORKSITE_STATE', StringType(), True),
                   StructField('YEAR', StringType(), True),
                   StructField('WORKSITE', StringType(), True),
                   StructField('lon', DoubleType(), True),
                   StructField('lat', DoubleType(), True)]

In [60]:
k_struc = StructType(fields=kaggle_schema)

In [61]:
# final parameter schema=
kaggle_h1b_data = spark.read.csv("/home/ubuntu/csv/pregeo_encoding.csv",inferSchema=True,header=True, schema=k_struc)

In [67]:
kaggle2_h1b_data = kaggle_h1b_data.select(['lon', 'lat'])

In [68]:
kaggle2_h1b_data.describe()

Py4JJavaError: An error occurred while calling o641.describe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 67.0 failed 1 times, most recent failure: Lost task 0.0 in stage 67.0 (TID 707, localhost, executor driver): java.text.ParseException: Unparseable number: "EASTHAMPTON,  MASSACHUSETTS"
	at java.text.NumberFormat.parse(NumberFormat.java:385)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at scala.util.Try.getOrElse(Try.scala:79)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2136)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2100)
	at org.apache.spark.sql.Dataset$$anonfun$describe$1.apply(Dataset.scala:2082)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2845)
	at org.apache.spark.sql.Dataset.describe(Dataset.scala:2082)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.text.ParseException: Unparseable number: "EASTHAMPTON,  MASSACHUSETTS"
	at java.text.NumberFormat.parse(NumberFormat.java:385)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at scala.util.Try.getOrElse(Try.scala:79)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [69]:
kaggle2_h1b_data.show(n=5)

Py4JJavaError: An error occurred while calling o641.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 69.0 failed 1 times, most recent failure: Lost task 0.0 in stage 69.0 (TID 709, localhost, executor driver): java.text.ParseException: Unparseable number: "EASTHAMPTON,  MASSACHUSETTS"
	at java.text.NumberFormat.parse(NumberFormat.java:385)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at scala.util.Try.getOrElse(Try.scala:79)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2386)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2392)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2128)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2127)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2818)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.text.ParseException: Unparseable number: "EASTHAMPTON,  MASSACHUSETTS"
	at java.text.NumberFormat.parse(NumberFormat.java:385)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply$mcD$sp(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$4.apply(CSVInferSchema.scala:270)
	at scala.util.Try.getOrElse(Try.scala:79)
	at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:270)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:125)
	at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:94)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:184)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [79]:
only_wage = spark.read.csv("/home/ubuntu/csv/one_col.csv")

TypeError: __init__() missing 1 required positional argument: 'name'

In [86]:
only_wage['_c0'].mean()

TypeError: 'Column' object is not callable