# Just Enough Spark - a Jupyter Notebook

You can execute each statement to generate the output, or choose any option in the "Cell" menu. Modify the statements to try new things. 


## An RDD is an immutable collection
* Most methods set up the execution graph for spark
* Action methods execute the graph
* partial results can be cached for reuse

*RDDs are construction with methods on the sparkContext (sc) object*

### RDDs can be created from files, Cassandra tables, Scala collections, and many other sources
Let's first look at creating an an rdd from a Scala object.  We use the parallelize function

In [2]:
val myrdd = sc.parallelize(Seq(4,5,6))

In [3]:
myrdd

ParallelCollectionRDD[2] at parallelize at <console>:17

Now filter out the even numbers

In [5]:
val evenNumbers = myrdd.filter( x => x % 2 == 0)
evenNumbers

MapPartitionsRDD[4] at filter at <console>:19

Note that nothing really happened - we set up the execution graph.  We'll use the *action* method *collect* to execute it and dump all of the results into an array.  

In [6]:
evenNumbers.collect

Array(4, 6)

## Creating RDDs from Cassandra Tables
* Can add a where clause to push down filter
* Creates and RDD of CassandraRow objects
* .as will map it to a case class or tuples for ease of use


In [9]:
val tracks = sc.cassandraTable("music","tracks_by_album")
tracks

CassandraTableScanRDD[6] at RDD at CassandraRDD.scala:15

In [10]:
tracks.take(2)

Array(CassandraRow{album_title: Duos For Violin and Cello, album_year: 2000, track_number: 1, album_genre: Classical, performer: Nigel Kennedy, track_title: Sonata for Violin and Cello - Allegro}, CassandraRow{album_title: Duos For Violin and Cello, album_year: 2000, track_number: 2, album_genre: Classical, performer: Nigel Kennedy, track_title: Sonata for Violin and Cello - Tres vif})

### get the album and track in a tuple.  This is the new syntax:

In [14]:
val albumTracks = sc.cassandraTable[(String,String)]("music","tracks_by_album").select("album_title","track_title")

The first 10 rows as tuples ....

In [17]:
albumTracks.take(10) foreach println

