<a href="https://cognitiveclass.ai"><img src = "https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/Logos/organization_logo/organization_logo.png" width = 400> </a>

<h1 align = "center"> Spark Fundamentals I - Introduction to Spark</h1>
<h2 align = "center"> Scala - Working with Scala Libraries</h2>
<br align = "left">

**Related free online courses:**

Related courses can be found in the following learning paths:

- [Spark Fundamentals path](http://cocl.us/Spark_Fundamentals_Path)
- [Big Data Fundamentals path](http://cocl.us/Big_Data_Fundamentals_Path)

<img src="http://spark.apache.org/images/spark-logo.png" height=100>

# Note: Using python cause Scala sucks


## Creating a Spark application using Spark SQL

Spark SQL provides the ability to write relational queries to be run on Spark. There is the abstraction SchemaRDD which is to create an RDD in which you can run SQL, HiveQL, and Scala. In this lab section, you will use SQL to find out the average weather and precipitation for a given time period in New York. The purpose is to demonstrate how to use the Spark SQL libraries on Spark.

### Please note that in Spark 1.3 DataFrames have replaced schemaRDDs however, it is still possible to switch between the two for supporting legacy systems. DataFrames is the recommended method going forward

## Disabled as the file needed is commited

In [None]:
# download module to run shell commands within this notebook
# import sys.process._

In [None]:
# download data from IBM Servier
# this may take ~30 seconds depending on your internet speed
# "wget --quiet https://ibm.box.com/shared/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip" !

# println("Data Downloaded!")

Let's unzip the data that we just downloaded into a directory dedicated for this course. Let's choose the directory **/resources/jupyter/labs/BD0211EN/**.

In [None]:
# unzip the folder's content into "resources" directory
# "unzip -q -o -d /resources/jupyterlab/labs/BD0211EN/ j8skrriqeqw66f51iyz911zyqai64j2g.zip" !

# println("Data Extracted!")

The data is in a folder called **LabData**. Let's list all the files in the data that we just downloaded and extracted.

In [4]:
# list the extracted files
!ls -1 .

BD0211EN-Exercise-Dataframes-py.ipynb
BD0211EN-Exercise-Getting-Started-py-v2.0.ipynb
BD0211EN-Exercise-RDD-py-v2.0.ipynb
BD0211EN-Exercise-Scala-Libraries-sc-v2.0.ipynb
README.md
data
j8skrriqeqw66f51iyz911zyqai64j2g.zip
mtcars.csv
notebook.log
pom.xml


# Set up python environment

In [1]:
!pip install findspark



In [2]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext.getOrCreate()
sc.version

'2.4.4'

# SQL Context

Let's take a look at the nycweather data. So run the following code:

In [38]:
weatherRdd = sc.textFile("nycweather.csv").map(lambda l: l.split(","))
weatherRdd.take(10)


[['"2013-01-01"', '1', '0'],
 ['"2013-01-02"', '-2', '0'],
 ['"2013-01-03"', '-2', '0'],
 ['"2013-01-04"', '1', '0'],
 ['"2013-01-05"', '3', '0'],
 ['"2013-01-06"', '4', '0'],
 ['"2013-01-07"', '5', '0'],
 ['"2013-01-08"', '6', '0'],
 ['"2013-01-09"', '7', '0'],
 ['"2013-01-10"', '7', '0']]

There are three columns in the dataset, the date, the mean temperature in Celsius, and the precipitation for the day. Since we already know the schema, we will infer the schema using reflection.

You will first need to define the SparkSQL context. Do so by creating it from an existing SparkContext. Type in:

In [39]:
sqlContext = pyspark.sql.SQLContext(sc)

Convert RDD to `Row` objects for each row

In [40]:
weatherRdd.map(lambda r: f"{r[0]} {r[1]} {r[2]}").take(10)

['"2013-01-01" 1 0',
 '"2013-01-02" -2 0',
 '"2013-01-03" -2 0',
 '"2013-01-04" 1 0',
 '"2013-01-05" 3 0',
 '"2013-01-06" 4 0',
 '"2013-01-07" 5 0',
 '"2013-01-08" 6 0',
 '"2013-01-09" 7 0',
 '"2013-01-10" 7 0']

In [41]:
weatherRows = weatherRdd.map(lambda r: pyspark.sql.Row(date=r[0], temp=r[1], precipitation=r[2]))
weatherRows.take(10)

[Row(date='"2013-01-01"', precipitation='0', temp='1'),
 Row(date='"2013-01-02"', precipitation='0', temp='-2'),
 Row(date='"2013-01-03"', precipitation='0', temp='-2'),
 Row(date='"2013-01-04"', precipitation='0', temp='1'),
 Row(date='"2013-01-05"', precipitation='0', temp='3'),
 Row(date='"2013-01-06"', precipitation='0', temp='4'),
 Row(date='"2013-01-07"', precipitation='0', temp='5'),
 Row(date='"2013-01-08"', precipitation='0', temp='6'),
 Row(date='"2013-01-09"', precipitation='0', temp='7'),
 Row(date='"2013-01-10"', precipitation='0', temp='7')]

Turn RDD into data frame and register as table

In [42]:
weatherDf = sqlContext.createDataFrame(weatherRows)
sqlContext.registerDataFrameAsTable(weatherDf, "weather")

At this point, you are ready to create and run some queries on the DatatFrame. You want to get a list of the hottest dates with some precipitation. Type in:

In [43]:
df = sqlContext.sql("SELECT * FROM weather WHERE precipitation > 0.0 ORDER BY temp DESC")
df.collect()

[Row(date='"2013-01-30"', precipitation='1.02', temp='9'),
 Row(date='"2013-03-31"', precipitation='2.03', temp='9'),
 Row(date='"2013-11-27"', precipitation='50.29', temp='9'),
 Row(date='"2013-01-14"', precipitation='2.29', temp='8'),
 Row(date='"2013-01-31"', precipitation='22.86', temp='8'),
 Row(date='"2013-01-12"', precipitation='0.51', temp='7'),
 Row(date='"2013-04-12"', precipitation='16', temp='7'),
 Row(date='"2013-01-11"', precipitation='13.97', temp='6'),
 Row(date='"2013-01-29"', precipitation='1.52', temp='6'),
 Row(date='"2013-02-27"', precipitation='39.62', temp='6'),
 Row(date='"2013-11-12"', precipitation='0.76', temp='6'),
 Row(date='"2013-02-19"', precipitation='3.81', temp='5'),
 Row(date='"2013-02-24"', precipitation='0.25', temp='5'),
 Row(date='"2013-02-11"', precipitation='12.45', temp='4'),
 Row(date='"2013-02-13"', precipitation='0.76', temp='4'),
 Row(date='"2013-02-23"', precipitation='6.6', temp='4'),
 Row(date='"2013-02-26"', precipitation='3.56', temp='

## Creating a Spark application using MLlib

In this section, Spark will be used to acquire the K-Means clustering for drop-off latitudes and longitudes of taxis for 3 clusters. The sample data contains a subset of taxi trips with hack license, medallion, pickup date/time, drop off date/time, pickup/drop off latitude/longitude, passenger count, trip distance, trip time and other information. As such, this may give a good indication of where to best to hail a cab.

Remember, this is only a subset of the file that you used in a previous exercise. If you ran this exercise on the full dataset, it would take a long time as we are only running on a test environment with limited resources.

Import the needed packages for K-Means algorithm and Vector packages:

In [None]:
#import org.apache.spark.mllib.clustering.KMeans
#import org.apache.spark.mllib.linalg.Vectors

Create an RDD

In [5]:
taxiFile = sc.textFile("nyctaxisub10000.csv.gz")

Determine the number of rows in taxiFile.

In [6]:
taxiFile.count()

10000

Cleanse the data.

In [7]:
taxiData = taxiFile.filter(lambda l: "2013" in l) \
    .filter(lambda l: l.split(",")[3]) \
    .filter(lambda l: l.split(",")[4])

The first filter limits the rows to those that occurred in the year 2013. This will also remove any header in the file. The third and fourth columns contain the drop off latitude and longitude. The transformation will throw exceptions if these values are empty.

Do another count to see what was removed.

In [8]:
taxiData.count()

9999

In this case, if we had used the full set of data, it would have filtered out a great many more lines.

To fence the area roughly to New York City use this command:

In [9]:
taxiFence = taxiData \
    .filter(lambda l: float(l.split(",")[3]) > 40.70) \
    .filter(lambda l: float(l.split(",")[3]) < 40.86) \
    .filter(lambda l: float(l.split(",")[4]) > -74.02) \
    .filter(lambda l: float(l.split(",")[4]) < -73.93)

Determine how many are left in taxiFence:

In [10]:
taxiFence.count()

8208

Approximately, 43,354 rows were dropped since these drop-off points are outside of New York City.

Create Vectors with the latitudes and longitudes that will be used as input to the K-Means algorithm.

In [19]:
import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

taxi = taxiFence \
    .map(lambda l: [float(f) for f in l.split(",")[3:5]]) \
    .map(lambda l: np.array(l))
print(taxi.take(10))

[array([ 40.703346, -74.01442 ]), array([ 40.819096, -73.946747]), array([ 40.727718, -73.994286]), array([ 40.764305, -73.95446 ]), array([ 40.729446, -73.977798]), array([ 40.800243, -73.965378]), array([ 40.82354 , -73.949409]), array([ 40.746716, -73.989937]), array([ 40.788681, -73.946732]), array([ 40.764877, -73.968323])]


Do some math stuff

See: https://spark.apache.org/docs/1.6.1/mllib-clustering.html

In [25]:
from pyspark.mllib.clustering import KMeans, KMeansModel

iterationCount = 10
clusterCount = 3

clusters = KMeans.train(taxi, clusterCount, iterationCount)
clusters.clusterCenters[:10]

[array([ 40.75727459, -73.98054457]),
 array([ 40.72513583, -73.99494704]),
 array([ 40.78803497, -73.95648675])]

Now we know the map co-ordinates. Not surprisingly, the second point is between the Theater District and Grand Central. The third point is in The Village, NYU, Soho and Little Italy area. The first point is the Upper East Side, presumably where people are more likely to take cabs than subways.



## Creating a Spark application using Spark Streaming

This section focuses on Spark Streams, an easy to build, scalable, stateful (e.g. sliding windows) stream processing library. Streaming jobs are written the same way Spark batch jobs are coded and support Java, Scala and Python. In this exercise, taxi trip data will be streamed using a socket connection and then analyzed to provide a summary of number of passengers by taxi vendor. This will be implemented in the Spark shell using Scala.

There are two relevant files for this section. The first one is the `nyctaxi100.csv` which will serve as the source of the stream. The other file is a python file, `taxistreams.py`, which will feed the csv file through a socket connection to simulate a stream.

Run on command line: `python taxistreams.py`
Verify data is being sent: `nc 127.0.0.1 7777`

Once started, the program will bind and listen to the localhost socket 7777. When a connection is made, it will read ‘nyctaxi100.csv’ and send across the socket. The sleep is set such that one line will be sent every 0.5 seconds, or 2 rows a second. This was intentionally set to a high value to make it easier to view the data during execution.

Create the StreamingContext by using the existing SparkContext (sc). It will be using a 1 second batch interval, which means the stream is divided to 1 second batches and each batch becomes a RDD. This is intentional to make it easier to read the data during execution.

In [3]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

Create the socket stream that connects to the localhost socket 7777. This matches the port that the Python script is listening on. Each batch from the Stream be a lines RDD.

In [4]:
taxiLines = ssc.socketTextStream("127.0.0.1", 7777)

Next, put in the business logic to split up the lines on each comma and mapping pass(15), which is the vendor, and pass(7), which is the passenger count. Then this is reduced by key resulting in a summary of number of passengers by vendor.

In [5]:
passCount = taxiLines \
    .map(lambda l: l.split(",")) \
    .map(lambda p: (p[15], int(p[7]))) \
    .reduceByKey(lambda a, b: a + b)

Print out to the console:

In [6]:
passCount.pprint()

The next two line starts the stream. 

In [7]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2019-12-29 11:02:01
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:02
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:03
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:04
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:05
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:06
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:07
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:08
-------------------------------------------

-------------------------------------------
Time: 2019-12-29 11:02:09
----------

KeyboardInterrupt: 

It will take a few cycles for the connection to be recognized, and then the data is sent. In this case, 2 rows per second of taxi trip data is receive in a 1 second batch interval.

In the Python terminal, the contents of the file are printed as they are streamed.

**Note: TO STOP THE STREAM PLEASE INTERRUPT THE KERNEL IN BOTH THE OTHER PYTHON NOTEBOOK AND THIS NOTEBOOK. THEN RESTART THIS NOTEBOOK'S KERNEL TO CONTINUE ONTO THE GRAPHX APPLICATION**

This is just a simple example showing how you can take streaming data into Spark and do some type of processing on it. In the case here, the taxi and the number of passengers was extracted from the data stream.

## Creating a Spark application using GraphX

GraphX isn't supported by Spark in python and the recommendation is to use GraphFrames instead. GraphX is to RDD as GraphFrames is to DatatFrames

See: 
1. https://issues.apache.org/jira/browse/SPARK-3789?focusedCommentId=15172538&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15172538
2. https://graphframes.github.io/graphframes


Users.txt is a set of users and followers is the relationship between the users. Take a look at the contents of these two files.

In [3]:
users = sc.textFile("users.txt")
print(f"Users: {users.count()} {users.take(1)}")
      
followers = sc.textFile("followers.txt")
print(f"Followers: {followers.count()} {users.take(1)}")

Users: 7 ['1,BarackObama,Barack Obama']
Followers: 8 ['1,BarackObama,Barack Obama']


Import packages

In [4]:
from graphframes import *
from pyspark.sql import SQLContext, Row

Create the users RDD and parse into tuples of user id and attribute list:

In [5]:
sqlContext = SQLContext(sc)

usersRows = users \
    .map(lambda l: l.split(",")) \
    .map(lambda l: Row(id=l[0], attr=l[1:], attr_size=len(l[1:])))

print(usersRows.take(10))
usersDf = sqlContext.createDataFrame(usersRows)

[Row(attr=['BarackObama', 'Barack Obama'], attr_size=2, id='1'), Row(attr=['ladygaga', 'Goddess of Love'], attr_size=2, id='2'), Row(attr=['jeresig', 'John Resig'], attr_size=2, id='3'), Row(attr=['justinbieber', 'Justin Bieber'], attr_size=2, id='4'), Row(attr=['matei_zaharia', 'Matei Zaharia'], attr_size=2, id='6'), Row(attr=['odersky', 'Martin Odersky'], attr_size=2, id='7'), Row(attr=['anonsys'], attr_size=1, id='8')]


Parse the edge data, which is already in userId -> userId format. Must use `src` and `dst` column names

In [6]:
followersRows = followers \
    .map(lambda l: l.split(" ")) \
    .map(lambda l: Row(src=l[0], dst=l[1]))

followersRows.take(10)
followersDf = sqlContext.createDataFrame(followersRows)

Attach the user attributes

In [7]:
graph = GraphFrame(usersDf, followersDf)
graph.vertices.show()
graph.edges.show()

+--------------------+---------+---+
|                attr|attr_size| id|
+--------------------+---------+---+
|[BarackObama, Bar...|        2|  1|
|[ladygaga, Goddes...|        2|  2|
|[jeresig, John Re...|        2|  3|
|[justinbieber, Ju...|        2|  4|
|[matei_zaharia, M...|        2|  6|
|[odersky, Martin ...|        2|  7|
|           [anonsys]|        1|  8|
+--------------------+---------+---+

+---+---+
|dst|src|
+---+---+
|  1|  2|
|  1|  4|
|  2|  1|
|  3|  6|
|  3|  7|
|  6|  7|
|  7|  6|
|  7|  3|
+---+---+



Restrict the graph to users with usernames and names:

In [8]:
subgraph = graph.filterVertices("attr_size > 1")
subgraph.vertices.show()
subgraph.edges.show()

+--------------------+---------+---+
|                attr|attr_size| id|
+--------------------+---------+---+
|[BarackObama, Bar...|        2|  1|
|[ladygaga, Goddes...|        2|  2|
|[jeresig, John Re...|        2|  3|
|[justinbieber, Ju...|        2|  4|
|[matei_zaharia, M...|        2|  6|
|[odersky, Martin ...|        2|  7|
+--------------------+---------+---+

+---+---+
|dst|src|
+---+---+
|  7|  3|
|  7|  6|
|  3|  7|
|  3|  6|
|  6|  7|
|  1|  4|
|  1|  2|
|  2|  1|
+---+---+



Compute the PageRank

In [9]:
pagerankGraph = subgraph.pageRank(0.15, maxIter=1)

Get the attributes of the top pagerank users

In [13]:
pagerankGraph.vertices.show()
pagerankGraph.edges.show()

+--------------------+---------+---+------------------+
|                attr|attr_size| id|          pagerank|
+--------------------+---------+---+------------------+
|[BarackObama, Bar...|        2|  1|1.8499999999999999|
|[jeresig, John Re...|        2|  3|               1.0|
|[ladygaga, Goddes...|        2|  2|               1.0|
|[justinbieber, Ju...|        2|  4|              0.15|
|[odersky, Martin ...|        2|  7|1.4249999999999998|
|[matei_zaharia, M...|        2|  6|             0.575|
+--------------------+---------+---+------------------+

+---+---+------+
|dst|src|weight|
+---+---+------+
|  2|  1|   1.0|
|  3|  6|   0.5|
|  6|  7|   0.5|
|  7|  3|   1.0|
|  1|  4|   1.0|
|  3|  7|   0.5|
|  1|  2|   1.0|
|  7|  6|   0.5|
+---+---+------+



Print the line out:

In [18]:
pagerankGraph.vertices.sort(pagerankGraph.vertices.pagerank.desc()).take(10)

[Row(attr=['BarackObama', 'Barack Obama'], attr_size=2, id='1', pagerank=1.8499999999999999),
 Row(attr=['odersky', 'Martin Odersky'], attr_size=2, id='7', pagerank=1.4249999999999998),
 Row(attr=['jeresig', 'John Resig'], attr_size=2, id='3', pagerank=1.0),
 Row(attr=['ladygaga', 'Goddess of Love'], attr_size=2, id='2', pagerank=1.0),
 Row(attr=['matei_zaharia', 'Matei Zaharia'], attr_size=2, id='6', pagerank=0.575),
 Row(attr=['justinbieber', 'Justin Bieber'], attr_size=2, id='4', pagerank=0.15)]

<div class="alert alert-success alertsuccess" style="margin-top: 20px">
    <strong>Tip</strong>: Enjoyed using Jupyter notebooks with Spark? Get yourself a free 
    <a href="http://cocl.us/DSX_on_Cloud">IBM Cloud</a> account where you can use Data Science Experience notebooks
    and have <em>two</em> Spark executors for free!
</div>

## Summary

Having completed this exercise, you should have some familiarity with using the Spark libraries. In particular, you use Spark SQL to effectively query data inside of Spark. You used Spark Streaming to process incoming streams of batch data. You used Spark's MLlib to compute the *k*-means algorithm to find the best place to hail a cab. Finally, you used Spark's GraphX library to perform and parallel graph calculations on a dataset to find the attributes of the top users.

This notebook is part of the free course on **Cognitive Class** called *Spark Fundamentals I*. If you accessed this notebook outside the course, you can take this free self-paced course, online by going to: http://cocl.us/Spark_Fundamentals_I

### About the Authors:  
Hi! It's Alex Aklson, one of the authors of this notebook. I hope you found this lab educational! There is much more to learn about Spark but you are well on your way. Feel free to connect with me if you have any questions.
<hr>