# CSE 255 Programming Assignment 5

##  Problem Statement

In this programming assignment, you will estimate intrinsic dimension by calculating the Mean Squared Distance of the entire dataset to their representative centers. You will use the K-Means API in spark to find representative centers.
All of the necessary helper code is included in this notebook. However, we advise you to go over the lecture material, the EdX videos and the corresponding notebooks before you attempt this Programming Assignment.

## Reviewing the Theory 

### Computing the intrinsic dimension of a data set
Recall from class that given any $d$ dimensional dataset, we can divide it into $n$ cells of diameter $\epsilon$ each. The relationship between $n, d, \epsilon$ is then given by:
$$
n = \frac{C}{\epsilon^d}
$$
Where $C \in I\!R$

Alternately, we may write this as:
$$
\log{n} = \log{C} + d \times \log{\frac{1}{\epsilon}}
$$

Given this expression, we can then compute the dimensionality of a dataset using:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$



Where $(n_1,\epsilon_1)$, $(n_2, \epsilon_2)$ are the number of cells and diameter of each cell at 2 different scales.

### Using K-Means to estimate intrinsic dimension
We can use K-Means to approximate the value of intrinsic dimension of a data-set. In this case, the K centers represent the cells, each with a diameter equal to the Mean Squared Distance of the entire dataset to their representative centers. The estimate for intrinsic dimension then becomes:
$$
d = \frac{\log{n_2} - \log{n_1}}{\log{\sqrt{\epsilon_1}} - \log{\sqrt{\epsilon_2}}} = 2 \times \frac{\log{n_2} - \log{n_1}}{\log{\epsilon_1} - \log{\epsilon_2}}
$$

## Notebook Setup 

In [2]:
from pyspark.mllib.clustering import KMeans, KMeansModel
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as fn
from math import log
import pickle

In [27]:
""" example"""

# from numpy import array
# from math import sqrt

# from pyspark.mllib.clustering import KMeans, KMeansModel


# data = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])
# # parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# sdata=sc.parallelize([(1,1),(2,2),(3,3)])
# # Build the model (cluster the data)
# clusters = KMeans.train(data, 3, maxIterations=1, initializationMode="kmeans++")

# # Evaluate clustering by computing Within Set Sum of Squared Errors
# def error(point):
#     center = clusters.centers[clusters.predict(point)]
#     return sqrt(sum([x**2 for x in (point - center)]))

# WSSSE = sdata.map(lambda point: error(point)).reduce(lambda x, y: x + y)
# print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
# clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
# sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")

' example'

In [3]:
spark = SparkSession \
    .builder \
    .getOrCreate()
sc = spark.sparkContext

## Exercises

### Exercise 1: runKmeans

#### Example
The function <font color="blue">runKmeans</font> takes as input the complete dataset rdd, the sample of the rdd on which k-means needs to be run on, the k value and the count of elements in the complete data-set. It outputs the Mean Squared Distance(MSD) of all the points in the dataset from their closest centers, where the centers are calculated using k-means.

**<font color="magenta" size=2>Example Code</font>**
``` python
rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])
runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())
```

**<font color="blue" size=2>Example Output</font>**
``` python
2.0
```

