# Finding Spark

Whenever we work in Spark the first thing we need is the spark contect (sc).  We are going to use the module `findspark` to get access to the spark context.  First we need to install the module:

In [1]:
! pip install findspark

[33mYou are using pip version 8.0.2, however version 8.0.3 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


First we specify the path to spark - which for us is on the local VM:

In [2]:
import findspark
import os
findspark.init(os.getenv('HOME') + '/spark-1.6.0-bin-hadoop2.6')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'

Now we can import pyspark and get the spark context:

In [3]:
import pyspark
try: 
    print(sc)
except NameError:
    sc = pyspark.SparkContext()
    print(sc)

<pyspark.context.SparkContext object at 0x7fd2840102d0>


# Creating an RDD

From the Spark documentation:

_"A Resilient Distributed Dataset (RDD), the basic abstraction in Spark, represents an immutable, partitioned collection of elements that can be operated on in parallel."_

_"Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel."_ 

For example, here is how to create a parallelized collection holding the numbers 1 to 5:


In [4]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

print(distData)

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423


The RDD exists in the Spark Context which may or may not be in the notebook kernel.

We apply transformations and actions to the RDD. The RDD will execute operations in parallel, for example to add up elements of list.

Spark is heavily functional (built in Scala).  For example, map, reduce and filter operations are supported - these functions take functions or lambda functions as arguments: 

### Basics - transformations and actions

The RDD is not loaded in memory - it is just a pointer to the file.  Spark allows us to apply transformations to the RDD - but these are computed immediately - Spark is intentionally lazy.  Nothing is computed until we execute an action, at which point the Spark driver creates tasks to run on separate nodes in the Spark cluster.  Each node executes the transformations and actions and returns the results to the driver.   

In [5]:
distData = sc.parallelize(data) \
                .filter(lambda x : x > 3)

In [6]:
distData = sc.parallelize(data) \
                .filter(lambda x : x > 3) \
                .map(lambda x : x ** 2)
type(distData)

pyspark.rdd.PipelinedRDD

Actions collect the results:

In [7]:
from operator import add, mul 

distData = sc.parallelize(data) \
                .filter(lambda x : x > 3) \
                .map(lambda x : x ** 2) \
                .reduce(add)

type(distData)

int

In [8]:
distData = sc.parallelize(data) \
                .filter(lambda x : x > 3) \
                .map(lambda x : x ** 2) \
                .reduce(mul)

print(distData)

400


<img src='files/resources/ic_assignment_black_24dp_2x.png' align='left'>Run the examples above.  Compare the `type` of object before and after the reduce action is applied.  Why does it change?

### External Data Sources

We can also create RDDs from external data sources such as Hadoop, Amazon S3 and files. Here we will create a text file RDD.  NOte that we must use absolute paths since this code is pushed onto the Spark cluster - it is not run in the context of this notebook:

In [12]:
rdd = sc.textFile(os.getcwd()+'/data/bike-item-titles.txt')
print(rdd)

rdd.take(10)

MapPartitionsRDD[8] at textFile at NativeMethodAccessorImpl.java:-2


[u'2,"ZIPP VUKA CARBON AERO BASE BAR AND EXTENSIONS COMPLETE TRIATHLON TT TRI CYCLING"',
 u'3,"Cycling Bicycle MTB Bike Fixie Gloss 3K Carbon Fiber Riser Bar Handlebar 31.8mm"',
 u'4,"BICYCLE RIMS 26""x 50MM RED 3 SPEED INTERNAL HUB WHEEL SET BEACH CRUISER BIKE"',
 u'5,"Mavic Crossride 26"" Mountain bike wheels and WTB Weirwolf Tires"',
 u'6,"New KCNC ARROW 7050 Alloy Stem ',
 u'7,"ROTOR QXL Aero Oval Road Chainring BCD110x5 53t"',
 u'8,"Yakima 4 pack SKS lock cores & 2 keys - A142 - roof rack locking cylinders"',
 u'9,"Sram Force Carbon Crank Gxp 110 Bcd No Chainrings 175 mm (2700)"',
 u'10,"THE ORIGINAL SQUIRT LONG LASTING DRY CHAIN BICYCLE LUBE WAX BASED"',
 u'11,"BV Bike Rear Saddle Bag Cycling Seat Post Pouch Bicycle Tail Storage NEW SB1-L"']

### Counting Words

To illustrate RDD basics, consider the simple program below which counts the number of words in the text file rdd we created earlier:

In [41]:
words_per_line = rdd.map(lambda s: len(s[0].split()))

total_words = words_per_line.reduce(add)

print(total_words)

9894


<img src='files/resources/ic_info_outline_black_24dp_2x.png' align='left'>  To reiterate - `words_per_line` is applies a transformation to the rdd - it is not evaluated until we apply an action - such as `reduce()`.  We can inspect the transformations applied to the RDD using the `toDebugString()` method:

In [57]:
print(words_per_line.toDebugString())

(1) PythonRDD[54] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[51] at textFile at NativeMethodAccessorImpl.java:-2 []
 |  /home/csumb/data-science-for-search/data/bike-item-titles.txt HadoopRDD[50] at textFile at NativeMethodAccessorImpl.java:-2 []


### Term frequency in Spark

Many ways to do this - here are two:

In [43]:
terms1 = rdd.flatMap(lambda s : s.split(' ')) \
            .countByValue()

terms2 = rdd.flatMap(lambda s : s.split()) \
            .map(lambda w : (w, 1)) \
            .reduceByKey(lambda x,y : x+y) \
            .collectAsMap()

print(terms1['bike'])
print(terms2['bike'])

175
175


<img src='files/resources/ic_info_outline_black_24dp_2x.png' align='left'>`reduceByKey` merges the values for each key using an associative reduce function.  For example if we had a key had values [1,2,3,4] then reduce by key first computes 1+2=3, then adds the results to the next value 3+3=6, and then adds the result to the next value 6+4=10 until the list has been processed.  Associative means not dependent on the order of the list.

<img src='files/resources/ic_assignment_black_24dp_2x.png' align='left'>Experiment with the flatmap and map transformations.  
What are the differences?

The toDebugString() starts to get more interesting with bigger pipelines:

In [58]:
print(rdd.flatMap(lambda s : s.split()) \
            .map(lambda w : (w, 1)) \
            .reduceByKey(lambda x,y : x+y).toDebugString())

(1) PythonRDD[125] at RDD at PythonRDD.scala:43 []
 |  MapPartitionsRDD[124] at mapPartitions at PythonRDD.scala:374 []
 |  ShuffledRDD[123] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(1) PairwiseRDD[122] at reduceByKey at <ipython-input-58-3254cf225807>:1 []
    |  PythonRDD[121] at reduceByKey at <ipython-input-58-3254cf225807>:1 []
    |  MapPartitionsRDD[90] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  MapPartitionsRDD[89] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  MapPartitionsRDD[88] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  MapPartitionsRDD[87] at javaToPython at NativeMethodAccessorImpl.java:-2 []
    |  MapPartitionsRDD[86] at flatMap at CsvRelation.scala:104 []
    |  MapPartitionsRDD[85] at mapPartitions at CsvRelation.scala:90 []
    |  MapPartitionsRDD[84] at textFile at TextFile.scala:30 []
    |  /home/csumb/data-science-for-search/data/bike-item-titles.txt HadoopRDD[83] at textFile at TextFile.scala:3

### Spark DataFrames API

If you look carefully above the text file is represented as a CSV and we did not parse the lines correctly.  CSV parsing is complex - but is made easier using Spark Data Frames which is an abstraction on top of RDDs. 

In [44]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
        .options(header='false', inferSchema='true') \
        .load('/home/csumb/data-science-for-search/data/bike-item-titles.txt') \
        .selectExpr("C0 as id","C1 as item_title")

df.show()

+---+--------------------+
| id|          item_title|
+---+--------------------+
|  2|ZIPP VUKA CARBON ...|
|  3|Cycling Bicycle M...|
|  4|BICYCLE RIMS 26"x...|
|  5|Mavic Crossride 2...|
|  7|ROTOR QXL Aero Ov...|
|  8|Yakima 4 pack SKS...|
|  9|Sram Force Carbon...|
| 10|THE ORIGINAL SQUI...|
| 11|BV Bike Rear Sadd...|
| 12|HELIX BMX ROUND D...|
| 13|Waterproof Bicycl...|
| 14|Brand New CycleOp...|
| 15|Planet Bike LED S...|
| 16|Bike Bicycle Head...|
| 17|New Helmet Teenag...|
| 18|2 Pcs Bike Roller...|
| 19|FSA BICYCLE COMPR...|
| 20|Kenda Tube 26 X1....|
| 21|Bicycle Lock Set ...|
| 22|NEW DT Swiss 350 ...|
+---+--------------------+
only showing top 20 rows



In [45]:
df.schema

StructType(List(StructField(id,IntegerType,true),StructField(item_title,StringType,true)))

DataFrames API has functional model that can be applied to data frame objects:

In [46]:
df.filter(df['id'] >=5).filter(df['id'] <= 10)

#df.filter(df['id'] >=5).filter(df['id'] <= 10).count()

#df.filter(df['id'] >=5).filter(df['id'] <= 10).explain()

DataFrame[id: int, item_title: string]

Also has SQL interface:

In [47]:
sqlContext.registerDataFrameAsTable(df,'bikeitems')
sqlContext.tableNames()

sqlContext.sql("select id, item_title from bikeitems where id between 5 and 10").show()

+---+--------------------+
| id|          item_title|
+---+--------------------+
|  5|Mavic Crossride 2...|
|  7|ROTOR QXL Aero Ov...|
|  8|Yakima 4 pack SKS...|
|  9|Sram Force Carbon...|
| 10|THE ORIGINAL SQUI...|
+---+--------------------+



We can convert back and forth RDD <> DF:

In [48]:
rdd = df.rdd
df2 = rdd.toDF()

rdd.take(10)

[Row(id=2, item_title=u'ZIPP VUKA CARBON AERO BASE BAR AND EXTENSIONS COMPLETE TRIATHLON TT TRI CYCLING'),
 Row(id=3, item_title=u'Cycling Bicycle MTB Bike Fixie Gloss 3K Carbon Fiber Riser Bar Handlebar 31.8mm'),
 Row(id=4, item_title=u'BICYCLE RIMS 26"x 50MM RED 3 SPEED INTERNAL HUB WHEEL SET BEACH CRUISER BIKE'),
 Row(id=5, item_title=u'Mavic Crossride 26" Mountain bike wheels and WTB Weirwolf Tires'),
 Row(id=7, item_title=u'ROTOR QXL Aero Oval Road Chainring BCD110x5 53t'),
 Row(id=8, item_title=u'Yakima 4 pack SKS lock cores & 2 keys - A142 - roof rack locking cylinders'),
 Row(id=9, item_title=u'Sram Force Carbon Crank Gxp 110 Bcd No Chainrings 175 mm (2700)'),
 Row(id=10, item_title=u'THE ORIGINAL SQUIRT LONG LASTING DRY CHAIN BICYCLE LUBE WAX BASED'),
 Row(id=11, item_title=u'BV Bike Rear Saddle Bag Cycling Seat Post Pouch Bicycle Tail Storage NEW SB1-L'),
 Row(id=12, item_title=u'HELIX BMX ROUND DROPOUT SAVERS -FITS NEARLY ALL FRAMES -Fits 3/8" AND 10mm Axles')]

### Inverted Index

In [49]:
index = rdd.flatMap(lambda row : [ ( word, row[0]) for word in row[1].split(' ') ] ) 
index.take(10)

[(u'ZIPP', 2),
 (u'VUKA', 2),
 (u'CARBON', 2),
 (u'AERO', 2),
 (u'BASE', 2),
 (u'BAR', 2),
 (u'AND', 2),
 (u'EXTENSIONS', 2),
 (u'COMPLETE', 2),
 (u'TRIATHLON', 2)]

In [50]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey()
index.take(10)

[(u'', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9890>),
 (u'Powerlock-New', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9b90>),
 (u'BLACK/SILVER', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9a90>),
 (u'SecurityIng', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9c50>),
 (u'SporstWear', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9950>),
 (u'(28.6)', <pyspark.resultiterable.ResultIterable at 0x7f6224ec97d0>),
 (u'S-5', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9550>),
 (u'Interloc', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9790>),
 (u'S-2', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9510>),
 (u'yellow', <pyspark.resultiterable.ResultIterable at 0x7f6224ec9410>)]

In [51]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey() \
            .map(lambda x : (x[0], list(x[1])))
index.filter(lambda x : x[0] == 'Unicycle').collect()

[(u'Unicycle', [2138, 3748, 7232, 8777])]

In [52]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey() \
            .map(lambda x : (x[0], list(x[1]))).cache()

In [55]:
index.filter(lambda x : x[0] == 'Unicycle').take(10)

[(u'Unicycle', [2138, 3748, 7232, 8777])]

<img src='files/resources/ic_assignment_black_24dp_2x.png' align='left'>The index has upper and lower case tokens, for example 'Unicycle' and 'unicycle'.  
Can you modify the index to normalise the tokens to lowercase?

This is a very quick overview - however you are in a great spot to now try out more of the great examples from the [Spark documentation](http://spark.apache.org/docs/latest/).