# Exercise 3

In this exercise, we are going to implement the KMeans clustering algorithm using Spark RDD.

For parts marked with **[CODE CHANGE REQUIRED]** you need to modify or complete the code before execution.
For parts without **[CODE CHANGE REQUIRED]** , you can just run the given code.


## KMeans clustering algorithm

KMeans clustering algorithm is an unsupervised machine learning algorithm which groups data points into *cluster*s (or groups) based on their similarity, e.g. product their likes, movies their likes, brands they follow, TV programs and movies they watched, university and colleages they attended. 

Assume all the attributes of the subjects in the analysis can be represented using some scalar values, we can conduct the analysis in the following steps.

1. The user/programmer specifies how many clusters he/she would like to group all the data points under. Let's say it is `K`
2. Randomly generate `K` data points, we call them *centroids*, `c1, c2, ..., cK`.
3. For each data point `p`, we compute the distances between `p` and `c1`, `p` and `c2`, ... Find the centroid `ci`, to which `p` is closest, we conclude `p` is in cluster `i`.
4. For each cluster `i`, we retrieve all the data points falling in this cluster, and compute the mean. The mean will be new centroid for cluster `i`, say `ci'`
5. Compare `c1` with `c1'`, `c2` with `c2'`, ..., `cK` with `cK'`. If all of them remains unchanged (or the differnce is lower than a threshold), we are done. Otherwise, update `c1 = c1'`, ..., `cK = cK` and go back to step 3.


Point to note, clusters might be disappear, i.e. some centroid has zero data point inside. 

