## CS431/631 Data Intensive Distributed Analytics
### Winter 2019 - Assignment 2
---

**Please edit this (text) cell to provide your name and UW student ID number!**
* **Name:** _replace this with your name_
* **ID:** _replace this with your UW student ID number_

---
#### Overview
For this assignment, you will be using Python and Spark to analyze the [pointwise mutual information (PMI)](http://en.wikipedia.org/wiki/Pointwise_mutual_information) of tokens in the text of Shakespeare's plays.    For this assignment, you will need the same text file (`Shakespeare.txt`) and Python tokenizer module, `simple_tokenize.py`, that you used for the first two assignments.    You will also use the same definition of PMI that was used for [Assignment 1](https://roegiest.com/bigdata-2019w/assignment1-431.html).

To use Spark from within a Python program, it is first necessary to tell the Python interpreter where the Spark installation is located.   You will be using the Spark installation in the CS451 course account.   The code in the following cell tells Python how to find this Spark installation.   Before going on, execute that code (by selecting the cell and hitting 'return' while holding down the shift key).   It will take a few seconds to run, and will produce no output.

In [2]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [3]:
import sys
!{sys.executable} -m pip install findspark



In [4]:
import findspark
findspark.init("/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7")

from pyspark import SparkContext, SparkConf

Once Python knows where Spark is located, you can create a `SparkContext`.   All Spark commands must run within an active `SparkContext`.   The code below will create a `SparkContext`, and store a reference to the context in the variable `sc`. 
The `appName` parameter assigns a name of your choosing to the Spark jobs that are created in this context - this is useful mostly for debugging.   The `master` parameter indicates that Spark jobs will run in local mode, using two threads.   This means that your Spark jobs are not really running on a cluster (since we do not have a Spark cluster in the CS student computing environment), and are instead running in a single process on the local machine.   You program Spark jobs the same way whether they run in local mode or on a cluster - the main difference between local and cluster modes is, of course, performance.

Run the code in the cell below to create a Spark context.   Creating the `SparkContext` causes your Python program (running in this notebook) to prepare to run Spark jobs, and will take a few seconds to complete.  Be sure that you run this code only one time, because a single Python program may only have one active SparkContext.

In [5]:
sc = SparkContext(appName="YourTest", master="local[2]")

Next, let's test that your `SparkContext` has been set up properly by running some simple test code (adapted from the [Spark examples page](https://spark.apache.org/examples.html)).   This code uses a single Spark job to estimate the value of $\pi$.  `parallelize()` and `filter()` are Spark *transformations*, and `count()` is a Spark *action*.   Study the code in the cell below, then go ahead and run it.   It should take a few seconds, since a Spark job is being created and executed, and should print an estimate of $\pi$ when it finishes.   

---
#### Question 1  (4/30 marks):

In the following cell, briefly explain how the $\pi$-estimation example works.   What is the Spark job doing, and how is it used to estimate the value of $\pi$?

In [8]:
import random

num_samples = 100000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()

pi_estimate = 4 * count / num_samples
print(pi_estimate)

3.14167568


#### Your answer to Question 1:

- sc - Spark Context object which represents a connection to a computing cluster.
- inside - funtion that return True if the value lies inside the circle according to the Monte-Carlo method of pi    estimation.
- parallelize - It is a Spark transformation which converts an iterable into a Resilient Distributed DataSet, also known as RDDs. This RDDs are the ones that are operated upon in the Hadoop cluster. 
- filter - Returns a new RDD containing only the elements that satisfy a predicate. In this case only the ones that satisfy the inside function. Or, in other words, the points from a uniform random distribution that falls inside the circle.
- count - It is a Spark action that counts the number of elements in the RDD. In our case, it is the number of points/co-ordinates that falls inside the circle.

- Finally pi is estimated as 4 times the count of random points that fell inside the circle divided by the number of samples.




---
### Important

###### The questions that follow ask you to implement functions whose prototypes are given to you. Do **NOT** change the prototypes of the functions. Do **NOT** write code outside of the functions. All necessary code should be included in the function body (except for import statements). You may declare functions inside of the function body. When marking, we will execute your code by calling the functions from an external program, which is why your code cannot rely on statements running outside functions. Please remove any call to the functions that you may have introduced for test purposes before submitting your notebook.

---
#### Question 2  (4/30 marks):

Now it is your turn to write some Spark programs. Start with the simple task of counting the number of *distinct* tokens which appear in `Shakespeare.txt`.   You have already written Python code to do this in Assignment 1, but for this assignment we want you to use Spark to solve the same problem.   You should compare the answer you get using Spark with the answer you got from your pure-Python solution from Assignment 1.   Both answers should, of course, be the same.

Your code should use Spark, not the Python driver code, to read `Shakespeare.txt` and do the counting.   The idea is to use Spark to give you a data-parallel alternative to the sequential Python solution you wrote for Assignment 1.

Write your solution for question 2 in the code cell below, by implementing the `count_distinct_tokens` function. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `count_distinct_tokens` must return the number of distinct tokens.

In [9]:
import re

# this captures sequences of alphabetic characters
# possibily followed by an apostrophe and more characters
# n.b. (?:...) denotes a non-capturing group


def simple_tokenize(s):
    return re.findall(r"[a-z]+(?:'[a-z]+)?", s.lower())

In [17]:
#from simple_tokenize import simple_tokenize
import re

# this captures sequences of alphabetic characters
# possibily followed by an apostrophe and more characters
# n.b. (?:...) denotes a non-capturing group


def simple_tokenize(s):
    return re.findall(r"[a-z]+(?:'[a-z]+)?", s.lower())

# Returns the count of distinct tokens in the `Shakespeare.txt` dataset
def count_distinct_tokens():
    lines = sc.textFile("../Shakespeare.txt")
    tokenized_lines = lines.flatMap(simple_tokenize)
    cnt = tokenized_lines.distinct().count()
    return cnt
    
    
cnt = count_distinct_tokens()
print(cnt)
    # your solution to Question 2 here
    

25975


---
#### Question 3  (4/30 marks):

Next, write a Spark program that will count the number of distinct token pairs in `Shakespeare.txt`, as you did in Assignment 1. Again, ensure that the answer that you get using Spark matches the answer you got in the first assignment.

Write your solution for question 3 by implementing the `count_distinct_pairs` function in the code cell below.   It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `count_distinct_pairs` must return the number of distinct token pairs.

In [15]:
#from simple_tokenize import simple_tokenize
import re

# this captures sequences of alphabetic characters
# possibily followed by an apostrophe and more characters
# n.b. (?:...) denotes a non-capturing group


def simple_tokenize(s):
    return re.findall(r"[a-z]+(?:'[a-z]+)?", s.lower())

# Returns the count of distinct pairs in the `Shakespeare.txt` dataset
def count_distinct_pairs():
    distinct_tokens = count_distinct_tokens()
    distinct_pairs = distinct_tokens*(distinct_tokens - 1)
    return distinct_pairs
    # your solution to Question 3 here
print(count_distinct_pairs())

674674650


---
#### Question 4  (6/30 marks):

Next, write Spark code that will calculate the probability $p(x)$ (as defined in Assignment 1) for every distinct token $x$ in `Shakespeare.txt`.   Your function should return the 50 highest-probability tokens, and their counts and probabilities.

Make sure that your solution calculates probabilities and identifies the 50 highest-probability tokens in a data-parallel fashion, using Spark transformations and actions.   Only the 50 highest-probability tokens (and their counts and probabilities) should be returned by Spark to your driver code.

Write your solution for question 4 by implementing the `top_50_tokens_probabilities` function in the code cell below. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `top_50_tokens_probabilities` must return a list of 50 (probability, count, token) tuples, ordered by probability, that is, the list returned by the function should be of the form: `[(proba1, count1, token1), (proba2, count2, token2), ..., (proba50, count50, token50)]`

In [21]:
import re

# this captures sequences of alphabetic characters
# possibily followed by an apostrophe and more characters
# n.b. (?:...) denotes a non-capturing group


def simple_tokenize(s):
    return re.findall(r"[a-z]+(?:'[a-z]+)?", s.lower())

lines = sc.textFile("../Shakespeare.txt")

# Returns a list of the top 50 (probability, count, token) tuples, ordered by probability
def top_50_tokens_probabilities():
    n = 50
    top_50_num = sorted(lines.flatMap(simple_tokenize).countByValue().items(), key=lambda x: x[1], reverse=True)[:50]
    top_50_tokens = []
    total_tokens = lines.flatMap(simple_tokenize).count()
    for token in top_50_num:
        prob = token[1]/total_tokens
        count = token[1]
        top_50_tokens.append((prob, count, token[0]))
    
    return top_50_tokens

print(top_50_tokens_probabilities())

[(0.030855264937383355, 27378, 'the'), (0.029394660679992426, 26082, 'and'), (0.023348254938555444, 20717, 'i'), (0.022158132951051724, 19661, 'to'), (0.01969223625724667, 17473, 'of'), (0.0165929602481224, 14723, 'a'), (0.015361138910677738, 13630, 'you'), (0.014076348128713495, 12490, 'my'), (0.01239259599866562, 10996, 'in'), (0.012301308232578688, 10915, 'that'), (0.010297485416497614, 9137, 'is'), (0.009593104505333008, 8512, 'not'), (0.008765879563261294, 7778, 'with'), (0.00876475255380343, 7777, 'me'), (0.008668956749885045, 7692, 'it'), (0.00854047767168862, 7578, 'for'), (0.007739173947147764, 6867, 'be'), (0.007730157871484858, 6859, 'his'), (0.007502501960996457, 6657, 'your'), (0.007445024478645425, 6606, 'this'), (0.007074238367008376, 6277, 'but'), (0.0070550792062246985, 6260, 'he'), (0.0066324506595259345, 5885, 'have'), (0.0064735423259672, 5744, 'as'), (0.006188408933127767, 5491, 'thou'), (0.005866084228178843, 5205, 'him'), (0.005698159818957201, 5056, 'so'), (0.00

---
#### Question 5  (6/30 marks):

Next, write the code for the function `PMI` that will take a positive integer threshold $T$ as an argument, and then return all token pairs that co-occur at least $T$ times in `Shakespeare.txt`.   For each such pair $(x,y)$, the function should also return PMI$(x,y)$, the co-occurrence count of the pair, the number of times $x$ appears, and the number of times $y$ appears. You can compare the results produced by this code with the results of Two-Token queries (from Assignment 1) for consistency.

Write your solution for question 5 by implementing the function `PMI` in the code cell below. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `PMI` should return a list of ((token1, token2), pmi, co-occurrence_count, token1_count, token2_count) tuples, that is, the list returned by the function should be of the form: `[((token1, token2), pmi, cooc_count, token1_count, token2_count), (...), ((other_token1, other_token2), other_pmi, other_cooc_count, other_token1_count, other_token2_count)]`.


As always, calculations should be done in data-parallel fashion by using Spark.

In [26]:
from simple_tokenize import simple_tokenize
from math import log

def co_occur(token1, token2):
    return token1 and token2
# Returns a list of tuples with the following format:
# ((token1, token2), pmi, co-occurrence_count, token1_count, token2_count)

def line_wise_tokens():
    with open('../Shakespeare.txt') as f:
        return list(map(lambda x: simple_tokenize(x), f))
def PMI(threshold):
    items = ['a', 'b', 'c']
    sc.parallelize(items).filter(co_occur('a', 'b')).count()
    # your solution to Question 5 here
PMI(10)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 24, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 1055, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 1055, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
TypeError: 'str' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 352, in func
    return f(iterator)
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 1055, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/pyspark/rdd.py", line 1055, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/sagar/Downloads/spark-2.4.4-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
TypeError: 'str' object is not callable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


---
#### Question 6  (6/30 marks):

Finally, write Spark code for the function `PMI_one_token`, that will take a positive integer threshold $T$ and a sample size $N$ as arguments. For every token $x$ in `Shakespeare.txt`, your code should find all tokens $y$ that co-occur with $x$ at least $T$ times, as well as PMI$(x,y)$ for each such pair, the co-occurrence count of the pair, the number of times $x$ appears, and the number of times $y$ appears.

For each $x$, the output of your program should be equivalent to the output that would be produced by a One-Token query on $x$ (see Assignment 1), with threshold $T$. Rather than producing output for all possible tokens $x$, the function should produce output only for $N$ different $x$'s, chosen at random from among all distinct tokens in the input file.

Write your solution for question 6 by implementing the function `PMI_one_token` in the code cell below. It should use the `SparkContext` which was created previously (referenced by the variable `sc`). The function `PMI_one_token` should return a list of $N$ tuples of the form `(token, [ list_of_cooccurring_tokens ])`, where each entry in `list_of_cooccurring_tokens` is of the form `((token1, token2), pmi, cooc_count, token1_count, token2_count)`.

For instance, if $N$ = 2 and the randomly selected tokens are "if" and "you", a valid format for the value returned by `PMI_one_token` would be:
```
[('if', [(('if', 'you'), -0.09813466615111513, 1975, 16624, 18237), (('if', 'a'), 0.03856379243802408, 1568, 16624, 10569)]), ('you', [(('you', 'if'), -0.09813466615111513, 1975, 18237, 16624)])]
```


In [None]:
from simple_tokenize import simple_tokenize
from math import log

# Returns a list of samp_size tuples with the following format:
# (token, [ list_of_cooccurring_tokens ])
# where list_of_cooccurring_tokens is of the form
# [((token1, token2), pmi, cooc_count, token1_count, token2_count), ...]
def PMI_one_token(threshold, samp_size):
    # your solution to Question 6 here


---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.