# RDD Review
We are reviewing some of the basic techniques for manipulating a RDD dataset.

## Word Count Revisit
### Read in a text file

In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession, Row


In [2]:
spark = SparkSession.builder.appName("rdd-app").config("spark.config.option", "value").getOrCreate()
scfg = SparkConf().setAppName('rdd-app')
sc = spark.sparkContext

In [3]:
import string

text_file = '/user/student/shakespeare/tragedy/macbeth.txt'
text = sc.textFile(text_file)

In [4]:
text.collect()

['',
 'The Tragedy of Macbeth',
 'Shakespeare homepage | Macbeth | Entire play',
 'ACT I',
 'SCENE I. A desert place.',
 '',
 '    Thunder and lightning. Enter three Witches ',
 '',
 'First Witch',
 '',
 '    When shall we three meet again',
 '    In thunder, lightning, or in rain?',
 '',
 'Second Witch',
 '',
 "    When the hurlyburly's done,",
 "    When the battle's lost and won.",
 '',
 'Third Witch',
 '',
 '    That will be ere the set of sun.',
 '',
 'First Witch',
 '',
 '    Where the place?',
 '',
 'Second Witch',
 '',
 '    Upon the heath.',
 '',
 'Third Witch',
 '',
 '    There to meet with Macbeth.',
 '',
 'First Witch',
 '',
 '    I come, Graymalkin!',
 '',
 'Second Witch',
 '',
 '    Paddock calls.',
 '',
 'Third Witch',
 '',
 '    Anon.',
 '',
 'ALL',
 '',
 '    Fair is foul, and foul is fair:',
 '    Hover through the fog and filthy air.',
 '',
 '    Exeunt',
 '',
 'SCENE II. A camp near Forres.',
 '',
 '    Alarum within. Enter DUNCAN, MALCOLM, DONALBAIN, LENNOX, with A

### Supporting functions

In [5]:
def strip_punc(s):
    return s.translate(str.maketrans('', '', string.punctuation)).split(' ')

def search_word_in_line(word):
    count = 1
    for line in text.collect():
        if word in strip_punc(line):
            print('{}. {}'.format(count, line))
        count += 1

### Split a line into tokens separated by space (' ') after removing punctuations

In [6]:
flatmap = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))
map = flatmap.map(lambda word: (word, 1))
reduced = map.reduceByKey(lambda a, b: a + b)

In [7]:
reduced.collect()

