# RDD Review
We are reviewing some of the basic techniques for manipulating a RDD dataset.

## Word Count Revisit
### Read in a text file

In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession, Row


In [2]:
spark = SparkSession.builder.appName("rdd-app").config("spark.config.option", "value").getOrCreate()
scfg = SparkConf().setAppName('rdd-app')
sc = spark.sparkContext

In [5]:
import string

text_file = '/user/student/shakespeare/tragedy/macbeth.txt'
text = sc.textFile(text_file)

In [7]:
text.collect()

['ACT I.',
 '',
 'SCENE I. An open place.',
 '[An open place. Thunder and lightning. Enter three Witches.]',
 '',
 '  FIRST WITCH.',
 '    When shall we three meet again',
 '    In thunder, lightning, or in rain?',
 '',
 '  SECOND WITCH.',
 "    When the hurlyburly's done,",
 "    When the battle's lost and won.",
 '',
 '  THIRD WITCH.',
 '    That will be ere the set of sun.',
 '',
 '  FIRST WITCH.',
 '    Where the place?',
 '',
 '  SECOND WITCH.',
 '    Upon the heath.',
 '',
 '  THIRD WITCH.',
 '    There to meet with Macbeth.',
 '',
 '  FIRST WITCH.',
 '    I come, Graymalkin!',
 '',
 '  SECOND WITCH.',
 '    Paddock calls.',
 '',
 '  THIRD WITCH.',
 '    Anon.',
 '',
 '  ALL.',
 '    Fair is foul, and foul is fair:',
 '    Hover through the fog and filthy air.',
 '[Witches vanish.]',
 '',
 'SCENE II. A camp near Forres.',
 '[Alarum within. Enter King Duncan, Malcolm, Donalbain, Lennox, with Attendants, meeting a bleeding Soldier.]',
 '',
 '  DUNCAN.',
 '    What bloody man is tha

### Supporting functions

In [8]:
def strip_punc(s):
    return s.translate(str.maketrans('', '', string.punctuation)).split(' ')

def search_word_in_line(word):
    count = 1
    for line in text.collect():
        if word in strip_punc(line):
            print('{}. {}'.format(count, line))
        count += 1

### Split a line into tokens separated by space (' ') after removing punctuations

In [9]:
flatmap = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))
map = flatmap.map(lambda word: (word, 1))
reduced = map.reduceByKey(lambda a, b: a + b)

In [10]:
reduced.collect()

[('ACT', 5),
 ('', 11441),
 ('SCENE', 28),
 ('open', 4),
 ('Thunder', 7),
 ('three', 12),
 ('WITCH', 51),
 ('When', 22),
 ('we', 60),
 ('meet', 6),
 ('again', 21),
 ('in', 175),
 ('rain', 2),
 ('battles', 1),
 ('lost', 6),
 ('That', 91),
 ('set', 8),
 ('of', 319),
 ('sun', 4),
 ('Upon', 11),
 ('heath', 3),
 ('Macbeth', 73),
 ('calls', 3),
 ('Fair', 2),
 ('is', 162),
 ('Hover', 1),
 ('fog', 1),
 ('vanish', 3),
 ('II', 6),
 ('near', 10),
 ('Alarum', 1),
 ('Donalbain', 10),
 ('meeting', 4),
 ('bleeding', 2),
 ('What', 58),
 ('bloody', 14),
 ('seemeth', 1),
 ('his', 131),
 ('revolt', 2),
 ('The', 129),
 ('newest', 3),
 ('state', 7),
 ('MALCOLM', 40),
 ('sergeant', 1),
 ('like', 34),
 ('good', 41),
 ('hardy', 1),
 ('soldier', 5),
 ('Gainst', 2),
 ('captivity', 1),
 ('brave', 2),
 ('Say', 5),
 ('king', 27),
 ('knowledge', 3),
 ('broil', 1),
 ('thou', 66),
 ('leave', 11),
 ('stood', 4),
 ('two', 10),
 ('do', 70),
 ('And', 151),
 ('choke', 1),
 ('merciless', 1),
 ('rebel', 2),
 ('multiplying',

### Making it into a single statement

In [11]:
counts = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))\
             .map(lambda word: (word, 1))\
             .reduceByKey(lambda a, b: a + b)    

### Run the search

