# CSE 255 Programming Assignment 5

##  Problem Statement

In this programming assignment, you will estimate intrinsic dimension by calculating the Mean Squared Distance of the entire dataset to their representative centers. You will use the K-Means API in spark to find representative centers.
All of the necessary helper code is included in this notebook. However, we advise you to go over the lecture material, the EdX videos and the corresponding notebooks before you attempt this Programming Assignment.

## Reviewing the Theory 

### Computing the intrinsic dimension of a data set
Recall from class that given any $d$ dimensional dataset, we can divide it into $n$ cells of diameter $\epsilon$ each. The relationship between $n, d, \epsilon$ is then given by:
$$
n = \frac{C}{\epsilon^d}
$$
Where $C \in I\!R$

Alternately, we may write this as:
$$
\log{n} = \log{C} + d \times \log{\frac{1}{\epsilon}}
$$

Given this expression, we can then compute the dimensionality of a dataset using:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$



Where $(n_1,\epsilon_1)$, $(n_2, \epsilon_2)$ are the number of cells and diameter of each cell at 2 different scales.

### Using K-Means to estimate intrinsic dimension
We can use K-Means to approximate the value of intrinsic dimension of a data-set. In this case, the K centers represent the cells, each with a diameter equal to the Mean Squared Distance of the entire dataset to their representative centers. The estimate for intrinsic dimension then becomes:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\sqrt{\epsilon_1}} - \log{\sqrt{\epsilon_2}}} = 2 \times \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$

## Notebook Setup 

In [1]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
from math import log
import pickle

In [2]:
spark = SparkSession \
    .builder \
    .getOrCreate()
sc = spark.sparkContext

## Exercises

### Exercise 1: runKmeans

#### Example
The function <font color="blue">runKmeans</font> takes as input the complete dataset rdd, the sample of the rdd on which k-means needs to be run on, the k value and the count of elements in the complete data-set. It outputs the Mean Squared Distance(MSD) of all the points in the dataset from their closest centers, where the centers are calculated using k-means.

**<font color="magenta" size=2>Example Code</font>**
``` python
rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())
```

**<font color="blue" size=2>Example Output</font>**
``` python
2.0
```

<font color="red">**Hint : **</font> You might find [K-Means API](https://spark.apache.org/docs/2.2.0/mllib-clustering.html#k-means) useful. Ensure that the initializationMode parameter is set to kmeans++. The computeCost function gives you the sum of squared distances. Ensure you set maxIterations to 1 since we only need approximate values for the K centers to estimate intrinsic dimension. Setting this to higher values will lead to large execution times while barely affecting the calculation of the intrinsic dimension.

In [3]:
def runKmeans(data, sample_dataset, k, count):
    clusters = KMeans.train(sample_dataset, k, maxIterations=1)
    
    sum_of_squared_distances = clusters.computeCost(data)
    
    mean_squared_distances = sum_of_squared_distances/count
    
    return mean_squared_distances



In [4]:
KMeans

pyspark.mllib.clustering.KMeans

In [5]:
rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])

In [6]:
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())

2.0

### Exercise 2: computeIntrinsicDimension

#### Example
The function <font color="blue">computeIntrinsicDimension</font> takes as input a pair of values $(n1, e1)$, $(n2, e2)$ and computes the intrinsic dimension as output. $e1, e2$ are the mean squared distances of data-points from their closest center

**<font color="magenta" size=2>Example Code</font>**
``` python
n1 = 10
n2 = 100
e1 = 10000
e2 = 100
computeIntrinsicDimension(n1, e1, n2, e2)
```

**<font color="blue" size=2>Example Output</font>**
``` python
1.0
```
<font color="red">**Hint : **</font> Use the last formula in the theory section 

In [7]:
def computeIntrinsicDimension(n1, e1, n2, e2):
    return 2*(log(n2)-log(n1))/(log(e1)-log(e2))


In [8]:
n1 = 10
n2 = 100
e1 = 10000
e2 = 100
computeIntrinsicDimension(n1, e1, n2, e2)

1.0

### Exercise 3: Putting it all together

#### Example
Now we run K-means for various values of k and use these to estimate the intrinsic dimension of the dataset. Since the dataset might be very large, running kmeans on the entire dataset to find k representative centers may take a very long time. To overcome this, we sample a subset of points from the complete dataset and run Kmeans only on these subsets. We will run Kmeans on 2 different subset sizes: 10000, 20000 points. We will be estimating the MSD for K values of 10, 200, 700, 2000.


The function <font color="blue">run</font> takes a dataframe containing the complete data-set as input and needs to do the following:
* For each sample size S
 * Take the first S elements from the dataframe
 * For each value of K (number of centroids)
  * Call runKmeans(data,S,K,data_count)
* Use the MSD values generated to calculate the intrinsic dimension where $(n_1, n_2) \in \{(10,200),(200,700),(700,2000)\}$.  

**NOTE: Ensure you the format of your output is identical to the one given below, i.e. the keys have to be of the format:**
```python
ID_<Subset_size>_<K-Value-1>_<K-Value-2>
```

**<font color="magenta" size=2>Example Code</font>**
``` python

df = spark.read.parquet(file_path)
run(df)
```

