## CS431/631 Data Intensive Distributed Analytics
### Winter 2023 - Assignment 2
---

**Please edit this (text) cell to provide your name and UW student ID number!**
* **Name:** Chris Binoi Verghese
* **ID:** 21092999

---
#### Overview
For this assignment, you will be using Python and Spark to analyze the [pointwise mutual information (PMI)](http://en.wikipedia.org/wiki/Pointwise_mutual_information) of tokens in the text of Shakespeare's plays.    For this assignment, you will need the same text file (`Shakespeare.txt`) and Python tokenizer module, `simple_tokenize.py`, that you used for the first two assignments.    You will also use the same definition of PMI that was used for [Assignment 1](https://www.student.cs.uwaterloo.ca/~cs451/assignment1-431.html).

We first need to install Apache Spark. Run the next block to download and install Spark. It will take about a minute to finish the instalation.

In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.4/spark-3.3.4-bin-hadoop3.tgz
!tar xzf spark-3.3.4-bin-hadoop3.tgz
!pip install -q findspark

To use Spark from within a Python program, it is first necessary to tell the Python interpreter where the Spark installation is located. The code in the following cell tells Python how to find this Spark installation. This code creates SparkContext (sc) for you. Do not change this block.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.4-bin-hadoop3"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf()\
  .setAppName("YourTest")\
  .setMaster("local[*]")

sc = SparkContext.getOrCreate(spark_conf)

Once Python knows where Spark is located, you can create a `SparkContext`.   All Spark commands must run within an active `SparkContext`.   The code below will create a `SparkContext`, and store a reference to the context in the variable `sc`.
The `appName` parameter assigns a name of your choosing to the Spark jobs that are created in this context - this is useful mostly for debugging.   The `master` parameter indicates that Spark jobs will run in local mode, using two threads.   This means that your Spark jobs are not really running on a cluster, and are instead running in a single process on the local machine.   You program Spark jobs the same way whether they run in local mode or on a cluster - the main difference between local and cluster modes is, of course, performance.

Next, let's test that your `SparkContext` has been set up properly by running some simple test code.   This code uses a single Spark job to estimate the value of Euler's number $e$. One way to calculate $e$ is to use the following serries by Jacob Bernoulli:

$p_n = 1 - \frac{1}{1!} + \frac{1}{2!} - \frac{1}{3!} + \cdots + \frac{(-1)^n}{n!} = \sum_{k = 0}^n \frac{(-1)^k}{k!}$

As n tends to infinity, $p_n$ approaches $1/e$.

In the following code,  `parallelize()` and `map()` are Spark *transformations*, and `reduce()` is a Spark *action*.   Study the code in the cell below, then go ahead and run it.   It should take several seconds, since a Spark job is being created and executed, and should print an estimate of $e$ when it finishes.   


In [None]:
import math

n= 10000
inverse_e = sc.parallelize(range(0, n)).map(lambda x: ((-1)**x) * (1 / math.factorial(x))).reduce(lambda x,y:x+y)
e = 1 / inverse_e
print("e = ", e)

e =  2.718281828459044


---
#### Question 1  (4/30 marks):

In the following cell, briefly explain how the example works.   What is the Spark job doing, and how is it used to estimate the value of $e$? How accurate is our estimate?

#### Your answer to Question 1:

First, a Spark context is created, partitioning RDDs from a list ranging from 0 to 9999, such as [0, 1, 2, ..., 9999]. Subsequently, each element in the partitioned RDDs is mapped to its corresponded result creating a new RDD of transformed values using the mathematical function where x represents every element:  $$f(x)=\frac{(-1)^x}{x!}$$ Following this, RDDs from every worker are reduced(aggregated) , summing every element of the list into a single integer value. Resulting in an equation: $$inverse\_e=\sum_{x=0}^{9999} \frac{(-1)^x}{x!}$$ Lastly, the reciprocal of this value is then assigned to 'e'.


This method for calculating 'e' utilizes the Taylor series expansion for the exponential function, and the accuracy of the calculated 'e' is contingent on the value of "n," i.e., the number of terms summed in the series. The larger the number of terms summed, the more accurate the estimate. In this case, using 10000 terms gives a reasonably accurate value. The estimated value, as per the Python code, is 2.718281828459044. The accepted value of 'e' is 2.718281828459045, making this approximation extremely close.

---
### Important

###### The questions that follow ask you to implement functions whose prototypes are given to you. Do **NOT** change the prototypes of the functions. Do **NOT** write code outside of the functions. All necessary code should be included in the function body (except for import statements). You may declare functions inside of the function body. When marking, we will execute your code by calling the functions from an external program, which is why your code cannot rely on statements running outside functions. Please remove any call to the functions that you may have introduced for test purposes before submitting your notebook.

Note: On this and the following assignment you will typically be told to use Spark, not the Python driver program.
That doesn't mean "do not use Python"! It means you should do as much as possible using RDD transformations and actions, and little-to-nothing in the driver itself.

Example:

`someRDD.collect()[:10]` - this is bad! It's collecting *all* of the data onto the driver when we were only looking for 10  
`someRDD.take(10)` - much better!

Extra examples of what not to do:

`sc.parallelize(myFile.readlines())` - no! Have the cluster load the file using `sc.textFile()` instead!

`newRDD = sc.parallelize(<some computation>.collect())` - no!  
`newRDD = <some computation>` - yes!

---
#### Question 2  (4/30 marks):

Now it is your turn to write some Spark programs. Start with the simple task of counting the number of *distinct* tokens which appear in `Shakespeare.txt`.   You have already written Python code to do this in Assignment 1, but for this assignment we want you to use Spark to solve the same problem.   You should compare the answer you get using Spark with the answer you got from your pure-Python solution from Assignment 1.   Both answers should, of course, be the same.

Run the next block to download the text file (Shakespeare.txt) and the Python tokenizer module (simple_tokenize.py).

In [None]:
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/Shakespeare.txt
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/simple_tokenize.py

Your code should use Spark, not the Python driver code, to read `Shakespeare.txt` and do the counting.   The idea is to use Spark to give you a data-parallel alternative to the sequential Python solution you wrote for Assignment 1.

Write your solution for question 2 in the code cell below, by implementing the `count_distinct_tokens` function. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `count_distinct_tokens` must return the number of distinct tokens.

In [None]:
from simple_tokenize import simple_tokenize

# Returns the count of distinct tokens in the `Shakespeare.txt` dataset
def count_distinct_tokens():
    #creates an RDD of list [(x,n)]
    #x is a token and n is its count
    tokens = sc.textFile('Shakespeare.txt')\
    .flatMap(simple_tokenize)\
    .map(lambda x: (x,1))\
    .reduceByKey(lambda x,y: x+y)
    return tokens.count()

count_distinct_tokens()

25975

Since the input file is the same as it was in A1 you should get the same ~26K as you got then.

---
#### Question 3  (4/30 marks):

Next, write a Spark program that will count the number of distinct token pairs in `Shakespeare.txt`, as you did in Assignment 1. Again, ensure that the answer that you get using Spark matches the answer you got in the first assignment.

Write your solution for question 3 by implementing the `count_distinct_pairs` function in the code cell below.   It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `count_distinct_pairs` must return the number of distinct token pairs.



In [11]:
from simple_tokenize import simple_tokenize

# Returns the count of distinct pairs in the `Shakespeare.txt` dataset
def count_distinct_pairs():
    #creates an RDD of list [((x1,x2),n)]
    #(x1,x2) is a token pair from line and n is its count
    token_pairs = sc.textFile('Shakespeare.txt')\
    .map(simple_tokenize)\
    .flatMap(lambda x: [(x[i], x[j]) for i in range(len(x)) for j in range(len(x)) if x[i] != x[j]])
    distinct = token_pairs.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
    return distinct.count()

count_distinct_pairs()

1969760


Again, you should be getting just shy of 2M here, the same value you got on A1.

---
#### Question 4  (6/30 marks):

Next, write Spark code that will calculate the probability $p(x)$ (as defined in Assignment 1) for every distinct token $x$ in `Shakespeare.txt`.   Your function should return the 50 highest-probability tokens, and their counts and probabilities.

Make sure that your solution calculates probabilities and identifies the 50 highest-probability tokens in a data-parallel fashion, using Spark transformations and actions.   Only the 50 highest-probability tokens (and their counts and probabilities) should be returned by Spark to your driver code.

Write your solution for question 4 by implementing the `top_50_tokens_probabilities` function in the code cell below. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `top_50_tokens_probabilities` must return a list of 50 (probability, count, token) tuples, ordered by probability, that is, the list returned by the function should be of the form: `[(proba1, count1, token1), (proba2, count2, token2), ..., (proba50, count50, token50)]`

In [12]:
from simple_tokenize import simple_tokenize

# Returns a list of the top 50 (probability, count, token) tuples, ordered by probability
def top_50_tokens_probabilities():
  #RDD for all lines in Shakespeare.txt
  lines = sc.textFile('Shakespeare.txt')
  #Number of lines in txt file
  total_lines = lines.count()
  #List of all tokens in the file
  tokens = lines.flatMap(lambda x: set(simple_tokenize(x)))
  # Creates RDD of tuples with token count
  distinct = tokens.map(lambda x: (x,1)).reduceByKey(lambda x,y: (x+y))
  #Sorting token count using number of occurances
  sorted = distinct.map(lambda x: (x[1], x[0])).sortByKey(False)
  #Top 50 valuesof token appearances
  prob_token = sorted.map(lambda x: (x[0]/total_lines,x[0], x[1])).take(50)
  return prob_token


top_50_tokens_probabilities()

[(0.2009178657172255, 24604, 'and'),
 (0.19843538192686472, 24300, 'the'),
 (0.1523542765682928, 18657, 'i'),
 (0.148924529226347, 18237, 'to'),
 (0.1357526662202551, 16624, 'of'),
 (0.10844534452628657, 13280, 'a'),
 (0.09959332995802643, 12196, 'you'),
 (0.09430988583840991, 11549, 'my'),
 (0.08667461497003054, 10614, 'in'),
 (0.08630714204053634, 10569, 'that'),
 (0.07150206601447026, 8756, 'is'),
 (0.06720671577193814, 8230, 'not'),
 (0.06167012363422561, 7552, 'with'),
 (0.06039621747864574, 7396, 'me'),
 (0.05982459292165477, 7326, 'for'),
 (0.05836286726877787, 7147, 'it'),
 (0.054402325695340446, 6662, 'be'),
 (0.05246696826667102, 6425, 'this'),
 (0.05228731483447386, 6403, 'his'),
 (0.050899083767495794, 6233, 'your'),
 (0.05067043394469941, 6205, 'but'),
 (0.0474938346208496, 5816, 'he'),
 (0.046889545803459144, 5742, 'have'),
 (0.04132028940534714, 5060, 'thou'),
 (0.04015254209606559, 4917, 'as'),
 (0.03952375508337552, 4840, 'him'),
 (0.03949109082297604, 4836, 'so'),
 (0

Hint: "and" is the most frequent token, and "the" is close.

---
#### Question 5  (6/30 marks):

Next, write the code for the function `PMI` that will take a positive integer threshold $T$ as an argument, and then return all token pairs that co-occur at least $T$ times in `Shakespeare.txt`.   For each such pair $(x,y)$, the function should also return PMI$(x,y)$, the co-occurrence count of the pair, the number of times $x$ appears, and the number of times $y$ appears. You can compare the results produced by this code with the results of Two-Token queries (from Assignment 1) for consistency.

Write your solution for question 5 by implementing the function `PMI` in the code cell below. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `PMI` should return a list of ((token1, token2), pmi, co-occurrence_count, token1_count, token2_count) tuples, that is, the list returned by the function should be of the form: `[((token1, token2), pmi, cooc_count, token1_count, token2_count), (...), ((other_token1, other_token2), other_pmi, other_cooc_count, other_token1_count, other_token2_count)]`.


As always, calculations should be done in data-parallel fashion by using Spark.

Note: You're using a "pairs" approach here, since the key is a pair of tokens! Also, if converted to a dictionary, this list is able to answer two-token queries from A1!

In [16]:
from simple_tokenize import simple_tokenize
from math import log

# Returns a list of tuples with the following format:
# ((token1, token2), pmi, co-occurrence_count, token1_count, token2_count)
def PMI(threshold):
    #Function to calculate the PMI value of a token pair
    def PMI_value(token1_count, token2_count, line_num, pair_count):
        p_token0 = token1_count/line_num
        p_token1 = token2_count/line_num
        #Probability of occurance of token pair
        p_token_pair = pair_count/ line_num
        return log(p_token_pair/(p_token0 * p_token1),10)
    #RDD for all lines in Shakespeare.txt
    lines = sc.textFile('Shakespeare.txt')
    lines.cache()
    #Number of lines in txt file
    line_num = lines.count()
    #List of all tokens in the file
    tokens = lines.flatMap(lambda x: set(simple_tokenize(x)))
    # Creates RDD of tuples with token count
    distinct = tokens.map(lambda x: (x,1)).reduceByKey(lambda x,y: (x+y))
    distinct.cache()
    #creates an RDD of list [((x1,x2),n)]
    #(x1,x2) is a token pair from line and n is its count at or above threshold
    token_pairs = lines.map(lambda x: list(set(simple_tokenize(x))))\
    .flatMap(lambda x: [(x[i], x[j]) for i in range(len(x)) for j in range(len(x)) if x[i] != x[j]])
    thresholded = token_pairs.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)\
    .filter(lambda x: x[1]>=threshold)
    #RDD where key is the first token of the pair to add that token's appearance count
    token1 = thresholded.map(lambda x: (x[0][0], ((x[0][0], x[0][1]), x[1])))\
    .join(distinct).map(lambda x: (x[1][0][0],(x[1][0][1], x[1][1])))
    #RDD where key is the second token of the pair to add that token's appearance count
    token2 = thresholded.map(lambda x: (x[0][1], ((x[0][0], x[0][1]), x[1])))\
    .join(distinct).map(lambda x: (x[1][0][0],(x[1][0][1], x[1][1])))
    #combine the RDDS involving the 2 tokens
    combined = token1.join(token2)
    #Create the final RDD in required format and calculate PMI
    complete = combined.map(lambda x: (x[0], PMI_value(x[1][0][1], x[1][1][1], line_num, x[1][1][0]), x[1][1][0], x[1][0][1], x[1][1][1]))
    return complete.collect()



PMI(4000)

[(('to', 'the'), 0.05123525982989819, 4072, 18237, 24300),
 (('the', 'to'), 0.05123525982989819, 4072, 24300, 18237),
 (('the', 'and'), 0.0459349918330654, 5427, 24300, 24604),
 (('and', 'the'), 0.0459349918330654, 5427, 24604, 24300),
 (('of', 'the'), 0.34294075191889295, 7266, 16624, 24300),
 (('the', 'of'), 0.34294075191889295, 7266, 24300, 16624)]

---
#### Question 6  (6/30 marks):

Finally, write Spark code for the function `PMI_one_token`, that will take a positive integer threshold $T$ and a sample size $N$ as arguments. For every token $x$ in `Shakespeare.txt`, your code should find all tokens $y$ that co-occur with $x$ at least $T$ times, as well as PMI$(x,y)$ for each such pair, the co-occurrence count of the pair, the number of times $x$ appears, and the number of times $y$ appears.

For each $x$, the output of your program should be equivalent to the output that would be produced by a One-Token query on $x$ (see Assignment 1), with threshold $T$. Rather than producing output for all possible tokens $x$, the function should produce output only for $N$ different $x$'s, chosen uniformly at random from among the $x$'s with a non-empty list of co-occurrences.

If there are fewer than $N$ different $x$'s, then the function should output all of them.  In other words, if $T$ is very large, the function would output nothing.

Write your solution for question 6 by implementing the function `PMI_one_token` in the code cell below. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `PMI_one_token` should return a list of $N$ tuples of the form `(token, [ list_of_cooccurring_tokens ])`, where each entry in `list_of_cooccurring_tokens` is of the form `((token1, token2), pmi, cooc_count, token1_count, token2_count)`.

For instance, if $N$ = 2 and the randomly selected tokens are "if" and "you", a valid format for the value returned by `PMI_one_token` would be:
```
[('if', [(('if', 'you'), -0.09813466615111513, 1975, 16624, 18237), (('if', 'a'), 0.03856379243802408, 1568, 16624, 10569)]), ('you', [(('you', 'if'), -0.09813466615111513, 1975, 18237, 16624)])]
```

Hint: Sampling must be done at the very last step.
Hint: there is an action that returns a sample subset from an RDD. (`takeSample`)
Note: You should be taking a "stripes" approach here. While you *can* use a pairs approach and then convert to stripes at the end, it's not as efficient.

In [17]:
from simple_tokenize import simple_tokenize
from math import log

# Returns a list of samp_size tuples with the following format:
# (token, [ list_of_cooccurring_tokens ])
# where list_of_cooccurring_tokens is of the form
# [((token1, token2), pmi, cooc_count, token1_count, token2_count), ...]
def PMI_one_token(threshold, samp_size):
    #Function to calculate the PMI value of a token pair
    def PMI_value(token1_count, token2_count, line_num, pair_count):
        p_token0 = token1_count/line_num
        p_token1 = token2_count/line_num
        #Probability of occurance of token pair
        p_token_pair = pair_count/ line_num
        return log(p_token_pair/(p_token0 * p_token1),10)
    #RDD for all lines in Shakespeare.txt
    lines = sc.textFile('Shakespeare.txt')
    lines.cache()
    #Number of lines in txt file
    line_num = lines.count()
    #List of all tokens in the file
    tokens = lines.flatMap(lambda x: set(simple_tokenize(x)))
    # Creates RDD of tuples with token count
    distinct = tokens.map(lambda x: (x,1)).reduceByKey(lambda x,y: (x+y))
    distinct.cache()
    #creates an RDD of list [((x1,x2),n)]
    #(x1,x2) is a token pair from line and n is its count at or above threshold
    token_pairs = lines.map(lambda x: list(set(simple_tokenize(x))))\
    .flatMap(lambda x: [(x[i], x[j]) for i in range(len(x)) for j in range(len(x)) if x[i] != x[j]])
    thresholded = token_pairs.map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)\
    .filter(lambda x: x[1]>=threshold)
    #RDD where key is the first token of the pair to add that token's appearance count
    token1 = thresholded.map(lambda x: (x[0][0], ((x[0][0], x[0][1]), x[1])))\
    .join(distinct).map(lambda x: (x[1][0][0],(x[1][0][1], x[1][1])))
    #RDD where key is the second token of the pair to add that token's appearance count
    token2 = thresholded.map(lambda x: (x[0][1], ((x[0][0], x[0][1]), x[1])))\
    .join(distinct).map(lambda x: (x[1][0][0],(x[1][0][1], x[1][1])))
    #combine the RDDS involving the 2 tokens
    combined = token1.join(token2)
    #Create the final RDD in required format and calculate PMI
    complete = combined.map(lambda x: (x[0], PMI_value(x[1][0][1], x[1][1][1], line_num, x[1][1][0]), x[1][1][0], x[1][0][1], x[1][1][1]))

    #Group and map this RDD into stripes
    stripe = complete.map(lambda x: (x[0][0] , x)).groupByKey().mapValues(list)
    #Return a stripe of n random samples
    return stripe.takeSample(withReplacement=False,num=samp_size,seed=800)

PMI_one_token(4000,5)

[('to', [(('to', 'the'), 0.05123525982989819, 4072, 18237, 24300)]),
 ('of', [(('of', 'the'), 0.34294075191889295, 7266, 16624, 24300)]),
 ('the',
  [(('the', 'to'), 0.05123525982989819, 4072, 24300, 18237),
   (('the', 'and'), 0.0459349918330654, 5427, 24300, 24604),
   (('the', 'of'), 0.34294075191889295, 7266, 24300, 16624)]),
 ('and', [(('and', 'the'), 0.0459349918330654, 5427, 24604, 24300)])]

---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.