<a href="https://cognitiveclass.ai"><img src = "https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/Logos/organization_logo/organization_logo.png" width = 400> </a>


<h1 align = "center"> Spark Fundamentals I - Introduction to Spark</h1>
<h2 align = "center"> Getting Started</h2>
<br align = "left">

**Related free online courses:**

Related courses can be found in the following learning paths:

-   [Spark Fundamentals path](http://cocl.us/Spark_Fundamentals_Path)
-   [Big Data Fundamentals path](http://cocl.us/Big_Data_Fundamentals_Path) 

<img src="http://spark.apache.org/images/spark-logo.png" height=100>


## Spark is built around speed and the ease of use. In these labs you will see for yourself how easy it is to get started using Spark.

Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset or RDD. In a subsequent lab exercise, you will learn more about the details of RDD. RDDs have actions, which return values, and transformations, which return pointers to new RDD.

This set of labs uses Skills Network (SN) Labs to provide an interactive environment to develop applications and analyze data. It is available in either Scala or Python shells. Scala runs on the Java VM and is thus a good way to use existing Java libraries. In this lab exercise, we will set up our environment in preparation for the later labs.

After completing this set of hands-on labs, you should be able to:

1.  Perform basic RDD actions and transformations
2.  Use caching to speed up repeated operations

### Using this notebook

This is an interactive environment where you can show your code through cells, and documentation through markdown.

Look at the top right corner. Do you see "Python 3"? This indicates that you are running Python in this notebook.

**To run a cell:** Shift + Enter

### Try creating a new cell below.

**To create a new cell:** In the menu, go to _"Insert" > "Insert Cell Below"_. Or, click outside of a cell, and press "a" (insert cell above) or "b" (insert cell below).


# Lab Setup

Run the following cells to get the lab data.


In [None]:
# download the data from the IBM server
# this may take ~30 seconds depending on your internet speed
!wget --quiet https://cocl.us/BD0211EN_Data
print("Data Downloaded!")

Let's unzip the data that we just downloaded into a directory dedicated for this course. Let's choose the directory **/resources/jupyter/labs/BD0211EN/**.


In [None]:
# this may take ~30 seconds depending on your internet speed
!unzip -q -o -d /resources/jupyterlab/labs/BD0211EN/ BD0211EN_Data
print("Data Extracted!")

The data is in a folder called **LabData**. Let's list all the files in the data that we just downloaded and extracted.


In [None]:
# list the extracted files
!ls -1 /resources/jupyterlab/labs/BD0211EN/LabData

Should have:

-   followers.txt
-   notebook.log
-   nyctaxi100.csv
-   nyctaxi.csv
-   nyctaxisub.csv
-   nycweather.csv
-   pom.xml
-   README.md
-   taxistreams.py
-   users.txt


### Starting with Spark


Let's first import the tools that we need to use Spark in this SN Labs.


In [None]:
!pip install findspark
!pip install pyspark
import findspark
import pyspark
findspark.init()
sc = pyspark.SparkContext.getOrCreate()

The notebooks provide code assist. For example, type in "sc." followed by the Tab key to get the list of options associated with the spark context:


In [None]:
sc.

To run a command as code, simple select the cell you want to run and either:

-   Click the play button in the toolbar above
-   Press "_Shift+Enter_"

Let's run a basic command and check the version of Spark running:


In [None]:
sc.version

Add in the path to the _README.md_ file in **LabData**.


In [None]:
readme = sc.textFile("/resources/jupyterlab/labs/BD0211EN/LabData/README.md")

Let’s perform some RDD actions on this text file. Count the number of items in the RDD using this command:


In [None]:
readme.count()

You should see that this RDD action returned a value of 103.

Let’s run another action. Run this command to find the first item in the RDD:


In [None]:
readme.first()

Now let’s try a transformation. Use the filter transformation to return a new RDD with a subset of the items in the file. Type in this command:


In [None]:
linesWithSpark = readme.filter(lambda line: "Spark" in line)

You can even chain together transformations and actions. To find out how many lines contains the word “Spark”, type in:


In [None]:
linesWithSpark = readme.filter(lambda line: "Spark" in line)
readme.filter(lambda line: "Spark" in line).count()

# More on RDD Operations

This section builds upon the previous section. In this section, you will see that RDD can be used for more complex computations. You will find the line from that "README.md" file with the most words in it.

Run the following cell.


In [None]:
readme.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)

There are two parts to this. The first maps a line to an integer value, the number of words in that line. In the second part reduce is called to find the line with the most words in it. The arguments to map and reduce are Python anonymous functions (lambdas), but you can use any top level Python functions. In the next step, you’ll define a max function to illustrate this feature.

Define the max function. You will need to type this in:


In [None]:
def max(a, b):
 if a > b:
    return a
 else:
    return b

Now run the following with the max function:


In [None]:
readme.map(lambda line: len(line.split())).reduce(max)

Spark has a MapReduce data flow pattern. We can use this to do a word count on the readme file.


In [None]:
wordCounts = readme.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

Here we combined the flatMap, map, and the reduceByKey functions to do a word count of each word in the readme file.

To collect the word counts, use the _collect_ action.

#### It should be noted that the collect function brings all of the data into the driver node. For a small dataset, this is acceptable but, for a large dataset this can cause an Out Of Memory error. It is recommended to use collect() for testing only. The safer approach is to use the take() function e.g. print take(n)


In [None]:
wordCounts.collect()

### <span style="color: red">YOUR TURN:</span>

#### In the cell below, determine what is the most frequent word in the README, and how many times was it used?


In [None]:
# WRITE YOUR CODE BELOW




Double-click **here** for the solution.

<!-- The correct answer is:
wordCounts.reduce(lambda a, b: a if (a[1] > b[1]) else b)
-->


## Using Spark caching

In this short section, you’ll see how Spark caching can be used to pull data sets into a cluster-wide in-memory cache. This is very useful for accessing repeated data, such as querying a small “hot” dataset or when running an iterative algorithm. Both Python and Scala use the same commands.

As a simple example, let’s mark our linesWithSpark dataset to be cached and then invoke the first count operation to tell Spark to cache it. Remember that transformation operations such as cache does not get processed until some action like count() is called. Once you run the second count() operation, you should notice a small increase in speed.


In [None]:
print(linesWithSpark.count())

In [None]:
from timeit import Timer
def count():
    return linesWithSpark.count()
t = Timer(lambda: count())

In [None]:
print(t.timeit(number=50))

In [None]:
linesWithSpark.cache()
print(t.timeit(number=50))

It may seem silly to cache such a small file, but for larger data sets across tens or hundreds of nodes, this would still work. The second linesWithSpark.count() action runs against the cache and would perform significantly better for large datasets.


<div class="alert alert-success alertsuccess" style="margin-top: 20px">
    <strong>Tip</strong>: Enjoyed using Jupyter notebooks with Spark? Get yourself a free 
    <a href="http://cocl.us/DSX_on_Cloud">IBM Cloud</a> account where you can use Data Science Experience notebooks
    and have <em>two</em> Spark executors for free!
</div>


### Summary

Having completed this exercise, you should now be able to log in to your environment and use the Spark shell to run simple actions and transformations for Scala and/or Python. You understand that Spark caching can be used to cache large datasets and subsequent operations on it will utilize the data in the cache rather than re-fetching it from HDFS.


This notebook is part of the free course on **cognitiveclass.ai** called _Spark Fundamentals I_. If you accessed this notebook outside the course, you can take this free self-paced course online by going to: <http://cocl.us/Spark_Fundamentals_I>


### About the Authors:

Hi! It's Alex Aklson, one of the authors of this notebook. I hope you found this lab educational! There is much more to learn about Spark but you are well on your way. Feel free to connect with me if you have any questions.

<hr>
