In [15]:
import re


For this assignment, you will need to make sure you're running from a PySpark docker environment I introduced in class. You can start the docker pySpark docker environment using the following command:

```
docker run --rm -p 4040:4040 -p 8888:8888 -v $(pwd):/home/jovyan/work jupyter/all-spark-notebook
```

Make sure you run the command from the directory containing this jupyter notebook and your data folder.


</b>
# WARNING: For some reason, ipynbb document didn't always sync properly when I was pushing to github. As such, please push often and make sure your incremental changes appear on GitHub.
</b>

### Part 1

The first part will use Spark to analyze the following books, which I have downloaded for you to use from Project Gutenberg. The files are saved to the data folder.

| File name | Book Title|
|:---------:|:----------|
|43.txt | The Strange Case of Dr. Jekyll and Mr. Hyde by Robert Louis Stevenson|
|84.txt | Frankenstein; Or, The Modern Prometheus by Mary Wollstonecraft Shelley |
|398.txt  | The First Book of Adam and Eve by Rutherford Hayes Platt|
|3296.txt | The Confessions of St. Augustine by Bishop of Hippo Saint Augustine|

The objective is to explore whether we can detect similarity between books within the same topic using word-based similarity. 

The task of identifying similar texts in Natural Language Processing is crucial. A naive method for determining whether two documents are similar is to treat them as collections of words (bag of words) and use the number of words they share as a proxy for their similarity. It makes sense that two books with religion as the topic (e.g.  `398.txt` and `3296.txt`) would have more words in common than a book that discusses religion and a book that discusses science fiction (e.g. books `84.txt` and `398.txt`). 

As mentioned above, we will be using Spark to analyze the data. Although Spark is not needed for such a small example, the platform would be ideal for analyzing very large collections of documents, like those often analyzed by large corporations

This part of the assignment will rely exclusively on RDDs.

### Q1. 
Start by importing Spark and making sure your environment is set up properly for the assignment.

Import the spark context necessary to load a document as an RDD; ignore any error messages

In [3]:
from pyspark import SparkContext
sc = SparkContext()
sc.version

'3.1.2'

### Q2 

Read in the file `43.txt` as a spark RDD and save it to a variable called `book_43`
 * make sure `book_43` is of type MapPartitionsRDD, i.e.,
   * str(type(book_43)) == "<class 'pyspark.rdd.RDD'>" should return True 

In [11]:
book_43 = sc.textFile('data/43.txt')
str(type(book_43)) == "<class 'pyspark.rdd.RDD'>"

True

### Q3

How many lines does `book_43` contain?
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [14]:
book_43.count()

2935

### Q4 

Prior to analyzing the words contained in this book, we need to first remove the occurrences of non-alphabetical characters and numbers from the text. You can use the following function, which takes a line as input, removes digits and non-word characters, and splits it into a collection of words. 

```python
def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()
```

Use the fucntion above on the variable (test_line) to see what it returns.
```python
test_line = "This is an example of that contains 234 and a dash-containing number"
```

In [19]:
def clean_split_line(line):
    line = re.sub('\d+', '', line)
    line = re.sub('[\W]+', ' ', line)
    return line.split()
test_line = "This is an example of that contains 234 and a dash-containing number"

clean_split_line(test_line)

['This',
 'is',
 'an',
 'example',
 'of',
 'that',
 'contains',
 'and',
 'a',
 'dash',
 'containing',
 'number']

### Q5

How many words does `book_43` contain? To answer this question, you may find it useful to apply the function in a spark-fashion. 
* You can only use operations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [23]:
book_43.flatMap(clean_split_line).count()

29116

### Q6

How many of the words in book_43 are unique? Given that words can appear in lower, upper or mixed case (ex. The, THE, the), make sure you convert the words into lower case before counting them.


In [2]:
# book_43.flatMap(clean_split_line).map(lambda x: x.lower()).distinct().count()

