### Ronnie Kauanoe (rkauanoe@hawaii.edu)
### ICS 438 (CRN 87660)

In [396]:
import re

# Assignment 2: Analyzing large datasets with Spark.


For this assignment, you will need to make sure you're running from a PySpark docker environment I introduced in class. You can start the docker pySpark docker environment using the following command:

```
docker run --rm -p 4040:4040 -p 8888:8888 -v "%cd%":/home/jovyan/work jupyter/all-spark-notebook
```

Make sure you run the command from the directory containing this jupyter notebook and your data folder.


</b>
# WARNING: For some reason, ipynbb document didn't always sync properly when I was pushing to github. As such, please push often and make sure your incremental changes appear on GitHub.
</b>

### Part 1

The first part will use Spark to analyze the following books, which I have downloaded for you to use from Project Gutenberg. The files are saved to the data folder.

| File name | Book Title|
|:---------:|:----------|
|43.txt | The Strange Case of Dr. Jekyll and Mr. Hyde by Robert Louis Stevenson|
|84.txt | Frankenstein; Or, The Modern Prometheus by Mary Wollstonecraft Shelley |
|398.txt  | The First Book of Adam and Eve by Rutherford Hayes Platt|
|3296.txt | The Confessions of St. Augustine by Bishop of Hippo Saint Augustine|

The objective is to explore whether we can detect similarity between books within the same topic using word-based similarity. 

The task of identifying similar texts in Natural Language Processing is crucial. A naive method for determining whether two documents are similar is to treat them as collections of words (bag of words) and use the number of words they share as a proxy for their similarity. It makes sense that two books with religion as the topic (e.g.  `398.txt` and `3296.txt`) would have more words in common than a book that discusses religion and a book that discusses science fiction (e.g. books `84.txt` and `398.txt`). 

As mentioned above, we will be using Spark to analyze the data. Although Spark is not needed for such a small example, the platform would be ideal for analyzing very large collections of documents, like those often analyzed by large corporations

This part of the assignment will rely exclusively on RDDs.

### Q1. 
Start by importing Spark and making sure your environment is set up properly for the assignment.

Import the spark context necessary to load a document as an RDD; ignore any error messages

In [397]:
import pyspark
from pyspark import SparkContext 

### Q2 

Read in the file `43.txt` as a spark RDD and save it to a variable called `book_43`
 * make sure `book_43` is of type MapPartitionsRDD, i.e.,
   * str(type(book_43)) == "<class 'pyspark.rdd.RDD'>" should return True 

In [398]:
# Create or reuse a context
sc = SparkContext.getOrCreate()
# Read in 43.txt as a spark RDD
book_43 = sc.textFile("data/43.txt")
# Verify that book_43 is the correct type
print(str(type(book_43)) == "<class 'pyspark.rdd.RDD'>")

True


### Q3

How many lines does `book_43` contain?
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [399]:
print(book_43.count())

2935


### Q4 

Prior to analyzing the words contained in this book, we need to first remove the occurrences of non-alphabetical characters and numbers from the text. You can use the following function, which takes a line as input, removes digits and non-word characters, and splits it into a collection of words. 

```python
def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()
```

Use the fucntion above on the variable (test_line) to see what it returns.
```python
test_line = "This is an example of that contains 234 and a dash-containing number"
```

In [400]:
# Provided code
def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()
# Provided sample
test_line = "This is an example of that contains 234 and a dash-containing number"

print(clean_split_line(test_line))

['THIS', 'IS', 'AN', 'EXAMPLE', 'OF', 'THAT', 'CONTAINS', 'AND', 'A', 'DASH', 'CONTAINING', 'NUMBER']


### Q5

How many words does `book_43` contain? To answer this question, you may find it useful to apply the function in a spark-fashion. 
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [401]:
# Apply the function to every line, create a flat list, and count the elements
book_43.flatMap(lambda line: clean_split_line(line)).count()

29116

### Q6

How many of the words in book_43 are unique? Given that words can appear in lower, upper or mixed case (ex. The, THE, the), make sure you convert the words into lower case before counting them.


