# CSE 255 Programming Assignment 5

##  Problem Statement

In this programming assignment, you will estimate intrinsic dimension by calculating the Mean Squared Distance of the entire dataset to their representative centers. You will use the K-Means API in spark to find representative centers.
All of the necessary helper code is included in this notebook. However, we advise you to go over the lecture material, the EdX videos and the corresponding notebooks before you attempt this Programming Assignment.

## Reviewing the Theory 

### Computing the intrinsic dimension of a data set
Recall from class that given any $d$ dimensional dataset, we can divide it into $n$ cells of diameter $\epsilon$ each. The relationship between $n, d, \epsilon$ is then given by:
$$
n = \frac{C}{\epsilon^d}
$$
Where $C \in I\!R$

Alternately, we may write this as:
$$
\log{n} = \log{C} + d \times \log{\frac{1}{\epsilon}}
$$

Given this expression, we can then compute the dimensionality of a dataset using:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$



Where $(n_1,\epsilon_1)$, $(n_2, \epsilon_2)$ are the number of cells and diameter of each cell at 2 different scales.

### Using K-Means to estimate intrinsic dimension
We can use K-Means to approximate the value of intrinsic dimension of a data-set. In this case, the K centers represent the cells, each with a diameter equal to the Mean Squared Distance of the entire dataset to their representative centers. The estimate for intrinsic dimension then becomes:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\sqrt{\epsilon_1}} - \log{\sqrt{\epsilon_2}}} = 2 \times \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$

## Notebook Setup 

In [1]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
from math import log
import pickle

import os

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"

In [2]:
spark = SparkSession \
    .builder \
    .getOrCreate()
sc = spark.sparkContext

## Exercises

### Exercise 1: runKmeans

#### Example
The function <font color="blue">runKmeans</font> takes as input the complete dataset rdd, the sample of the rdd on which k-means needs to be run on, the k value and the count of elements in the complete data-set. It outputs the Mean Squared Distance(MSD) of all the points in the dataset from their closest centers, where the centers are calculated using k-means.

**<font color="magenta" size=2>Example Code</font>**
``` python
rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())
```

**<font color="blue" size=2>Example Output</font>**
``` python
2.0
```

<font color="red">**Hint : **</font> You might find [K-Means API](https://spark.apache.org/docs/2.2.0/mllib-clustering.html#k-means) useful. Ensure that the initializationMode parameter is set to kmeans++. The computeCost function gives you the sum of squared distances. Ensure you set maxIterations to 1 since we only need approximate values for the K centers to estimate intrinsic dimension. Setting this to higher values will lead to large execution times while barely affecting the calculation of the intrinsic dimension.

In [3]:
def runKmeans(data, sample_dataset, k, count):
    ###
    ### YOUR CODE HERE
    ###
    clusters = KMeans.train(sample_dataset, k, maxIterations=1, initializationMode="k-means||")
    return clusters.computeCost(data)/count

In [12]:
rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())

2.0

In [16]:
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())

Py4JJavaError: An error occurred while calling o84.computeCostKmeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 74, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed
	at scala.Predef$.require(Predef.scala:212)
	at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:507)
	at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:590)
	at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:564)
	at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:558)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:558)
	at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:581)
	at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$2.apply(KMeansModel.scala:89)
	at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$2.apply(KMeansModel.scala:89)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212)
	at scala.collection.AbstractIterator.fold(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1090)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1090)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2123)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2123)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35)
	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34)
	at org.apache.spark.mllib.clustering.KMeansModel.computeCost(KMeansModel.scala:89)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.computeCostKmeansModel(PythonMLLibAPI.scala:379)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed
	at scala.Predef$.require(Predef.scala:212)
	at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:507)
	at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:590)
	at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:564)
	at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:558)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
	at org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:558)
	at org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:581)
	at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$2.apply(KMeansModel.scala:89)
	at org.apache.spark.mllib.clustering.KMeansModel$$anonfun$2.apply(KMeansModel.scala:89)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.fold(TraversableOnce.scala:212)
	at scala.collection.AbstractIterator.fold(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1090)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1$$anonfun$19.apply(RDD.scala:1090)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2123)
	at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2123)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


### Exercise 2: computeIntrinsicDimension

#### Example
The function <font color="blue">computeIntrinsicDimension</font> takes as input a pair of values $(n1, e1)$, $(n2, e2)$ and computes the intrinsic dimension as output. $e1, e2$ are the mean squared distances of data-points from their closest center

**<font color="magenta" size=2>Example Code</font>**
``` python
n1 = 10
n2 = 100
e1 = 10000
e2 = 100
computeIntrinsicDimension(n1, e1, n2, e2)
```

**<font color="blue" size=2>Example Output</font>**
``` python
1.0
```
<font color="red">**Hint : **</font> Use the last formula in the theory section 

In [4]:
def computeIntrinsicDimension(n1, e1, n2, e2):
    ###
    ### YOUR CODE HERE
    ###
    return 2*(log(n2)-log(n1))/(log(e1)-log(e2))

