# RDD Review
We are reviewing some of the basic techniques for manipulating a RDD dataset.

## Word Count Revisit
### Read in a text file

In [19]:
import string

text_file = '/user/student/shakespeare/tragedy/macbeth.txt'
text = sc.textFile(text_file)

### Supporting functions

In [20]:
def strip_punc(s):
    return s.translate(str.maketrans('', '', string.punctuation)).split(' ')

def search_word_in_line(word):
    count = 1
    for line in text.collect():
        if word in strip_punc(line):
            print('{}. {}'.format(count, line))
        count += 1

### Split a line into tokens separated by space (' ') after removing punctuations

In [21]:
flatmap = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))
map = flatmap.map(lambda word: (word, 1))
reduced = map.reduceByKey(lambda a, b: a + b)

### Making it into a single statement

In [22]:
counts = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))\
             .map(lambda word: (word, 1))\
             .reduceByKey(lambda a, b: a + b)    

### Run the search

In [24]:
word = "purpose"
for count in reduced.collect():
    # kv = str(count).translate(str.maketrans('', '', string.punctuation)).split(' ')
    kv = strip_punc(str(count))
    if word == kv[0]:
        print('Found \'{}\' occurs \'{}\' times'.format(kv[0], kv[1])) 
        search_word_in_line(word)
        break

Found 'purpose' occurs '5' times
715.     Shake my fell purpose, nor keep peace between
815.     We coursed him at the heels, and had a purpose
1258.     Infirm of purpose!
3141.     The flighty purpose never is o'ertook
3150.     This deed I'll do before this purpose cool.


## Manipulating airline performance data

### Creating an RDD with one row.

In [25]:
airport = sc.parallelize([Row(iata="00M",airport="Thigpen ",city="Bay Springs",\
                              state="MS",country="USA",lat=31.95376472,long=-89.23450472)])
airport.count()
airport.take(3)
airport.collect()

[Row(airport='Thigpen ', city='Bay Springs', country='USA', iata='00M', lat=31.95376472, long=-89.23450472, state='MS')]

### Converting an RDD to a Dataframe (DF)

In [26]:

from pyspark.sql.types import Row
from datetime import datetime

airport_df = airport.toDF()
airport_df.show()

+--------+-----------+-------+----+-----------+------------+-----+
| airport|       city|country|iata|        lat|        long|state|
+--------+-----------+-------+----+-----------+------------+-----+
|Thigpen |Bay Springs|    USA| 00M|31.95376472|-89.23450472|   MS|
+--------+-----------+-------+----+-----------+------------+-----+



### More complex dataset

In [27]:

complex = sc.parallelize([Row(col_float=3.1415,
                              col_string='da pi',
                              col_boolean=True,
                              col_integer=201,
                              col_list=[1,2,3,4])])
complex.collect()

[Row(col_boolean=True, col_float=3.1415, col_integer=201, col_list=[1, 2, 3, 4], col_string='da pi')]

### Converting to DF

In [28]:
complex_df = complex.toDF()
complex_df.show()

+-----------+---------+-----------+------------+----------+
|col_boolean|col_float|col_integer|    col_list|col_string|
+-----------+---------+-----------+------------+----------+
|       true|   3.1415|        201|[1, 2, 3, 4]|     da pi|
+-----------+---------+-----------+------------+----------+



### More complex data type

In [29]:
real_complex = sc.parallelize([
    Row(col_list=[1,2,3], col_dict = {"pi": 3.1415}, col_row = Row(number=3, fraction=1415), col_time=datetime(2019,7,22,5,51,0)),
    Row(col_list=[3,4,5], col_dict = {"sqrt2": 1.4142}, col_row = Row(number=1, fraction=4142), col_time=datetime(2019,7,22,5,54,0)),
    Row(col_list=[6,7,9,10], col_dict = {"sqrt3": 1.73205}, col_row = Row(number=1, fraction=73205), col_time=datetime(2019,7,22,5,55,0))
])
real_complex.collect() # A little bit hard to see

[Row(col_dict={'pi': 3.1415}, col_list=[1, 2, 3], col_row=Row(fraction=1415, number=3), col_time=datetime.datetime(2019, 7, 22, 5, 51)),
 Row(col_dict={'sqrt2': 1.4142}, col_list=[3, 4, 5], col_row=Row(fraction=4142, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 54)),
 Row(col_dict={'sqrt3': 1.73205}, col_list=[6, 7, 9, 10], col_row=Row(fraction=73205, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 55))]