In [402]:
# Similar to above, apply the function to every line, ensure ever word is lowercase,
# create a flat list, remove any duplicate words using distinct(), and count the elements
book_43.flatMap(lambda line: [s.lower() for s in clean_split_line(line)]).distinct().count()

4296

### Q7

* Generate an `RDD` that contains the frequency of each word in `book_43`. Call the variable `book_43_counts`. Each item in the `RDD` should be a tuple with the word as the first element of the tuple and the count as the second item of the tuple. The collection should look like the following:

[('project', 88), ("the", 1807), ... ]

* Such a collection may contain a large number of words and it would be imprudent to transfer all the words onto the same machine to display it. Instead, to explore the content of such a collection, display only the first element in your list. 

* Given the random nature of this operation, the first element element displayed may be different. The first entry for me was:
```
[('project', 88)]
```

* You can only use operations or actions to answer the question. 
* Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
* Code that uses functions such as `some_func(...)` is not allowed


In [403]:
book_43_counts = book_43.flatMap(
    # Take all words in the document and put them in a flat list
    lambda line: clean_split_line(line)).map(
    # Then, make all words a key/value pair where (word, 1)
    lambda s: (s, 1)).reduceByKey(
    # Finally, reduce by adding pairs with equal keys and add their values together
    # ex. ("united", 1), ("united", 1) -> ("united", 2) to represent two instances of the words "united"
    lambda x, y: x + y)

### Q8

Sort `book_43_counts` and print the 20 most frequent words in book_43. 
  * Hint: function `sortByKey` sorts a collection of tuples on the first element element of the list. You can easily change the order of the items in each element and use `sortByKey` to sort on the second item of each element in `book_43_counts`
  * You can only use operations or actions to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [404]:
book_43_counts.sortBy(
    # Sort using the value of the tuple
    lambda kvp: kvp[1],
    # Descending values, greatest to least, and take the first 20
    ascending=False).take(20)

[('THE', 1807),
 ('OF', 1068),
 ('AND', 1043),
 ('TO', 726),
 ('A', 686),
 ('I', 646),
 ('IN', 485),
 ('WAS', 471),
 ('THAT', 392),
 ('HE', 384),
 ('IT', 378),
 ('YOU', 312),
 ('MY', 308),
 ('WITH', 301),
 ('HIS', 285),
 ('HAD', 244),
 ('AS', 203),
 ('FOR', 202),
 ('THIS', 195),
 ('BUT', 193)]

### Q9

You must have noted that the most frequent words in `book_43_counts` include stop words such as `of`, `the`, `and`, etc.

It would be inefficient to compare documents based on whether or not they contain stop words; those are common to all documents. As such, it's common to remove such stop words. The librarary `sklearn.feature_extraction` provides access to a collection of English stop words, which can be loaded using the following snippet:

```
from sklearn.feature_extraction import stop_words
stop_words.ENGLISH_STOP_WORDS
```