<font color="red">**Hint : **</font> You might find [K-Means API](https://spark.apache.org/docs/2.2.0/mllib-clustering.html#k-means) useful. Ensure that the initializationMode parameter is set to kmeans++. The computeCost function gives you the sum of squared distances. Ensure you set maxIterations to 1 since we only need approximate values for the K centers to estimate intrinsic dimension. Setting this to higher values will lead to large execution times while barely affecting the calculation of the intrinsic dimension.

In [28]:
def runKmeans(data, sample_dataset, k, count):
    clusters = KMeans.train(sample_dataset, k, maxIterations=1, initializationMode="kmeans++")

    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sum([x**2 for x in (point - center)])

    WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    # mean square distance 
    MSD=WSSSE/count
    #     print(" Square distance = " + str(WSSSE))
    #     print ( " MSD = "+ str (MSD))
    return MSD



In [None]:
'test runKmeans'
# rdd = sc.parallelize([(1,1),(2,2),(3,3),(4,4),(5,5)])
# runKmeans(rdd, sc.parallelize([(1,1),(2,2),(3,3)]), 3, rdd.count())

### Exercise 2: computeIntrinsicDimension

#### Example
The function <font color="blue">computeIntrinsicDimension</font> takes as input a pair of values $(n1, e1)$, $(n2, e2)$ and computes the intrinsic dimension as output. $e1, e2$ are the mean squared distances of data-points from their closest center

**<font color="magenta" size=2>Example Code</font>**
``` python
n1 = 10
n2 = 100
e1 = 10000
e2 = 100
computeIntrinsicDimension(n1, e1, n2, e2)
```

**<font color="blue" size=2>Example Output</font>**
``` python
1.0
```
<font color="red">**Hint : **</font> Use the last formula in the theory section 

In [6]:
def computeIntrinsicDimension(n1, e1, n2, e2):
    logn=log(n2) - log(n1)
    loge=log(e1)-log(e2)
    d=2*(logn/loge)
    return d

### Exercise 3: Putting it all together

#### Example
Now we run K-means for various values of k and use these to estimate the intrinsic dimension of the dataset. Since the dataset might be very large, running kmeans on the entire dataset to find k representative centers may take a very long time. To overcome this, we sample a subset of points from the complete dataset and run Kmeans only on these subsets. We will run Kmeans on 2 different subset sizes: 10000, 20000 points. We will be estimating the MSD for K values of 10, 200, 700, 2000.


The function <font color="blue">run</font> takes a dataframe containing the complete data-set as input and needs to do the following:
* For each sample size S
 * Take the first S elements from the dataframe
 * For each value of K (number of centroids)
  * Call runKmeans(data,S,K,data_count)
* Use the MSD values generated to calculate the intrinsic dimension where $(n_1, n_2) \in \{(10,200),(200,700),(700,2000)\}$.  

**NOTE: Ensure you the format of your output is identical to the one given below, i.e. the keys have to be of the format:**
```python
ID_<Subset_size>_<K-Value-1>_<K-Value-2>
```

**<font color="magenta" size=2>Example Code</font>**
``` python

df = spark.read.parquet(file_path)
run(df)
```

**<font color="blue" size=2>Example Output</font>**
``` python
{'ID_10000_10_200': 1.5574966096390015, 'ID_10000_200_700': 1.3064513902343675, 'ID_10000_700_2000': 1.091310378488035, 'ID_20000_10_200': 1.518279780870003, 'ID_20000_200_700': 1.2660237819996782, 'ID_20000_700_2000': 1.0151131917703071}
```
**Note: The output here is the output of the below function, i.e., the value stored in the variable where the 'run' function is called**

In [11]:
"""test function"""
# # import data
# df=spark.read.parquet('hw5-small.parquet')
# #change schema 
# res=dict()
# num_clust=((10,200),(200,700),(700,2000))
# for s in (10000,20000):
#     df=df.select(*[fn.col(x).cast('float') for x in df.columns])
# #     s=10000
#     data=df.rdd.map(lambda x: (x[0],x[1],x[3],x[4],x[5],x[6],x[7]))
#     sample_dataset=df.limit(s).rdd.map(lambda x: (x[0],x[1],x[3],x[4],x[5],x[6],x[7]))
#     for clusters in num_clust:
#         print (clusters[1])
#         e1=runKmeans(data, sample_dataset, clusters[0], data.count())
#         e2=runKmeans(data, sample_dataset, clusters[1], data.count())
#         n1=clusters[0]
#         n2=clusters[1]
#         ID='ID_{}_{}_{}'.format(s,n1,n2)
#         res[ID]=computeIntrinsicDimension(n1, e1, n2, e2)









200
 Square distance = 1032.425566240172
 MSD = 0.0516212783120086
 Square distance = 23.01411188646486
 MSD = 0.001150705594323243
700
 Square distance = 23.534784484715402
 MSD = 0.0011767392242357701
 Square distance = 3.4428284953079316
 MSD = 0.00017214142476539658
2000
 Square distance = 3.844474183047626
 MSD = 0.0001922237091523813
 Square distance = 0.5411195620887326
 MSD = 2.7055978104436628e-05
200
 Square distance = 1071.6793280239476
 MSD = 0.05358396640119738
 Square distance = 23.06706536858016
 MSD = 0.001153353268429008
700
 Square distance = 22.261651851426397
 MSD = 0.0011130825925713198
 Square distance = 3.5852832456590837
 MSD = 0.0001792641622829542
2000
 Square distance = 3.2101376985496106
 MSD = 0.00016050688492748053
 Square distance = 0.4375735883594407
 MSD = 2.1878679417972037e-05


In [22]:

def run(df):
    """ This calculates the intrinsic dimension using K means clustering and returns a dictionary
        Had to cast DF to float then Map named tuples to regular tuples in an RDD """
    res=dict()
    num_clust=((10,200),(200,700),(700,2000))
    for s in (10000,20000):
        df=df.select(*[fn.col(x).cast('float') for x in df.columns])
        #     s=10000
        data=df.rdd.map(lambda x: (x[0],x[1],x[3],x[4],x[5],x[6],x[7])).cache()
        sample_dataset=df.limit(s).rdd.map(lambda x: (x[0],x[1],x[3],x[4],x[5],x[6],x[7])).cache()
        for clusters in num_clust:
            print (clusters[1])
            e1=runKmeans(data, sample_dataset, clusters[0], data.count())
            e2=runKmeans(data, sample_dataset, clusters[1], data.count())
            n1=clusters[0]
            n2=clusters[1]
            ID='ID_{}_{}_{}'.format(s,n1,n2)
            res[ID]=computeIntrinsicDimension(n1, e1, n2, e2)
    return res
    
    


In [14]:
# this function allows a +/-15% tolerance margin 
# while comparing student's answer against the solution
def within_tolerance(required_answer, student_answer, tolerance = 0.15):
            tolerance_value = required_answer * tolerance
            return required_answer - tolerance_value <= student_answer <= required_answer + tolerance_value

In [23]:
file_path_small = "./hw5-small.parquet"
df = spark.read.parquet(file_path_small)
res = run(df)

200
 Square distance = 1073.3720238814485
 MSD = 0.05366860119407243
 Square distance = 21.799342385739713
 MSD = 0.0010899671192869857
700
 Square distance = 22.83083535585914
 MSD = 0.001141541767792957
 Square distance = 3.335536932588065
 MSD = 0.00016677684662940324
2000
 Square distance = 3.268587808887315
 MSD = 0.00016342939044436576
 Square distance = 0.5567397688810111
 MSD = 2.7836988444050553e-05
200
 Square distance = 1117.228268732562
 MSD = 0.05586141343662809
 Square distance = 22.122737137754353
 MSD = 0.0011061368568877177
700
 Square distance = 22.08924452480702
 MSD = 0.0011044622262403512
 Square distance = 3.071034829320173
 MSD = 0.00015355174146600866
2000
 Square distance = 3.031279915230011
 MSD = 0.00015156399576150056
 Square distance = 0.4014238094729393
 MSD = 2.0071190473646964e-05


In [24]:
assert within_tolerance(1.5528485676876376, res['ID_10000_10_200'])

In [25]:
assert within_tolerance(1.2563867109654632, res['ID_10000_200_700'])

In [26]:
assert within_tolerance(1.2041956732161911, res['ID_10000_700_2000'])

In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#


In [None]:
#
# AUTOGRADER TEST - DO NOT REMOVE
#