In [47]:
word = "sweet"
for count in reduced.collect():
    # kv = str(count).translate(str.maketrans('', '', string.punctuation)).split(' ')
    kv = strip_punc(str(count))
    if word == kv[0]:
        print('Found \'{}\' occurs \'{}\' times'.format(kv[0], kv[1])) 
        search_word_in_line(word)
        break

Found 'sweet' occurs '2' times
2901.     Pour the sweet milk of concord into hell,
3457.     And with some sweet oblivious antidote


## Manipulating airline performance data

### Creating an RDD with one row.

In [48]:
airport = sc.parallelize([Row(iata="00M",airport="Thigpen ",city="Bay Springs",\
                              state="MS",country="USA",lat=31.95376472,long=-89.23450472)])
print(airport.count())
print(airport.take(3))
print(airport.collect())

1
[Row(airport='Thigpen ', city='Bay Springs', country='USA', iata='00M', lat=31.95376472, long=-89.23450472, state='MS')]
[Row(airport='Thigpen ', city='Bay Springs', country='USA', iata='00M', lat=31.95376472, long=-89.23450472, state='MS')]


### Converting an RDD to a Dataframe (DF)

In [49]:

from pyspark.sql.types import Row
from datetime import datetime

airport_df = airport.toDF()
airport_df.show()
airport_df

+--------+-----------+-------+----+-----------+------------+-----+
| airport|       city|country|iata|        lat|        long|state|
+--------+-----------+-------+----+-----------+------------+-----+
|Thigpen |Bay Springs|    USA| 00M|31.95376472|-89.23450472|   MS|
+--------+-----------+-------+----+-----------+------------+-----+



DataFrame[airport: string, city: string, country: string, iata: string, lat: double, long: double, state: string]

### More complex dataset

In [50]:

complex = sc.parallelize([Row(col_float=3.1415,
                              col_string='da pi',
                              col_boolean=True,
                              col_integer=201,
                              col_list=[1,2,3,4])])
complex.collect()

[Row(col_boolean=True, col_float=3.1415, col_integer=201, col_list=[1, 2, 3, 4], col_string='da pi')]

### Converting to DF

In [51]:
complex_df = complex.toDF()
complex_df.show()

+-----------+---------+-----------+------------+----------+
|col_boolean|col_float|col_integer|    col_list|col_string|
+-----------+---------+-----------+------------+----------+
|       true|   3.1415|        201|[1, 2, 3, 4]|     da pi|
+-----------+---------+-----------+------------+----------+



### More complex data type

In [52]:
real_complex = sc.parallelize([
    Row(col_list=[1,2,3], col_dict = {"pi": 3.1415}, col_row = Row(number=3, fraction=1415), col_time=datetime(2019,7,22,5,51,0)),
    Row(col_list=[3,4,5], col_dict = {"sqrt2": 1.4142}, col_row = Row(number=1, fraction=4142), col_time=datetime(2019,7,22,5,54,0)),
    Row(col_list=[6,7,9,10], col_dict = {"sqrt3": 1.73205}, col_row = Row(number=1, fraction=73205), col_time=datetime(2019,7,22,5,55,0))
])
real_complex.collect() # A little bit hard to see

[Row(col_dict={'pi': 3.1415}, col_list=[1, 2, 3], col_row=Row(fraction=1415, number=3), col_time=datetime.datetime(2019, 7, 22, 5, 51)),
 Row(col_dict={'sqrt2': 1.4142}, col_list=[3, 4, 5], col_row=Row(fraction=4142, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 54)),
 Row(col_dict={'sqrt3': 1.73205}, col_list=[6, 7, 9, 10], col_row=Row(fraction=73205, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 55))]

In [54]:
real_complex_df = real_complex.toDF()
real_complex_df.show();

+------------------+-------------+----------+-------------------+
|          col_dict|     col_list|   col_row|           col_time|
+------------------+-------------+----------+-------------------+
|    [pi -> 3.1415]|    [1, 2, 3]| [1415, 3]|2019-07-22 05:51:00|
| [sqrt2 -> 1.4142]|    [3, 4, 5]| [4142, 1]|2019-07-22 05:54:00|
|[sqrt3 -> 1.73205]|[6, 7, 9, 10]|[73205, 1]|2019-07-22 05:55:00|
+------------------+-------------+----------+-------------------+



**It is much easier to view the data structure now**

## Airline Performance data
Loading data from HDFS