* Explore ENGLISH_STOP_WORDS (it's a frozen set data structure, i.e., a set that you cannot modify) by printing any 10 words from it. 
 * Hint: convert the frozen set to something you can subscript


In [405]:
# Slightly different import than given
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS as en_stop_words
# Convert from frozen data to list and convert to upper
stop_words = [word.upper() for word in list(en_stop_words)]

# Print the first 10 words
for i in range(10):
    print(stop_words[i])

TOO
TAKE
ANY
UN
NOT
NEVER
BECAME
ETC
WILL
ABOVE


### Q10

Filter out the words in `book_43_counts` by removing those that appear in the ENGLISH_STOP_WORDS.
Save the results to a new variable called `book_43_counts_filtered`
  * You can only use operarations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses function such as `some_func(...)` is not allowed


In [406]:
book_43_counts_filtered = book_43_counts.filter(
    # Does the word exist in the list of stop words? If not, keep it. Otherwise, remove.
    lambda kvp: kvp[0] not in stop_words)

### Q11

* How many words are left in `book_43_counts_filtered` after removing the stop words

In [407]:
book_43_counts_filtered.count()

4034

### Q12 

* Create a function called *process_RDD* that combines the relevant steps you proposed above to make it convenient to apply them to the remaining four books. Your function should accept an input text file path and:
 * Reads in the file as a textRDD
 * Cleans and splits the line using `clean_split_line`
 * Filters out the stop words
 * Returns a word count RDD where each item is a tuple of words and its count.
 


In [408]:
def process_RDD(path):
    # Create or reuse a context
    sc = SparkContext.getOrCreate()
    # Read in 43.txt as a spark RDD
    file = sc.textFile(path)
    
    counts = file.flatMap(
        # Take all words in the document and put them in a flat list
        lambda line: clean_split_line(line)).map(
        # Then, make all words a key/value pair where (word, 1)
        lambda s: (s, 1)).reduceByKey(
        # Finally, reduce by adding pairs with equal keys and add their values together
        # ex. ("united", 1), ("united", 1) -> ("united", 2) to represent two instances of the words "united"
        lambda x, y: x + y)
    
    counts.sortBy(
        # Sort using the value of the tuple
        lambda kvp: kvp[1],
        # Descending values, greatest to least, and take the first 20
        ascending=False)
    
    counts_filtered = counts.filter(
        # Does the word exist in the list of stop words? If not, keep it. Otherwise, remove.
        lambda kvp: kvp[0] not in stop_words)
    
    return counts_filtered

# Check if the content of book_43's RDD is the same as the one
# produced by process_RDD("data/43.txt")
print(process_RDD("data/43.txt").collect() == book_43_counts_filtered.collect())

True


### Q13 

Apply the function `process_RDD` to `book_84`, `book_398` and `book_3296` and save the results to variables `book_84_counts_filtered`, `book_398_counts_filtered` and `book_3296_counts_filtered` respectively. How many distinct words does each book contain after filtering the stop words.


In [409]:
book_84_counts_filtered = process_RDD("data/84.txt")
book_398_counts_filtered = process_RDD("data/398.txt")
book_3296_counts_filtered = process_RDD("data/3296.txt")

print("book_84 has %d distinct words" % (book_84_counts_filtered.count()))
print("book_398 has %d distinct words" % (book_398_counts_filtered.count()))
print("book_3296 has %d distinct words" % (book_3296_counts_filtered.count()))

book_84 has 7016 distinct words
book_398 has 2421 distinct words
book_3296 has 7293 distinct words


### Q14 

We discussed how to evaluate similarity between two texts using the number of words they share. We hypothesized that books that are similar should have more words in common than books that are dissimilar. If that holds, `book_398` and `book_3296`, which both pertain to religion, will have more words in common than, say, `book_84` and `book_398`. Test this hypothesis by writing code that compares and prints the number of words shared between `book_398` and `book_3296` and then between `book_84` and `book_398`.


In [410]:
# Function only used to reduce lines of written code, function only uses RDD actions/transformations
def compare_books(book1_counts_filtered, book2_counts_filtered):
    # Flatten book1 to get all words and ignore counts
    flat_book1 = book1_counts_filtered.flatMap(lambda kvp: [kvp[0]]).collect()
    
    shared_words = book2_counts_filtered.filter(
        # Only keep the words that are in both books, remove otherwise
        lambda kvp: kvp[0] in flat_book1).flatMap(
        # Only keep the words, not the counts of the words
        lambda kvp: [kvp[0]]).collect()
    
    return shared_words

shared_398_3296 = compare_books(book_398_counts_filtered, book_3296_counts_filtered)
shared_84_398 = compare_books(book_84_counts_filtered, book_398_counts_filtered)

print("Books 398 and 3296 share %d words" % (len(shared_398_3296)))
print("Books 84 and 398 share %d words" % (len(shared_84_398)))

Books 398 and 3296 share 1790 words
Books 84 and 398 share 1691 words


### Q15

* Based on the above, do you think counting the number of shared words is a good idea as a distance metric for evaluating topic similarity? Justify your answer?
* Hint: What do *book_84* and *book_3296* have in common? 

In [411]:
# Get list of shared terms between books 84 and 3296
shared_84_3296 = compare_books(book_84_counts_filtered, book_3296_counts_filtered)
# Get top 20 used words in book 84
high_use_shared_84 = book_84_counts_filtered.filter(
    lambda kvp: kvp[0] in shared_84_3296).sortBy(
    # Sort using the value of the tuple
    lambda kvp: kvp[1],
    # Descending values, greatest to least
    ascending=False).take(20)


#Get top 20 used words in book 3296
high_use_shared_3296 = book_3296_counts_filtered.filter(
    lambda kvp: kvp[0] in shared_84_3296).sortBy(
    # Sort using the value of the tuple
    lambda kvp: kvp[1],
    # Descending values, greatest to least
    ascending=False).take(20)


flat_high_use_shared_84 = [kvp[0] for kvp in high_use_shared_84]

# Get book 3296's most used terms relative to book 84's most used terms
similar_counts_3296 = book_3296_counts_filtered.filter(
    lambda kvp: kvp[0] in flat_high_use_shared_84).sortBy(
    # Sort using the value of the tuple
    lambda kvp: kvp[1],
    # Descending values, greatest to least
    ascending=False).collect()


flat_high_use_shared_3296 = [kvp[0] for kvp in high_use_shared_3296]

# Get book 84's most used terms relative to book 3296's most used terms
similar_counts_84 = book_84_counts_filtered.filter(
    lambda kvp: kvp[0] in flat_high_use_shared_3296).sortBy(
    # Sort using the value of the tuple
    lambda kvp: kvp[1],
    # Descending values, greatest to least
    ascending=False).collect()



print("Most used words and counts in book 84")
print(high_use_shared_84)
print("\n")

print("Book 3296's most used words relative to book 84's most used words")
print(similar_counts_3296)
print("\n")

print("Most used words and counts in book 3296")
print(high_use_shared_3296)
print("\n")

print("Book 84's most used words relative to book 3296's most used words")
print(similar_counts_84)

Most used words and counts in book 84
[('MAN', 137), ('FATHER', 133), ('DID', 119), ('LIFE', 116), ('SHALL', 106), ('EYES', 104), ('SAID', 102), ('S', 100), ('GUTENBERG', 98), ('TIME', 98), ('SAW', 94), ('NIGHT', 91), ('PROJECT', 88), ('MIND', 85), ('DAY', 82), ('HEART', 81), ('FELT', 80), ('DEATH', 79), ('WORK', 78), ('FEELINGS', 76)]


Book 3296's most used words relative to book 84's most used words
[('MAN', 268), ('DID', 248), ('TIME', 228), ('LIFE', 202), ('SHALL', 192), ('MIND', 185), ('HEART', 180), ('S', 136), ('SAID', 131), ('DAY', 107), ('GUTENBERG', 97), ('PROJECT', 88), ('EYES', 82), ('WORK', 62), ('FATHER', 58), ('DEATH', 57), ('SAW', 50), ('NIGHT', 30), ('FELT', 13), ('FEELINGS', 9)]


Most used words and counts in book 3296
[('THOU', 1092), ('THEE', 924), ('THY', 901), ('THINGS', 553), ('GOD', 510), ('LORD', 316), ('O', 302), ('GOOD', 279), ('MAN', 268), ('DID', 248), ('EARTH', 244), ('TIME', 228), ('SOUL', 215), ('HEAVEN', 202), ('LIFE', 202), ('TRUTH', 199), ('SHALL', 

<b>RESPONSE</b>: The above code compares the most common terms in books 84 and 3296. Let's compare the occurences of certain words between the two texts.

"Father" is the second most used word in book 84 with 133 uses but "father" is only used 58 times in book 3296, far less than the book's second most used word "thee" with 924 uses.

"Things" is the fourth most used word in book 3296 with a whopping 553 uses but "things" is only used 11 times in book 84, far less than the book's fourth most used word "life".

These are just two examples on how the texts vary. Using just the number of shared words to compare the books similarity of topics completely ignores how often those words are used in the books. <u>Therefore, I believe that counting the number of shared words is not a good idea as a distance metric for evaluating topic similarity.</u>

## Part II 

Another approach to estimating similarity consists of computing the Euclidean distance across a set of words. For example, suppose we have 3 documents A, B and C with the following counts for the words `evolution`, `DNA`, `biology` and `finance`. 

```python 
A = [4, 9, 6, 8]
B = [3, 7, 7, 10]
C = [15, 10, 1, 1]
```
Although all documents contain exactly the four words, the number of times these words appear in each book may be indicative of thier topic. For example, documents `A` and `B` are more likely to be business related since they contain the word `finance` more frequently (8 and 10 times respectively). Document `C` may be a technical document since it focuses on more technical words (`evolution` and `DNA`) and less on the words `finance`.

The Euclidean distance, which can be computed using the `scikit` snippet below, is more indicative of topic-relatedness between the two documents.

```python
from scipy.spatial.distance import euclidean 
print(f"The Euclidean distance between A and B is: {euclidean(A, B)}")

print(f"The Euclidean distance between A and C is: {euclidean(A, C)}")

print(f"The Euclidean distance between B and C is: {euclidean(B, C)}")
```


### Q16

To calculate the Euclidean distance, we must first identify the set of words by which we will compare the documents. Here, we will explore the words that are common to all 4 documents. We will store the data in a matrix called `counts_matrix`.

Start by finding the words that are common to all four documents after stop-word filtering and store the counts for each word in a column of `counts_matrix`. 

To take the previous example, you can generate an emtpy matrix with 3 lines (documents `A`, `B` and `C`) and 4 columns (words `evolution`, `DNA`, `biology` and `finance`) using the following code.

```python
import numpy as np
counts_matrix = np.zeros([3,4])
```

After generting the counts, you can fill the counts for a document, say `A`, using the following code:

```python
counts_matrix[0, :] = [4, 9, 6, 8] 
```
* Other than for building `counts_matrix` you should exclusively use operations or actions on the `RDD` to answer this question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [412]:
import numpy as np

# Get all distinct words from book_84
flat_book_84 = [kvp[0] for kvp in book_84_counts_filtered.collect()]
# Get all distinct words from book_398
flat_book_398 = [kvp[0] for kvp in book_398_counts_filtered.collect()]
# Get all distinct words from book_3296
flat_book_3296 = [kvp[0] for kvp in book_3296_counts_filtered.collect()]

# List of all words (alphabetized) shared between all books
shared_words = book_43_counts_filtered.filter(
    # Check that the word exists in ALL books
    lambda kvp: (kvp[0] in flat_book_84)
    and (kvp[0] in flat_book_398)
    and (kvp[0] in flat_book_3296)).sortBy(
    # Then, alphabetize
    lambda kvp: kvp[0]).flatMap(
    # Finally, keep only the words
    lambda kvp: [kvp[0]])

print("All books share %d words" % (shared_words.count()))

# 4 documents = 4 lines
counts_matrix = np.zeros([4, shared_words.count()])

# Retrieve list from RDD
list_shared_words = shared_words.collect()

# Add book_43's shared word counts to matrix
counts_matrix[0, :] = book_43_counts_filtered.filter(
    # Keep only shared words
    lambda kvp: kvp[0] in list_shared_words).sortBy(
    # Alphabetize the words
    lambda kvp: kvp[0]).flatMap(
    # Keep only the counts
    lambda kvp: [kvp[1]]).collect()

# Add book_84's shared word counts to matrix
counts_matrix[1, :] = book_84_counts_filtered.filter(
    # Keep only shared words
    lambda kvp: kvp[0] in list_shared_words).sortBy(
    # Alphabetize the words
    lambda kvp: kvp[0]).flatMap(
    # Keep only the counts
    lambda kvp: [kvp[1]]).collect()

# Add book_398's shared word counts to matrix
counts_matrix[2, :] = book_398_counts_filtered.filter(
    # Keep only shared words
    lambda kvp: kvp[0] in list_shared_words).sortBy(
    # Alphabetize the words
    lambda kvp: kvp[0]).flatMap(
    # Keep only the counts
    lambda kvp: [kvp[1]]).collect()

# Add book_3296's shared word counts to matrix
counts_matrix[3, :] = book_3296_counts_filtered.filter(
    # Keep only shared words
    lambda kvp: kvp[0] in list_shared_words).sortBy(
    # Alphabetize the words
    lambda kvp: kvp[0]).flatMap(
    # Keep only the counts
    lambda kvp: [kvp[1]]).collect()

# Let's check that the matrix has been populated correctly
books = [book_43_counts_filtered,
         book_84_counts_filtered,
         book_398_counts_filtered,
         book_3296_counts_filtered]
# Get the first five words and counts
for i in range(4):
    print(books[i].filter(
        lambda kvp: kvp[0] in list_shared_words).sortBy(
        lambda kvp: kvp[0]).take(5))

# As we can see in the output, the counts match with the matrix values
print(counts_matrix[:, 0])
print(counts_matrix[:, 1])
print(counts_matrix[:, 2])
print(counts_matrix[:, 3])
print(counts_matrix[:, 4])

All books share 1162 words
[('ABIDE', 1), ('ABLE', 3), ('ACCEPT', 2), ('ACCEPTED', 2), ('ACCEPTING', 1)]
[('ABIDE', 1), ('ABLE', 9), ('ACCEPT', 4), ('ACCEPTED', 2), ('ACCEPTING', 1)]
[('ABIDE', 1), ('ABLE', 5), ('ACCEPT', 16), ('ACCEPTED', 14), ('ACCEPTING', 1)]
[('ABIDE', 16), ('ABLE', 36), ('ACCEPT', 3), ('ACCEPTED', 3), ('ACCEPTING', 1)]
[ 1.  1.  1. 16.]
[ 3.  9.  5. 36.]
[ 2.  4. 16.  3.]
[ 2.  2. 14.  3.]
[1. 1. 1. 1.]


### Q17

Compute the Euclidean distance between `book_398` and `book_3296`, which both talk about religion and `book_84` and `book_398`. What do you conclude about using the Euclidean distance for evaluating topic relatedness across documents?


In [413]:
from scipy.spatial.distance import euclidean

print(euclidean(counts_matrix[2, :], counts_matrix[3, :]))
print(euclidean(counts_matrix[1, :], counts_matrix[2, :]))

1156.6628722320086
751.6688100486809


I can conclude that using Euclidean distance is a more accurate way to evaluate topic relatedness across documents

### Q18

Bonus question (5 points): Can you think of a few things we could do to improve similarity between documents that pertain to the same topic. Justify your answer without giving code

We can improve the similarity between the documents by excluding words that are only used a single time in each document. For example, there are large portion of words that only occur once or twice in both documents. However, keeping these in will still skew the Euclidean distance by a few points, and will cause them to drift farther apart when in actuality, they are closely related.

Additionally, we can use the optional parameter w for the the Euclidean distance function. This parameter is an array that holds float values corresponding to the weight of each word. We can use this array to more heavily weigh certain words over others to more accurately show similarity or dissimilarity. 

## Part III

In this part we will build some basic analytics for a dataset consisting of flight arrival and departure details for all commercial flights within the USA in one month. While this dataset can be managed using Pandas (<1M records), scaling to a yearly or longer timeframe will greatly benefit from using a distributed computing framework such as `Spark`.

Here, you should use exclusively `SparkDatFrames. 

We want to analyze this dataset to better schedule trips.  For example:
 * Avoid airlines carriers that are most often associated with delays.
 * Avoid departure days where delays are most frequent.
 * Avoid airports which are associated with delays or long taxxying time.
* etc.
 

The information about the fields contained in the data file can be found [here](https://dataverse.harvard.edu/dataset.xhtml;jsessionid=0414e25969eccd0e88ae4d64fa0b?persistentId=doi%3A10.7910%2FDVN%2FHG7NV7&version=&q=&fileTypeGroupFacet=&fileTag=%221.+Documentation%22&fileSortField=date&fileSortOrder=desc)


### Q19

Load the file `flight_info.csv` into a spark `DataFrame` called `flight_info`.

  * Note that you will need to create a sparkSession prior to loading the data
  
* How many entries does the file contain?



In [414]:
import pyspark.sql
import pyspark.sql.types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create or reuse a context
sc = SparkContext.getOrCreate()

# Create a SQL context from the SparkContext
sqlC = SQLContext.getOrCreate(sc)

# Read csv file and put data into dataframe
flight_info = sqlC.read.format("csv").option("header", "true").load("data/flight_info.csv")

print("There are %d entries in the file" % (flight_info.count()))

There are 450017 entries in the file


### Q20

Use `pySpark-SQL` or `pandas-like syntax to compute the airlines represented in this dataset
The airline information is stored in a field called UniqueCarrier
* UniqueCarrier: Represents the unique carrier code (ex.AA = American Airlines) 


In [415]:
# Get all unique airline codes and put them into a list
airlines = [row['UniqueCarrier'] for row in flight_info.select("UniqueCarrier").distinct().collect()]

print(airlines)



['UA', 'NK', 'AA', 'EV', 'B6', 'DL', 'OO', 'F9', 'HA', 'AS', 'VX', 'WN']


                                                                                

### Q21

The data file contains various other fields, two of which are useful for answering the next question.

* CRSDepTime: Represents the scheduled departure time
* DepTime: Represents the actual departure time

Compute the number of flights delayed per each carried code represented in this dataset. Sort the data by decreasing order of delays.
  * A delay is observed when `DepTime` > `CRSDepTime`


In [416]:
# Holds tuples of schema ([AIRLINE], [DELAY_COUNT])
delays = []

for airline in airlines:
    # Select the two columns for the specific airline
    times = flight_info.select("DepTime", "CRSDepTime").where(flight_info["UniqueCarrier"] == airline)
        
    # Creates a list of 0, 1 where 1 is DepTime > CRSDepTime and 0 otherwise
    delay_buffer = [int((float(row["DepTime"]) if (row["DepTime"] is not None) else 0) > (float(row["CRSDepTime"]) if (row["CRSDepTime"] is not None) else 0)) for row in times.collect()]
    
    delays.append((airline, sum(delay_buffer)))

# Sort the list of tuples by number of delays
delays.sort(key = lambda x: x[1], reverse = True)
    
print(delays)

                                                                                

[('WN', 47472), ('DL', 24334), ('AA', 23461), ('UA', 17701), ('OO', 16751), ('EV', 11596), ('B6', 9396), ('AS', 4488), ('NK', 4151), ('F9', 2988), ('VX', 2648), ('HA', 1939)]


### Q22

 Use the file `airlines.csv` to find the the complete name of the airline. Here, you are required to load the file as a pyspark DataFrame; call it `airlines_info`, and repeat the query above while including the `flights.csv `file in your query ( requires doing a `join`) so that you can also display the full name of the carrier (second column). 

The result will look (approximately) like:

```
[Row(UniqueCarrier='WN', first(_c1)='Southwest Airlines', count=SOME_count),
 Row(UniqueCarrier='DL', first(_c1)='Delta Air Lines', count=SOME_count),
 Row(UniqueCarrier='AA', first(_c1)='American Airlines', count=SOME_count),
 ...
 ]
```

The carrier code in the `airlines.csv` file is provided in the 4th (1-based) column

Note that the file `airlines.csv` does not have column header. Hence, you need to print one line of your dataset to see what names Spark gave to the columns. Use the name provided by Spark in your query.

In [417]:
# Create or reuse a context
sc = SparkContext.getOrCreate()

# Create a SQL context from the SparkContext
sqlC = SQLContext.getOrCreate(sc)

# Read csv file and put data into dataframe
airlines_info = sqlC.read.format("csv").option("header", "false").load("data/airlines.csv")

combined_info = airlines_info.drop(airlines_info["_c0"]).join(flight_info, flight_info["UniqueCarrier"] == airlines_info["_c3"])

# Holds tuples of schema ([UNIQUE_CARRIER], [AIRLINE_NAME], [DELAY_COUNT])
delays = []

for uc in airlines:
    
    # Select the two columns for the specific airline
    times = combined_info.select("DepTime", "CRSDepTime", "_c1").where(combined_info["UniqueCarrier"] == uc)
    # Creates a list of 0, 1 where 1 is DepTime > CRSDepTime and 0 otherwise
    delay_buffer = [int((float(row["DepTime"]) if (row["DepTime"] is not None) else 0) > (float(row["CRSDepTime"]) if (row["CRSDepTime"] is not None) else 0)) for row in times.collect()]
    
    # Get the airline name from the row
    airline_name = times.take(1)[0]["_c1"]
        
    delays.append((uc, airline_name, sum(delay_buffer)))

# Sort the list of tuples by number of delays
delays.sort(key = lambda x: x[2], reverse = True)
    
print(delays)

                                                                                

[('WN', 'Southwest Airlines', 47472), ('DL', 'Delta Air Lines', 24334), ('AA', 'American Airlines', 23461), ('UA', 'United Airlines', 17701), ('OO', 'SkyWest', 16751), ('EV', 'Atlantic Southeast Airlines', 11596), ('B6', 'JetBlue Airways', 9396), ('AS', 'Alaska Airlines', 4488), ('NK', 'Spirit Airlines', 4151), ('F9', 'Frontier Airlines', 2988), ('VX', 'Virgin America', 2648), ('HA', 'Hawaiian Airlines', 1939)]


### Q23

Compute the number of delays per company per day. The day is encoded as an integer in the column `DayOfWeek` in `fight_info`. You can display the day as an integer or map it into a string name of the week.
Sort the data by airline code (UniqueCarrier) and by increasing values of DayOfWeek


You results should look like the following



In [None]:
# Holds tuples of schema ()
delays = []

week = {1: "MON", 2: "TUES", 3: "WED", 4: "THURS", 5: "FRI", 6: "SAT", 7: "SUN"}

for uc in airlines:
    # Select only the following four columns
    times = combined_info.select("DepTime", "CRSDepTime", "_c1", "DayOfWeek").where(combined_info["UniqueCarrier"] == uc)

    # Get the airline name from the row
    airline_name = times.take(1)[0]["_c1"]
    
    # Dictionary to save delays per day
    day_dict = {}
    
    # Cycle over all seven days
    for day in range(1, 8):
        # Only take entries that fall on this day
        times_this_day = times.where(combined_info["DayOfWeek"] == str(day))
        # Same as before, create a binary buffer
        delay_buffer = [int((float(row["DepTime"]) if (row["DepTime"] is not None) else 0) > (float(row["CRSDepTime"]) if (row["CRSDepTime"] is not None) else 0)) for row in times_this_day.collect()]
        # Sum up the buffer for number of delays on that day
        delays_this_day = sum(delay_buffer)
        # Put this sum into the dictionary
        day_dict[week[day]] = delays_this_day
    
    # Flatten the dictionary into a list of tuples
    flat_days = [(k, v) for k, v in day_dict.items()]
    # Create a tuple that follows the schema: (UC, AIRLINE_NAME, (("MON", ___), ("TUES", ___), etc.))
    airline_tuple = tuple([uc, airline_name, tuple(tuple(x) for x in flat_days)])
    # Add the tuple to the delay list
    delays.append(airline_tuple)
    
# Sort on UniqueCarrier
delays.sort(key = lambda x: x[1])    

[print(single_airline_delay) for single_airline_delay in delays]

                                                                                

### Q24  

Counting the number of delayed flights per airline is misleading, as airlines with more flights are more likley to have delays than companies with substantially fiewer flights. 

Repeat the same query above but, for each carrier, normalize the counts of delays by the total number of flights for that carrier. 


In [None]:
### Write your code here

### Q25 

Time the query above. How long did it take to run. 
  * Make sure you run the code a few times and compute the average run time.
  * The above should be easy to implement if you use the correct Jupyter Notebook `magic` function
  

In [None]:
### Write your code here

### Q26 

Use one of the techniques covered in class to accelerate this query. Time your query to see by how much the run time was improved

In [None]:
### Write your code here

### Q27 

Is the departure delay (i.e., DepTime - CRSDepTime) predictive of the arrival delay (ArrTime > CRSArrTime)?
Use an approach of your choice (e.g. `skelearn` which we covered in class or `Spark`) to model as a linear regression the arrival delay as a function of the departure delay. 



In [None]:
### Write your code here