<a href="https://cognitiveclass.ai"><img src = "https://ibm.box.com/shared/static/9gegpsmnsoo25ikkbl4qzlvlyjbgxs5x.png" width = 400> </a>

<h1 align = "center"> Spark Fundamentals 1 - Introduction to Spark </h1>
<h2 align = "center"> Lab 2a. Scala - Working with RDD operations </h2>
<br align = "left">

**Related free online courses:**  

Related courses can be found in the following learning paths:

- [Spark Fundamentals path](https://cognitiveclass.ai/learn/spark/)
- [Big Data Fundamentals path](https://cognitiveclass.ai/learn/big-data/)

<img src = "http://spark.apache.org/images/spark-logo.png", height = 100, align = 'left'>

### Starting with Spark using Scala

### Run the following lines of code to get the data

In [1]:
// download the required module to run shell commands within the notebook
import sys.process._

In [2]:
// download the data from the IBM Server
// this may take ~30 seconds depending on your internet speed
// "wget --quiet https://ibm.box.com/shared/static/1c65hfqjxyxpdkts42oab8i8mzxbpvc8.zip"!
// "wget https://ibm.box.com/shared/static/1c65hfqjxyxpdkts42oab8i8mzxbpvc8.zip"!
"wget https://ibm.box.com/shared/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip" !

//println("Data Downloaded!")

--2019-12-06 16:40:16--  https://ibm.box.com/shared/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip
Resolving ibm.box.com (ibm.box.com)... 107.152.27.197, 107.152.26.197
Connecting to ibm.box.com (ibm.box.com)|107.152.27.197|:443... connected.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: /public/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip [following]
--2019-12-06 16:40:17--  https://ibm.box.com/public/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip
Reusing existing connection to ibm.box.com:443.
HTTP request sent, awaiting response... 301 Moved Permanently
Location: https://ibm.ent.box.com/public/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip [following]
--2019-12-06 16:40:17--  https://ibm.ent.box.com/public/static/j8skrriqeqw66f51iyz911zyqai64j2g.zip
Resolving ibm.ent.box.com (ibm.ent.box.com)... 107.152.26.211, 107.152.27.211
Connecting to ibm.ent.box.com (ibm.ent.box.com)|107.152.26.211|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: h



0

In [3]:
sc.version

2.3.3

In [4]:
"ls *zip" !

ls: cannot access *zip: No such file or directory




2

In [5]:
// unzip the folder's content into "resources" directory
// this may take ~30 seconds depending on your internet speed
"unzip -q -o -d ./resources j8skrriqeqw66f51iyz911zyqai64j2g.zip" !

println("Data Extracted!")



Data Extracted!


In [6]:
// list the extracted files
"ls -1 ./resources/LabData/" !

README.md
followers.txt
notebook.log
nyctaxi.csv
nyctaxi100.csv
nyctaxisub.csv
nycweather.csv
pom.xml
taxistreams.py
users.txt




0

Now we are going to create an RDD file from the file README. This is created using the spark context ".textFile" just as in the previous lab. As we know the initial operation is a transformation, so nothing actually happens. We're just telling it that we want to create a readme RDD. 

Run the code in the following cell. This was an RDD transformation, thus it returned a pointer to a RDD, which we have named as readme. 

In [7]:
"wget https://raw.githubusercontent.com/apache/spark/master/README.md -o README.tmp.md" !

val readme = sc.textFile("./resources/LabData/README.md")

readme.collect()

readme = ./resources/LabData/README.md MapPartitionsRDD[1] at textFile at <console>:32




Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", <http://spark.apache.org/>, "", "", ## Online Documentation, "", You can find the latest Spark documentation, including a programming, guide, on the [project web page](http://spark.apache.org/documentation.html), ...

Let’s perform some RDD actions on this text file. Count the number of items in the RDD using this command:

In [8]:
readme.count()

98

Let’s run another action. Run this command to find the first item in the RDD:

In [9]:
readme.first()

# Apache Spark

Now let’s try a transformation. Use the filter transformation to return a new RDD with a subset of the items in the file. Type in this command:

In [10]:
val linesWithSpark = readme.filter(line => line.contains("Spark"))
linesWithSpark.count()

linesWithSpark = MapPartitionsRDD[2] at filter at <console>:32


18

Again, this returned a pointer to a RDD with the results of the filter transformation.

You can even chain together transformations and actions. To find out how many lines contains the word “Spark”, type in:

In [11]:
readme.filter(line => line.contains("Spark")).count()

18

### More on RDD Operations

This section builds upon the previous section. In this section, you will see that RDD can be used for more complex computations. You will find the line from that readme file with the most words in it.

In [12]:
readme.map(line => line.split(" ").size).
                    reduce((a, b) => if (a > b) a else b)

14

There are two parts to this. The first maps a line to an integer value, the number of words in that line. In the second part reduce is called to find the line with the most words in it. The arguments to map and reduce are Scala function literals (closures), but you can use any language feature or Scala/Java library.

In the next step, you use the Math.max() function to show that you can indeed use a Java library instead.
Import in the java.lang.Math library:

In [13]:
import java.lang.Math

Now run with the max function:

In [14]:
readme.map(line => line.split(" ").size).
        reduce((a, b) => Math.max(a, b))

14

Spark has a MapReduce data flow pattern. We can use this to do a word count on the readme file.

In [15]:
val wordCounts = readme.flatMap(line => line.split(" ")).
                        map(word => (word, 1)).
                        reduceByKey((a,b) => a + b)

wordCounts = ShuffledRDD[8] at reduceByKey at <console>:35


ShuffledRDD[8] at reduceByKey at <console>:35

Here we combined the flatMap, map, and the reduceByKey functions to do a word count of each word in the readme file.

To collect the word counts, use the collect action.

#### It should be noted that the collect function brings all of the data into the driver node. For a small dataset, this isacceptable but, for a large dataset this can cause an Out Of Memory error. It is recommended to use collect() for testing only. The safer approach is to use the take() function e.g. take(n).foreach(println)

In [27]:
wordCounts.collect().foreach(println)

(package,1)
(this,1)
(Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1)
(Because,1)
(Python,2)
(cluster.,1)
(its,1)
([run,1)
(general,2)
(have,1)
(pre-built,1)
(locally.,1)
(locally,2)
(changed,1)
(sc.parallelize(1,1)
(only,1)
(several,1)
(This,2)
(basic,1)
(Configuration,1)
(learning,,1)
(documentation,3)
(YARN,,1)
(graph,1)
(Hive,2)
(first,1)
(["Specifying,1)
("yarn",1)
(page](http://spark.apache.org/documentation.html),1)
([params]`.,1)
(application,1)
([project,2)
(prefer,1)
(SparkPi,2)
(<http://spark.apache.org/>,1)
(engine,1)
(version,1)
(file,1)
(documentation,,1)
(MASTER,1)
(example,3)
(distribution.,1)
(are,1)
(params,1)
(scala>,1)
(DataFrames,,1)
(provides,1)
(refer,2)
(configure,1)
(Interactive,2)
(R,,1)
(can,6)
(build,3)
(when,1)
(easiest,1)
(Apache,1)
(systems.,1)
(Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html),1)
(works,1)
(how,2)
(package.,1)
(1000).count(),1)
(Note,1)
(Data.,1)

You can also do:


 println(wordCounts.collect().mkString("\n"))
 
 println(wordCounts.collect().deep)


### <span style="color: red">YOUR TURN:</span> 

#### In the cell below, determine what is the most frequent CHARACTER in the README, and how many times was it used?

In [16]:
// WRITE YOUR CODE BELOW
readme.flatMap(line => line.split(" ")).
                        map(word => (word, 1)).
                        reduceByKey((a,b) => a + b).
                        reduce((a, b) => if (a._2 > b._2) a else b)

("",67)

Highlight text for answer:

<textarea rows="6" cols="80" style="color: white">
val wordCounts = readme.flatMap(line => line.split(" ")).
                        map(word => (word, 1)).
                        reduceByKey((a,b) => a + b).
                        reduce((a, b) => if (a._2 > b._2) a else b)

println(wordCounts)
</textarea>

## Analysing a log file

First, let's analyze a log file in the current directory.

In [17]:
val logFile = sc.textFile("./resources/LabData/notebook.log")

logFile = ./resources/LabData/notebook.log MapPartitionsRDD[13] at textFile at <console>:31


./resources/LabData/notebook.log MapPartitionsRDD[13] at textFile at <console>:31

Filter out the lines that contains INFO (or ERROR, if the particular log has it)

In [18]:
val info = logFile.filter(line => line.contains("INFO"))

info = MapPartitionsRDD[14] at filter at <console>:33


MapPartitionsRDD[14] at filter at <console>:33

Count the lines:

In [19]:
info.count()

13438

Count the lines with Spark in it by combining transformation and action.

In [20]:
info.filter(line => line.contains("spark")).count()

156

Fetch those lines as an array of Strings

In [21]:
info.filter(line => line.contains("spark")).collect() foreach println

15/10/14 14:29:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:53333]
15/10/14 14:29:23 INFO Utils: Successfully started service 'sparkDriver' on port 53333.
15/10/14 14:29:23 INFO DiskBlockManager: Created local directory at /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/blockmgr-c142f2f1-ebb6-4612-945b-0a67d156230a
15/10/14 14:29:23 INFO HttpFileServer: HTTP File server directory is /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/httpd-ed3f4ab0-7218-48bc-9d8a-3981b1cfe574
15/10/14 14:29:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35726.
15/10/15 15:33:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:47412]
15/10/15 15:33:42 INFO Utils: Successfully started service 'sparkDriver' on port 47412.
15/10/15 15:33:42 INFO DiskBlockManager: Created local directory at /tmp/spark-fc035223-3b43-43d1-8d7d-a22dda6b0d46/blockmgr-aad4e583

Remember that we went over the DAG. It is what provides the fault tolerance in Spark. Nodes can re-compute its state by borrowing the DAG from a neighboring node. You can view the graph of an RDD using the toDebugString command.

In [22]:
println(info.toDebugString)

(2) MapPartitionsRDD[14] at filter at <console>:33 []
 |  ./resources/LabData/notebook.log MapPartitionsRDD[13] at textFile at <console>:31 []
 |  ./resources/LabData/notebook.log HadoopRDD[12] at textFile at <console>:31 []


## Joining RDDs

Next, you are going to create RDDs for the README and the POM file in the current directory.

In [23]:
val readmeFile = sc.textFile("./resources/LabData/README.md")
val pom = sc.textFile("./resources/LabData/pom.xml")

readmeFile = ./resources/LabData/README.md MapPartitionsRDD[18] at textFile at <console>:31
pom = ./resources/LabData/pom.xml MapPartitionsRDD[20] at textFile at <console>:32


./resources/LabData/pom.xml MapPartitionsRDD[20] at textFile at <console>:32

How many Spark keywords are in each file?

In [24]:
println(readmeFile.filter(line => line.contains("Spark")).count())
println(pom.filter(line => line.contains("Spark")).count())

18
2


Now do a WordCount on each RDD so that the results are (K,V) pairs of (word,count)

In [25]:
val readmeCount = readmeFile.
                    flatMap(line => line.split(" ")).
                    map(word => (word, 1)).
                    reduceByKey(_ + _)

val pomCount = pom.
                flatMap(line => line.split(" ")).
                map(word => (word, 1)).
                reduceByKey(_ + _)

readmeCount = ShuffledRDD[25] at reduceByKey at <console>:37
pomCount = ShuffledRDD[28] at reduceByKey at <console>:42


ShuffledRDD[28] at reduceByKey at <console>:42

To see the array for either of them, just call the collect function on it.

In [26]:
println("Readme Count\n")
readmeCount.collect() foreach println

Readme Count

(package,1)
(this,1)
(Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1)
(Because,1)
(Python,2)
(cluster.,1)
([run,1)
(its,1)
(YARN,,1)
(have,1)
(general,2)
(pre-built,1)
(locally,2)
(changed,1)
(locally.,1)
(sc.parallelize(1,1)
(only,1)
(Configuration,1)
(This,2)
(first,1)
(basic,1)
(documentation,3)
(learning,,1)
(graph,1)
(Hive,2)
(several,1)
(["Specifying,1)
("yarn",1)
(page](http://spark.apache.org/documentation.html),1)
([params]`.,1)
(application,1)
([project,2)
(prefer,1)
(SparkPi,2)
(<http://spark.apache.org/>,1)
(engine,1)
(version,1)
(file,1)
(documentation,,1)
(MASTER,1)
(example,3)
(distribution.,1)
(are,1)
(params,1)
(scala>,1)
(systems.,1)
(provides,1)
(refer,2)
(configure,1)
(Interactive,2)
(R,,1)
(DataFrames,,1)
(can,6)
(build,3)
(when,1)
(Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html),1)
(how,2)
(easiest,1)
(works,1)
(Apache,1)
(package.,1)
(1000).count(),1)
(Not

In [27]:
println("Pom Count\n")
pomCount.collect() foreach println

Pom Count

(<id>kinesis-asl</id>,1)
(Unless,1)
(this,3)
(under,4)
(implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer",1)
(<scope>provided</scope>,8)
(</properties>,6)
(version="1.0",1)
(<artifactId>maven-install-plugin</artifactId>,1)
(<plugins>,1)
(express,1)
(</transformer>,2)
(<version>3.2.0</version>,1)
(we,1)
(WITHOUT,1)
(<groupId>commons-io</groupId>,1)
(<artifactId>commons-math</artifactId>,1)
("AS,1)
(<artifactId>compress-lzf</artifactId>,1)
(<scope>${hbase.deps.scope}</scope>,6)
(<artifactId>jersey-json</artifactId>,1)
(<id>flume-provided</id>,1)
(IS",1)
(already,1)
(...-->,1)
(ANY,1)
(disable,1)
(<configuration>,3)
(<packaging>jar</packaging>,1)
(2.0,1)
(<groupId>org.scala-lang</groupId>,1)
(file,3)
(<shadedArtifactAttached>false</shadedArtifactAttached>,1)
(<artifactSet>,1)
(<version>${hbase.version}</version>,7)
(<groupId>commons-codec</groupId>,1)
(are,1)
(<version>${project.version}</version>,12)
(<artifactId>hadoop-client</artifactId>,1)

Now let's join these two RDDs together to get a collective set. The join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W)). Let's join these two counts together and then cache it.

In [28]:
val joined = readmeCount.join(pomCount)
joined.cache()

joined = MapPartitionsRDD[31] at join at <console>:37


MapPartitionsRDD[31] at join at <console>:37

Let's see what's in the joined RDD.

In [29]:
joined.collect.foreach(println)

(file,(1,3))
(are,(1,1))
(Apache,(1,2))
(is,(6,2))
(uses,(1,1))
(this,(1,3))
(one,(2,1))
(with,(4,2))
(,(67,2931))
(The,(1,2))
(the,(21,10))
(not,(1,1))
(be,(2,1))
(on,(6,1))
(at,(2,1))
(use,(3,1))
(or,(3,3))
(of,(5,2))
(Spark,(14,1))
(following,(2,1))
(which,(2,1))
(See,(1,2))
(for,(12,2))
(an,(3,1))
(You,(3,2))
(in,(5,3))
(you,(4,1))
(that,(3,1))
(a,(10,1))
(to,(14,5))
(and,(10,1))


Let's combine the values together to get the total count. The operations in this command tells Spark to combine the values from (K,V) and (K,W) to give us(K, V+W). The ._ notation is a way to access the value on that particular index of the key value pair.

In [30]:
val joinedSum = joined.map(k => (k._1, (k._2)._1 + (k._2)._2))
joinedSum.collect() foreach println

(file,4)
(are,2)
(Apache,3)
(is,8)
(uses,2)
(this,4)
(one,3)
(with,6)
(,2998)
(The,3)
(the,31)
(not,2)
(be,3)
(on,7)
(at,3)
(use,4)
(or,6)
(of,7)
(Spark,15)
(following,3)
(which,3)
(See,3)
(for,14)
(an,4)
(You,5)
(in,8)
(you,5)
(that,4)
(a,11)
(to,19)
(and,11)


joinedSum = MapPartitionsRDD[32] at map at <console>:39


MapPartitionsRDD[32] at map at <console>:39

To check if it is correct, print the first five elements from the joined and the joinedSum RDD

In [31]:
println("Joined Individial\n")
joined.take(5).foreach(println)

println("\n\nJoined Sum\n")
joinedSum.take(5).foreach(println)

Joined Individial

(file,(1,3))
(are,(1,1))
(Apache,(1,2))
(is,(6,2))
(uses,(1,1))


Joined Sum

(file,4)
(are,2)
(Apache,3)
(is,8)
(uses,2)


## Shared variables

Broadcast variables allow the programmer to keep a read-only variable cached on each worker node rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).

Read more here: [http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)

Let's create a broadcast variable:

In [32]:
val broadcastVar = sc.broadcast(Array(1,2,3))

broadcastVar = Broadcast(26)


Broadcast(26)

To get the value, type in:

In [33]:
broadcastVar.value

Array(1, 2, 3)

Accumulators are variables that can only be added through an associative operation. It is used to implement counters and sum efficiently in parallel. Spark natively supports numeric type accumulators and standard mutable collections. Programmers can extend these for new types. Only the driver can read the values of the accumulators. The workers can only invoke it to increment the value.

Create the accumulator variable. Type in:

In [34]:
val accum = sc.accumulator(0)

accum = 0




0

Next parallelize an array of four integers and run it through a loop to add each integer value to the accumulator variable. Type in:

In [35]:
sc.parallelize(Array(1,2,3,4)).foreach(x => accum += x)

To get the current value of the accumulator variable, type in:

In [36]:
accum.value

10

You should get a value of 10.
This command can only be invoked on the driver side. The worker nodes can only increment the accumulator.

## Key-value pairs

You have already seen a bit about key-value pairs in the Joining RDD section. Here is a brief example of how to create a key-value pair and access its values. Remember that certain operations such as map and reduce only works on key-value pairs.

Create a key-value pair of two characters. Type in:

In [37]:
val pair = ('a', 'b')

pair = (a,b)


(a,b)

To access the value of the first index using the *._1* method and *._2* method for the 2nd.

In [38]:
pair._1

a

In [39]:
pair._2

b

## Sample Application

In this section, you will be using a subset of a data for taxi trips that will determine the top 10 medallion numbers based on the number of trips.

In [40]:
val taxi = sc.textFile("./resources/LabData/nyctaxi.csv")

taxi = ./resources/LabData/nyctaxi.csv MapPartitionsRDD[35] at textFile at <console>:31


./resources/LabData/nyctaxi.csv MapPartitionsRDD[35] at textFile at <console>:31

To view the five rows of content, invoke the take function. Type in:

In [41]:
taxi.take(5).foreach(println)

"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"
"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"
"2a80cfaa425dcec0861e02ae44354500","1-b72234b58a7b0018a1ec5d2ea0797e32","2013-01-11 04:28:00",+4.08190960000000E+001,-7.39467470000000E+001,"64CE1B03FDE343BB8DFB512123A525A4","60150AA39B2F654ED6F0C3AF8174A48A",1,"2013-01-11 04:07:00",+4.07280540000000E+001,-7.40020370000000E+001,1,,+8.53000000000000E+000,1260,"VTS"
"29b3f4a30dea6688d4c289c96758d87e","1-387ec30eac5abda89d2abefdf947b2c1","2013-01-11 22:02:00",+4.07277180000000E+00

Note that the first line is the headers. Normally, you would want to filter that out, but since it will not affect our results, we can leave it in.

To parse out the values, including the medallion numbers, you need to first create a new RDD by splitting the lines of the RDD using the comma as the delimiter. Type in:

In [42]:
val taxiParse = taxi.map(line=>line.split(","))

taxiParse = MapPartitionsRDD[36] at map at <console>:33


MapPartitionsRDD[36] at map at <console>:33

Now create the key-value pairs where the key is the medallion number and the value is 1. We use this model to later sum up all the keys to find out the number of trips a particular taxi took and in particular, will be able to see which taxi took the most trips. Map each of the medallions to the value of one. Type in:

In [43]:
val taxiMedKey = taxiParse.map(vals=>(vals(6), 1))

taxiMedKey = MapPartitionsRDD[37] at map at <console>:35


MapPartitionsRDD[37] at map at <console>:35

vals(6) corresponds to the column where the medallion key is located

Next use the reduceByKey function to count the number of occurrence for each key.

In [44]:
val taxiMedCounts = taxiMedKey.reduceByKey((v1,v2)=>v1+v2)

taxiMedCounts.take(5).foreach(println)

("A9907052C8BBDED5079252EFE6177ECF",195)
("26DE3DC2FCBB37A233BE231BA6F7364E",173)
("BA60553FAA4FE1A36BBF77B4C10D3003",171)
("67DD83EA2A67933B2724269121BF45BB",196)
("AD57F6329C387766186E1B3838A9CEDD",214)


taxiMedCounts = ShuffledRDD[38] at reduceByKey at <console>:37


ShuffledRDD[38] at reduceByKey at <console>:37

Finally, the values are swapped so they can be ordered in descending order and the results are presented correctly.

In [45]:
for (pair <-taxiMedCounts.map(_.swap).top(10)) println("Taxi Medallion %s had %s Trips".format(pair._2, pair._1))

Taxi Medallion "FE4C521F3C1AC6F2598DEF00DDD43029" had 415 Trips
Taxi Medallion "F5BB809E7858A669C9A1E8A12A3CCF81" had 411 Trips
Taxi Medallion "8CE240F0796D072D5DCFE06A364FB5A0" had 406 Trips
Taxi Medallion "0310297769C8B049C0EA8E87C697F755" had 402 Trips
Taxi Medallion "B6585890F68EE02702F32DECDEABC2A8" had 399 Trips
Taxi Medallion "33955A2FCAF62C6E91A11AE97D96C99A" had 395 Trips
Taxi Medallion "4F7C132D3130970CFA892CC858F5ECB5" had 391 Trips
Taxi Medallion "78833E177D45E4BC520222FFBBAC5B77" had 383 Trips
Taxi Medallion "E097412FE23295A691BEEE56F28FB9E2" had 380 Trips
Taxi Medallion "C14289566BAAD9AEDD0751E5E9C73FBD" had 377 Trips


While each step above was processed one line at a time, you can just as well process everything on one line:

In [46]:
val taxiMedCountsOneLine = taxi.map(line=>line.split(',')).map(vals=>(vals(6),1)).reduceByKey(_ + _)

taxiMedCountsOneLine = ShuffledRDD[43] at reduceByKey at <console>:33


ShuffledRDD[43] at reduceByKey at <console>:33

Run the same line as above to print the taxiMedCountsOneLine RDD.

In [47]:
for (pair <-taxiMedCountsOneLine.map(_.swap).top(10)) println("Taxi Medallion %s had %s Trips".format(pair._2, pair._1))

Taxi Medallion "FE4C521F3C1AC6F2598DEF00DDD43029" had 415 Trips
Taxi Medallion "F5BB809E7858A669C9A1E8A12A3CCF81" had 411 Trips
Taxi Medallion "8CE240F0796D072D5DCFE06A364FB5A0" had 406 Trips
Taxi Medallion "0310297769C8B049C0EA8E87C697F755" had 402 Trips
Taxi Medallion "B6585890F68EE02702F32DECDEABC2A8" had 399 Trips
Taxi Medallion "33955A2FCAF62C6E91A11AE97D96C99A" had 395 Trips
Taxi Medallion "4F7C132D3130970CFA892CC858F5ECB5" had 391 Trips
Taxi Medallion "78833E177D45E4BC520222FFBBAC5B77" had 383 Trips
Taxi Medallion "E097412FE23295A691BEEE56F28FB9E2" had 380 Trips
Taxi Medallion "C14289566BAAD9AEDD0751E5E9C73FBD" had 377 Trips


Let's cache the taxiMedCountsOneLine to see the difference caching makes. Run it with the logs set to INFO and you can see the output of the time it takes to execute each line. First, let's cache the RDD

In [None]:
taxiMedCountsOneLine.cache()

Next, you have to invoke an action for it to actually cache the RDD. Note the time it takes here (either empirically using the INFO log or just notice the time it takes)

In [None]:
taxiMedCountsOneLine.count()

Run it again to see the difference.

In [None]:
taxiMedCountsOneLine.count()

The bigger the dataset, the more noticeable the difference will be. In a sample file such as ours, the difference may be negligible.

<div class="alert alert-success alertsuccess" style="margin-top: 20px">
**Tip**: Enjoyed using Jupyter notebooks with Spark? Get yourself a free 
    <a href="http://cocl.us/DSX_on_Cloud">IBM Cloud</a> account where you can use Data Science Experience notebooks
    and have *two* Spark executors for free!
</div>

### Summary
Having completed this exercise, you should now be able to describe Spark’s primary data abstraction, understand how to create parallelized collections and external datasets, work with Resilient Distributed Dataset (RDD) operations, and utilize shared variables and key-value pairs.

This notebook is part of the free course on **Cognitive Class** called *Spark Fundamentals I*. If you accessed this notebook outside the course, you can take this free self-paced course, online by going to: https://cognitiveclass.ai/courses/what-is-spark/

### About the Authors:  
Hi! It's [Alex Aklson](https://www.linkedin.com/in/aklson/), one of the authors of this notebook. I hope you found this lab educational! There is much more to learn about Spark but you are well on your way. Feel free to connect with me if you have any questions.
<hr>