In [56]:
data_by_year = '/user/student/airline/plane-data.csv'
airline_performance = spark.read.option("header", "true").csv(data_by_year)


In [57]:
airline_performance.show()

+-------+----+------------+----------+-----+------+-------------+-----------+----+
|tailnum|type|manufacturer|issue_date|model|status|aircraft_type|engine_type|year|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
| N050AA|null|        null|      null| null|  null|         null|       null|null|
| N051AA|null|        null|      null| null|  null|         null|       null|null|
| N052AA|null|        null|      null| null|  null|         null|       null|null|
| N054AA|null|        null|      null| null|  null|         null|       null|null|
| N055AA|null|        null|      null| null|  null|         null|       null|null|
| N056AA|null|        null|      null| null|  null|         null|       null|null|
| N057AA|null|        null|      null| null|  null|         null|       null|null|
| N058AA|null|        null|      null| null|  null|         null|       null|null|
| N059AA|null|        null|      null| null|  null|         null|       null|null|
| N0

### Loading airport table

In [74]:
plane_data = '/user/student/airline/plane-data.csv'
plane_data = spark.read.option("header", "true").csv(plane_data)
plane_data.head(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

### plane-data is a DF data type

In [59]:
plane_data

DataFrame[tailnum: string, type: string, manufacturer: string, issue_date: string, model: string, status: string, aircraft_type: string, engine_type: string, year: string]

In [60]:
plane_data.count()

5029

In [61]:
plane_data.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [62]:
plane_data.take(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [63]:
plane_data.first()

Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)

In [64]:
plane_data.head(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [65]:
# Accessing rows
plane_data.collect()[51]

Row(tailnum='N11109', type='Corporation', manufacturer='EMBRAER', issue_date='06/25/2003', model='EMB-145XR', status='Valid', aircraft_type='Fixed Wing Multi-Engine', engine_type='Turbo-Fan', year='2002')

In [66]:
plane_data.collect()[51]['type'] # use column name.

'Corporation'

In [67]:
plane_data.collect()[51][4] # use column index

'EMB-145XR'

In [68]:
plane_data_rdd = plane_data.rdd.map(lambda x: (x.tailnum, x.type, x.manufacturer))
plane_data_rdd.collect()

[('N050AA', None, None),
 ('N051AA', None, None),
 ('N052AA', None, None),
 ('N054AA', None, None),
 ('N055AA', None, None),
 ('N056AA', None, None),
 ('N057AA', None, None),
 ('N058AA', None, None),
 ('N059AA', None, None),
 ('N060AA', None, None),
 ('N061AA', None, None),
 ('N062AA', None, None),
 ('N063AA', None, None),
 ('N064AA', None, None),
 ('N065AA', None, None),
 ('N066AA', None, None),
 ('N067AA', None, None),
 ('N068AA', None, None),
 ('N069AA', None, None),
 ('N070AA', None, None),
 ('N071AA', None, None),
 ('N072AA', None, None),
 ('N073AA', None, None),
 ('N074AA', None, None),
 ('N075AA', None, None),
 ('N076AA', None, None),
 ('N077AA', None, None),
 ('N078AA', None, None),
 ('N079AA', None, None),
 ('N080AA', None, None),
 ('N081AA', None, None),
 ('N082AA', None, None),
 ('N083AA', None, None),
 ('N084AA', None, None),
 ('N10156', 'Corporation', 'EMBRAER'),
 ('N102UW', 'Corporation', 'AIRBUS INDUSTRIE'),
 ('N10323', 'Corporation', 'BOEING'),
 ('N103US', 'Corporation'

In [69]:
# default with col names.
plane_data_rdd = plane_data.rdd
plane_data_rdd.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [70]:
plane_data.describe(['tailnum']).show()

+-------+-------+
|summary|tailnum|
+-------+-------+
|  count|   5029|
|   mean|   null|
| stddev|   null|
|    min| N050AA|
|    max| N999DN|
+-------+-------+



In [71]:
plane_data.describe(['tailnum'])

DataFrame[summary: string, tailnum: string]

In [72]:
plane_data.describe(['tailnum']).collect()[3]

Row(summary='min', tailnum='N050AA')

In [73]:
plane_data.describe(['status']).show()

+-------+-----------------+
|summary|           status|
+-------+-----------------+
|  count|             4480|
|   mean|             null|
| stddev|             null|
|    min|Registered to Mfr|
|    max|            Valid|
+-------+-----------------+