In [20]:
n1 = 10
n2 = 100
e1 = 10000
e2 = 100
computeIntrinsicDimension(n1, e1, n2, e2)

1.0

### Exercise 3: Putting it all together

#### Example
Now we run K-means for various values of k and use these to estimate the intrinsic dimension of the dataset. Since the dataset might be very large, running kmeans on the entire dataset to find k representative centers may take a very long time. To overcome this, we sample a subset of points from the complete dataset and run Kmeans only on these subsets. We will run Kmeans on 2 different subset sizes: 10000, 20000 points. We will be estimating the MSD for K values of 10, 200, 700, 2000.


The function <font color="blue">run</font> takes a dataframe containing the complete data-set as input and needs to do the following:
* For each sample size S
 * Take the first S elements from the dataframe
 * For each value of K (number of centroids)
  * Call runKmeans(data,S,K,data_count)
* Use the MSD values generated to calculate the intrinsic dimension where $(n_1, n_2) \in \{(10,200),(200,700),(700,2000)\}$.  

**NOTE: Ensure you the format of your output is identical to the one given below, i.e. the keys have to be of the format:**
```python
ID_<Subset_size>_<K-Value-1>_<K-Value-2>
```

**<font color="magenta" size=2>Example Code</font>**
``` python

df = spark.read.parquet(file_path)
run(df)
```

**<font color="blue" size=2>Example Output</font>**
``` python
{'ID_10000_10_200': 1.5574966096390015, 'ID_10000_200_700': 1.3064513902343675, 'ID_10000_700_2000': 1.091310378488035, 'ID_20000_10_200': 1.518279780870003, 'ID_20000_200_700': 1.2660237819996782, 'ID_20000_700_2000': 1.0151131917703071}
```
**Note: The output here is the output of the below function, i.e., the value stored in the variable where the 'run' function is called**

In [31]:
def run(df):
    ###
    ### YOUR CODE HERE
    ###
    rdd = df.rdd.map(list)
    dict = {}
    e_10000_10 = runKmeans(rdd, sc.parallelize(rdd.take(10000)), 10, rdd.count())
    e_10000_200 = runKmeans(rdd, sc.parallelize(rdd.take(10000)), 200, rdd.count())
    e_10000_700 = runKmeans(rdd, sc.parallelize(rdd.take(10000)), 700, rdd.count())
    e_10000_2000 = runKmeans(rdd, sc.parallelize(rdd.take(10000)), 2000, rdd.count())
    dict["ID_10000_10_200"] = computeIntrinsicDimension(10, e_10000_10, 200, e_10000_200)
    dict["ID_10000_200_700"] = computeIntrinsicDimension(200, e_10000_200, 700, e_10000_700)
    dict["ID_10000_700_2000"] = computeIntrinsicDimension(700, e_10000_700, 2000, e_10000_2000)
    e_20000_10 = runKmeans(rdd, sc.parallelize(rdd.take(20000)), 10, rdd.count())
    e_20000_200 = runKmeans(rdd, sc.parallelize(rdd.take(20000)), 200, rdd.count())
    e_20000_700 = runKmeans(rdd, sc.parallelize(rdd.take(20000)), 700, rdd.count())
    e_20000_2000 = runKmeans(rdd, sc.parallelize(rdd.take(20000)), 2000, rdd.count())
    dict["ID_20000_10_200"] = computeIntrinsicDimension(10, e_20000_10, 200, e_20000_200)
    dict["ID_20000_200_700"] = computeIntrinsicDimension(200, e_20000_200, 700, e_20000_700)
    dict["ID_20000_700_2000"] = computeIntrinsicDimension(700, e_20000_700, 2000, e_20000_2000)
    return dict

In [7]:
file_path_small = "./hw5-small.parquet"
df = spark.read.parquet(file_path_small)

In [32]:
run(df)

{'ID_10000_10_200': 1.5420010301427032,
 'ID_10000_200_700': 1.2647693703846206,
 'ID_10000_700_2000': 1.1337084266097024,
 'ID_20000_10_200': 1.5401239609820971,
 'ID_20000_200_700': 1.2403512517265063,
 'ID_20000_700_2000': 1.0382740534490769}

In [34]:
# this function allows a +/-15% tolerance margin 
# while comparing student's answer against the solution
def within_tolerance(required_answer, student_answer, tolerance = 0.15):
            tolerance_value = required_answer * tolerance
            return required_answer - tolerance_value <= student_answer <= required_answer + tolerance_value

In [35]:
file_path_small = "./hw5-small.parquet"
df = spark.read.parquet(file_path_small)
res = run(df)

In [36]:
assert within_tolerance(1.5528485676876376, res['ID_10000_10_200'])

In [37]:
assert within_tolerance(1.2563867109654632, res['ID_10000_200_700'])

In [38]:
assert within_tolerance(1.2041956732161911, res['ID_10000_700_2000'])

In [39]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [40]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [41]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [42]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [43]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [44]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [45]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [46]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [47]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###


In [48]:
###
### AUTOGRADER TEST - DO NOT REMOVE
###