**<font color="blue" size=2>Example Output</font>**
``` python
{'ID_10000_10_200': 1.5574966096390015, 'ID_10000_200_700': 1.3064513902343675, 'ID_10000_700_2000': 1.091310378488035, 'ID_20000_10_200': 1.518279780870003, 'ID_20000_200_700': 1.2660237819996782, 'ID_20000_700_2000': 1.0151131917703071}
```
**Note: The output here is the output of the below function, i.e., the value stored in the variable where the 'run' function is called**

In [9]:
def run(df):
    output = {}
    for sample_size in [10000,20000]:
        last_k,last_msd = -1,-1
        for k in [10,200,700,2000]:
            msd = runKmeans(df,sc.parallelize(df.take(sample_size)),k,sample_size)
            if last_k == -1:
                last_k, last_msd = k, msd
                continue
            else:
                key = 'ID_%d_%d_%d'%(sample_size,last_k,k)
                val = computeIntrinsicDimension(last_k, last_msd, k, msd)
                output[key] = val
                last_k, last_msd = k, msd
    return output


In [10]:
# this function allows a +/-15% tolerance margin 
# while comparing student's answer against the solution
def within_tolerance(required_answer, student_answer, tolerance = 0.15):
            tolerance_value = required_answer * tolerance
            return required_answer - tolerance_value <= student_answer <= required_answer + tolerance_value

In [12]:
file_path_small = "./hw5-small.parquet"
df = spark.read.parquet(file_path_small)
res = run(df)

Py4JJavaError: An error occurred while calling o114.trainKMeansModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 82, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\mllib\linalg\__init__.py", line 83, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.sql.types.Row'> into Vector

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
	at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:572)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:561)
	at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:386)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:282)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:251)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:233)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainKMeansModel(PythonMLLibAPI.scala:367)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "C:\Users\clain\Anaconda3\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\mllib\linalg\__init__.py", line 83, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.sql.types.Row'> into Vector

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [13]:
df.head()

Row(col0='0.45152887379472995', col1='0.2260514832989532', col2='0.34592042250000005', col3='0.20345309649337506', col4='0.542974354273149', col5='0.2260514832989532', col6='0.34592042250000005', col7='0.20345309649337506')

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

## Spark 2.3
将dataFrame转为RDD，再用KMeans训练  
**方式一**  
```python
cols = df.columns

expr = [col(c).cast("Double").alias(c) 
        for c in df.columns]

df2 = df.select(*expr)
vectorAss = VectorAssembler(inputCols=cols, outputCol="features")
vdf = vectorAss.transform(df2)

data = vdf.rdd.map(lambda row: [x for x in row['features']])
```

**方式二**  
```python
data = df.rdd.map(lambda line: [float(x) for x in line])
```

## Spark 2.4
KMeans.fit 接受dataframe作为输入，只要frame中含有fearures字段，所以按照上诉方式一中的得到vdf后，就可以用来训练  
<img src="./Spark2.4.0-Estimator-input.jpg">

In [15]:
cols = df.columns

expr = [col(c).cast("Double").alias(c) 
        for c in df.columns]

df2 = df.select(*expr)
vectorAss = VectorAssembler(inputCols=cols, outputCol="features")
vdf = vectorAss.transform(df2)

data = vdf.rdd.map(lambda row: [x for x in row['features']])


In [16]:
data = df.rdd.map(lambda line: [float(x) for x in line])

In [None]:
print(data.getNumPartitions())

 WARN TaskSetManager: Stage 917 contains a task of very large size (325 KB). The maximum recommended task size is 100 KB.

In [None]:
%%timeit
res = run(data)

In [None]:
data5 = data.repartition(5)
print(data5.getNumPartitions())

 Stage 44 contains a task of very large size (147 KB). The maximum recommended task size is 100 KB.

In [None]:
%%timeit
res = run(data5)

In [None]:
data10 = data.repartition(10)
print(data10.getNumPartitions())

Stage 1054 contains a task of very large size (325 KB). The maximum recommended task size is 100 KB.

In [None]:
%%timeit
res = run(data10)

In [None]:
res

In [None]:
assert within_tolerance(1.5528485676876376, res['ID_10000_10_200'])

In [None]:
assert within_tolerance(1.2563867109654632, res['ID_10000_200_700'])

In [None]:
assert within_tolerance(1.2041956732161911, res['ID_10000_700_2000'])

In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


## Quiz based on PA 5 

**computeIntrinsicDimension**  
0.0/25.0 points (graded)  
What would computeIntrinsicDimension(700, 700, 200, 20000) return? (Round to 3 decimal places)    
**Answer: 0.7473811426956143**

In [None]:
computeIntrinsicDimension(700, 700, 200, 20000)

**ID_20000_10_200**   
0.0/25.0 points (graded)
Following the PA5 notebook definitions, What value does ID_20000_10_200 have?  
**Answer: 1.564420312148605**  
**+/- 15% range:[1.3297572653263143,1.7990833589708959]**



In [None]:
res['ID_20000_10_200']

**ID_20000_200_700**
0.0/25.0 points (graded)  
Following the PA5 definition, what value does ID_20000_200_700 have?  
**Answer:1.2150732884765727**   
**+/- 15% range:1.0328122952050869, 1.3973342817480585**


In [None]:
res['ID_20000_200_700']

**ID_20000_700_2000**
0.0/25.0 points (graded)  
Following the PA5 definition, what value does ID_20000_200_700 have?  
**Answer:1.1196299391601852**   
**+/- 15% range:0.9516854482861574, 1.287574430034213**


In [None]:
res['ID_20000_700_2000']