In [1]:
##Taryn Takebayashi


In [1]:
import re

# Assignment 2: Analyzing large datasets with Spark.


For this assignment, you will need to make sure you're running from a PySpark docker environment I introduced in class. You can start the docker pySpark docker environment using the following command:

```
docker run --rm -p 4040:4040 -p 8888:8888 -v $(pwd):/home/jovyan/work jupyter/all-spark-notebook
```

Make sure you run the command from the directory containing this jupyter notebook and your data folder.


</b>
# WARNING: For some reason, ipynbb document didn't always sync properly when I was pushing to github. As such, please push often and make sure your incremental changes appear on GitHub.
</b>

### Part 1

The first part will use Spark to analyze the following books, which I have downloaded for you to use from Project Gutenberg. The files are saved to the data folder.

| File name | Book Title|
|:---------:|:----------|
|43.txt | The Strange Case of Dr. Jekyll and Mr. Hyde by Robert Louis Stevenson|
|84.txt | Frankenstein; Or, The Modern Prometheus by Mary Wollstonecraft Shelley |
|398.txt  | The First Book of Adam and Eve by Rutherford Hayes Platt|
|3296.txt | The Confessions of St. Augustine by Bishop of Hippo Saint Augustine|

The objective is to explore whether we can detect similarity between books within the same topic using word-based similarity. 

The task of identifying similar texts in Natural Language Processing is crucial. A naive method for determining whether two documents are similar is to treat them as collections of words (bag of words) and use the number of words they share as a proxy for their similarity. It makes sense that two books with religion as the topic (e.g.  `398.txt` and `3296.txt`) would have more words in common than a book that discusses religion and a book that discusses science fiction (e.g. books `84.txt` and `398.txt`). 

As mentioned above, we will be using Spark to analyze the data. Although Spark is not needed for such a small example, the platform would be ideal for analyzing very large collections of documents, like those often analyzed by large corporations

This part of the assignment will rely exclusively on RDDs.

### Q1. 
Start by importing Spark and making sure your environment is set up properly for the assignment.

Import the spark context necessary to load a document as an RDD; ignore any error messages

In [89]:
### Write your code here
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

### Q2 

Read in the file `43.txt` as a spark RDD and save it to a variable called `book_43`
 * make sure `book_43` is of type MapPartitionsRDD, i.e.,
   * str(type(book_43)) == "<class 'pyspark.rdd.RDD'>" should return True 

In [90]:
### Write your code here
# The sparkContext.textFile() method is used to read a text file into RDD
book_43 = sc.textFile("data/43.txt", minPartitions = 96)
str(type(book_43)) == "<class 'pyspark.rdd.RDD'>"

True

### Q3

How many lines does `book_43` contain?
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [127]:
### Write your code here
print("Book 43 contains " + str(book_43.count()) + " lines.")
lines = open("data/43.txt").readlines()
print(len(lines))



Book 43 contains 2935 lines.
2935


                                                                                

### Q4 

Prior to analyzing the words contained in this book, we need to first remove the occurrences of non-alphabetical characters and numbers from the text. You can use the following function, which takes a line as input, removes digits and non-word characters, and splits it into a collection of words. 

```python
def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()
```

Use the fucntion above on the variable (test_line) to see what it returns.
```python
test_line = "This is an example of that contains 234 and a dash-containing number"
```

In [92]:
### Write your code here
def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()

In [93]:
test_line = "This is an example of that contains 234 and a dash-containing number"
clean_split_line(test_line)

['THIS',
 'IS',
 'AN',
 'EXAMPLE',
 'OF',
 'THAT',
 'CONTAINS',
 'AND',
 'A',
 'DASH',
 'CONTAINING',
 'NUMBER']

### Q5

How many words does `book_43` contain? To answer this question, you may find it useful to apply the function in a spark-fashion. 
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [128]:
### Write your code here
words = book_43.flatMap(clean_split_line)
print("Book 43 contains " + str(words.count()) + " words")



Book 43 contains 29116 words


                                                                                

### Q6

How many of the words in book_43 are unique? Given that words can appear in lower, upper or mixed case (ex. The, THE, the), make sure you convert the words into lower case before counting them.