![](https://i.stack.imgur.com/ibYKU.png)

Instead of using real data, we use a python script to generate the data. The python script is already written for you. It can be found in the git-cloned repo `sutd50043_student/lab13/data/ex3/data/generate.py` 

```python
import sys
import random
def gen(num_of_records, filename):
    with open(filename,'w') as f:
        for i in range(0,int(num_of_records)):
            x = random.uniform(-100,100)
            y = random.uniform(-100,100) 
            f.write("%.2f\t%.2f\n" % (x,y))
    f.close()

if __name__ == "__main__":
    if len(sys.argv) > 2:
        sys.exit(gen(sys.argv[1],sys.argv[2]))
    else:
        print("USAGE: python3 generate.py <number_of_records> <file_name>")
```

 **[CODE CHANGE REQUIRED]**
Modify the bash command in the next cell accordingly to execute the above script.



## TODO 0

 **[CODE CHANGE REQUIRED]**
 
Modify the following bash command and execute it, so that we can upload the points in the HDFS

 

We load the spark session to create the spark context

 

## TODO 1

 **[CODE CHANGE REQUIRED]**

Complete the definition of the following function `load_points_into_rdd`, which load the tsv data into an RDD given the namenode (e.g. `127.0.0.1:9000`) and the HDFS path (e.g. `kmeans/input`) 

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```python
namenode = "127.0.0.1:9000"

def load_points_into_rdd(namenode, path): # TODO
    return sc.textFile("hdfs://%s/%s" % (namenode,path)).map(lambda ln:ln.strip().split("\t")).map(lambda l:(float(l[0]), float(l[1])))
```

## Test case 1

Run the following you should see, (the actual numeric values might differ, but the structure should be the same) 
```python
[(61.4, -33.87), (19.57, -20.85), (22.95, -49.32), (42.81, 29.71), (-65.89, -75.57), (13.48, 71.92), (-17.28, -21.7), (1.79, 43.8), (11.58, -32.18), (1.73, -54.43)]
```


 

## TODO 2
 **[CODE CHANGE REQUIRED]**
 
Complete the following function `euc_dist` which compute the euclidean distance between two points


$$
eucdist((x_1,y_1),(x_2,y_2)) = \sqrt{ (x_1 - x_2)^2 + (y_1 - y_2)^2  }
$$


## Test case 2

Run the following you should see, 

```text
1.4142135623730951
```


 

## TODO 3 
 **[CODE CHANGE REQUIRED]**
 
Complete the following function which computes the mean of of an iterator/list of points.

`mean()` functions takes a plain python iterator of points, not an RDD of points.
The mean of two points are defined as 

$$
mean((x_1,y_1),(x_2,y_2)) = (( x_1 + x_2) / 2, (y_1 + y_2) / 2)
$$



## Test case 3

Run the following you should see,

```text
(4,5)
```


## TODO 4
 **[CODE CHANGE REQUIRED]**
Complete the following function which finds the nearest centroids for each points in the RDD.
<style>
    div.hidecode + {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```text
Let r1 and r2 be RDDs, r1.cartesion(r2) produces the cartesian product of r1 and r2. 
When finding the nearest centroid w.r.t to a point from a list of centroids, it is useful to think of it as a reduce operation.
```


## Test Case 4

Run the following you should see, (the actual numeric values might differ, but the structure should be the same)


```text

[((31.34, 39.54), (10.94, 64.76)), ((63.18, -54.98), (78.66, -91.58)), ((-96.53, 54.69), (-53.93, 76.38)), ((-89.91, 9.5), (-42.27, 24.16)), ((95.43, 57.26), (10.94, 64.76)), ((74.38, -61.52), (78.66, -91.58)), ((77.21, -69.08), (78.66, -91.58)), ((-84.04, -74.41), (11.81, -27.94)), ((7.44, 45.33), (10.94, 64.76)), ((30.63, 52.08), (10.94, 64.76))]
```

## TODO 5

 **[CODE CHANGE REQUIRED]**

Complete the following function which executes one iteration of the K-Means algorithm.

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```text
Recall the difference between reduceByKey and groupByKey
```

## Test case 5

Run the following you should see, (the actual numeric values might differ, but the structure should be the same)

```text
[((-42.27, 24.16), (-63.50617346938779, -4.715561224489797)), ((34.19, -5.47), (65.24037313432835, 4.253208955223882)), ((73.78, -94.16), (43.22766666666668, -84.53166666666665)), ((78.66, -91.58), (83.09037735849053, -62.727924528301884)), ((11.81, -27.94), (-18.85223809523809, -62.081285714285706)), ((-55.65, 97.37), (-42.820499999999996, 94.566)), ((11.1, 2.86), (8.047450980392156, 10.851764705882351)), ((10.94, 64.76), (38.894331550802164, 69.78598930481279)), ((-68.4, 93.57), (-83.12192307692308, 83.44)), ((-53.93, 76.38), (-56.9511111111111, 63.98825396825399))]
```

 

## KMeans 

To define KMeans, we just need two more actions, 

1. `forAll(rdd,p)` which checks whether all elements in `rdd` satisfy the predicate `p`.
2. `no_change(no_change(centroid_and_newcentroids,tolerance)` which applies a conditional check to all pairs of current and new centroids. It yields True if none of the new centroids is `None`, and the euclidean distances between the currents and new centroids are less than the tolerance.




### TODO 6

Complete the `forAll` function. You don't need to change `no_change` function. 

 **[CODE CHANGE REQUIRED]**

Complete the following function which executes one iteration of the K-Means algorithm.

<style>
    div.hidecode + pre {display: none}
</style>
<script>
doclick=function(e) {
    e.nextSibling.nextSibling.style.display="block";
}
</script>

<div class="hidecode" onclick="doclick(this);">[Show Hint]</div>

```text
You can implement it using 

1. the aggregate function or
2. map and reduce functions.

```


### Test Case 6

Run the following, we should see `True`


Lastly, the `kmeans` function is defined by a for-loop in which 

1. we call `iteration` to compute the new centroids, 
2. then check whether there is any change between the current and new centroids via `no_change`. 
    2.1. If there is changes, it goes back to the loop by sending the new centroids to be current centroids, 
    2.2. otherwise it exits the loop and compute the membership between the points and the lastest centroids.

Note the use of `.persist()`. try to re-run it again by commenting away the statements using `.persist()`, it will take a longer time to converge.

The code is written for you, you don't need to change anything unless you want to experiement with `.persist()` and without.

## modify and run the following to clean up the HDFS

 **[CODE CHANGE REQUIRED]**