# CMU 10405/10605 Machine Learning with Large Datasets
## **Running Your First Notebook**
This notebook will show you how to install the course libraries, create your first Spark cluster, and test basic notebook functionality.  To move through the notebook just run each of the cells.  You will not need to solve any problems to complete this lab.  You can run a cell by pressing "shift-enter", which will compute the current cell and advance to the next cell, or by clicking in a cell and pressing "control-enter", which will compute the current cell and remain in that cell.

#### ** Part 1: Attach and Test Class helper Library **

#### (1a) Install class helper library into your Databricks CE workspace
- The class helper library is called "nose"
- You can install the library into your workspace following the following instructions:
 - Step 1: Click on "Workspace", then on the dropdown and select "Create" and "Library"

<img src="http://spark-mooc.github.io/web-assets/images/Lab0_Library1.png" alt="Drawing" />
 - Step 2 Enter the name of the library by selecting "Upload Python Egg or PyPI" and entering "nose" in the "PyPI Name" field


 - Step 3 Make sure the checkbox for auto-attaching the library to your cluster is selected

#### ** Part 2: Test Spark functionality **

** (2a) Create a DataFrame and filter it **

When you run the next cell (with control-enter or shift-enter), you will see the following popup.

<img src="http://spark-mooc.github.io/web-assets/images/Lab0_Cluster.png" alt="Drawing" />

Select the click box and then "Launch and Run". The display at the top of your notebook will change to "Pending"

<img src="http://spark-mooc.github.io/web-assets/images/Lab0_Cluster_Pending.png" alt="Drawing" />

Note that it may take a few seconds to a few minutes to start your cluster. Once your cluster is running the display will changed to "Attached"

<img src="http://spark-mooc.github.io/web-assets/images/Lab0_Cluster_Attached.png" alt="Drawing" />

Congratulations! You just launched your Spark cluster in the cloud!

In [8]:
# Check that Spark is working
import pyspark
from pyspark.sql import Row
from pyspark.sql import SQLContext

#When using pyspark as a library, the sqlContext is not created automatically, 
#you have to create it yourself:
sqlContext = pyspark.SQLContext(pyspark.SparkContext())

data = [('Alice', 1), ('Bob', 2), ('Bill', 4)]
df = sqlContext.createDataFrame(data, ['name', 'age'])
fil = df.filter(df.age > 3).collect()
print(fil)

# If the Spark job doesn't work properly this will raise an AssertionError
assert(fil == [Row(u'Bill', 4)])



[Row(name='Bill', age=4)]


In [13]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate(); #this helps avoid issue of multiple sparkcontext

peopleRDD = sc.parallelize(["bob","alice","bill"])
words_filter = peopleRDD.filter(lambda x: "i" in x)
words_filter

PythonRDD[8] at RDD at PythonRDD.scala:53

In [14]:
filtered = words_filter.collect()
filtered

['alice', 'bill']

In [15]:
peopleRDD.foreach(print)

In [16]:
peopleRDD.take(2)

['bob', 'alice']

In [17]:
peopleRDD.collect()

['bob', 'alice', 'bill']

** (2b) Loading a text file **

Let's load a text file.

In [18]:
import os.path
from pyspark import SparkFiles

url = "https://raw.githubusercontent.com/10605/data/master/hw1/shakespeare.txt"
sc.addFile(url)

# verifying that the file got downloaded
print(SparkFiles.get("shakespeare.txt"))

/private/var/folders/22/2sr4vq913y9_4v5xpn7h9fqm0000gn/T/spark-2c01587e-64ef-495c-a5d1-ef5f1290eba9/userFiles-1946d2a4-8238-47bf-8086-a7f671c7c24d/shakespeare.txt


In [19]:
file_lines_count = len(open(SparkFiles.get("shakespeare.txt")).readlines())
print(f'Number of lines in the file are: {file_lines_count}')

Number of lines in the file are: 122395


In [20]:
shakespeare_rdd = sc.textFile("file://" + SparkFiles.get("shakespeare.txt"))
shakespeare_rdd_lines = shakespeare_rdd.count()

print(shakespeare_rdd_lines)

assert(shakespeare_rdd_lines == file_lines_count)

122395


In [21]:
# Check loading data with sqlContext.read.text
dataDF = sqlContext.read.text("file://" + SparkFiles.get("shakespeare.txt"))
shakespeareCount = dataDF.count()  # number of rows in dataDF

print(shakespeareCount)

# If the text file didn't load properly an AssertionError will be raised
assert(shakespeareCount == file_lines_count)

122395


#### ** Part 3: Test class testing library **

** (3a) Compare with numbers **

In [22]:
# TEST Compare with numbers
# Check the nose library
# This should not print out anything if the test is passed
from nose.tools import assert_equal, assert_true

twelve = 12
assert_equal(twelve, 12, 'twelve should equal 12')

ModuleNotFoundError: No module named 'nose'

** (3b) Compare lists **

In [0]:
# TEST Compare lists (2b)
# This should not print out anything if the test is passed
unsortedList = [(5, 'b'), (5, 'a'), (4, 'c'), (3, 'a')]
assert_equal(sorted(unsortedList), [(3, 'a'), (4, 'c'), (5, 'a'), (5, 'b')])

#### ** Part 4: Check MathJax formulas **

** (4a) Gradient descent formula **

