# Exercises with Spark

## ZHANG Xin

## First steps with Spark

### 2.1 First RDD

In [None]:
#2.1
import numpy as np

rddr = np.arange(0, 3000, 1)
rdd21 = sc.parallelize(rddr)

### 2.2 Computing the Sum of Cubes

In [2]:
#2.2
rddr22 = np.power(rddr,3)
rdd22 = sc.parallelize(rddr22)
print("The sum of elements in C is " + str(rdd22.sum()))

### 2.3 Last digits of elements in $C$

In [3]:
#2.3
rdd23 = rdd22.map(lambda x : (x%10, 1)).reduceByKey(lambda a, b : a+b)
rdd23.sortByKey().collect()

### 2.4 Digits of $C$ 

In [4]:
#2.4
def digits(i):
    return [e for e in str(i)]
rdd24 = rdd22.flatMap(lambda x : digits(x)).map(lambda x : (x, 1)).reduceByKey(lambda a, b : a+b)
rdd24.sortByKey().collect()

## Approximating $\pi$

### 3.1 Step 1 : computing set of pairs

In [5]:
#3.1
rdd31 = rdd21.cartesian(rdd21)
rdd31.max()
rdd31.count()

### 3.2 Step 2 : computing the pairs

In [6]:
#3.2
rdd32 = rdd31.filter(lambda x : (2*x[0]+1)**2+(2*x[1]+1)**2<6000**2)
rdd32.count()

### 3.3 Computing the approximation

In [7]:
#3.3
print("$\pi$: " + str(4*(rdd32.count()/rdd31.count())))

## Using the Movie Lens dataset

### 4.1 Getting the data

In [8]:
#4.1
#Upload the data

### 4.2 Getting the dataset into an RDD

In [9]:
#4.2
import re
future_pattern = re.compile("""([^,"]+|"[^"]+")(?=,|$)""")
def parseCSV(line):
    return future_pattern.findall(line)
path_data = "/FileStore/tables/"
ratingsFile = sc.textFile(path_data+"/ratings.csv").map(parseCSV)
moviesFile = sc.textFile(path_data+"/movies.csv").map(parseCSV)
moviesFile.take(2)

In [10]:
#4.2
ratingsFile.take(2)

### 4.3 Cleaning data

In [11]:
#4.3
header = ratingsFile.first() #extract header
ratings = ratingsFile.filter(lambda row : row != header)   #filter out header
ratings.take(2)

In [12]:
mHeader = moviesFile.first() #extract header
movies = moviesFile.filter(lambda row : row != mHeader)   #filter out header
movies.take(2)

In [13]:
#cast the third column
ratings = ratings.map(lambda x : [x[0],x[1],float(x[2]),x[3]])
ratings.take(10)

### 4.4 10 best movies of all times

In [14]:
#4.4
rankRating = ratings.map(lambda x : (x[1],[x[2],1])).reduceByKey( lambda x, y : (x[0] + y[0], x[1] + y[1])).map(lambda x : [x[0],x[1][0]/x[1][1]])
rankRating.sortBy(lambda x : -x[1]).take(10)

### 4.5 Ordered list of movies with names

In [15]:
#4.5
rankj = rankRating.map(lambda x : (x[0],x[1]))
moviesj = movies.map(lambda x : (x[0],x[1]))
joined = rankj.join(moviesj).map(lambda x : (x[1][1],x[1][0]))
joined.sortBy(lambda x : -x[1]).take(10)

### 4.6 Better ordered list

In [16]:
#4.6
#first formula
newRating1 = ratings.map(lambda x : (x[1],[x[2],1])).reduceByKey( lambda x, y : (x[0] + y[0], x[1] + y[1])).map(lambda x : [x[0],x[1][0]/(x[1][1]+1)])
newRating1.sortBy(lambda x : -x[1]).take(10)

In [17]:
rankj1 = newRating1.map(lambda x : (x[0],x[1]))
newJoined1 = rankj1.join(moviesj).map(lambda x : (x[1][1],x[1][0]))
newJoined1.sortBy(lambda x : -x[1]).take(10)

In [18]:
#Second formula
import math
newRating2 = ratings.map(lambda x : (x[1],[x[2],1])).reduceByKey( lambda x, y : (x[0] + y[0], x[1] + y[1])).map(lambda x : [x[0],x[1][0]/(x[1][1])*math.log(x[1][1])])
newRating2.sortBy(lambda x : -x[1]).take(10)
rankj2 = newRating2.map(lambda x : (x[0],x[1]))
newJoined2 = rankj2.join(moviesj).map(lambda x : (x[1][1],x[1][0]))
newJoined2.sortBy(lambda x : -x[1]).take(10)