[('', 11739),
 ('The', 127),
 ('Tragedy', 1),
 ('of', 316),
 ('Macbeth', 43),
 ('Shakespeare', 1),
 ('Entire', 1),
 ('ACT', 5),
 ('SCENE', 28),
 ('Thunder', 7),
 ('three', 13),
 ('Witch', 51),
 ('When', 22),
 ('we', 59),
 ('meet', 6),
 ('again', 19),
 ('in', 175),
 ('rain', 2),
 ('Second', 24),
 ('battles', 1),
 ('lost', 6),
 ('That', 90),
 ('set', 8),
 ('sun', 4),
 ('Upon', 11),
 ('heath', 3),
 ('calls', 3),
 ('Fair', 2),
 ('is', 164),
 ('Hover', 1),
 ('fog', 1),
 ('II', 6),
 ('near', 10),
 ('Alarum', 1),
 ('MALCOLM', 50),
 ('meeting', 4),
 ('bleeding', 2),
 ('Sergeant', 5),
 ('What', 58),
 ('bloody', 14),
 ('seemeth', 1),
 ('his', 130),
 ('revolt', 2),
 ('newest', 3),
 ('state', 7),
 ('sergeant', 1),
 ('like', 34),
 ('good', 43),
 ('hardy', 1),
 ('soldier', 5),
 ('Gainst', 2),
 ('captivity', 1),
 ('brave', 2),
 ('Say', 5),
 ('king', 30),
 ('knowledge', 3),
 ('broil', 1),
 ('thou', 67),
 ('leave', 11),
 ('stood', 4),
 ('two', 10),
 ('do', 70),
 ('And', 151),
 ('choke', 1),
 ('merciles

### Making it into a single statement

In [10]:
counts = text.flatMap(lambda line: line.translate(str.maketrans('', '', string.punctuation)).split(' '))\
             .map(lambda word: (word, 1))\
             .reduceByKey(lambda a, b: a + b)    

### Run the search

In [11]:
word = "purpose"
for count in reduced.collect():
    # kv = str(count).translate(str.maketrans('', '', string.punctuation)).split(' ')
    kv = strip_punc(str(count))
    if word == kv[0]:
        print('Found \'{}\' occurs \'{}\' times'.format(kv[0], kv[1])) 
        search_word_in_line(word)
        break

Found 'purpose' occurs '5' times
715.     Shake my fell purpose, nor keep peace between
815.     We coursed him at the heels, and had a purpose
1258.     Infirm of purpose!
3141.     The flighty purpose never is o'ertook
3150.     This deed I'll do before this purpose cool.


## Manipulating airline performance data

### Creating an RDD with one row.

In [14]:
airport = sc.parallelize([Row(iata="00M",airport="Thigpen ",city="Bay Springs",\
                              state="MS",country="USA",lat=31.95376472,long=-89.23450472)])
print(airport.count())
print(airport.take(3))
print(airport.collect())

1
[Row(airport='Thigpen ', city='Bay Springs', country='USA', iata='00M', lat=31.95376472, long=-89.23450472, state='MS')]
[Row(airport='Thigpen ', city='Bay Springs', country='USA', iata='00M', lat=31.95376472, long=-89.23450472, state='MS')]


### Converting an RDD to a Dataframe (DF)

In [15]:

from pyspark.sql.types import Row
from datetime import datetime

airport_df = airport.toDF()
airport_df.show()
airport_df

+--------+-----------+-------+----+-----------+------------+-----+
| airport|       city|country|iata|        lat|        long|state|
+--------+-----------+-------+----+-----------+------------+-----+
|Thigpen |Bay Springs|    USA| 00M|31.95376472|-89.23450472|   MS|
+--------+-----------+-------+----+-----------+------------+-----+



DataFrame[airport: string, city: string, country: string, iata: string, lat: double, long: double, state: string]

### More complex dataset

In [16]:

complex = sc.parallelize([Row(col_float=3.1415,
                              col_string='da pi',
                              col_boolean=True,
                              col_integer=201,
                              col_list=[1,2,3,4])])
complex.collect()

[Row(col_boolean=True, col_float=3.1415, col_integer=201, col_list=[1, 2, 3, 4], col_string='da pi')]

### Converting to DF

In [17]:
complex_df = complex.toDF()
complex_df.show()

+-----------+---------+-----------+------------+----------+
|col_boolean|col_float|col_integer|    col_list|col_string|
+-----------+---------+-----------+------------+----------+
|       true|   3.1415|        201|[1, 2, 3, 4]|     da pi|
+-----------+---------+-----------+------------+----------+



### More complex data type

In [18]:
real_complex = sc.parallelize([
    Row(col_list=[1,2,3], col_dict = {"pi": 3.1415}, col_row = Row(number=3, fraction=1415), col_time=datetime(2019,7,22,5,51,0)),
    Row(col_list=[3,4,5], col_dict = {"sqrt2": 1.4142}, col_row = Row(number=1, fraction=4142), col_time=datetime(2019,7,22,5,54,0)),
    Row(col_list=[6,7,9,10], col_dict = {"sqrt3": 1.73205}, col_row = Row(number=1, fraction=73205), col_time=datetime(2019,7,22,5,55,0))
])
real_complex.collect() # A little bit hard to see

[Row(col_dict={'pi': 3.1415}, col_list=[1, 2, 3], col_row=Row(fraction=1415, number=3), col_time=datetime.datetime(2019, 7, 22, 5, 51)),
 Row(col_dict={'sqrt2': 1.4142}, col_list=[3, 4, 5], col_row=Row(fraction=4142, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 54)),
 Row(col_dict={'sqrt3': 1.73205}, col_list=[6, 7, 9, 10], col_row=Row(fraction=73205, number=1), col_time=datetime.datetime(2019, 7, 22, 5, 55))]

In [19]:
real_complex_df = real_complex.toDF()
real_complex_df.show();

+------------------+-------------+----------+-------------------+
|          col_dict|     col_list|   col_row|           col_time|
+------------------+-------------+----------+-------------------+
|    [pi -> 3.1415]|    [1, 2, 3]| [1415, 3]|2019-07-22 05:51:00|
| [sqrt2 -> 1.4142]|    [3, 4, 5]| [4142, 1]|2019-07-22 05:54:00|
|[sqrt3 -> 1.73205]|[6, 7, 9, 10]|[73205, 1]|2019-07-22 05:55:00|
+------------------+-------------+----------+-------------------+



**It is much easier to view the data structure now**

## Airline Performance data
Loading data from HDFS

In [20]:
data_by_year = '/user/student/airline/1987.csv'
airline_performance = spark.read.option("header", "true").csv(data_by_year)

In [21]:
airline_performance.show()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|1987|   10|        14|        3|    741|       730|    912|       849|           PS|     1451

### Loading airport table

In [23]:

plane_data_file = '/user/student/airline/plane-data.csv'
plane_data = spark.read.option("header", "true").csv(plane_data_file)
plane_data.show()

+-------+----+------------+----------+-----+------+-------------+-----------+----+
|tailnum|type|manufacturer|issue_date|model|status|aircraft_type|engine_type|year|
+-------+----+------------+----------+-----+------+-------------+-----------+----+
| N050AA|null|        null|      null| null|  null|         null|       null|null|
| N051AA|null|        null|      null| null|  null|         null|       null|null|
| N052AA|null|        null|      null| null|  null|         null|       null|null|
| N054AA|null|        null|      null| null|  null|         null|       null|null|
| N055AA|null|        null|      null| null|  null|         null|       null|null|
| N056AA|null|        null|      null| null|  null|         null|       null|null|
| N057AA|null|        null|      null| null|  null|         null|       null|null|
| N058AA|null|        null|      null| null|  null|         null|       null|null|
| N059AA|null|        null|      null| null|  null|         null|       null|null|
| N0

### Airports is a DF data type

In [24]:
plane_data

DataFrame[tailnum: string, type: string, manufacturer: string, issue_date: string, model: string, status: string, aircraft_type: string, engine_type: string, year: string]

In [25]:
plane_data.count()

5029

In [26]:
plane_data.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [30]:
plane_data.take(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [31]:
plane_data.first()

Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)

In [37]:
plane_data.head(5)

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None)]

