# Finding Spark

Whenever we work in Spark the first thing we need is the spark contect (sc).  We are going to use the module `findspark` to get access to the spark context.  First we need to install the module:

In [1]:
! pip install findspark

[33mYou are using pip version 8.0.2, however version 8.1.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


First we specify the path to spark - which for us is on the local VM:

In [2]:
import findspark
import os
findspark.init(os.getenv('HOME') + '/spark-1.6.0-bin-hadoop2.6')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'

Now we can import pyspark and get the spark context:

In [3]:
import pyspark
try: 
    print(sc)
except NameError:
    sc = pyspark.SparkContext()
    print(sc)

<pyspark.context.SparkContext object at 0x7f0d4831f510>


# Creating an RDD

From the Spark documentation:

_"A Resilient Distributed Dataset (RDD), the basic abstraction in Spark, represents an immutable, partitioned collection of elements that can be operated on in parallel."_

_"Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel."_ 

For example, here is how to create a parallelized collection holding the numbers 1 to 5:


In [7]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

print(distData)

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423


The RDD exists in the Spark Context which may or may not be in the notebook kernel.

We apply transformations and actions to the RDD. The Spark driver will manage the RDD and when needed execute operations in parallel, for example to add up elements of list.

Spark is heavily functional (built in Scala).  For example, map, reduce and filter operations are supported - these functions take functions or lambda functions as arguments.

### Basics - transformations and actions

The RDD is not loaded in memory - it is just a pointer to the file.  Spark allows us to apply transformations to the RDD.  The transformations are not computed immediately - Spark is intentionally lazy.  Nothing is computed until we execute an action. Actions require the Spark driver to execute the tasks which run on separate nodes in the Spark cluster.  Each node executes the transformations and actions and returns the results to the driver.   

In [8]:
distData = sc.parallelize(data) \
                .filter(lambda x : x > 3)

In [9]:
distData = sc.parallelize(data) \
                .filter(lambda x : x > 3) \
                .map(lambda x : x ** 2)
type(distData)

pyspark.rdd.PipelinedRDD

<img src='files/resources/ic_info_outline_black_24dp_2x.png' align='left'> `filter()` applies a logical expression to each element of the RDD - those that return TRUE are kept in the output RDD.  
`map()` applies a transformation to each element of the RDD - here we transform each element by outputing the x-squared.

Actions force the collection of the results:

In [10]:
from operator import add, mul 

distData = sc.parallelize(data) \
                .filter(lambda x : x > 3) \
                .map(lambda x : x ** 2) \
                .reduce(add)

print(type(distData))

print(distData)

<type 'int'>
41


<img src='files/resources/ic_info_outline_black_24dp_2x.png' align='left'>`reduce()` merges the values using an associative reduce function.  For example with the `add` function if we had values [1,2,3,4] then reduce first computes 1+2=3, then adds the results to the next value 3+3=6, and then adds the result to the next value 6+4=10 until the list has been processed.  Associative means not dependent on the order of the list.



<img src='files/resources/ic_assignment_black_24dp_2x.png' align='left'>Run the examples above.  Compare the `type` of object before and after the reduce action is applied.  Why does it change?

### External Data Sources

We can also create RDDs from external data sources such as Hadoop, Amazon S3 and files. Here we will create a text file RDD.  Note that we must use absolute paths since this code is pushed onto the Spark cluster - it is not run in the context of this notebook:

In [11]:
rdd = sc.textFile(os.getcwd()+'/data/bike-items-clean.txt')
print(rdd)

rdd.take(2)

MapPartitionsRDD[6] at textFile at NativeMethodAccessorImpl.java:-2


[u'cycling bicycle mtb bike fixie gloss carbon fiber riser bar handlebar,description feature easy to use made of high quality carbon fiber with the special design can save for a long time the carbon fiber handlebar is made of high quality carbon fiber so that you can use it relieved this quick disassembling carbon fiber handlebar is easy to use and one of the best gifts to your friends specification material carbon fiber color black handlebar clamp diameter mm length package included x cycling carbon fiber rise',
 u'bicycle rims x red speed internal hub wheel set beach cruiser bike,clyde james cycles x speed internal hub red wheel set most orders ship within days after receiving payment threw paypal the shipping rates listed are only for residential or commercial destinations in the continental united states please send a message to quote to other destinations we only ship the item to paypal verified address that is sent to us at the time of checkout please verify the correct shipping 

### Caching RDDs

In the previous example the `take()` action forces the Spark driver to create the RDD - reading from disk to memory.

If we are going to use an RDD interactively we can cache the RDD.  This means we only need to apply actions after the point of caching - however if the cluster fails Spark still knows how to recreate the RDD.

In this example we cache the rdd and then extract different numbers of records.  Note the quicker execution of subsequent calls to the `take()` method: 

In [119]:
%%timeit 
rdd = sc.textFile(os.getcwd()+'/data/bike-items-clean.txt')
rdd.take(10)

10 loops, best of 3: 98.4 ms per loop


In [120]:
%%timeit 
rdd = sc.textFile(os.getcwd()+'/data/bike-items-clean.txt')
rdd.show()
rdd.cache()
rdd.take(10)

1 loops, best of 3: 220 ms per loop


In [121]:
%%timeit 
rdd.take(10)

10 loops, best of 3: 68.1 ms per loop


### Counting Words

To illustrate RDD basics, consider the simple program below which counts the number of words in the text file rdd we created earlier:

In [12]:
words_per_line = rdd.map(lambda s: len(s.split())).filter(lambda x : x > 2)

total_words = words_per_line.reduce(lambda x,y : x+y)

print(total_words)

733764


<img src='files/resources/ic_info_outline_black_24dp_2x.png' align='left'>  To reiterate - `words_per_line` applies a transformation to the rdd.  No computation happens.  
Work starts when we apply an action - such as `reduce()`.  

### Inspecting RDD's

We can inspect the transformations applied to the RDD using the `toDebugString()` method:

In [123]:
print(words_per_line.toDebugString())

(1) PythonRDD[708] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[529] at textFile at null:-1 []
 |  /home/otter/data-science-for-search/data/bike-items-clean.txt HadoopRDD[528] at textFile at null:-1 []


We can also look at the DAG and the jobs on the cluster [here](http://localhost:4040)

![Spark DAG](resources/spark-dag.png)

### Term frequency in Spark

Many ways to do this - some are more efficient than others.

Method 1 - use map and reduce operations

* map() - split each line on comma - extract item title, discard item description
* flatMap() - split item title on whitespace - vector of words in title
* map() - transform to key, value tuple (word, 1)
* reduceByKey() - reduce by key adding the values - requires a shuffle
* collectAsMap()- return results as a dictionary of (word, count) pairs


<img src='files/resources/ic_info_outline_black_24dp_2x.png' align='left'>`reduceByKey` merges the values for each key using an associative reduce function.  For example if a key had values [1,2,3,4] then reduce by key first computes 1+2=3, then adds the results to the next value 3+3=6, and then adds the result to the next value 6+4=10 until the list has been processed.  Associative means not dependent on the order of the list.

In [13]:
terms1 = rdd.map(lambda s : s.split(',')[0]) \
            .flatMap(lambda s : s.split()) \
            .map(lambda w : (w, 1)) \
            .reduceByKey(lambda x,y : x+y) \
            .collectAsMap()



print(terms1['bike'])

#?rdd.collectAsMap()

4534


Method 2 - use Spark API to optimise

* map - split each line on comma - extract item title, discard item description
* flatMap - split item title on whitespace - vector of words in titles
* countByValue() - return the count of each unique value in the RDD as a dictionary of (value, count) pairs


In [125]:
terms2 = rdd.map(lambda s : s.split(',')[0]) \
            .flatMap(lambda s : s.split()) \
            .countByValue()
        
print(terms2['bike'])

#?rdd.countByValue()

4534


The toDebugString() starts to get more interesting with bigger pipelines:

In [126]:
print(rdd.flatMap(lambda s : s.split()) \
            .map(lambda w : (w, 1)) \
            .reduceByKey(lambda x,y : x+y).toDebugString())

(1) PythonRDD[719] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[718] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[717] at partitionBy at null:-1 []
 +-(1) PairwiseRDD[716] at reduceByKey at <ipython-input-126-3254cf225807>:1 []
    |  PythonRDD[715] at reduceByKey at <ipython-input-126-3254cf225807>:1 []
    |  MapPartitionsRDD[529] at textFile at null:-1 []
    |  /home/otter/data-science-for-search/data/bike-items-clean.txt HadoopRDD[528] at textFile at null:-1 []


<img src='files/resources/ic_assignment_black_24dp_2x.png' align='left'>Experiment by breaking down the pipelines and inspecting the contents of the RDDs using `take()`.  
Make sure you understand the different between `map()` and `flatMap()` before continuing.

### Spark DataFrames API

Spark DataFrames API is becoming very popular abstraction on top of RDDs - largely due to the familiarity of the idea of a dataframe inherited from R and Pandas. Let's take a look!

CSV parsing is surprisingly complex (we dodged it above by using a clean text file).  The Spark DataFrames API provides a good CSV parser:

In [16]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
        .options(header='false', inferSchema='true') \
        .load(os.getcwd() + '/data/bike-item-titles.txt') \
        .selectExpr("C0 as id","C1 as item_title")

df.show()

+---+--------------------+
| id|          item_title|
+---+--------------------+
|  2|ZIPP VUKA CARBON ...|
|  3|Cycling Bicycle M...|
|  4|BICYCLE RIMS 26"x...|
|  5|Mavic Crossride 2...|
|  7|ROTOR QXL Aero Ov...|
|  8|Yakima 4 pack SKS...|
|  9|Sram Force Carbon...|
| 10|THE ORIGINAL SQUI...|
| 11|BV Bike Rear Sadd...|
| 12|HELIX BMX ROUND D...|
| 13|Waterproof Bicycl...|
| 14|Brand New CycleOp...|
| 15|Planet Bike LED S...|
| 16|Bike Bicycle Head...|
| 17|New Helmet Teenag...|
| 18|2 Pcs Bike Roller...|
| 19|FSA BICYCLE COMPR...|
| 20|Kenda Tube 26 X1....|
| 21|Bicycle Lock Set ...|
| 22|NEW DT Swiss 350 ...|
+---+--------------------+
only showing top 20 rows



In [17]:
df.schema

StructType(List(StructField(id,IntegerType,true),StructField(item_title,StringType,true)))

DataFrames API has functional model that can be applied to data frame objects:

In [20]:
df.filter(df['id'] >=5).filter(df['id'] <= 10).count()

#df.filter(df['id'] >=5).filter(df['id'] <= 10).collect()

#df.filter(df['id'] >=5).filter(df['id'] <= 10).explain()

5

Also has SQL interface, first we register the DataFrame:

In [19]:
sqlContext.registerDataFrameAsTable(df,'bikeitems')
sqlContext.tableNames()

[u'bikeitems']

Now we can execute SQL against the table:

In [131]:
sqlContext.sql("select id, item_title from bikeitems where id between 5 and 10").show()

+---+--------------------+
| id|          item_title|
+---+--------------------+
|  5|Mavic Crossride 2...|
|  7|ROTOR QXL Aero Ov...|
|  8|Yakima 4 pack SKS...|
|  9|Sram Force Carbon...|
| 10|THE ORIGINAL SQUI...|
+---+--------------------+



We can convert back and forth between RDDs and DataFrames:

In [21]:
rdd = df.rdd
df2 = rdd.toDF()

rdd.take(10)

[Row(id=2, item_title=u'ZIPP VUKA CARBON AERO BASE BAR AND EXTENSIONS COMPLETE TRIATHLON TT TRI CYCLING'),
 Row(id=3, item_title=u'Cycling Bicycle MTB Bike Fixie Gloss 3K Carbon Fiber Riser Bar Handlebar 31.8mm'),
 Row(id=4, item_title=u'BICYCLE RIMS 26"x 50MM RED 3 SPEED INTERNAL HUB WHEEL SET BEACH CRUISER BIKE'),
 Row(id=5, item_title=u'Mavic Crossride 26" Mountain bike wheels and WTB Weirwolf Tires'),
 Row(id=7, item_title=u'ROTOR QXL Aero Oval Road Chainring BCD110x5 53t'),
 Row(id=8, item_title=u'Yakima 4 pack SKS lock cores & 2 keys - A142 - roof rack locking cylinders'),
 Row(id=9, item_title=u'Sram Force Carbon Crank Gxp 110 Bcd No Chainrings 175 mm (2700)'),
 Row(id=10, item_title=u'THE ORIGINAL SQUIRT LONG LASTING DRY CHAIN BICYCLE LUBE WAX BASED'),
 Row(id=11, item_title=u'BV Bike Rear Saddle Bag Cycling Seat Post Pouch Bicycle Tail Storage NEW SB1-L'),
 Row(id=12, item_title=u'HELIX BMX ROUND DROPOUT SAVERS -FITS NEARLY ALL FRAMES -Fits 3/8" AND 10mm Axles')]

### Inverted Index

Let's imagine we want to use Spark to compute and inverted index for our set of documents. 

In [133]:
rdd.take(10)

[Row(id=2, item_title=u'ZIPP VUKA CARBON AERO BASE BAR AND EXTENSIONS COMPLETE TRIATHLON TT TRI CYCLING'),
 Row(id=3, item_title=u'Cycling Bicycle MTB Bike Fixie Gloss 3K Carbon Fiber Riser Bar Handlebar 31.8mm'),
 Row(id=4, item_title=u'BICYCLE RIMS 26"x 50MM RED 3 SPEED INTERNAL HUB WHEEL SET BEACH CRUISER BIKE'),
 Row(id=5, item_title=u'Mavic Crossride 26" Mountain bike wheels and WTB Weirwolf Tires'),
 Row(id=7, item_title=u'ROTOR QXL Aero Oval Road Chainring BCD110x5 53t'),
 Row(id=8, item_title=u'Yakima 4 pack SKS lock cores & 2 keys - A142 - roof rack locking cylinders'),
 Row(id=9, item_title=u'Sram Force Carbon Crank Gxp 110 Bcd No Chainrings 175 mm (2700)'),
 Row(id=10, item_title=u'THE ORIGINAL SQUIRT LONG LASTING DRY CHAIN BICYCLE LUBE WAX BASED'),
 Row(id=11, item_title=u'BV Bike Rear Saddle Bag Cycling Seat Post Pouch Bicycle Tail Storage NEW SB1-L'),
 Row(id=12, item_title=u'HELIX BMX ROUND DROPOUT SAVERS -FITS NEARLY ALL FRAMES -Fits 3/8" AND 10mm Axles')]

In [134]:
index = rdd.flatMap(lambda row : [ ( word, row[0]) for word in row[1].split(' ') ] ) 
index.take(20)

[(u'ZIPP', 2),
 (u'VUKA', 2),
 (u'CARBON', 2),
 (u'AERO', 2),
 (u'BASE', 2),
 (u'BAR', 2),
 (u'AND', 2),
 (u'EXTENSIONS', 2),
 (u'COMPLETE', 2),
 (u'TRIATHLON', 2),
 (u'TT', 2),
 (u'TRI', 2),
 (u'CYCLING', 2),
 (u'Cycling', 3),
 (u'Bicycle', 3),
 (u'MTB', 3),
 (u'Bike', 3),
 (u'Fixie', 3),
 (u'Gloss', 3),
 (u'3K', 3)]

In [135]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey()
index.take(10)

[(u'', <pyspark.resultiterable.ResultIterable at 0x7f65152915d0>),
 (u'Powerlock-New', <pyspark.resultiterable.ResultIterable at 0x7f6515291ad0>),
 (u'BLACK/SILVER', <pyspark.resultiterable.ResultIterable at 0x7f6515291f90>),
 (u'SecurityIng', <pyspark.resultiterable.ResultIterable at 0x7f6515291cd0>),
 (u'SporstWear', <pyspark.resultiterable.ResultIterable at 0x7f6515291a90>),
 (u'(28.6)', <pyspark.resultiterable.ResultIterable at 0x7f65152919d0>),
 (u'S-5', <pyspark.resultiterable.ResultIterable at 0x7f6515291290>),
 (u'Interloc', <pyspark.resultiterable.ResultIterable at 0x7f6515291550>),
 (u'S-2', <pyspark.resultiterable.ResultIterable at 0x7f6515291fd0>),
 (u'yellow', <pyspark.resultiterable.ResultIterable at 0x7f6515291410>)]

In [136]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey() \
            .map(lambda x : (x[0], list(x[1])))
index.filter(lambda x : x[0] == 'Unicycle').collect()

[(u'Unicycle', [2138, 3748, 7232, 8777])]

In [137]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey() \
            .map(lambda x : (x[0], list(x[1]))).cache()

In [138]:
index.filter(lambda x : x[0] == 'Unicycle').take(10)

[(u'Unicycle', [2138, 3748, 7232, 8777])]

<img src='files/resources/ic_assignment_black_24dp_2x.png' align='left'>The index has upper and lower case tokens, for example 'Unicycle' and 'unicycle'.  
Can you modify the index to normalise the tokens to lowercase?

This is a very quick overview - however you are in a great spot to now try out more of the great examples from the [Spark documentation](http://spark.apache.org/docs/latest/).