In [1]:
import re

#Candace Edwards Save-Test-Push

# Assignment 1: Data Analysis with Spark.


For this assignment, you will need to make sure you're running from a PySpark Docker environment I introduced in class. You can start the pySpark Docker environment using the following command:

```
docker run --rm -p 4040:4040 -p 8888:8888 -v $(pwd):/home/jovyan/work jupyter/all-spark-notebook
```

Make sure you run the command from the directory containing this Jupyter notebook and your data folder.

> ⚠️ For some reason, my Jupyter Notebook didn't always sync properly when I was pushing to github. As such, please push often and make sure your incremental changes appear on GitHub.


### Part 1

The first part will use Spark to analyze the following books, which I have downloaded for you to use from Project Gutenberg. The files are saved to the data folder.

| File name | Book Title|
|:---------:|:----------|
|43.txt | The Strange Case of Dr. Jekyll and Mr. Hyde by Robert Louis Stevenson|
|84.txt | Frankenstein; Or, The Modern Prometheus by Mary Wollstonecraft Shelley |
|398.txt  | The First Book of Adam and Eve by Rutherford Hayes Platt|
|3296.txt | The Confessions of St. Augustine by Bishop of Hippo Saint Augustine|

The objective is to explore whether we can detect similarity between books within the same topic using word-based similarity. 

The task of identifying similar texts in Natural Language Processing is crucial. A naive method for determining whether two documents are similar is to treat them as collections of words (bag of words) and use the number of words they share as a proxy for their similarity. It makes sense that two books with religion as the topic (e.g., `398.txt` and `3296.txt`) would have more words in common than a book that discusses religion and a book that discusses science fiction (e.g. books `84.txt` and `398.txt`). 

As mentioned above, we will be using Spark to analyze the data. Although Spark is not needed for such a small example, the platform would be ideal for analyzing very large collections of documents, like those often analyzed by large corporations

This part of the assignment will rely exclusively on RDDs.

### Q1. 
Start by importing Spark and making sure your environment is set up properly for the assignment.

Import the Spark context necessary to load a document as an RDD; ignore any error messages

In [2]:
from pyspark import SparkContext
#SparkContext?
sc=SparkContext()
#sc.stop()


### Q2 

Read in the file `43.txt` as a Spark RDD and save it to a variable called `book_43`
 * make sure `book_43` is of type MapPartitionsRDD, i.e.,
   * str(type(book_43)) == "<class 'pyspark.rdd.RDD'>" should return True 

In [3]:
book_43 = sc.textFile('data/43.txt')
str(type(book_43)) == "<class 'pyspark.rdd.RDD'>"

True

### Q3

How many lines does `book_43` contain?
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [4]:
book_43_count = book_43.count()
print(f"book_43 contains {book_43_count} lines")

book_43 contains 2935 lines


### Q4 

Prior to analyzing the words contained in this book, we need to first remove the occurrences of non-alphabetical characters and numbers from the text. You can use the following function, which takes a line as input, removes digits and non-word characters, and splits it into a collection of words. 

```python
def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()
```

Use the function above on the variable (test_line) to see what it returns.
```python
test_line = "This is an example of that contains 234 and a dash-containing number"
```

In [5]:
def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.lower().split()

test_line = "This is an example of that contains 234 and a dash-containing number"


result = clean_split_line(test_line)
print(result)

['this', 'is', 'an', 'example', 'of', 'that', 'contains', 'and', 'a', 'dash', 'containing', 'number']


### Q5

How many words does `book_43` contain? To answer this question, you may find it useful to apply the function in a Spark-like manner. 
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [6]:
RDD = book_43.flatMap(clean_split_line)
print(f"book_43 contains {RDD.count()} words")

book_43 contains 29116 words


### Q6

How many of the words in book_43 are unique? Given that words can appear in lower, upper or mixed case (ex. The, THE, the), make sure you convert the words into lower case before counting them.


In [7]:
distinct_words = RDD.distinct().count()
print(f"book_43 contains {distinct_words} UNIQUE words")


book_43 contains 4296 UNIQUE words


### Q7

* Generate an `RDD` that contains the frequency of each word in `book_43`. Call the variable `book_43_counts`. Each item in the `RDD` should be a tuple with the word as the first element of the tuple and the count as the second item of the tuple. The collection should look like the following:

[('project', 88), ("the", 1807), ... ]