### Q7

* Generate an `RDD` that contains the frequency of each word in `book_43`. Call the variable `book_43_counts`. Each item in the `RDD` should be a tuple with the word as the first element of the tuple and the count as the second item of the tuple. The collection should look like the following:

[('project', 88), ("the", 1807), ... ]

* Such a collection may contain a large number of words and it would be imprudent to transfer all the words onto the same machine to display it. Instead, to explore the content of such a collection, display only the first element in your list. 

* Given the random nature of this operation, the first element element displayed may be different. The first entry for me was:
```
[('project', 88)]
```

* You can only use operations or actions to answer the question. 
* Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
* Code that uses functions such as `some_func(...)` is not allowed


In [31]:
book_43_counts = book_43.flatMap(clean_split_line).map(lambda x: (x.lower(),1)).reduceByKey(lambda x,y: x+y)
book_43_counts.take(1)

[('project', 88)]

### Q8

Sort `book_43_counts` and print the 20 most frequent words in book_43. 
  * Hint: function `sortByKey` sorts a collection of tuples on the first element element of the list. You can easily change the order of the items in each element and use `sortByKey` to sort on the second item of each element in `book_43_counts`
  * You can only use operations or actions to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed

In [4]:
book_43_counts.map(lambda x:(x[1], x[0])).sortByKey(ascending=False).take(10)

### Q9

You must have noted that the most frequent words in `book_43_counts` include stop words such as `of`, `the`, `and`, etc.

It would be inefficient to compare documents based on whether or not they contain stop words; those are common to all documents. As such, it's common to remove such stop words. The librarary `sklearn.feature_extraction` provides access to a collection of English stop words, which can be loaded using the following snippet:

```
from sklearn.feature_extraction import stop_words
stop_words.ENGLISH_STOP_WORDS
```