In [38]:
# Accessing rows
plane_data.collect()[100]

Row(tailnum='N12114', type='Corporation', manufacturer='BOEING', issue_date='07/05/1995', model='757-224', status='Valid', aircraft_type='Fixed Wing Multi-Engine', engine_type='Turbo-Jet', year='1995')

In [39]:
plane_data.collect()[100]['model'] # use column name.

'757-224'

In [40]:
plane_data.collect()[101][3] # use column index

'03/29/1996'

In [42]:
plane_data_rdd = plane_data.rdd.map(lambda x: (x.tailnum, x.type, x.manufacturer, x.issue_date, x.model, x.status, x.aircraft_type))
plane_data_rdd.collect()

[('N050AA', None, None, None, None, None, None),
 ('N051AA', None, None, None, None, None, None),
 ('N052AA', None, None, None, None, None, None),
 ('N054AA', None, None, None, None, None, None),
 ('N055AA', None, None, None, None, None, None),
 ('N056AA', None, None, None, None, None, None),
 ('N057AA', None, None, None, None, None, None),
 ('N058AA', None, None, None, None, None, None),
 ('N059AA', None, None, None, None, None, None),
 ('N060AA', None, None, None, None, None, None),
 ('N061AA', None, None, None, None, None, None),
 ('N062AA', None, None, None, None, None, None),
 ('N063AA', None, None, None, None, None, None),
 ('N064AA', None, None, None, None, None, None),
 ('N065AA', None, None, None, None, None, None),
 ('N066AA', None, None, None, None, None, None),
 ('N067AA', None, None, None, None, None, None),
 ('N068AA', None, None, None, None, None, None),
 ('N069AA', None, None, None, None, None, None),
 ('N070AA', None, None, None, None, None, None),
 ('N071AA', None, No

In [44]:
# More selective
plane_data_rdd = plane_data.rdd.map(lambda x: (x.tailnum, x.manufacturer))
plane_data_rdd.collect()

[('N050AA', None),
 ('N051AA', None),
 ('N052AA', None),
 ('N054AA', None),
 ('N055AA', None),
 ('N056AA', None),
 ('N057AA', None),
 ('N058AA', None),
 ('N059AA', None),
 ('N060AA', None),
 ('N061AA', None),
 ('N062AA', None),
 ('N063AA', None),
 ('N064AA', None),
 ('N065AA', None),
 ('N066AA', None),
 ('N067AA', None),
 ('N068AA', None),
 ('N069AA', None),
 ('N070AA', None),
 ('N071AA', None),
 ('N072AA', None),
 ('N073AA', None),
 ('N074AA', None),
 ('N075AA', None),
 ('N076AA', None),
 ('N077AA', None),
 ('N078AA', None),
 ('N079AA', None),
 ('N080AA', None),
 ('N081AA', None),
 ('N082AA', None),
 ('N083AA', None),
 ('N084AA', None),
 ('N10156', 'EMBRAER'),
 ('N102UW', 'AIRBUS INDUSTRIE'),
 ('N10323', 'BOEING'),
 ('N103US', 'AIRBUS INDUSTRIE'),
 ('N104UA', 'BOEING'),
 ('N104UW', 'AIRBUS INDUSTRIE'),
 ('N10575', 'EMBRAER'),
 ('N105UA', 'BOEING'),
 ('N105UW', 'AIRBUS INDUSTRIE'),
 ('N106US', 'AIRBUS INDUSTRIE'),
 ('N107UA', 'BOEING'),
 ('N107US', 'AIRBUS INDUSTRIE'),
 ('N108UW', 'AIR

In [45]:
# default with col names.
plane_data_rdd = plane_data.rdd
plane_data_rdd.collect()

[Row(tailnum='N050AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N051AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N052AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N054AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N055AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N056AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=None, year=None),
 Row(tailnum='N057AA', type=None, manufacturer=None, issue_date=None, model=None, status=None, aircraft_type=None, engine_type=Non

In [52]:
plane_data.describe(['year']).show()

+-------+------------------+
|summary|              year|
+-------+------------------+
|  count|              4480|
|   mean| 1995.523414071511|
| stddev|53.062381928953315|
|    min|              0000|
|    max|              None|
+-------+------------------+