(Duos For Violin and Cello,Sonata for Violin and Cello - Allegro)
(Duos For Violin and Cello,Sonata for Violin and Cello - Tres vif)
(Duos For Violin and Cello,Sonata for Violin and Cello - Lent)
(Duos For Violin and Cello,Sonata for Violin and Cello - Vif, avec entrain)
(Duos For Violin and Cello,Passacaglia)
(Duos For Violin and Cello,Duo for Violin and Cello Op. 7 - Allegro serioso, non troppo)
(Duos For Violin and Cello,Duo for Violin and Cello Op. 7 - Adagio-Andante-Tempo I)
(Duos For Violin and Cello,Duo for Violin and Cello Op. 7 - Maestoso e largamente, ma non troppo lento-Presto)
(Duos For Violin and Cello,Two-Part Intervention No. 6 in E)
(Golden Boy Elvis,She's Not You)


### Create RDDs from Cassandra Tables and return an RDD of case class objects
.as() will map the rdd to a case class


In [18]:
case class Tracks(album_title: String,
year:Int, number:Int,
album_genre: String,
performer: String,
track_title: String)

In [20]:
val tracks = sc.cassandraTable("music","tracks_by_album").as(Tracks)
tracks

CassandraTableScanRDD[19] at RDD at CassandraRDD.scala:15

In [25]:
tracks take 5 foreach println

Tracks(Duos For Violin and Cello,2000,1,Classical,Nigel Kennedy,Sonata for Violin and Cello - Allegro)
Tracks(Duos For Violin and Cello,2000,2,Classical,Nigel Kennedy,Sonata for Violin and Cello - Tres vif)
Tracks(Duos For Violin and Cello,2000,3,Classical,Nigel Kennedy,Sonata for Violin and Cello - Lent)
Tracks(Duos For Violin and Cello,2000,4,Classical,Nigel Kennedy,Sonata for Violin and Cello - Vif, avec entrain)
Tracks(Duos For Violin and Cello,2000,5,Classical,Nigel Kennedy,Passacaglia)


## Some other useful actions ...
* first – same as take(1)(0)
* collect – bring everything back to the caller as a scala array
* saveToCassandra
* count


In [26]:
tracks.first

Tracks(Duos For Violin and Cello,2000,1,Classical,Nigel Kennedy,Sonata for Violin and Cello - Allegro)

In [27]:
tracks.count

36751

## Some Typical Transformations
filter, map, distinct

Show tracks from 1989

In [28]:
tracks.filter(x => x.year == 1989).take(10).foreach(println)


Tracks(CD Type Thing,1989,1,Rock,Big Drill Car,16 Lines)
Tracks(CD Type Thing,1989,2,Rock,Big Drill Car,Clamato #11)
Tracks(CD Type Thing,1989,3,Rock,Big Drill Car,No Need)
Tracks(CD Type Thing,1989,4,Rock,Big Drill Car,Brody)
Tracks(CD Type Thing,1989,5,Rock,Big Drill Car,In Green Fields)
Tracks(CD Type Thing,1989,6,Rock,Big Drill Car,Diamond Earrings)
Tracks(CD Type Thing,1989,7,Rock,Big Drill Car,Reform Before)
Tracks(CD Type Thing,1989,8,Rock,Big Drill Car,Head On)
Tracks(CD Type Thing,1989,9,Rock,Big Drill Car,Swanson)
Tracks(CD Type Thing,1989,10,Rock,Big Drill Car,About Us)


**This can also be accomplished with a .where function on the cassandraTable to push the work into Cassandra**

map the cassandra table to 2-tuples 

In [30]:
tracks.map(x =>(x.album_title, x.track_title)).
   take(5).foreach(println)

(Duos For Violin and Cello,Sonata for Violin and Cello - Allegro)
(Duos For Violin and Cello,Sonata for Violin and Cello - Tres vif)
(Duos For Violin and Cello,Sonata for Violin and Cello - Lent)
(Duos For Violin and Cello,Sonata for Violin and Cello - Vif, avec entrain)
(Duos For Violin and Cello,Passacaglia)


Combine operations into a single graphe or even a single statement

In [31]:
tracks.filter(x => x.year == 1990).map(x => (x.album_title, x.track_title)).take(5).foreach(println)


(Hits and Rarities,Fever)
(Hits and Rarities,You're the Boss)
(Hits and Rarities,There'll Be Some Changes Made)
(Hits and Rarities,I Was Only Kidding)
(Hits and Rarities,Begin the Beguine)


## Pair RDDs – Special operations on RDD of  2-Tuples
* Think of each tuple as (Key,Value)
* countByKey
* groupByKey
* reduceByKey


In [33]:
val albumTracks = tracks.map(t => (t.album_title, t.track_title))

How many tracks in each album?

In [36]:
val trackTitles = albumTracks.countByKey

Why not sort the results descending? toList turns the map into a list of tuples and sort by the negative of the count

In [40]:
albumTracks.countByKey.toList.sortBy( t => -t._2 ) take 10 foreach println

(Greatest Hits,129)
(Who Was That Masked Man,108)
(Blue Grass 1950-1958,108)
(The Music of Bill Monroe, 1936-1994,98)
(Danzig 4,66)
(We Love Elvis,60)
(Memories Of Elvis,60)
(We Love Elvis Vol.2,60)
(The Elvis Presley Story,59)
(Elvis The Other Sides-Gold Award Hits, Volume 2,50)


In [43]:
tracks.filter(_.album_title == "Greatest Hits").collect foreach println

Tracks(Greatest Hits,1995,1,Unknown,Wesley Willis,Rock n Roll McDonald's)
Tracks(Greatest Hits,1995,2,Unknown,Wesley Willis,Larry Nevers/ Walter Budzyn)
Tracks(Greatest Hits,1995,3,Unknown,Wesley Willis,Rick Sims)
Tracks(Greatest Hits,1995,4,Unknown,Wesley Willis,Outburst)
Tracks(Greatest Hits,1995,5,Unknown,Wesley Willis,Chronic Schizophrenia)
Tracks(Greatest Hits,1995,6,Unknown,Wesley Willis,Urge Overkill)
Tracks(Greatest Hits,1995,7,Unknown,Wesley Willis,Skrew)
Tracks(Greatest Hits,1995,8,Unknown,Wesley Willis,Tammy Smith)
Tracks(Greatest Hits,1995,9,Unknown,Wesley Willis,Vampire Bat)
Tracks(Greatest Hits,1995,10,Unknown,Wesley Willis,Elvis Presley)
Tracks(Greatest Hits,1995,11,Unknown,Wesley Willis,The Chicken Cow)
Tracks(Greatest Hits,1995,12,Unknown,Wesley Willis,Kris Kringle Was A Cat Thief)
Tracks(Greatest Hits,1995,13,Unknown,Wesley Willis,Eazy-E)
Tracks(Greatest Hits,1995,14,Unknown,Wesley Willis,Jesus Is the Answer)
Tracks(Greatest Hits,1995,15,Unknown,Wesley Willis,He's Doi