<a href="https://colab.research.google.com/github/nazbeh/I_C_M_E_2020/blob/master/Workshop4/Spark_Tutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Spark Tutorial

## Setup of the VM 
First execute this cell to setup the Virtual Machine using Spark. This steps may change if running locally or in another cluster. After this you can run independently the rest of the sections

In [None]:
#Install dependencies needed for pySpark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

#Set paths for Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

## Initialize Spark in Python

In [None]:
#Easy way to add PySpark to sys.path 
import findspark
findspark.init()

In [None]:
#Create a spark session
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()



## Hello World

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

def main():
  #Create a spark session
  spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
  sc = spark.sparkContext

  nums = sc.parallelize([0,1,2,3,4])
  print(nums.map(lambda x: x+x).collect())
  spark.stop()


if __name__ == '__main__':
  main()

## Example: Compute Pi

In [None]:
import findspark
findspark.init()
from operator import add
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Pi").getOrCreate()

n = 100000

s = spark.sparkContext.parallelize(range(n))\
    .map(lambda i:4./(1.+(i+0.5)**2./n**2.)*1./n)\
    .reduce(add)

print("Pi is %f\n"%s)
spark.stop()

## Example: Page Rank
You can find this and more examples in https://github.com/apache/spark/tree/master/examples/src/main/python

In [None]:
import findspark
findspark.init()
import numpy as np
from pyspark.sql import SparkSession
from operator import add

# Creates a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

def compute_contribution(nodes,weight):
  tot_node = len(nodes)
  for node in nodes:
    yield (node, weight / tot_node)

#Create an RDD
data = spark.sparkContext.parallelize([[0,1],[1,2],[2,3],[5,2],[5,3],[1,3],[4,5],[3,2],[3,4],[3,5],[0,2]])

#Groups the data creating an adjacency matrix by key
S = data.groupByKey()

#Initializes de vector r will all ones from nodes of adjacency matrix
r = S.map(lambda node: (node[0], 1.0))

for iter in range(10):
  ## Joins S and r, and applies the transformation compute_contribution
  new_S = S.join(r).flatMap(lambda node: compute_contribution(node[1][0],node[1][1]))
  ## Computes new iteration of r
  r = new_S.reduceByKey(add)

#Collects in driver
print(r.collect())
spark.stop()

## Exercise: K-means 

Complete the following code following the ```To do``` suggestions 

In [None]:
import findspark
findspark.init()
import numpy as np
# To do here: Import SparkSession from pyspark

from operator import add

#To do here: Create a local Spark session
spark = 


#Function to compute clusters
def get_cluster(x,clusters):
    dist = np.array(x)-clusters
    dist = np.sqrt(np.sum(dist**2.,1))
    i = np.argmin(dist)
    return i

#To do here: Create a RDD from the list
#[[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12],[5,2,5],[3,4,5]]
data = 

#Number of clusters 
K = 2

#Initial clusters taken at random
centroids = data.takeSample(False,K)
tol = 1E-4
error = 1.

print("Centroids",centroids)
while error > tol:
  #To do here: transform data using a *map* to create a key,value map 
  #applying the lambda function
  #lambda x: (get_cluster(x,centroids), np.array(x))
  assign_cluster = 

  #To do here: compute de sum of data per cluster using *reduceByKey*
  sum_cluster = 

  #To do here: compute de number of points per cluster using *countByKey*
  size_cluster = 

  #To do here: collect the sum_cluster in the driver using *collect*
  sum_clusters = 

  #Computes the new centroids
  new_centroids = []
  for key,coord in sum_clusters:
    new_centroids.append(coord/size_cluster[key])
  new_centroids = np.array(new_centroids)

  error = np.sqrt(np.sum((new_centroids-centroids)**2.))
  centroids = new_centroids

  print("Centroids",new_centroids)
  print("Convergence error",error )
spark.stop()

## Exercise: Correct Page Rank
Notice that the code in the example does not completely compute the Page rank iteration (and as you may have notice in the previous notebooks neither the cpp implementations that we have use). For now we are computing the power method, i.e.
$$ r^{k+1} = S*r^{k} $$
whereas the Page rank algorithm is
$$ r^{k+1} = 0.85 S*r^{k} + 0.15 * 1$$
Modify the spark code before to add these changes.