<h1 align="center">Introduction to Spark</h1>

<b>How to use notebook<b></br>
- Start jupyterlab in enviroment (view introduction)
- Run cell by Shift + Enter

# Let's start

### Prepare data
To practice, we need data. Download file to folder `data` with file name introduction.zip

In [5]:
!wget --quiet https://ibm.box.com/shared/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip -O ./data/introduction.zip
print("Data Downloaded!")

Data Downloaded!


Unzip introduction.zip to folder `data`, we'll get folder `LabData`

In [6]:
!unzip -q -o -d ./data/ ./data/introduction.zip
print("Data Extracted!")

Data Extracted!


View things in LabData

In [7]:
!ls -l ./data/LabData

total 910572
-rwxr-xr-x 1 helios helios        32 Th04  3  2015 followers.txt
-rw-r----- 1 helios helios   2903115 Th10 22  2015 notebook.log
-rwxr-xr-x 1 helios helios     31798 Th03 28  2015 nyctaxi100.csv
-rwxr-xr-x 1 helios helios 849939371 Th03 28  2015 nyctaxi.csv
-rwxr-xr-x 1 helios helios  79500408 Th03 28  2015 nyctaxisub.csv
-rwxr-xr-x 1 helios helios      6677 Th03 28  2015 nycweather.csv
-rw-r----- 1 helios helios     14876 Th10 22  2015 pom.xml
-rw-r----- 1 helios helios      3568 Th10 22  2015 README.md
-rw-r--r-- 1 helios helios       341 Th11 29  2017 taxistreams.py
-rwxr-xr-x 1 helios helios       169 Th04  2  2015 users.txt


### Start with spark

Import libraries spypark

In [10]:
from pyspark import SparkConf, SparkContext

Create SparkContext in local

In [15]:
conf = SparkConf().setMaster("local").setAppName("myApp")
sc = SparkContext(conf=conf)
print("Spark version: {0}".format(sc.version))

Spark version: 2.4.4


Create a RDD by loading file

In [16]:
readme = sc.textFile("./data/LabData/README.md")

Do action: count number of items in RDD

In [17]:
readme.count()

98

Do action: view first item in RDD

In [18]:
readme.first()

'# Apache Spark'

Do a transformation: Use filter transformation to get a new subset RDD which contain word `Spark`. Remember that transformation do not return values, it only builds DAG

In [19]:
linesWithSpark = readme.filter(lambda line: "Spark" in line)

Do action: count number of `linesWithSpark`

In [20]:
linesWithSpark.count()

18

Count number of words in every line with `map`</br>
Then, find line with the most words by `reduce`

In [21]:
readme.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)

14

Instead of using `lambda`, can use function

In [22]:
def max(a, b):
 if a > b:
    return a
 else:
    return b

readme.map(lambda line: len(line.split())).reduce(max)

14

Split line to words and flat them into a RDD using `flatMap`</br>
Map every word with 1 by `map`</br>
Count frequency of word by `reduceByKey`

In [23]:
wordCounts = readme.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

Use `collect` function to view data but it brings all data to node, should use with a small data.</br>
The safer method is take(n).

In [26]:
# wordCounts.collect()
wordCounts.take(5)

[('#', 1), ('Apache', 1), ('Spark', 14), ('is', 6), ('a', 10)]

Get the most frequent word

In [27]:
wordCounts.reduce(lambda w1, w2: w1 if w1[1] > w2[1] else w2)

('the', 21)

### Spark caching

Spark caching can be used to pull data sets into a cluster-wide in-memory cache. This is very useful for accessing repeated data, such as querying a small “hot” dataset or when running an iterative algorithm

In [42]:
linesWithSpark = readme.filter(lambda line: "Spark" in line)

from timeit import Timer
def count():
    return linesWithSpark.count()
t = Timer(lambda: count())

Run with no caching

In [46]:
print(t.timeit(number=50))

3.8033461139984865


Cache RDD linesWithSpark

In [44]:
linesWithSpark.cache()

PythonRDD[667] at RDD at PythonRDD.scala:53

Run with caching

In [47]:
print(t.timeit(number=50))

3.749730470999566


It may seem silly to cache such a small file, but for larger data sets across tens or hundreds of nodes, this would still work. The second linesWithSpark.count() action runs against the cache and would perform significantly better for large datasets.

# End