* Explore ENGLISH_STOP_WORDS (it's a frozen set data structure, i.e., a set that you cannot modify) by printing any 10 words from it. 
 * Hint: convert the frozen set to something you can subscript


In [44]:
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS
list(ENGLISH_STOP_WORDS)[:10]

['once',
 'thru',
 'sometimes',
 'seem',
 'give',
 'everything',
 'upon',
 'hereby',
 'never',
 'formerly']

### Q10

Filter out the words in `book_43_counts` by removing those that appear in the ENGLISH_STOP_WORDS.
Save the results to a new variable called `book_43_counts_filtered`
  * You can only use operarations or actions on RDDs to answer the question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses function such as `some_func(...)` is not allowed


In [52]:
book_43_counts_filtered = book_43_counts.filter(lambda x: x[0] not in ENGLISH_STOP_WORDS)


### Q11

* How many words are left in `book_43_counts_filtered` after removing the stop words

In [51]:
book_43_counts_filtered.count()

4296

### Q12 

* Create a function called *process_RDD* that combines the relevant steps you proposed above to make it convenient to apply them to the remaining four books. Your function should accept an input text file path and:
 * Reads in the file as a textRDD
 * Cleans and splits the line using `clean_split_line`
 * Filters out the stop words
 * Returns a word count RDD where each item is a tuple of words and its count.
 


In [56]:
def process_RDD(file_path):
    book_rdd = sc.textFile(file_path)
    book_rdd_counts = book_rdd.flatMap(clean_split_line).map(lambda x: (x.lower(),1)).reduceByKey(lambda x,y: x+y)
    book_rdd_counts_filtered = book_rdd_counts.filter(lambda x: x[0] not in ENGLISH_STOP_WORDS)
    return book_rdd_counts_filtered

### Q13 

Apply the function `process_RDD` to `book_84`, `book_398` and `book_3296` and save the results to variables `book_84_counts_filtered`, `book_398_counts_filtered` and `book_3296_counts_filtered` respectively. How many distinct words does each book contain after filtering the stop words.


In [62]:
book_84_counts_filtered = process_RDD("data/84.txt")
book_398_counts_filtered = process_RDD("data/398.txt")
book_3296_counts_filtered = process_RDD("data/3296.txt")

print("Book 84 count is: ", book_84_counts_filtered.count())
print("Book 398 count is: ", book_398_counts_filtered.count())
print("Book 3296 count is: ", book_3296_counts_filtered.count())

Book 84 count is:  7016
Book 398 count is:  2421
Book 3296 count is:  7293


### Q14 

We discussed how to evaluate similarity between two texts using the number of words they share. We hypothesized that books that are similar should have more words in common than books that are dissimilar. If that holds, `book_398` and `book_3296`, which both pertain to religion, will have more words in common than, say, `book_84` and `book_398`. Test this hypothesis by writing code that compares and prints the number of words shared between `book_398` and `book_3296` and then between `book_84` and `book_398`.


In [73]:
print("number of words shared between book_398 and book_3296 is:")
print(book_398_counts_filtered.map(lambda x: x[0]).intersection(book_3296_counts_filtered.map(lambda x: x[0])).count())
print("number of words shared between book_84 and book_3296 is:")
print(book_84_counts_filtered.map(lambda x: x[0]).intersection(book_3296_counts_filtered.map(lambda x: x[0])).count())

number of words shared between book_398 and book_3296 is:
1790
number of words shared between book_84 and book_3296 is:
3608


### Q15

* Based on the above, do you think counting the number of shared words is a good idea as a distance metric for evaluating topic similarity? Justify your answer?
* Hint: What do *book_84* and *book_3296* have in common? 

######  ANSWER
Not a good idea. Both book_84 and book_3296 are moch larger than the other books are most likely to have more words in common just by virtue of their length?

## Part II 

Another approach to estimating similarity consists of computing the Euclidean distance across a set of words. For example, suppose we have 3 documents A, B and C with the following counts for the words `evolution`, `DNA`, `biology` and `finance`. 

```python 
A = [4, 9, 6, 8]
B = [3, 7, 7, 10]
C = [15, 10, 1, 1]
```
Although all documents contain exactly the four words, the number of times these words appear in each book may be indicative of thier topic. For example, documents `A` and `B` are more likely to be business related since they contain the word `finance` more frequently (8 and 10 times respectively). Document `C` may be a technical document since it focuses on more technical words (`evolution` and `DNA`) and less on the words `finance`.

The Euclidean distance, which can be computed using the `scikit` snippet below, is more indicative of topic-relatedness between the two documents.

```python
from scipy.spatial.distance import euclidean 
print(f"The Euclidean distance between A and B is: {euclidean(A, B)}")

print(f"The Euclidean distance between A and C is: {euclidean(A, C)}")

print(f"The Euclidean distance between B and C is: {euclidean(B, C)}")
```


In [77]:
from scipy.spatial.distance import euclidean 

A = [4, 9, 6, 8]
B = [3, 7, 7, 10]
C = [15, 10, 1, 1]

print(f"The Euclidean distance between A and B is: {euclidean(A, B)}")
print(f"The Euclidean distance between A and C is: {euclidean(A, C)}")
print(f"The Euclidean distance between B and C is: {euclidean(B, C)}")

The Euclidean distance between A and B is: 3.1622776601683795
The Euclidean distance between A and C is: 14.0
The Euclidean distance between B and C is: 16.431676725154983


### Q16

To calculate the Euclidean distance, we must first identify the set of words by which we will compare the documents. Here, we will explore the words that are common to all 4 documents. We will store the data in a matrix called `counts_matrix`.

Start by finding the words that are common to all four documents after stop-word filtering and store the counts for each word in a column of `counts_matrix`. 

To take the previous example, you can generate an emtpy matrix with 3 lines (documents `A`, `B` and `C`) and 4 columns (words `evolution`, `DNA`, `biology` and `finance`) using the following code.

```python
import numpy as np
counts_matrix = np.zeros([3,4])
```

After generting the counts, you can fill the counts for a document, say `A`, using the following code:

```python
counts_matrix[0, :] = [4, 9, 6, 8] 
```
* Other than for building `counts_matrix` you should exclusively use operations or actions on the `RDD` to answer this question. 
  * Code that uses methods such as `some_rdd.X().Y().Z()...` is allowed
  * Code that uses functions such as `some_func(...)` is not allowed


In [96]:
common_words = (
book_43_counts_filtered.
    map(lambda x: x[0]).
    intersection(book_84_counts_filtered.map(lambda x: x[0])).
    intersection(book_398_counts_filtered.map(lambda x: x[0])).
    intersection(book_3296_counts_filtered.map(lambda x: x[0]))
).collect()
counts_matrix = np.zeros([4,len(common_words)])

x = book_43_counts_filtered.filter(lambda x: x[0] in common_words).collect()
counts_matrix[0,:]= [x[1] for x in x]

x = book_84_counts_filtered.filter(lambda x: x[0] in common_words).collect()
counts_matrix[1,:]= [x[1] for x in x]

x = book_398_counts_filtered.filter(lambda x: x[0] in common_words).collect()
counts_matrix[2,:]= [x[1] for x in x]

x = book_3296_counts_filtered.filter(lambda x: x[0] in common_words).collect()
counts_matrix[3,:]= [x[1] for x in x]

### Q17

Compute the Euclidean distance between `book_398` and `book_3296`, which both talk about religion and `book_84` and `book_398`. What do you conclude about using the Euclidean distance for evaluating topic relatedness across documents?


In [99]:
print(f"The Euclidean distance between  \
      book_398 and book_3296 is: {euclidean(counts_matrix[2], counts_matrix[3])}")
print(f"The Euclidean distance between \
      book_84 and book_398 is: {euclidean(counts_matrix[1], counts_matrix[2])}")


The Euclidean distance between        book_398 and book_3296 is: 1470.8415958219293
The Euclidean distance between       book_84 and book_398 is: 867.2508287687017


Again, the counts are biased simply due to the fact that both have more words, 

### Q18

Bonus question (5 points): Can you think of a few things we could do to improve similarity between documents that pertain to the same topic. Jutify your answer without given codem

#### Answer

* Normalize the data by the total number of words in the bool
* Select words that are common to all four biases words that are common to one topic (ex. Religion). All those words, which are highly specific are missing.
* Ultimately, words alone don't mean much. If a book uses the words Religion and Dogma and the second book uses the words faith and belief, the match would be nil. If, however, we could use synonyms, we would know that all four 4 words are synonymous. 

Part III

In this part we will build some basic analytics for a dataset consisting of flight arrival and departure details for all commercial flights within the USA in a single month. While this dataset can be managed uisng Pandas (<1M records), scaling to a yearly or longer reports requires using spark.

Here, you should use exclusively `SparkDatFrames. 

Here we are interested in analyzing this dataset to allow you to better schedule trips. For example:
 * avoid carriers that are most often associated with delays
 * avoid departure days where delays are most frequent
 * avoid airport wich are associate with delays or long taxxying time
 

The information about the fields can be found [here](https://dataverse.harvard.edu/dataset.xhtml;jsessionid=0414e25969eccd0e88ae4d64fa0b?persistentId=doi%3A10.7910%2FDVN%2FHG7NV7&version=&q=&fileTypeGroupFacet=&fileTag=%221.+Documentation%22&fileSortField=date&fileSortOrder=desc)


In [None]:
QX

Load the file `flight_info.csv` into a spark DataFrame called `fight_info`.

  * note that you will have to create sparkSession prior to loading the data
  
* How many entries does the file contains.




In [None]:
from pyspark.sql import SparkSession
session = SparkSession(sc)
fight_info  = session.read.csv('data/flight_info.csv', header=True)
fight_info.count()

In [None]:
QX Use pySpark-SQL or pandas like syntax to compute howmany airlines are represented in this dataset?
The airline info is stored in a field called
* UniqueCarrier: Represents the unique carrier code (ex.AA = American Airlines) 


In [None]:
session.catalog.dropTempView("flights")
fight_info.createTempView("flights")

In [None]:
fight_info.head(1)

In [None]:

session.sql("SELECT DISTINCT(UniqueCarrier) FROM flights").collect()

In [None]:
QX 
This files contains various field which we will explore the file stored in this file throughtout this third part. Three of the fields we are interested in are for this question are:

* CRSDepTime:  Represents the scheduled departure time
* DepTime: Represents the actual departure time

Compute and plot the number of flight delayed per each carried code represented in this dataset. Sort the data by descreasing order of delays.
  * A delay is observed when `DepTime` > `CRSDepTime`

In [None]:
query = """
SELECT UniqueCarrier, count(*) as COUNT
FROM flights 
WHERE DepTime > CRSDepTime 
GROUP BY UniqueCarrier
ORDER BY COUNT DESC
"""
session.sql(query).collect()

In [None]:
QX Use the file "airlines.csv" to find the name of the airline. Here, you are required to load the the file as a pyspark DataFrame; call it airlines_info, Repeat the query above by including the flights.csv file in your query (do a join) so that you can also display the full name of the carrier (second column). the carrier code in the airlines.csv file is provided in the 4th (1-based) column

Note that the file airlines.csv does not have column headers. Print a single line of your dataset to see which names were given to the column by Spark. Use that name in your query.

The result will look (approximately) like (only three entries are provided):

```
[Row(UniqueCarrier='WN', first(_c1)='Southwest Airlines', count=SOME_count),
 Row(UniqueCarrier='DL', first(_c1)='Delta Air Lines', count=SOME_count),
 Row(UniqueCarrier='AA', first(_c1)='American Airlines', count=SOME_count),
 ...
 ]
```

In [None]:
airlines_info  = session.read.csv('data/airlines.csv', header=False)
session.catalog.dropTempView("airlines")
airlines_info.createTempView("airlines")
airlines_info.head(1)

In [None]:
query = """
SELECT UniqueCarrier, first(_c1), count(*) as COUNT
FROM flights, airlines 
WHERE DepTime > CRSDepTime and UniqueCarrier == _c3
GROUP BY UniqueCarrier
ORDER BY COUNT DESC
"""
session.sql(query).collect()

Qx Compute the number of delays percompany per day. The day is encoded as an integer in the column `DayOfWeek` in `fight_info`. 
Sort the data by Airline (UniqueCarrier) and bu increasing values of DayOfWeek


You results should like the following


XXX


In [None]:
query = """
SELECT UniqueCarrier, DayOfWeek, count(*) as COUNT
FROM flights
WHERE DepTime > CRSDepTime
GROUP BY UniqueCarrier, DayOfWeek
ORDER BY UniqueCarrier DESC, DayOfWeek ASC
"""
session.sql(query).collect()

In [None]:
QX Counting the number of delayed flights per airline is misleading, as airlines with more flights are more likley to have delays that companies withsubstantially fiewer flights. 

Repeat the same query above but, for each carried, devide the number of delays by the total number of flights for that carrier



In [None]:
CODE GOES HERE

In [None]:
Time the query above. HOw long did it take


In [None]:
CODE GOES HERE

In [None]:
Use one of the techniques covered in class to acceelarate this query. Time you query to see by how much the run time was improved

In [None]:
QX. Is the departure delay in minutes (i.e., DepTime - CRSDepTime) predictive of the arrival delay (ArrTime > CRSArrTime)?
Use any approach desired (ex. skelearn which we covered in class or Spark which we did not cover) to answer the question 

CODE GOES HERE