In [30]:
real_complex_df = real_complex.toDF()
real_complex_df.show();

+------------------+-------------+----------+-------------------+
|          col_dict|     col_list|   col_row|           col_time|
+------------------+-------------+----------+-------------------+
|    [pi -> 3.1415]|    [1, 2, 3]| [1415, 3]|2019-07-22 05:51:00|
| [sqrt2 -> 1.4142]|    [3, 4, 5]| [4142, 1]|2019-07-22 05:54:00|
|[sqrt3 -> 1.73205]|[6, 7, 9, 10]|[73205, 1]|2019-07-22 05:55:00|
+------------------+-------------+----------+-------------------+



**It is much easier to view the data structure now**

## Airline Performance data
Loading data from HDFS

In [31]:
data_by_year = '/user/student/airline/1987.csv'
airline_performance = spark.read.option("header", "true").csv(data_by_year)

AnalysisException: 'Path does not exist: hdfs://localhost:9000/user/student/airline/1987.csv;'

In [32]:
airline_performance.show()

NameError: name 'airline_performance' is not defined

### Loading airport table

In [33]:

airports_file = '/user/student/airline/plane-data.csv'
airports = spark.read.option("header", "true").csv(airports_file)
airports.show()

+-------+----+------------+----------+-----+------+-------------+-----------+----+
|tailnum|type|manufacturer|issue_date|model|status|aircraft_type|engine_type|year|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
| N050AA|null|        null|      null| null|  null|         null|       null|null|
| N051AA|null|        null|      null| null|  null|         null|       null|null|
| N052AA|null|        null|      null| null|  null|         null|       null|null|
| N054AA|null|        null|      null| null|  null|         null|       null|null|
| N055AA|null|        null|      null| null|  null|         null|       null|null|
| N056AA|null|        null|      null| null|  null|         null|       null|null|
| N057AA|null|        null|      null| null|  null|         null|       null|null|
| N058AA|null|        null|      null| null|  null|         null|       null|null|
| N059AA|null|        null|      null| null|  null|         null|       null|null|
| N0

### Airports is a DF data type

In [34]:
airports

DataFrame[tailnum: string, type: string, manufacturer: string, issue_date: string, model: string, status: string, aircraft_type: string, engine_type: string, year: string]

In [35]:
airports.count()

5029

In [36]:
airports.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [37]:
airports.take(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [38]:
airports.first()

Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)

In [39]:
airports.head(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [40]:
# Accessing rows
airports.collect()[2]

Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)

In [41]:
airports.collect()[0]['iata'] # use column name.

ValueError: iata

In [42]:
airports.collect()[0][0] # use column index

'N050AA'

In [43]:
airport_rdd = airports.rdd.map(lambda x: (x.iata, x.airport, x.city, x.state, x.country, x.lat, x.long))
airport_rdd.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 51.0 failed 1 times, most recent failure: Lost task 0.0 in stage 51.0 (TID 94, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'iata' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-43-ed64cde9de81>", line 1, in <lambda>
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: iata

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'iata' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-43-ed64cde9de81>", line 1, in <lambda>
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: iata

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [44]:
# More selective
airport_rdd = airports.rdd.map(lambda x: (x.iata, x.airport))
airport_rdd.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 52.0 failed 1 times, most recent failure: Lost task 0.0 in stage 52.0 (TID 95, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'iata' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-44-726f7a88b443>", line 2, in <lambda>
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: iata

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1527, in __getattr__
    idx = self.__fields__.index(item)
ValueError: 'iata' is not in list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-44-726f7a88b443>", line 2, in <lambda>
  File "/home/student/dev/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1532, in __getattr__
    raise AttributeError(item)
AttributeError: iata

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [45]:
# default with col names.
airport_rdd = airports.rdd
airport_rdd.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [46]:
airports.describe(['lat']).show()

AnalysisException: "cannot resolve '`lat`' given input columns: [tailnum, issue_date, model, engine_type, manufacturer, aircraft_type, status, year, type];;\n'Project ['lat]\n+- Relation[tailnum#167,type#168,manufacturer#169,issue_date#170,model#171,status#172,aircraft_type#173,engine_type#174,year#175] csv\n"