In [129]:
def clean_split_line2(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.lower().split()
words2 = book_43.flatMap(clean_split_line2)
uniqueWords = words2.distinct()
countDistinct = uniqueWords.count()
print("The number of unique words in Book 43 is " + str(countDistinct))



The number of unique words in Book 43 is 4296




### Q7

* Generate an `RDD` that contains the frequency of each word in `book_43`. Call the variable `book_43_counts`. Each item in the `RDD` should be a tuple with the word as the first element of the tuple and the count as the second item of the tuple. The collection should look like the following:

[('project', 88), ("the", 1807), ... ]

* Such a collection may contain a large number of words and it would be imprudent to transfer all the words onto the same machine to display it. Instead, to explore the content of such a collection, display only the first element in your list. 

* Given the random nature of this operation, the first element element displayed may be different. The first entry for me was:
```
[('project', 88)]
```

* You can only use operations or actions to answer the question. 
* Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
* Code that uses functions such as `some_func(...)` is not allowed


In [96]:
### Write your code here
words_mapped = words2.map(lambda x: (x,1))
book_43_counts = words_mapped.reduceByKey(lambda x,y: x+y)
book_43_counts.take(1)

                                                                                

[('start', 6)]

### Q8

Sort `book_43_counts` and print the 20 most frequent words in book_43. 
  * Hint: function `sortByKey` sorts a collection of tuples on the first element element of the list. You can easily change the order of the items in each element and use `sortByKey` to sort on the second item of each element in `book_43_counts`
  * You can only use operations or actions to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [97]:
### Write your code here
book_43_counts_rev = book_43_counts.map(lambda x: (x[1],x[0]))
book_43_counts_rev.sortByKey(False).take(20)

                                                                                

[(1807, 'the'),
 (1068, 'of'),
 (1043, 'and'),
 (726, 'to'),
 (686, 'a'),
 (646, 'i'),
 (485, 'in'),
 (471, 'was'),
 (392, 'that'),
 (384, 'he'),
 (378, 'it'),
 (312, 'you'),
 (308, 'my'),
 (301, 'with'),
 (285, 'his'),
 (244, 'had'),
 (203, 'as'),
 (202, 'for'),
 (195, 'this'),
 (193, 'but')]

### Q9

You must have noted that the most frequent words in `book_43_counts` include stop words such as `of`, `the`, `and`, etc.

It would be inefficient to compare documents based on whether or not they contain stop words; those are common to all documents. As such, it's common to remove such stop words. The librarary `sklearn.feature_extraction` provides access to a collection of English stop words, which can be loaded using the following snippet:

```
from sklearn.feature_extraction import stop_words
stop_words.ENGLISH_STOP_WORDS
```

* Explore ENGLISH_STOP_WORDS (it's a frozen set data structure, i.e., a set that you cannot modify) by printing any 10 words from it. 
 * Hint: convert the frozen set to something you can subscript


In [98]:
### Write your code here
from sklearn.feature_extraction import _stop_words
stopWords = _stop_words.ENGLISH_STOP_WORDS
stopWordList = list(stopWords)
stopWordList[:10]

['alone',
 'four',
 'itself',
 'once',
 'nowhere',
 'for',
 'mill',
 'which',
 'per',
 'many']

### Q10

Filter out the words in `book_43_counts` by removing those that appear in the ENGLISH_STOP_WORDS.
Save the results to a new variable called `book_43_counts_filtered`
  * You can only use operarations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses function such as `some_func(...)` is not allowed


In [101]:
### Write your code here
book_43_counts_filtered = book_43_counts_rev.filter(lambda x: x[1] not in stopWordList)

### Q11

* How many words are left in `book_43_counts_filtered` after removing the stop words

In [130]:
### Write your code here
print("The number of words left in Book 43 after removing the stop words is " + str(book_43_counts_filtered.count()))



The number of words left in Book 43 after removing the stop words is 4034


                                                                                

### Q12 

* Create a function called *process_RDD* that combines the relevant steps you proposed above to make it convenient to apply them to the remaining four books. Your function should accept an input text file path and:
 * Reads in the file as a textRDD
 * Cleans and splits the line using `clean_split_line`
 * Filters out the stop words
 * Returns a word count RDD where each item is a tuple of words and its count.
 


In [114]:
### Write your code here
def process_RDD(bookFilePath):
    bookRDD = sc.textFile(bookFilePath, minPartitions=96)
    bookWords = bookRDD.flatMap(clean_split_line2)
    mapWords = bookWords.map(lambda x: (x,1))
    mapWordsSorted = mapWords.sortByKey()
    bookWordCount = mapWordsSorted.reduceByKey(lambda x,y: x+y)
    bookWordCountSorted = bookWordCount.map(lambda x: (x[1],x[0]))
    EngStopWords = _stop_words.ENGLISH_STOP_WORDS
    EngStopWordList = list(EngStopWords)
    bookCountFiltered = bookWordCountSorted.filter(lambda x: x[1] not in EngStopWordList)
    bookCountFilteredSorted = bookCountFiltered.sortByKey(False)
    return bookCountFilteredSorted

### Q13 

Apply the function `process_RDD` to `book_84`, `book_398` and `book_3296` and save the results to variables `book_84_counts_filtered`, `book_398_counts_filtered` and `book_3296_counts_filtered` respectively. How many distinct words does each book contain after filtering the stop words.


In [131]:
### Write your code here
book_43_counts_filtered = process_RDD("./data/43.txt")
print("Distinct words for Book 43 is " + str(book_43_counts_filtered.count()))



Distinct words for Book 43 is 4034




In [120]:
book_84_counts_filtered = process_RDD("./data/84.txt")
print("Distinct words for Book 84 is " + str(book_84_counts_filtered.count()))



Distinct words for Book 84 is 7016




In [121]:
book_398_counts_filtered = process_RDD("./data/398.txt")
print("Distinct words for Book 398 is " + str(book_398_counts_filtered.count()))



Distinct words for Book 398 is 2421


                                                                                

In [122]:
book_3296_counts_filtered = process_RDD("./data/3296.txt")
print("Distinct words for Book 3296 is " + str(book_3296_counts_filtered.count()))



Distinct words for Book 3296 is 7293


                                                                                

### Q14 

We discussed how to evaluate similarity between two texts using the number of words they share. We hypothesized that books that are similar should have more words in common than books that are dissimilar. If that holds, `book_398` and `book_3296`, which both pertain to religion, will have more words in common than, say, `book_84` and `book_398`. Test this hypothesis by writing code that compares and prints the number of words shared between `book_398` and `book_3296` and then between `book_84` and `book_398`.


In [124]:
cross_398_3296 = book_398_counts_filtered.map(lambda x: x[1]).intersection(book_3296_counts_filtered.map(lambda x: x[1]))
cross_84_398 = book_84_counts_filtered.map(lambda x: x[1]).intersection(book_398_counts_filtered.map(lambda x: x[1]))
print("The count of words shared between book_398 and book_3296 is ", cross_398_3296.count())
print("The count of words shared between book_84 and book_398 is ", cross_84_398.count())

                                                                                

The count of words shared between book_398 and book_3296 is  1790




The count of words shared between book_84 and book_398 is  1691


                                                                                

### Q15

* Based on the above, do you think counting the number of shared words is a good idea as a distance metric for evaluating topic similarity? Justify your answer?
* Hint: What do *book_84* and *book_3296* have in common? 

In [125]:
cross_84_3296 = book_84_counts_filtered.map(lambda x: x[1]).intersection(book_3296_counts_filtered.map(lambda x: x[1]))
print("The count of words shared between book_84 and book_3296 is ", cross_84_3296.count())




The count of words shared between book_84 and book_3296 is  3608




#### Answer
Book 43 is "The Strange Case of Dr. Jekyll and Mr. Hyde" with 4034 distinct words.
Book 84 is "Frankenstein with 7016 distinct words
Book 398 is the "Book of Adam and Eve" with 2421 distinct words
Book 3296 is "The Confessions of Saint Augustine" with 7293 distinct words.

70% of the distinct words in Book of Adam and Eve are also found in Frankenstein. But only 24% of the distinct words in Frankenstein are found in Adam and Eve. So, if we only look at the intersection for Adam and Eve, it seems like it has a very similar topic to Frankenstein. However, looking at the intersection for Frankenstein, shows that Frankenstein has very little to do with Adam and Eve.

If we compare book 84 (Frankenstein) with book 3296 (Confessions of Saint Augustine), we see that 51.4% of the distinct words in Frankenstein are found in Confessions of Saint Augustine. 49.5% of the distinct words in Saint Augustine are found in Frankenstein. We expect that there is some similarity because both books discuss creation and religion. However, the religious context of Saint Augustine's confessions is much greater than Frankenstein. So, the match is still low at ~50%.

Lastly, if we compare Adam and Eve vs. Confessions of Saint Augustine, we see that 74% of the distinct words in Adam and Eve are found in Saint Augustine, while only 54% of the distinct words in Saint Augustine are found in Adam and Eve. Both of these books are very religious and should share similar percentages of distinct words just based on their topic. However, they are of very different lengths (Adam and Eve is only about one-third as long as Saint Augustine).

In summary, this method will not be good for books that are very different in size even if their topics are similar (Adam and Eve vs. Confessions of Saint Augustine). But this method can work well for books of similar length (Frankenstein vs. Confessions of St. Augustine have a low match and have quite different topics)

## Part II 

Another approach to estimating similarity consists of computing the Euclidean distance across a set of words. For example, suppose we have 3 documents A, B and C with the following counts for the words `evolution`, `DNA`, `biology` and `finance`. 

```python 
A = [4, 9, 6, 8]
B = [3, 7, 7, 10]
C = [15, 10, 1, 1]
```
Although all documents contain exactly the four words, the number of times these words appear in each book may be indicative of thier topic. For example, documents `A` and `B` are more likely to be business related since they contain the word `finance` more frequently (8 and 10 times respectively). Document `C` may be a technical document since it focuses on more technical words (`evolution` and `DNA`) and less on the words `finance`.

The Euclidean distance, which can be computed using the `scikit` snippet below, is more indicative of topic-relatedness between the two documents.

```python
from scipy.spatial.distance import euclidean 
print(f"The Euclidean distance between A and B is: {euclidean(A, B)}")

print(f"The Euclidean distance between A and C is: {euclidean(A, C)}")

print(f"The Euclidean distance between B and C is: {euclidean(B, C)}")
```


### Q16

To calculate the Euclidean distance, we must first identify the set of words by which we will compare the documents. Here, we will explore the words that are common to all 4 documents. We will store the data in a matrix called `counts_matrix`.

Start by finding the words that are common to all four documents after stop-word filtering and store the counts for each word in a column of `counts_matrix`. 

To take the previous example, you can generate an emtpy matrix with 3 lines (documents `A`, `B` and `C`) and 4 columns (words `evolution`, `DNA`, `biology` and `finance`) using the following code.

```python
import numpy as np
counts_matrix = np.zeros([3,4])
```

After generting the counts, you can fill the counts for a document, say `A`, using the following code:

```python
counts_matrix[0, :] = [4, 9, 6, 8] 
```
* Other than for building `counts_matrix` you should exclusively use operations or actions on the `RDD` to answer this question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [135]:
### Write your code here
import numpy as np
cross_43_84_398_3296 = book_43_counts_filtered.map(lambda x: x[1]).intersection(book_84_counts_filtered.map(lambda x: x[1])).intersection(book_398_counts_filtered.map(lambda x: x[1])).intersection(book_3296_counts_filtered.map(lambda x: x[1]))
numColumns = cross_43_84_398_3296.count()
#print(numColumns)

                                                                                

In [137]:
counts_matrix = np.zeros([4, numColumns])
print(counts_matrix)

[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]


In [138]:
crossList = list(cross_43_84_398_3296.collect())

                                                                                

In [142]:
words_filtered_43 = book_43_counts_filtered.filter(lambda x: x[1] in crossList)
words_filtered_84 = book_84_counts_filtered.filter(lambda x: x[1] in crossList)
words_filtered_398 = book_398_counts_filtered.filter(lambda x: x[1] in crossList)
words_filtered_3296 = book_3296_counts_filtered.filter(lambda x: x[1] in crossList)
words_filtered_43_rev = words_filtered_43.map(lambda x: (x[1],x[0]))
words_filtered_84_rev = words_filtered_84.map(lambda x: (x[1],x[0]))
words_filtered_398_rev = words_filtered_398.map(lambda x: (x[1],x[0]))
words_filtered_3296_rev = words_filtered_3296.map(lambda x: (x[1],x[0]))
sorted_word_43 = words_filtered_43_rev.sortByKey(False)
sorted_word_84 = words_filtered_84_rev.sortByKey(False)
sorted_word_398 = words_filtered_398_rev.sortByKey(False)
sorted_word_3296 = words_filtered_3296_rev.sortByKey(False)
counts_matrix[0, :] = list(sorted_word_43.map(lambda x: x[1]).collect())
counts_matrix[1, :] = list(sorted_word_84.map(lambda x: x[1]).collect())
counts_matrix[2, :] = list(sorted_word_398.map(lambda x: x[1]).collect())
counts_matrix[3, :] = list(sorted_word_3296.map(lambda x: x[1]).collect())
print(counts_matrix)



[[ 1.  3.  5. ...  2.  3.  1.]
 [ 1.  9. 30. ...  4.  9.  1.]
 [ 1.  2.  1. ... 16.  5.  1.]
 [ 1.  4. 15. ...  3. 36. 16.]]




### Q17

Compute the Euclidean distance between `book_398` and `book_3296`, which both talk about religion and `book_84` and `book_398`. What do you conclude about using the Euclidean distance for evaluating topic relatedness across documents?


In [146]:
### Write your code here
from scipy.spatial.distance import euclidean

dist_398_3296 = euclidean(counts_matrix[2],counts_matrix[3])
dist_84_398= euclidean(counts_matrix[1],counts_matrix[2])
dist_84_3296= euclidean(counts_matrix[1],counts_matrix[3])

print("The Euclidean distance between Books 398 and 3296 is",dist_398_3296)
print("The Euclidean distance between Books 84 and 398 is",dist_84_398)
print("The Euclidean distance between Books 84 and 3296 is",dist_84_3296)

The Euclidean distance between Books 398 and 3296 is 1156.6628722320086
The Euclidean distance between Books 84 and 398 is 751.6688100486809
The Euclidean distance between Books 84 and 3296 is 1171.2493329773981


#### Answer
The Euclidean distance between Frankenstein (Book 84) vs. Adam and Eve (Book 398) is smaller than the Euclidean distances between Adam and Eve vs. Confessions of St. Augustine. This does not make sense because Adam and Eve should be more similar to Confessions of St. Augustine. A third comparison between books of similar length (Frankenstein and Confessions) shows an even larger Euclidean distance. A large Euclidean distance is expected for these two books however, as they have very different topics. So, Euclidean distance is also not a good measurement for books of very different lengths. Because if one book is much shorter, even though it has the same/similar topic, it will report a large Euclidean distance.

The raw distance coefficient also does not have an obvious upper bound. We only know that the lower bound of 0 = absolute identity. So, we can rank dissimilarities, but we don't really know how similar the books are.

### Q18

Bonus question (5 points): Can you think of a few things we could do to improve similarity between documents that pertain to the same topic. Justify your answer without giving code

#### Write your answer here
We can apply length normalization methods (as described in Week 12 - Feature_engineering_1) so that documents of different lengths have comparable weights. This makes the model applied less sensitive to the scale of the features. We could improve our previous analysis using Euclidean distance by performing instead a normalized Euclidean distance.

Alternatively, we could calculate a cosine similarity instead. Cosine similarity is preferred over Euclidean distance when comparing documents of varying sizes because it is a judgement of orientation. Cosine similarity does not care about magnitude.

Another option is to calculate the Pearson coefficient as a measure of how "in-sync" two sets of data are. The Pearson coefficient looks at the shape of the data distribution. So, for two data sets that are very similar, the shape of the distribution should also be similar.

## Part III

In this part we will build some basic analytics for a dataset consisting of flight arrival and departure details for all commercial flights within the USA in one month. While this dataset can be managed using Pandas (<1M records), scaling to a yearly or longer timeframe will greatly benefit from using a distributed computing framework such as `Spark`.

Here, you should use exclusively `SparkDatFrames. 

We want to analyze this dataset to better schedule trips.  For example:
 * Avoid airlines carriers that are most often associated with delays.
 * Avoid departure days where delays are most frequent.
 * Avoid airports which are associated with delays or long taxxying time.
* etc.
 

The information about the fields contained in the data file can be found [here](https://dataverse.harvard.edu/dataset.xhtml;jsessionid=0414e25969eccd0e88ae4d64fa0b?persistentId=doi%3A10.7910%2FDVN%2FHG7NV7&version=&q=&fileTypeGroupFacet=&fileTag=%221.+Documentation%22&fileSortField=date&fileSortOrder=desc)


### Q19

Load the file `flight_info.csv` into a spark `DataFrame` called `fight_info`.

  * Note that you will need to create a sparkSession prior to loading the data
  
* How many entries does the file contain?



In [2]:
### Write your code here
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()
flight_info = session.read.csv("./data/flight_info.csv",header=True, inferSchema=True)
numEntries = flight_info.count()
print("The number of entries in Flight Info is",numEntries)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/08 16:59:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

The number of entries in Flight Info is 450017


### Q20

Use `pySpark-SQL` or `pandas-like syntax to compute the airlines represented in this dataset
The airline information is stored in a field called UniqueCarrier
* UniqueCarrier: Represents the unique carrier code (ex.AA = American Airlines) 


In [3]:
### Write your code here
flight_info.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- TaxiOut: double (nullable = true)
 |-- WheelsOff: double (nullable = true)
 |-- WheelsOn: double (nullable = true)
 |-- TaxiIn: double (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- ArrTime: double (nullable = true)
 |-- Cancelled: double (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Distance: double (nullable = true)
 |-- CarrierDelay: double (nullable = true)
 |-- WeatherDelay: double (nullable = true)
 |-- NASDelay: double (nullable = true)
 |-- SecurityDelay: double (nullable = true)
 |-- LateAircraftDelay: double (nullable = true)



In [12]:
uniqueCarrier = flight_info.select('UniqueCarrier').distinct()
uniqueCarrier.head(uniqueCarrier.count())

[Row(UniqueCarrier='UA'),
 Row(UniqueCarrier='NK'),
 Row(UniqueCarrier='AA'),
 Row(UniqueCarrier='EV'),
 Row(UniqueCarrier='B6'),
 Row(UniqueCarrier='DL'),
 Row(UniqueCarrier='OO'),
 Row(UniqueCarrier='F9'),
 Row(UniqueCarrier='HA'),
 Row(UniqueCarrier='AS'),
 Row(UniqueCarrier='WN'),
 Row(UniqueCarrier='VX')]

### Q21

The data file contains various other fields, two of which are useful for answering the next question.

* CRSDepTime: Represents the scheduled departure time
* DepTime: Represents the actual departure time

Compute the number of flights delayed per each carried code represented in this dataset. Sort the data by decreasing order of delays.
  * A delay is observed when `DepTime` > `CRSDepTime`


In [5]:
### Write your code here
flight_info.createOrReplaceTempView("Flights")
flight_delays = session.sql("""
SELECT UniqueCarrier, COUNT(*)
FROM Flights
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier;
""")

In [6]:
flight_delays.head(uniqueCarrier.count())

                                                                                

[Row(UniqueCarrier='UA', count(1)=17701),
 Row(UniqueCarrier='NK', count(1)=4151),
 Row(UniqueCarrier='AA', count(1)=23461),
 Row(UniqueCarrier='EV', count(1)=11596),
 Row(UniqueCarrier='B6', count(1)=9396),
 Row(UniqueCarrier='DL', count(1)=24334),
 Row(UniqueCarrier='OO', count(1)=16751),
 Row(UniqueCarrier='F9', count(1)=2988),
 Row(UniqueCarrier='HA', count(1)=1939),
 Row(UniqueCarrier='AS', count(1)=4488),
 Row(UniqueCarrier='WN', count(1)=47472),
 Row(UniqueCarrier='VX', count(1)=2648)]

### Q22

 Use the file `airlines.csv` to find the the complete name of the airline. Here, you are required to load the file as a pyspark DataFrame; call it `airlines_info`, and repeat the query above while including the `flights.csv `file in your query ( requires doing a `join`) so that you can also display the full name of the carrier (second column). 

The result will look (approximately) like:

```
[Row(UniqueCarrier='WN', first(_c1)='Southwest Airlines', count=SOME_count),
 Row(UniqueCarrier='DL', first(_c1)='Delta Air Lines', count=SOME_count),
 Row(UniqueCarrier='AA', first(_c1)='American Airlines', count=SOME_count),
 ...
 ]
```

The carrier code in the `airlines.csv` file is provided in the 4th (1-based) column

Note that the file `airlines.csv` does not have column header. Hence, you need to print one line of your dataset to see what names Spark gave to the columns. Use the name provided by Spark in your query.

In [7]:
### Write your code here
airlines_info = session.read.csv("./data/airlines.csv",header=False,inferSchema=True)
for airline in airlines_info.head(5):
    print(airline)

Row(_c0=1, _c1='Private flight', _c2='\\N', _c3='-', _c4='N/A', _c5=None, _c6=None, _c7='Y')
Row(_c0=2, _c1='135 Airways', _c2='\\N', _c3=None, _c4='GNL', _c5='GENERAL', _c6='United States', _c7='N')
Row(_c0=3, _c1='1Time Airline', _c2='\\N', _c3='1T', _c4='RNX', _c5='NEXTIME', _c6='South Africa', _c7='Y')
Row(_c0=4, _c1='2 Sqn No 1 Elementary Flying Training School', _c2='\\N', _c3=None, _c4='WYT', _c5=None, _c6='United Kingdom', _c7='N')
Row(_c0=5, _c1='213 Flight Unit', _c2='\\N', _c3=None, _c4='TFU', _c5=None, _c6='Russia', _c7='N')


In [8]:
airlines_info.createOrReplaceTempView("airlines")
delaysFull = session.sql("""
SELECT UniqueCarrier, first(_c1), COUNT(*)
FROM Flights Fl
INNER JOIN airlines Ai
on Fl.UniqueCarrier = Ai._c3
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier;
""")

In [9]:
delaysFull.head(uniqueCarrier.count())

                                                                                

[Row(UniqueCarrier='AA', first(_c1)='American Airlines', count(1)=23461),
 Row(UniqueCarrier='AS', first(_c1)='Alaska Airlines', count(1)=4488),
 Row(UniqueCarrier='B6', first(_c1)='JetBlue Airways', count(1)=9396),
 Row(UniqueCarrier='DL', first(_c1)='Delta Air Lines', count(1)=24334),
 Row(UniqueCarrier='EV', first(_c1)='Atlantic Southeast Airlines', count(1)=11596),
 Row(UniqueCarrier='F9', first(_c1)='Frontier Airlines', count(1)=2988),
 Row(UniqueCarrier='HA', first(_c1)='Hawaiian Airlines', count(1)=1939),
 Row(UniqueCarrier='NK', first(_c1)='Spirit Airlines', count(1)=4151),
 Row(UniqueCarrier='OO', first(_c1)='SkyWest', count(1)=16751),
 Row(UniqueCarrier='UA', first(_c1)='United Airlines', count(1)=17701),
 Row(UniqueCarrier='VX', first(_c1)='Virgin America', count(1)=2648),
 Row(UniqueCarrier='WN', first(_c1)='Southwest Airlines', count(1)=47472)]

### Q23

Compute the number of delays per company per day. The day is encoded as an integer in the column `DayOfWeek` in `fight_info`. You can display the day as an integer or map it into a string name of the week.
Sort the data by airline code (UniqueCarrier) and by increasing values of DayOfWeek


You results should look like the following



In [19]:
### Write your code here
dailyDelays = session.sql("""
SELECT UniqueCarrier, DayOfWeek, COUNT(*) AS NumDelays
FROM Flights
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier, DayOfWeek
ORDER BY UniqueCarrier, DayOfWeek ASC;
""")
dailyDelays.show(truncate=False)

+-------------+---------+---------+
|UniqueCarrier|DayOfWeek|NumDelays|
+-------------+---------+---------+
|AA           |1        |4639     |
|AA           |2        |3288     |
|AA           |3        |2570     |
|AA           |4        |3142     |
|AA           |5        |3108     |
|AA           |6        |2509     |
|AA           |7        |4205     |
|AS           |1        |786      |
|AS           |2        |580      |
|AS           |3        |573      |
|AS           |4        |656      |
|AS           |5        |554      |
|AS           |6        |470      |
|AS           |7        |869      |
|B6           |1        |1754     |
|B6           |2        |1628     |
|B6           |3        |1109     |
|B6           |4        |1101     |
|B6           |5        |1175     |
|B6           |6        |900      |
+-------------+---------+---------+
only showing top 20 rows



In [13]:
dailyDelays.head(uniqueCarrier.count()*7)

[Row(UniqueCarrier='AA', DayOfWeek=1, NumDelays=4639),
 Row(UniqueCarrier='AA', DayOfWeek=2, NumDelays=3288),
 Row(UniqueCarrier='AA', DayOfWeek=3, NumDelays=2570),
 Row(UniqueCarrier='AA', DayOfWeek=4, NumDelays=3142),
 Row(UniqueCarrier='AA', DayOfWeek=5, NumDelays=3108),
 Row(UniqueCarrier='AA', DayOfWeek=6, NumDelays=2509),
 Row(UniqueCarrier='AA', DayOfWeek=7, NumDelays=4205),
 Row(UniqueCarrier='AS', DayOfWeek=1, NumDelays=786),
 Row(UniqueCarrier='AS', DayOfWeek=2, NumDelays=580),
 Row(UniqueCarrier='AS', DayOfWeek=3, NumDelays=573),
 Row(UniqueCarrier='AS', DayOfWeek=4, NumDelays=656),
 Row(UniqueCarrier='AS', DayOfWeek=5, NumDelays=554),
 Row(UniqueCarrier='AS', DayOfWeek=6, NumDelays=470),
 Row(UniqueCarrier='AS', DayOfWeek=7, NumDelays=869),
 Row(UniqueCarrier='B6', DayOfWeek=1, NumDelays=1754),
 Row(UniqueCarrier='B6', DayOfWeek=2, NumDelays=1628),
 Row(UniqueCarrier='B6', DayOfWeek=3, NumDelays=1109),
 Row(UniqueCarrier='B6', DayOfWeek=4, NumDelays=1101),
 Row(UniqueCarrie

### Q24  

Counting the number of delayed flights per airline is misleading, as airlines with more flights are more likley to have delays than companies with substantially fiewer flights. 

Repeat the same query above but, for each carrier, normalize the counts of delays by the total number of flights for that carrier. 


In [20]:
### Write your code here
delaysPerCarrier = session.sql("""
SELECT UniqueCarrier, first(_c1), COUNT(*)
FROM Flights Fl
INNER JOIN airlines Ai
ON Fl.UniqueCarrier = Ai._c3
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier
""")

In [21]:
totalFlightsPerCarrier = session.sql("""
SELECT UniqueCarrier, COUNT(*)
FROM Flights
GROUP BY UniqueCarrier
""")

In [22]:
delaysPerCarrier.createOrReplaceTempView("delays")
totalFlightsPerCarrier.createOrReplaceTempView("totals")
normalizedCounts = session.sql("""
SELECT d.UniqueCarrier, ROUND(d.`count(1)` / t.`count(1)`,2) AS norm
FROM delays d
INNER JOIN totals t
ON d.UniqueCarrier = t.UniqueCarrier
""")
normalizedCounts.head(uniqueCarrier.count())

                                                                                

[Row(UniqueCarrier='UA', norm=0.42),
 Row(UniqueCarrier='NK', norm=0.33),
 Row(UniqueCarrier='AA', norm=0.32),
 Row(UniqueCarrier='EV', norm=0.33),
 Row(UniqueCarrier='B6', norm=0.38),
 Row(UniqueCarrier='DL', norm=0.35),
 Row(UniqueCarrier='OO', norm=0.33),
 Row(UniqueCarrier='F9', norm=0.39),
 Row(UniqueCarrier='HA', norm=0.31),
 Row(UniqueCarrier='AS', norm=0.31),
 Row(UniqueCarrier='WN', norm=0.44),
 Row(UniqueCarrier='VX', norm=0.46)]

In [40]:
countsFullName = session.sql("""
SELECT UniqueCarrier, first(_c1), COUNT(*)
FROM Flights Fl
INNER JOIN airlines Ai
on Fl.UniqueCarrier = Ai._c3
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier;
""")
countsFullName.show()

+-------------+--------------------+--------+
|UniqueCarrier|          first(_c1)|count(1)|
+-------------+--------------------+--------+
|           AA|   American Airlines|   23461|
|           AS|     Alaska Airlines|    4488|
|           B6|     JetBlue Airways|    9396|
|           DL|     Delta Air Lines|   24334|
|           EV|Atlantic Southeas...|   11596|
|           F9|   Frontier Airlines|    2988|
|           HA|   Hawaiian Airlines|    1939|
|           NK|     Spirit Airlines|    4151|
|           OO|             SkyWest|   16751|
|           UA|     United Airlines|   17701|
|           VX|      Virgin America|    2648|
|           WN|  Southwest Airlines|   47472|
+-------------+--------------------+--------+



### Q25 

Time the query above. How long did it take to run. 
  * Make sure you run the code a few times and compute the average run time.
  * The above should be easy to implement if you use the correct Jupyter Notebook `magic` function
  

In [51]:
### Write your code here
%timeit -n10 -r10 -o sum(range(numEntries))
countsFullName = session.sql("""
SELECT UniqueCarrier, first(_c1), COUNT(*)
FROM Flights Fl
INNER JOIN airlines Ai
on Fl.UniqueCarrier = Ai._c3
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier;
""")
countsFullName.show()

4.94 ms ± 634 µs per loop (mean ± std. dev. of 10 runs, 10 loops each)
+-------------+--------------------+--------+
|UniqueCarrier|          first(_c1)|count(1)|
+-------------+--------------------+--------+
|           UA|     United Airlines|   17701|
|           NK|     Spirit Airlines|    4151|
|           AA|   American Airlines|   23461|
|           EV|Atlantic Southeas...|   11596|
|           B6|     JetBlue Airways|    9396|
|           DL|     Delta Air Lines|   24334|
|           OO|             SkyWest|   16751|
|           F9|   Frontier Airlines|    2988|
|           HA|   Hawaiian Airlines|    1939|
|           AS|     Alaska Airlines|    4488|
|           VX|      Virgin America|    2648|
|           WN|  Southwest Airlines|   47472|
+-------------+--------------------+--------+



### Q26 

Use one of the techniques covered in class to accelerate this query. Time your query to see by how much the run time was improved

In [52]:
### Write your code here - using Caching (Week 5_SparkIntroduction_2)
session.sql("""
UNCACHE TABLE Flights
""").collect()
session.sql("""
UNCACHE TABLE airlines
""").collect()
session.sql("""
UNCACHE TABLE delays
""").collect()
session.sql("""
UNCACHE TABLE totals
""").collect()

[]

In [53]:
### Uncached time
%timeit -n10 -r10 -o sum(range(numEntries))
countsFullName = session.sql("""
SELECT UniqueCarrier, first(_c1), COUNT(*)
FROM Flights Fl
INNER JOIN airlines Ai
on Fl.UniqueCarrier = Ai._c3
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier;
""")
countsFullName.show()

5.22 ms ± 778 µs per loop (mean ± std. dev. of 10 runs, 10 loops each)




+-------------+--------------------+--------+
|UniqueCarrier|          first(_c1)|count(1)|
+-------------+--------------------+--------+
|           AA|   American Airlines|   23461|
|           AS|     Alaska Airlines|    4488|
|           B6|     JetBlue Airways|    9396|
|           DL|     Delta Air Lines|   24334|
|           EV|Atlantic Southeas...|   11596|
|           F9|   Frontier Airlines|    2988|
|           HA|   Hawaiian Airlines|    1939|
|           NK|     Spirit Airlines|    4151|
|           OO|             SkyWest|   16751|
|           UA|     United Airlines|   17701|
|           VX|      Virgin America|    2648|
|           WN|  Southwest Airlines|   47472|
+-------------+--------------------+--------+



                                                                                

In [54]:
### Write your code here - using Caching (Week 5_SparkIntroduction_2)
session.sql("""
CACHE TABLE Flights
""").collect()
session.sql("""
CACHE TABLE airlines
""").collect()
session.sql("""
CACHE TABLE delays
""").collect()
session.sql("""
CACHE TABLE totals
""").collect()

21/11/08 19:44:13 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , DayOfWeek, UniqueCarrier, FlightNum, Origin, Dest, CRSDepTime, DepTime, TaxiOut, WheelsOff, WheelsOn, TaxiIn, CRSArrTime, ArrTime, Cancelled, CancellationCode, Distance, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay
 Schema: _c0, DayOfWeek, UniqueCarrier, FlightNum, Origin, Dest, CRSDepTime, DepTime, TaxiOut, WheelsOff, WheelsOn, TaxiIn, CRSArrTime, ArrTime, Cancelled, CancellationCode, Distance, CarrierDelay, WeatherDelay, NASDelay, SecurityDelay, LateAircraftDelay
Expected: _c0 but found: 
CSV file: file:///home/jovyan/work/data/flight_info.csv
                                                                                

[]

In [56]:
### Cached time
%timeit -n10 -r10 -o sum(range(numEntries))
countsFullName = session.sql("""
SELECT UniqueCarrier, first(_c1), COUNT(*)
FROM Flights Fl
INNER JOIN airlines Ai
on Fl.UniqueCarrier = Ai._c3
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier;
""")
countsFullName.show()

4.45 ms ± 346 µs per loop (mean ± std. dev. of 10 runs, 10 loops each)
+-------------+--------------------+--------+
|UniqueCarrier|          first(_c1)|count(1)|
+-------------+--------------------+--------+
|           UA|     United Airlines|   17701|
|           NK|     Spirit Airlines|    4151|
|           AA|   American Airlines|   23461|
|           EV|Atlantic Southeas...|   11596|
|           B6|     JetBlue Airways|    9396|
|           DL|     Delta Air Lines|   24334|
|           OO|             SkyWest|   16751|
|           F9|   Frontier Airlines|    2988|
|           HA|   Hawaiian Airlines|    1939|
|           AS|     Alaska Airlines|    4488|
|           VX|      Virgin America|    2648|
|           WN|  Southwest Airlines|   47472|
+-------------+--------------------+--------+



### Q27 

Is the departure delay (i.e., DepTime - CRSDepTime) predictive of the arrival delay (ArrTime > CRSArrTime)?
Use an approach of your choice (e.g. `skelearn` which we covered in class or `Spark`) to model as a linear regression the arrival delay as a function of the departure delay. 



In [60]:
### Write your code here
delayInfo = session.sql("""
SELECT (DepTime - CRSDepTime) AS startDelay, (ArrTime - CRSArrTime) AS endDelay
FROM Flights;
""")
delayInfo.head(10)

[Row(startDelay=-3.0, endDelay=-14.0),
 Row(startDelay=-5.0, endDelay=-35.0),
 Row(startDelay=-8.0, endDelay=-30.0),
 Row(startDelay=37.0, endDelay=73.0),
 Row(startDelay=13.0, endDelay=2.0),
 Row(startDelay=17.0, endDelay=65.0),
 Row(startDelay=-3.0, endDelay=51.0),
 Row(startDelay=0.0, endDelay=48.0),
 Row(startDelay=-3.0, endDelay=50.0),
 Row(startDelay=-1.0, endDelay=46.0)]

In [99]:
from pyspark.sql.functions import col
x = delayInfo.select(col("startDelay")).collect()
y = delayInfo.select(col("endDelay")).collect()
myx = list(map(lambda x: x["startDelay"],x))
myy = list(map(lambda y: y["endDelay"],y))

In [108]:
newx = []
newy = []

for i in range(0,len(myx)):
    if(myx[i]== None or myy[i]== None):
        pass
    else:
        newx.append(myx[i])
        newy.append(myy[i])

In [109]:
from scipy.stats import linregress
import numpy as np
from sklearn import datasets, linear_model

linreg = linregress(newx,newy)
linreg

LinregressResult(slope=0.22335475547344275, intercept=-20.14816658160143, rvalue=0.11616972584526296, pvalue=0.0, stderr=0.002876461667436919, intercept_stderr=0.4374235372787865)

#### Explanation
Since the rvalue is very small (0.116), these two differences are not well-correlated. So, departure delay is not predictive of arrival delay

In [110]:
booleanNewY = []
for i in range (0,len(newy)):
    if newy[i]>0:
        booleanNewY.append(1)
    else:
        booleanNewY.append(0)
linreg = linregress(newx,booleanNewY)
linreg

LinregressResult(slope=0.000481323476691118, intercept=0.37658137702559685, rvalue=0.15063741216078022, pvalue=0.0, stderr=4.758025919492034e-06, intercept_stderr=0.0007235530206188639)

#### Explanation
If the difference in ArrTime and CRSArrTime is positive, we set that to 1 to represent true. Otherwise it is 0 to represent false. As you can see from above, this is not strongly correlated either. So, the difference in DepTime and CRSDepTime is not strongly correlated to whether or not the arrival is delayed.