* Such a collection may contain a large number of words and it would be imprudent to transfer all the words onto the same machine to show them. Instead, to explore the content of such a collection, display only the first element in your list. 

* Given the random nature of this operation, the first element element displayed may be different. The first entry for me was:
```
[('project', 88)]
```

* You can only use operations or actions to answer the question. 
* Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
* Code that uses functions such as `some_func(...)` is not allowed


In [8]:
rdd_mapped = RDD.map(lambda x:(x,1))
rdd_grouped = rdd_mapped.groupByKey()
book_43_counts = rdd_grouped.mapValues(sum).map(lambda x:(x[0],x[1])).sortByKey(False)
book_43_counts.take(10)

[('zip', 1),
 ('youth', 2),
 ('yourself', 6),
 ('yours', 2),
 ('your', 64),
 ('younger', 3),
 ('young', 5),
 ('you', 312),
 ('yet', 32),
 ('yesterday', 1)]

### Q8

Sort `book_43_counts` and print the 20 most frequent words in book_43. 
  * Hint: the function `sortByKey` sorts a collection of tuples on the first element element of the list. You can easily change the order of the items in each element and use `sortByKey` to sort on the second item of each element in `book_43_counts`
  * You can only use operations or actions to answer the question. 
  * Code tha employs methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [9]:
sorted_rdd_frequency = rdd_grouped.mapValues(sum).map(lambda x:(x[1],x[0])).sortByKey(False)
sorted_rdd_frequency.take(20)

[(1807, 'the'),
 (1068, 'of'),
 (1043, 'and'),
 (726, 'to'),
 (686, 'a'),
 (646, 'i'),
 (485, 'in'),
 (471, 'was'),
 (392, 'that'),
 (384, 'he'),
 (378, 'it'),
 (312, 'you'),
 (308, 'my'),
 (301, 'with'),
 (285, 'his'),
 (244, 'had'),
 (203, 'as'),
 (202, 'for'),
 (195, 'this'),
 (193, 'but')]

### Q9

You must have noted that the most frequent words in `book_43_counts` include stop words such as `of`, `the`, `and`, etc.

It would be inefficient to compare documents based on whether or not they contain stop words; those are common to all documents. As such, it's common to remove such stop words. The library `sklearn.feature_extraction` provides access to a collection of English stop words, which can be loaded using the following snippet:

```
from sklearn.feature_extraction import stop_words
stop_words.ENGLISH_STOP_WORDS
```