You should see a formula on the line below this one: \\[ \scriptsize \mathbf{w}_{i+1} = \mathbf{w}_i - \alpha_i \sum_j (\mathbf{w}_i^\top\mathbf{x}_j  - y_j) \mathbf{x}_j \,.\\]

This formula is included inline with the text and is \\( \scriptsize (\mathbf{w}^\top \mathbf{x} - y) \mathbf{x} \\).

** (4b) Log loss formula **

This formula shows log loss for single point. Log loss is defined as: \\[  \scriptsize \ell_{log}(p, y) = \begin{cases} -\log (p) & \text{if } y = 1 \\\ -\log(1-p) & \text{if } y = 0 \end{cases} \\]

## **Spark Basics**

#### ** Part 5: Basic Spark Transformations and Actions **

The remaining notebook will walk you through a basic Spark transformations and actions.

Refer to the Spark guide: https://spark.apache.org/docs/2.4.5/rdd-programming-guide.html

Refer to last week's recitation slides and video: https://10605.github.io/

In [23]:
# Create a sample list
my_list = [i for i in range(0, 10000000)]

# Parallelize the data
rdd_0 = sc.parallelize(my_list, 4)
"""
One important parameter for parallel collections is the number of partitions to cut 
the dataset into. Spark will run one task for each partition of the cluster. 
Typically you want 2-4 partitions for each CPU in your cluster. 
Normally, Spark tries to set the number of partitions automatically based on your 
cluster. However, you can also set it manually by passing it as a second parameter
to parallelize (e.g. sc.parallelize(data, 10)).
"""

# Add 4 to each element
rdd_1 = rdd_0.map(lambda x: x + 4)

##### 1.1. What will the following line print?

In [24]:
rdd_0

ParallelCollectionRDD[23] at readRDDFromFile at PythonRDD.scala:274

In [25]:
rdd_1

PythonRDD[24] at RDD at PythonRDD.scala:53

In [0]:
#rdd_1.collect()
#rdd_1.take(10)

##### Remember that Spark does lazy evaluation, and only exectues things once actions are triggered.
Execute an *action* and actually print values on the console

##### 1.2. Another example

In [26]:
students = [('Alice', [10405, 15213, 10301]), ('Bob', [10405, 10701]), ('Chad', [15513, 15445, 10405, 15213])]

rdd_students = sc.parallelize(students)

rdd_students.take(3)

[('Alice', [10405, 15213, 10301]),
 ('Bob', [10405, 10701]),
 ('Chad', [15513, 15445, 10405, 15213])]

In [0]:
# How do we find the distinct courses students are taking?
# The equivalent of doing: set([course for student in students for course in student[1]]) in Python
# It should be: {10301, 10405, 10701, 15213, 15445, 15513}

In [27]:
rdd_students.flatMap(lambda x:x[1])
#rdd_students.flatMap(lambda x:x[1]).distinct().collect()

#PySpark flatMap() is a transformation operation that flattens the RDD/DataFrame 
#(array/map DataFrame columns) after applying the function on every element and 
#returns a new PySpark RDD/DataFrame.

PythonRDD[29] at RDD at PythonRDD.scala:53

##### 1.3. Lets now work with a PairRDD, that is, an RDD of (key, value)

+--+------+-----------+  
|id|league|height (cm)|  
+--+------+-----------+

In [28]:
players = [(1, 'CMU', 192), (2, 'CMU', 218), (3, 'CMU', 195), (4, 'NBA', 192), (5,'NBA', 198)]
players += [(6, 'CMU', 166), (7, 'NBA', 195), (8, 'NBA', 182), (9, 'CMU', 189), (10,'NBA', 180)]
players += [(11, 'NBA', 190), (12, 'CMU', 195), (13, 'CMU', 195), (14, 'NBA', 197), (15,'NBA', 195)]
rdd_players = sc.parallelize(players)

rdd_players.take(10)

[(1, 'CMU', 192),
 (2, 'CMU', 218),
 (3, 'CMU', 195),
 (4, 'NBA', 192),
 (5, 'NBA', 198),
 (6, 'CMU', 166),
 (7, 'NBA', 195),
 (8, 'NBA', 182),
 (9, 'CMU', 189),
 (10, 'NBA', 180)]

In [29]:
# Sum of all heights

def sum_fun(x,y):
  return x + y

#example: sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
#15 

rdd_players.map(lambda x:x[2]).reduce(lambda x,y: x+y)
#OR
#rdd_players.map(lambda x:x[2]).reduce(sum_fun)
#OR
#rdd_players.map(lambda x:x[2]).sum()

2879

In [30]:
# Sum of heights per league

sum_heights = rdd_players.map(lambda x:(x[1],x[2])).reduceByKey(lambda x,y: x+y)
sum_heights.collect()

[('CMU', 1350), ('NBA', 1529)]

In [31]:
# Number of player per league

count_players_rdd = rdd_players.map(lambda x:(x[1],1)).reduceByKey(lambda x,y : x+y)
#count_players_rdd = rdd_players.map(lambda x:(x[1],x[2])).countByKey()
count_players_rdd.collect()


[('CMU', 7), ('NBA', 8)]

In [32]:
# Average height per league

#sum_heights.join(count_players_rdd).collect()
sum_heights.join(count_players_rdd).mapValues(lambda x : x[0]/x[1]).collect()


[('CMU', 192.85714285714286), ('NBA', 191.125)]