* Explore ENGLISH_STOP_WORDS (it's a frozen set data structure, i.e., a set that you cannot modify) by printing any 10 words from it. 
 * Hint: convert the frozen set to something you can subscript


In [10]:
from sklearn.feature_extraction import _stop_words
stop = list(_stop_words.ENGLISH_STOP_WORDS)
print(stop[:10])

['seemed', 'along', 'please', 'either', 'after', 'below', 'must', 'whom', 'onto', 'ltd']


### Q10

Filter out the words in `book_43_counts` by removing those that appear in the ENGLISH_STOP_WORDS.
Save the results to a new variable called `book_43_counts_filtered`
  * You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [11]:
book_43_counts_filtered=book_43_counts.filter(lambda x:x[0] not in stop)
book_43_counts_f_sort = book_43_counts_filtered.map(lambda x:(x[1],x[0])).sortByKey(False)
book_43_counts_f_sort.take(10)

[(156, 's'),
 (131, 'utterson'),
 (130, 'said'),
 (128, 'mr'),
 (105, 'jekyll'),
 (105, 'hyde'),
 (98, 'gutenberg'),
 (88, 'project'),
 (85, 'man'),
 (72, 'lawyer')]

### Q11

*How many words remain in `book_43_counts_filtered` after removing the stop words?

In [12]:
book_43_counts_filtered.count()

4034

### Q12 

* Create a function called *process_RDD* that combines the relevant steps you proposed above to make it convenient to apply them to the remaining four books. Your function should accept an input text file path and:
 * Reads in the file as a textRDD
 * Cleans and splits the line using `clean_split_line`
 * Filters out the stop words
 * Returns a word count RDD where each item is a tuple of words and its count.
 


In [13]:
def process_RDD(path):
    RDD = sc.textFile(path)
    RDD_word_count = RDD.flatMap(clean_split_line).filter(lambda x:x not in stop).map(lambda x:(x,1)).groupByKey().mapValues(sum).map(lambda x:(x[0],x[1])).sortByKey(False)
    return RDD_word_count

#testRDD = process_RDD('data/43.txt')
#testRDD.count()
#testRDD.take(5)
    


### Q13 

Apply the function `process_RDD` to `book_84`, `book_398` and `book_3296` and save the results to variables `book_84_counts_filtered`, `book_398_counts_filtered` and `book_3296_counts_filtered` respectively. 

How many distinct words does each book contain after filtering the stop words?


In [14]:
book_84_counts_filtered = process_RDD('data/84.txt')
book_398_counts_filtered = process_RDD('data/398.txt')
book_3296_counts_filtered = process_RDD('data/3296.txt')

print(f"book 84 contains {book_84_counts_filtered.distinct().count()} unique words")
print(f"book 398 contains {book_398_counts_filtered.distinct().count()} unique words")
print(f"book 3296 contains {book_3296_counts_filtered.distinct().count()} unique words")

book 84 contains 7016 unique words
book 398 contains 2421 unique words
book 3296 contains 7293 unique words


In [15]:
#book_84_counts_filtered.take(1)

### Q14 

The similarity between two texts can be assessed using the number of words the two tests share. For example, there are probably more words in common between two books that discuss Greek history than they would share with books on computing. Let's hypothesize that books with similar themes will have more words in common than books with different themes. Assuming this hypothesis is correct, book_398 and book_3296, two books that both deal with religion, will have more words in common than, for example, book_84 and book_398.

Test this hypothesis by writing code that compares and prints the number of words shared between `book_398` and `book_3296` and then between `book_84` and `book_398`.


In [16]:
#FUNCTION TEST CODE#
#RDD_1_test =sc.parallelize([('Candace',1000000),('Lee',1000000),('Lydia',2000000),('Lydia',3000000)])
#RDD_2_test =sc.parallelize([('Candace',1000000),('Jane',1000000),('Lydia',1000000),('Lee',1000000)])

#RDD_3= sc.parallelize(['Racks getting larger just look at the alkaline wrist cause I got that water test test'])
#RDD_4= sc.parallelize(['Racks getting thicker just look at the acidic wrist cause I got that fire test test'])


#print(f'rdd3 and rdd4 have {compareRDD(RDD_3,RDD_4)} words in common')

##pseudo
## create RDDs with distinct values
## get intersection of RDDs

#new_rdd_3 = RDD_3.flatMap(clean_split_line).distinct().filter(lambda x: x not in stop)
#new_rdd_4 = RDD_4.flatMap(clean_split_line).distinct().filter(lambda x: x not in stop)
#rdd_comparison = new_rdd_3.intersection(new_rdd_4)
#rdd_comparison.count() #expected 8, observed 8

In [17]:
def compareRDD(RDD_1,RDD_2):
    new_rdd_1 = RDD_1.flatMap(clean_split_line).distinct().filter(lambda x: x not in stop)
    new_rdd_2 = RDD_2.flatMap(clean_split_line).distinct().filter(lambda x: x not in stop)
    rdd_comparison = new_rdd_1.intersection(new_rdd_2)
    return rdd_comparison


In [18]:
book_398 = sc.textFile('data/398.txt')
book_3296 = sc.textFile('data/3296.txt')
book_84 = sc.textFile('data/84.txt')

compare_1=compareRDD(book_398,book_3296)
compare_2=compareRDD(book_398,book_84)

print(f'book_398and book_3296 have {compare_1.count()} words in common')
print(f'book_398 and book_84 have {compare_2.count()} words in common')


book_398and book_3296 have 1790 words in common
book_398 and book_84 have 1691 words in common


### Q15

* Based on the above, do you think counting the number of shared words is a good idea as a distance metric for evaluating topic similarity? Justify your answer?
* Hint: What do *book_84* and *book_3296* have in common? 

In [19]:
compare_3=compareRDD(book_84,book_3296)
compare_3.count()

3608

#### Conclusion:
Comparing common words in text is not a good indicator of similarity in the overall text. While the religioun themed books (book_398 and book_3296) have more words in common, it was only a ~10% difference compared to the words in common of the books with seperate themes (books_398 and book_84). Additionally when comparing book_84 and book_3296, these two books with differing themes had the most words in common of all the previous comparisons. 


## Part II 

Another approach to estimating similarity consists of computing the Euclidean distance across a set of words. For example, suppose we have 3 documents A, B and C with the following counts for the words `evolution`, `DNA`, `biology` and `finance`. 

```python 
A = [4, 9, 6, 8]
B = [3, 7, 7, 10]
C = [15, 10, 1, 1]
```
Although all documents contain exactly the four words, the number of times these words appear in each book may be indicative of their topic. For example, documents `A` and `B` are more likely to be business related since they contain the word `finance` more frequently (8 and 10 times respectively). Document `C` may be a technical document since it focuses on more technical words (`evolution` and `DNA`) and less on the words `finance`.

The Euclidean distance can be computed using the `scikit` snippet below:

```python
from scipy.spatial.distance import euclidean 
print(f"The Euclidean distance between A and B is: {euclidean(A, B)}")

print(f"The Euclidean distance between A and C is: {euclidean(A, C)}")

print(f"The Euclidean distance between B and C is: {euclidean(B, C)}")
```


### Q16

In order to calculate the Euclidean distance, we must first identify the set of words on which we will compare the documents. First, we will use the words that appear to all 4 documents. We will store the data in a matrix called `counts_matrix` where the books will be represented as rows and the words will be represented as columnns. 

Start by finding the words that are common to all four documents after stop-word filtering and store the counts for each word in a column of `counts_matrix`. 

Based on the previous example, the following code will create an emtpy matrix with three lines (documents A, B, and C) and four columns (words evolution, DNA, biology, and finance).

```python
import numpy as np
counts_matrix = np.zeros([3,4])
```

After generating the counts, you can fill the counts for a document, say `A`, using the following code:

```python
counts_matrix[0, :] = [4, 9, 6, 8] 
```
* Other than for building `counts_matrix` you should exclusively use operations or actions on the `RDD` to answer this question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [20]:
import numpy as np


#book_43_counts_filtered
#book_84_counts_filtered = process_RDD('data/84.txt')
#book_398_counts_filtered = process_RDD('data/398.txt')
#book_3296_counts_filtered = process_RDD('data/3296.txt')

#common words among documents
rdd_join_col = book_43_counts_filtered.join(book_84_counts_filtered).join(book_398_counts_filtered).join(book_3296_counts_filtered).map(lambda x:x[0]) #join and map create list of column words
#rdd_join_col.take(10)
words = rdd_join_col.collect()

m = 4 #books/rdds/rows
n = rdd_join_col.count() #columns count

rdd_b0 = book_43_counts_filtered.filter(lambda x: x[0] in words).map(lambda x:x[1])
rdd_b1 = book_84_counts_filtered.filter(lambda x: x[0] in words).map(lambda x:x[1]) # filter by key in list of col_words create value list
rdd_b2 = book_398_counts_filtered.filter(lambda x: x[0] in words).map(lambda x:x[1]) # filter by key in list of col_words create value list
rdd_b3 = book_3296_counts_filtered.filter(lambda x: x[0] in words).map(lambda x:x[1]) # filter by key in list of col_words create value list

counts_matrix = np.zeros([m,n])


counts_matrix[0,:] = rdd_b0.collect()
counts_matrix[1,:] = rdd_b1.collect()
counts_matrix[2,:] = rdd_b2.collect()
counts_matrix[3,:] = rdd_b3.collect()

#print(f'documents have {rdd_join_col.count()} words in common')
print(counts_matrix[:][:10])
print(f'count_matrix has {m} rows and {n} columns')


[[ 1.  3.  5. ...  2.  3.  1.]
 [ 1.  9. 30. ...  4.  9.  1.]
 [ 1.  2.  1. ... 16.  5.  1.]
 [ 1.  4. 15. ...  3. 36. 16.]]
count_matrix has 4 rows and 1162 columns


In [21]:
### TESTCODE ##SANDBOX

#A = sc.parallelize([4, 9, 6, 8])
#B = sc.parallelize([3, 7, 7, 10])
#C = sc.parallelize([15, 10, 1, 1])

#import numpy as np
#counts_matrix = np.zeros([3,4])
#counts_matrix[0,:] = A.collect()
#counts_matrix[1,:] = B.collect()
#counts_matrix[2,:] = C.collect()
#print(counts_matrix[0][0])

#RDD_t1=sc.parallelize([('word',10),('bell',11),('pillow',12),('lamp',13)])
#RDD_t2=sc.parallelize([('word',20),('bell',21),('pillow',22),('lamp',23),('misc',15)])
#RDD_t3=sc.parallelize([('word',30),('bell',31),('pillow',32),('lamp',33),('test',15)])
#RDD_t4=sc.parallelize([('word',40),('bell',41),('lamp',43)])

#rdd_join_two = RDD_t1.join(RDD_t2)
#rdd_join_col = rdd_join_two.join(RDD_t3).join(RDD_t4).map(lambda x:x[0]) #join and map create list of column words
#n = rdd_join_col.count() #columns count
#m = 4 #books/rdds
#words = rdd_join_col.collect()
#rdd_join_count_t1 = RDD_t1.filter(lambda x: x[0] in words).map(lambda x:x[1]) # filter by key in list of col_words create value list
#rdd_join_count_t2 = RDD_t2.filter(lambda x: x[0] in words).map(lambda x:x[1]) # filter by key in list of col_words create value list
#rdd_join_count_t3 = RDD_t3.filter(lambda x: x[0] in words).map(lambda x:x[1]) # filter by key in list of col_words create value list
#rdd_join_count_t4 = RDD_t4.filter(lambda x: x[0] in words).map(lambda x:x[1]) # filter by key in list of col_words create value list

#pseudo:
#on given RDD, key is in list of common words,map to value

#rdd_join_count_t1.take(10)



##counts_matrix = np.zeros([m,n])

#counts_matrix[0,:] = rdd_join_count_t1.collect()
#counts_matrix[1,:] = rdd_join_count_t2.collect()
#counts_matrix[2,:] = rdd_join_count_t3.collect()
#counts_matrix[3,:] = rdd_join_count_t4.collect()

#counts_matrix[1,:] = B.collect()
#counts_matrix[2,:] = C.collect()



#print(counts_matrix)




#disgarded code#
#rdd_join_count = rdd_join_two.join(RDD_t3).join(RDD_t4).map(lambda x:x[1]) #join and map create list of counts-NO






### Q17

Compute the Euclidean distance between `book_398` and `book_3296`, which both talk about religion and `book_84` and `book_398`. 
What do you conclude about using the Euclidean distance for evaluating topic relatedness across documents?


In [22]:
from scipy.spatial.distance import euclidean 
print(f"The Euclidean distance between book_398 and book_3296 is: {euclidean(counts_matrix[2,:], counts_matrix[3,:])}")
print(f"The Euclidean distance between book_398 and book_84 is: {euclidean(counts_matrix[2,:], counts_matrix[1,:])}")
#print(f"The Euclidean distance between B and C is: {euclidean(B, C)}")




The Euclidean distance between book_398 and book_3296 is: 1156.6628722320086
The Euclidean distance between book_398 and book_84 is: 751.6688100486809


#### Conclusion:

Euclidean distance is a sub-optimal technique for measuring similarity between documents. In the results above,the similarly themed book_398 and book_3296 are more distant than book_84 and book_398.


### Q18

Bonus question (5 points): Can you think of a few things we could do to improve similarity between documents that pertain to the same topic. Justify your answer without giving code

#### Write your answer here

--

## Part III

As part of this section, we will create some basic analytics for a dataset containing flight arrivals and departures within the United States over the course of a month. 
Pandas can be used to manage this dataset (1M records), but a distributed computing framework like Spark, will be beneficial when extending the analysis to a longer timeframe.

Here, you should use exclusively `SparkDatFrames. 

This dataset can be analyzed to improve trip scheduling.  For example:
 * Avoid airlines carriers that are most often associated with delays.
 * Avoid departure days where delays are most frequent.
 * Avoid airports which are associated with delays or long taxxying time.
* etc.
 


The information about the fields contained in the data file can be found [here](https://dataverse.harvard.edu/dataset.xhtml;jsessionid=0414e25969eccd0e88ae4d64fa0b?persistentId=doi%3A10.7910%2FDVN%2FHG7NV7&version=&q=&fileTypeGroupFacet=&fileTag=%221.+Documentation%22&fileSortField=date&fileSortOrder=desc)


### Q19

Load the file `flight_info.csv` into a Spark `DataFrame` called `fight_info`.

  * Note that you will need to create a sparkSession prior to loading the data
  
* How many instances (entries) does the file contain?


In [23]:
#from pyspark import SparkContext
#sc = SparkContext()
#sc.stop()

from pyspark.sql import SparkSession
session = SparkSession(sc)


In [24]:
#flight_info = session.read.csv("data/flight_info.csv", header=True)
flight_info= session.read.options(inferSchema = True).csv("data/flight_info.csv", header=True)
print(type(flight_info))
print(f' The file contains {flight_info.count()} instances')
#flight_info.head(1)

<class 'pyspark.sql.dataframe.DataFrame'>
 The file contains 450017 instances


### Q20

Use `pySpark-SQL` or `pandas-like syntax to compute the airlines represented in this dataset.
The airline information is stored in a field called UniqueCarrier, which represents the unique carrier code (ex.AA = American Airlines). 


In [25]:
flight_info.select('UniqueCarrier').distinct().show()

+-------------+
|UniqueCarrier|
+-------------+
|           NK|
|           AA|
|           EV|
|           B6|
|           DL|
|           F9|
|           HA|
|           UA|
|           OO|
|           WN|
|           AS|
|           VX|
+-------------+



### Q21

The data file contains various other fields, two of which are useful for answering the next question.

* CRSDepTime: Represents the scheduled departure time
* DepTime: Represents the actual departure time

Compute the number of flights delayed per each carried code represented in this dataset. Sort the data by decreasing order of delays.
  * A delay is observed when `DepTime` > `CRSDepTime`


In [26]:
delay_count = flight_info.filter(flight_info['DepTime'] > flight_info['CRSDepTime']).groupBy(flight_info['UniqueCarrier']).count()
#print(type(delay_count))
#delay_count.show()

delay_count.sort('count', ascending=False).show()

+-------------+-----+
|UniqueCarrier|count|
+-------------+-----+
|           WN|47472|
|           DL|24334|
|           AA|23461|
|           UA|17701|
|           OO|16751|
|           EV|11596|
|           B6| 9396|
|           AS| 4488|
|           NK| 4151|
|           F9| 2988|
|           VX| 2648|
|           HA| 1939|
+-------------+-----+



### Q22

 Use the file `airlines.csv` to find the the complete name of the airline. Here, you are required to load the file as a pyspark DataFrame; call it `airlines_info`, and repeat the query above while including the `flights.csv `file in your query ( requires doing a `join`) so that you can also display the full name of the carrier (second column). 

The result will look (approximately) like:

```
[Row(UniqueCarrier='WN', first(_c1)='Southwest Airlines', count=SOME_count),
 Row(UniqueCarrier='DL', first(_c1)='Delta Air Lines', count=SOME_count),
 Row(UniqueCarrier='AA', first(_c1)='American Airlines', count=SOME_count),
 ...
 ]
```

The carrier code in the `airlines.csv` file is provided in the 4th (1-based) column

Note that the file `airlines.csv` does not have column header. Hence, you need to print one line of your dataset to see what names Spark gave to the columns. Use the name provided by Spark in your query.

In [27]:
airline_info= session.read.options(inferSchema = True).csv("data/airlines.csv", header=False)
airline_info.head()


Row(_c0=1, _c1='Private flight', _c2='\\N', _c3='-', _c4='N/A', _c5=None, _c6=None, _c7='Y')

In [28]:
joined_data = flight_info.join(airline_info,delay_count['UniqueCarrier']== airline_info['_c3'])
joined_data.head()

Row(_c0=0, DayOfWeek=2, UniqueCarrier='AA', FlightNum=494, Origin='CLT', Dest='PHX', CRSDepTime=1619, DepTime=1616.0, TaxiOut=17.0, WheelsOff=1633.0, WheelsOn=1837.0, TaxiIn=5.0, CRSArrTime=1856, ArrTime=1842.0, Cancelled=0.0, CancellationCode=None, Distance=1773.0, CarrierDelay=None, WeatherDelay=None, NASDelay=None, SecurityDelay=None, LateAircraftDelay=None, _c0=24, _c1='American Airlines', _c2='\\N', _c3='AA', _c4='AAL', _c5='AMERICAN', _c6='United States', _c7='Y')

In [29]:
#delay_count = flight_info.filter(flight_info['DepTime'] > flight_info['CRSDepTime']).groupBy(flight_info['UniqueCarrier']).count()
group_cols=['UniqueCarrier','_c1']
new_delay_count = joined_data.filter(joined_data['DepTime'] > joined_data['CRSDepTime']).groupBy(group_cols).count()
new_delay_count.head()




Row(UniqueCarrier='B6', _c1='JetBlue Airways', count=9396)

In [30]:
new_delay_count.sort('count', ascending=False).show()

+-------------+--------------------+-----+
|UniqueCarrier|                 _c1|count|
+-------------+--------------------+-----+
|           WN|  Southwest Airlines|47472|
|           DL|     Delta Air Lines|24334|
|           AA|   American Airlines|23461|
|           UA|     United Airlines|17701|
|           OO|             SkyWest|16751|
|           EV|Atlantic Southeas...|11596|
|           B6|     JetBlue Airways| 9396|
|           AS|     Alaska Airlines| 4488|
|           NK|     Spirit Airlines| 4151|
|           F9|   Frontier Airlines| 2988|
|           VX|      Virgin America| 2648|
|           HA|   Hawaiian Airlines| 1939|
+-------------+--------------------+-----+



### Q23

Compute the number of delays per company per day. The day is encoded as an integer in the column `DayOfWeek` in `fight_info`. You can display the day as an integer or map it into a string name of the week.
Sort the data by airline code (UniqueCarrier) and by increasing values of DayOfWeek


You results should look like the following

[Row(UniqueCarrier='WN', DayOfWeek='6', COUNT=5270), Row(UniqueCarrier='WN', DayOfWeek='7', COUNT=8681) ]




In [31]:
group_col = ['UniqueCarrier','DayOfWeek']
delay_by_day = joined_data.filter(joined_data['DepTime'] > joined_data['CRSDepTime']).groupBy(group_col).count()
delay_by_day.head()


Row(UniqueCarrier='F9', DayOfWeek=2, count=469)

In [32]:
#sort by airline code THEN (ascending) day of week
delay_by_day.sort(delay_by_day.UniqueCarrier.asc(),delay_by_day.DayOfWeek.asc()).show()

+-------------+---------+-----+
|UniqueCarrier|DayOfWeek|count|
+-------------+---------+-----+
|           AA|        1| 4639|
|           AA|        2| 3288|
|           AA|        3| 2570|
|           AA|        4| 3142|
|           AA|        5| 3108|
|           AA|        6| 2509|
|           AA|        7| 4205|
|           AS|        1|  786|
|           AS|        2|  580|
|           AS|        3|  573|
|           AS|        4|  656|
|           AS|        5|  554|
|           AS|        6|  470|
|           AS|        7|  869|
|           B6|        1| 1754|
|           B6|        2| 1628|
|           B6|        3| 1109|
|           B6|        4| 1101|
|           B6|        5| 1175|
|           B6|        6|  900|
+-------------+---------+-----+
only showing top 20 rows



### Q24  

Counting the number of delayed flights per airline is misleading, as airlines with more flights are more likley to have delays than companies with substantially fiewer flights. 

Repeat the same query above but, for each carrier, normalize the counts of delays by the total number of flights for that carrier. 


In [33]:
%%timeit

total_flights = flight_info.groupBy('UniqueCarrier').count()
#total_flights.show()

#rename count column
total_flights_new = total_flights.withColumnRenamed('count','total_count')
#total_flights_new.show()

new_df= delay_count.join(total_flights_new, delay_count['UniqueCarrier']==total_flights_new['UniqueCarrier'])
new_df.withColumn('normalized_delay', new_df['count']/new_df['total_count']).show()

+-------------+-----+-------------+-----------+-------------------+
|UniqueCarrier|count|UniqueCarrier|total_count|   normalized_delay|
+-------------+-----+-------------+-----------+-------------------+
|           NK| 4151|           NK|      12570|  0.330230708035004|
|           AA|23461|           AA|      73132|0.32080347864136083|
|           EV|11596|           EV|      35037| 0.3309644090532865|
|           B6| 9396|           B6|      24602|0.38192016909194376|
|           DL|24334|           DL|      69813|0.34855972383366995|
|           F9| 2988|           F9|       7760| 0.3850515463917526|
|           HA| 1939|           HA|       6276|0.30895474824729124|
|           UA|17701|           UA|      42403| 0.4174468787585784|
|           OO|16751|           OO|      50146|0.33404458979779045|
|           WN|47472|           WN|     107785| 0.4404323421626386|
|           AS| 4488|           AS|      14711|0.30507783291414586|
|           VX| 2648|           VX|       5782| 

### Q25 

Time the query above. How long did it take to run. 
  * Make sure you run the code a few times and compute the average run time.
  * The above should be easy to implement if you use the correct Jupyter Notebook `magic` function
  

### %%timeit Results
1.05 s ± 62.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

**Please note results may differ if code is reloaded. 

### Q26 

Use one of the techniques covered in class to accelerate this query. Time your query to see by how much the run time was improved

In [45]:
%%timeit

##Caching Memory Approach
# spark.catalog.dataFrame.cache()

delay_count_c = flight_info.filter(flight_info['DepTime'] > flight_info['CRSDepTime']).groupBy(flight_info['UniqueCarrier']).count().cache()
#print(type(delay_count))

total_flights_c = flight_info.groupBy('UniqueCarrier').count().cache()


#rename count column
total_flights_new_c = total_flights_c.withColumnRenamed('count','total_count').cache()

#total_flights_new.show()

new_df_c= delay_count.join(total_flights_new_c, delay_count_c['UniqueCarrier']==total_flights_new_c['UniqueCarrier'])
new_df_c.withColumn('normalized_delay', new_df_c['count']/new_df_c['total_count']).show()





+-------------+-----+-------------+-----------+-------------------+
|UniqueCarrier|count|UniqueCarrier|total_count|   normalized_delay|
+-------------+-----+-------------+-----------+-------------------+
|           UA|17701|           UA|      42403| 0.4174468787585784|
|           NK| 4151|           NK|      12570|  0.330230708035004|
|           AA|23461|           AA|      73132|0.32080347864136083|
|           EV|11596|           EV|      35037| 0.3309644090532865|
|           B6| 9396|           B6|      24602|0.38192016909194376|
|           DL|24334|           DL|      69813|0.34855972383366995|
|           OO|16751|           OO|      50146|0.33404458979779045|
|           F9| 2988|           F9|       7760| 0.3850515463917526|
|           HA| 1939|           HA|       6276|0.30895474824729124|
|           AS| 4488|           AS|      14711|0.30507783291414586|
|           VX| 2648|           VX|       5782| 0.4579730197163611|
|           WN|47472|           WN|     107785| 

#### %%timeit result post cache optimization:
731 ms ± 60.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

### Q27 

Is the departure delay (i.e., DepTime - CRSDepTime) predictive of the arrival delay (ArrTime > CRSArrTime)?
Use an approach of your choice (e.g. `skelearn` which we covered in class or `Spark`) to model as a linear regression the arrival delay as a function of the departure delay. 



In [35]:
from sklearn.linear_model import LinearRegression
import matplotlib.pyplot as plt
import numpy as np


##Approach
#1. Calculate departure_delay --> feature
#2. Calculate arrival_delay --> target
#3. Model Regression
#4. Plot Regression

flight_delays = flight_info.filter(flight_info['DepTime'] > flight_info['CRSDepTime'])
flight_delay_calc =flight_delays.withColumn('dep_delay_calc',flight_delays['DepTime']- flight_delays['CRSDepTime']).withColumn('arr_delay_calc',flight_delays['ArrTime']- flight_delays['CRSArrTime'])
flight_delay_calc.head()


#https://inria.github.io/scikit-learn-mooc/python_scripts/linear_regression_in_sklearn.html

Row(_c0=3, DayOfWeek=5, UniqueCarrier='AA', FlightNum=494, Origin='CLT', Dest='PHX', CRSDepTime=1619, DepTime=1656.0, TaxiOut=18.0, WheelsOff=1714.0, WheelsOn=1926.0, TaxiIn=3.0, CRSArrTime=1856, ArrTime=1929.0, Cancelled=0.0, CancellationCode=None, Distance=1773.0, CarrierDelay=33.0, WeatherDelay=0.0, NASDelay=0.0, SecurityDelay=0.0, LateAircraftDelay=0.0, dep_delay_calc=37.0, arr_delay_calc=73.0)

In [48]:
feature_name='dep_delay_calc' #independent var
target_name='arr_delay_calc'  #depdendent var
#target = flight_delay_calc[target_name].collect()


#Dataframe to array
data_array=np.array(flight_delay_calc.select(feature_name).collect())
target_array =np.array( flight_delay_calc.select(target_name).collect())
print(type(data_array))
data_array.reshape(-1,1) #2D array



##FIX Attempts##
#print(type(data))
#print((data.count(), len(data.columns))) #shape
#data_arr = np.array(data.collect()).reshape(-1,1)
#print(type(data_arr))
#data_arr.shape()


linear_regression = LinearRegression()
linear_regression.fit(data_array,target_array)  #<-- error 

#ValueError: Expected 2D array, got scalar array instead:
#array=DataFrame[dep_delay_calc: double].
#Reshape your data either using array.reshape(-1, 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.

#New Error
#ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.


#weighted_dep_delay = linear_regression.coef_[0]
#weighted_dep_delay

<class 'numpy.ndarray'>


ValueError: array must not contain infs or NaNs