(last updated:   8/23/16, Reshama)

# Spark
Spark is the future, and in many ways the present.  It lets us work with a lot of the concepts we've covered at scale, combining some of the best aspects of Hadoop with a smarter execution engine for problems that aren't really MapReduce. 

In this notebook we'll examine some of the primitives that Spark has for transforming data in a distributed fashion, as well as use MLLib to implement machine learning in Spark.

## PySpark
PySpark is the Python binding for Spark, so that's how we'll investigate Spark in this notebook.  To get it up and running, you'll have to go through some gymnastics like the next few cells.

## Reference
Spark Programming Guide:  
http://spark.apache.org/docs/latest/programming-guide.html

## Table of Contents 
[Example 01 - word count](#1) 

[Example 02 - word count](#2)

[Example 03 - word count (more complicated)](#3)

[Example 04:  numerical (prime numbers)](#4)

[Example 05:  logistic regression with Spark MLlib](#5)


---

### Using Python 3 in this notebook (make sure kernel matches)

In [1]:
!python --version

Python 3.5.2 :: Continuum Analytics, Inc.


In [2]:
import os

In [3]:
# this might give you an error
#import findspark

In [4]:
# install findspark
!pip install findspark



In [5]:
# Finds the location of spark installation
import findspark
findspark.init()

In [6]:
# to see which packages, libraries are installed
#!pip freeze

##### A spark context is the main entry point for Spark functionality. It is the connection to the Spark cluster and can be used to creat RDDs, accumulators and broadcast variables on that cluster

In [7]:
# Fire up a Spark context
import pyspark

### Note: we can only run the cell below once (re-running the SparkContext() command will give an error)

In [8]:
sc = pyspark.SparkContext()
sc

<pyspark.context.SparkContext at 0x7f164c39b2e8>

In [9]:
spark_home = os.environ.get('SPARK_HOME', None)

In [10]:
print (spark_home)

/usr/local/spark


In [11]:
import numpy as np
from pprint import pprint
from sklearn.datasets import fetch_20newsgroups
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from datetime import datetime
from io import StringIO
from collections import namedtuple
from operator import add, itemgetter
from sklearn.linear_model import SGDClassifier
import csv
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint

### <a id='1'></a> Example 1:  Word Count

Now that we have PySpark up and running, let's try out the canonical word count example.

In [12]:
# Read in the Spark course file with a simple call to textFile
text_file = sc.textFile("0_spark_about.md")

In [13]:
type(text_file)

pyspark.rdd.RDD

In [14]:
# number of lines in text file
count = text_file.count()

In [15]:
count

78

In [16]:
# first 10 lines of file
text_file.take(10)

['',
 '# SPARK',
 '',
 "Spark is the future, Spark is right now.  The goal is essentially an end-to-end platform for data science on Big Data, the way sklearn is for small data.  It's not there yet, but it's growing daily.",
 '',
 '## WHAT IS Spark?',
 '',
 '\t* Spark is a fast and general engine for large-scale distributed data processing.',
 '\t* Spark can run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.',
 "\t* Spark improves on MapReduce's computation model with an advanced DAG (Directed Acyclic Graph) execution engine that supports cyclic data flow and in-memory computing."]

##### RDD = Resilient Distributed Dataset. This is an immutable, partitioned collection of elements that can be operated upon in parallel
Let's do some word counting on this RDD.

##### We can apply a filter using an anonymous function

In [26]:
lines_not_empty = text_file.filter(lambda x: len(x) > 0)

In [27]:
type(lines_not_empty)

pyspark.rdd.PipelinedRDD

In [28]:
lines_not_empty.count()

53

##### We can use the `take()` function to retrieve items from our RDDs

In [29]:
lines_not_empty.take(10)

['# SPARK',
 "Spark is the future, Spark is right now.  The goal is essentially an end-to-end platform for data science on Big Data, the way sklearn is for small data.  It's not there yet, but it's growing daily.",
 '## WHAT IS Spark?',
 '\t* Spark is a fast and general engine for large-scale distributed data processing.',
 '\t* Spark can run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.',
 "\t* Spark improves on MapReduce's computation model with an advanced DAG (Directed Acyclic Graph) execution engine that supports cyclic data flow and in-memory computing.",
 '\t* Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources including HDFS, Cassandra, HBase, and S3.',
 '\t* Spark is written in Scala, but has bindings for Java, Python (PySpark), and R',
 '\t',
 '### Spark Modules']

##### flatMap() 
- flattens the return lists into a single list

In [30]:
words = text_file.flatMap(lambda x: x.split())

In [31]:
# how many words in whole file
words.count()

637

In [24]:
words.take(50)

['#',
 'SPARK',
 'Spark',
 'is',
 'the',
 'future,',
 'Spark',
 'is',
 'right',
 'now.',
 'The',
 'goal',
 'is',
 'essentially',
 'an',
 'end-to-end',
 'platform',
 'for',
 'data',
 'science',
 'on',
 'Big',
 'Data,',
 'the',
 'way',
 'sklearn',
 'is',
 'for',
 'small',
 'data.',
 "It's",
 'not',
 'there',
 'yet,',
 'but',
 "it's",
 'growing',
 'daily.',
 '##',
 'WHAT',
 'IS',
 'Spark?',
 '*',
 'Spark',
 'is',
 'a',
 'fast',
 'and',
 'general',
 'engine']

##### The map function
- map returns a new RDD containing values created by applying the supplied lambda function to each value in the original RDD
- A map function utilizing the anonymous Python function lambda

In [25]:
words = words.map(lambda x: x.replace('|', '').replace('.', '').\
                  replace('-', '').replace(' ', '').replace('&', '').replace('#','').upper())

In [26]:
words.take(10)

['', 'SPARK', 'SPARK', 'IS', 'THE', 'FUTURE,', 'SPARK', 'IS', 'RIGHT', 'NOW']

We have some ugly characters in there.  Let's create a filter that filters out all tokens that doesn't have at least one letter:

In [27]:
import re
words = words.filter(lambda x: re.match('[A-Z]+', x))
words.count()

559

In [28]:
#Parallelize - Distribute a local Python collection to form an RDD
words.subtract(sc.parallelize(['IS', 'WHAT'])).take(10)

['FAULTTOLERANT',
 'SC',
 'SC',
 'ASSIGNED',
 'LAUNCHES',
 'THAT',
 'THAT',
 'THAT',
 'THAT',
 'THAT']

In [29]:
words.count()

559

In [30]:
words.take(10)

['SPARK',
 'SPARK',
 'IS',
 'THE',
 'FUTURE,',
 'SPARK',
 'IS',
 'RIGHT',
 'NOW',
 'THE']

##### A word counting mapper function

In [31]:
word_counts = words.map(lambda x: (x, 1))

In [32]:
type(word_counts)

pyspark.rdd.PipelinedRDD

In [33]:
word_counts.take(10)

[('SPARK', 1),
 ('SPARK', 1),
 ('IS', 1),
 ('THE', 1),
 ('FUTURE,', 1),
 ('SPARK', 1),
 ('IS', 1),
 ('RIGHT', 1),
 ('NOW', 1),
 ('THE', 1)]

In [34]:
word_counts.first()

('SPARK', 1)

In [35]:
# prints out whole RDD, LOTS OF OUTPUT
# word_counts.collect()

##### Now do a reduction
##### The reduceByKey function
- input must be tuples of the form (key, value)
- creates a new RDD containing a tuple for each unique value of the key
- the value in the output depends upon the supplied lambda function

In [36]:
word_counts = word_counts.reduceByKey(lambda a, b: a + b)

In [37]:
word_counts.take(20)

[('FAULTTOLERANT', 1),
 ('DATAFRAMES', 1),
 ('LAUNCHES', 1),
 ('BE', 6),
 ('THAT', 9),
 ('SERIALIZABLE', 1),
 ('WHERE', 2),
 ('LIKE', 1),
 ('COLLECTION', 2),
 ('PROVIDES', 1),
 ('EXTREMELY', 1),
 ('PROGRAM))', 1),
 ('PYTHON', 3),
 ('WHEN', 1),
 ('EFFICIENTLY!', 1),
 ('SETS', 1),
 ('THIS', 3),
 ('TURN', 1),
 ('MORE', 1),
 ('NEWER', 1)]

##### Now do another map and swap the key and the value in terms of their positions
##### Which will make the value the key

In [38]:
word_counts = word_counts.map(lambda x: (x[1], x[0]))

In [39]:
word_counts = word_counts.sortByKey(False)

In [40]:
word_counts.take(10)

[(28, 'SPARK'),
 (21, 'THE'),
 (18, 'IS'),
 (14, 'IN'),
 (13, 'FOR'),
 (13, 'DATA'),
 (11, 'TO'),
 (11, 'ON'),
 (10, 'A'),
 (9, 'THAT')]

### <a id='2'></a> Example 2:  Word Count

In [41]:
lines = sc.parallelize(['Its fun to have fun,','but you have to know how.']) 

In [42]:
lines.mapPartitionsWithIndex
lines.mapPartitionsWithSplit

<bound method RDD.mapPartitionsWithSplit of ParallelCollectionRDD[36] at parallelize at PythonRDD.scala:423>

In [43]:
rd1 = lines.map(lambda x: x.replace('|', '').\
                replace('.', '').replace('-', '').replace('&', '').replace('#','').upper())

In [44]:
rd1.take(10)

['ITS FUN TO HAVE FUN,', 'BUT YOU HAVE TO KNOW HOW']

In [45]:
rd2 = rd1.flatMap(lambda x: x.split())

In [46]:
rd2.take(20)

['ITS', 'FUN', 'TO', 'HAVE', 'FUN,', 'BUT', 'YOU', 'HAVE', 'TO', 'KNOW', 'HOW']

In [47]:
#create the tuples required for the reduce step, 1 is the value and this will be counted by the reduce lambda function
rd3 = rd2.map(lambda x: (x, 1))

In [48]:
rd4 = rd3.reduceByKey(lambda a, b: a + b)

In [49]:
rd4.take(20)

[('BUT', 1),
 ('TO', 2),
 ('HOW', 1),
 ('YOU', 1),
 ('ITS', 1),
 ('FUN,', 1),
 ('KNOW', 1),
 ('FUN', 1),
 ('HAVE', 2)]

In [50]:
#use another map function to swap the key, value positionally
rd5 = rd4.map(lambda x: (x[1], x[0]))

In [51]:
rd5.take(20)

[(1, 'BUT'),
 (2, 'TO'),
 (1, 'HOW'),
 (1, 'YOU'),
 (1, 'ITS'),
 (1, 'FUN,'),
 (1, 'KNOW'),
 (1, 'FUN'),
 (2, 'HAVE')]

##### The function sortByKey does exactly what is says, and sorts the tuples using the key value

In [52]:
rd6 = rd5.sortByKey(ascending=False)

In [53]:
rd6.take(20)

[(2, 'TO'),
 (2, 'HAVE'),
 (1, 'BUT'),
 (1, 'HOW'),
 (1, 'YOU'),
 (1, 'ITS'),
 (1, 'FUN,'),
 (1, 'KNOW'),
 (1, 'FUN')]

### <a id='3'></a> Example 3:  Word Count (more complicated)

##### Let's use the 20 news groups dataset

In [54]:
ngd = fetch_20newsgroups(shuffle = True, remove = ("headers", "footers", "quotes"), random_state = 6)

##### Create an RDD

In [55]:
mrd_one = sc.parallelize(ngd.data) 

In [56]:
type(mrd_one)

pyspark.rdd.RDD

In [57]:
mrd_one.take(2)

['\n\n\n\n\nTheir should be no difference in the drive itself between IBM-PC and Mac.\nThe two main differences are the formatting of the disk itself (but with\nthe correct software each can read the others) and maybe the cable\n(depends on your SCSI board on IBM-PC).\n\nIf you get some Mac softawre to allow mounting of ANY IBM-formatted disk\nand the correct cable you should br able to mount and read your IBM-PC\nsyquest.\n\ngood luck,\n\n--Paul\n\n-- \n  +-------------------------------------------------------------------------+\n  | Paul Hardwick  |  Technical Consulting  |  InterNet: hardwick@panix.com |\n  | P.O. Box 1482  |  for MVS (SP/XA/ESA)   |  Voice:    (212) 535-0998     |\n  | NY, NY 10274   |  and 3rd party addons  |  Fax:      (212) Pending      |\n  +-------------------------------------------------------------------------+',
 "\n\nFreedom of speech does not mean that others are compelled to give one\nthe means to speak publicly.  Some systems have regulations\nprohibi

##### `glom()` allows you to treat a partition as an array rather than as a single row at a time

In [58]:
test = mrd_one.glom()

In [59]:
type(test)

pyspark.rdd.PipelinedRDD

In [60]:
# prints out LOTS OF OUTPUT, SO I'M COMMENTING OUT
#test.take(1)

#### Chaining commands
##### The aim here is to get a list of sentences
1. Use `glom()` to convert the partitions to an array of documents
2. Use `map()` to join the array of documents into 1 massive string with documents separated by a space
3. Use `flatMap()` to split the massive string by sentence into an array of sentences
4. Use `map()` to replace all newlines with '' and make everything lowercase
5. Use `map()` to remove all occurrences of "the"

In [61]:
mrd2 = mrd_one.glom().map(lambda x: " ".join(x)).flatMap(lambda x: x.split('.')).map(lambda x: x.replace('\n', '').\
    lower()).map(lambda x: x.replace(' the ', ' '))

In [62]:
mrd2.take(2)

['their should be no difference in drive itself between ibm-pc and mac',
 'the two main differences are formatting of disk itself (but withthe correct software each can read others) and maybe cable(depends on your scsi board on ibm-pc)']

#### Exercise: Using the sentences write a mapping function to find all the bigrams
- Use a `map()` that splits the sentences (x) into a list of tokens via the `split()` function
- Use a `flatMap()` that loops through each list and returns something like `((x[i], x[i+1]), 1)` for all the tokens in `x`

In [63]:
bigrams = mrd2.map(lambda x: x.split()).flatMap(lambda x: [((x[i], x[i+1]), 1) for i in range(len(x)-1)]).reduceByKey(lambda a, b: a+b)

In [64]:
#Check out the first 10 bigrams with take()
bigrams.takeOrdered(10, key=(lambda x: -x[1]))

[(('to', 'be'), 2947),
 (('it', 'is'), 2889),
 (('is', 'a'), 2508),
 (('i', 'have'), 2174),
 (('if', 'you'), 2170),
 (('this', 'is'), 1851),
 (('of', 'a'), 1591),
 (('in', 'a'), 1531),
 (('i', 'am'), 1529),
 (('is', 'not'), 1490)]

##### Now let's count up the number of occurrences for each bigram
- Use a `reduceByKey()` to sum up the occurrences
- Use a `map()` to exchange the resulting keys with values
- Use a `sortByKey()` to sort the results in descending order

In [65]:
# try it out here

##### Use a `take()` to print out the top 10 bigrams!!

In [66]:
# try it out here

#### Spark supports the efficient parallel application of map and reduce operations by dividing data up into multiple partitions.
- Each partition is replicated across multiple workers running on different nodes in a cluster so that failure of a single worker should not cause the RDD to become unavailable.
- Many operations including map and flatMap can be applied independently to each partition, running as concurrent jobs based on the number of available cores. 
- When processing reduceByKey, Spark will create a number of output partitions based on the *default* paralellism based on the numbers of nodes and cores available to Spark. 
- Data is effectively reshuffled so that input data from different input partitions with the same key value is passed to the same output partition and combined there using the specified reduce function. 
- sortByKey is another operation which transforms N input partitions to M output partitions.
- The number of partitions generated by the reduce stage can be controlled by supplying the desired number of partitions as an extra parameter to reduceByKey

In [68]:
sc.defaultParallelism

2

In [69]:
new1 = sc.parallelize(ngd.data).glom().map(lambda x: " ".join(x)).flatMap(lambda x: x.split('.')).\
    map(lambda x: x.replace('\n', '').lower()).map(lambda x: x.replace('the', '')).map(lambda x: x.split()).\
    flatMap(lambda x: [((x[i], x[i+1]), 1) for i in range(0, len(x)-1)]).\
    reduceByKey(lambda a, b: a + b, numPartitions = 12).\
    map(lambda x: (x[1], x[0])).sortByKey(False)

In [70]:
def countPartitions(id, iterator): 
    c = 0 
    for _ in iterator: 
        c += 1 
        yield (id, c) 

In [71]:
new1.mapPartitionsWithIndex(countPartitions).collectAsMap()

{0: 39666, 1: 43980, 2: 90094, 11: 706648}

In [72]:
new1.take(10)

[(2954, ('to', 'be')),
 (2895, ('it', 'is')),
 (2508, ('is', 'a')),
 (2178, ('i', 'have')),
 (2170, ('if', 'you')),
 (1854, ('this', 'is')),
 (1591, ('of', 'a')),
 (1531, ('in', 'a')),
 (1529, ('i', 'am')),
 (1496, ('is', 'not'))]

### <a id='4'></a> Example 4:  Numerical (prime numbers)

Let's use Spark to find all the primes in any range we specify.  Here's a function that determines if a number is prime:

In [73]:
def isprime(n):
    """
    check if integer n is a prime
    """
    
    # make sure n is a positive integer
    n = abs(int(n))
    
    # 0 and 1 are not primes
    if n < 2:
        return False
    
    # 2 is the only even prime number
    if n == 2:
        return True
    
    # all other even numbers are not primes
    if not n & 1:
        return False
    
    # range starts with 3 and only needs to go up the square root of n
    # for all odd numbers
    for x in range(3, int(n**0.5)+1, 2):
        if n % x == 0:
            return False
    return True

In [74]:
# Create an RDD of numbers from 0 to 1,000,000
# python 2
#nums = sc.parallelize(1000000)

# python 3
nums = sc.parallelize(range(1000000))

nums.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

The `filter()` function allows us to supply a function that returns a boolean and filter an `RDD` by those entries that return True for that function.  Here's how we would use it to return prime numbers less than 1 million.

In [75]:
primes = nums.filter(isprime)

In [76]:
primes.take(10)

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29]

In [77]:
# Compute the number of primes in the RDD
print(primes.count())

78498


### <a id='5'></a> Example 5:  Logistic Regression with Spark MLlib

MLlib is how Spark does Machine Learning.  It has a variety of (what should be!) familiar algorithms that are optimized to work in a distributed fashion!

#### Method 1:   Python

In [78]:
import pandas as pd
from sklearn.linear_model import SGDClassifier  # Stochastic Gradient Descent, linear classifier

In [79]:
dat = pd.read_csv("spark_data/sample_svm_data.txt", delimiter = ' ', header = None)

In [80]:
dat.head(1)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16
0,1,0.0,2.520784,0.0,0.0,0.0,2.004684,2.000347,0.0,2.228387,2.228387,0.0,0.0,0.0,0.0,0.0,0.0


In [81]:
predictors = dat.columns.values[1:]
print(predictors)

[ 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16]


In [82]:
X = dat[predictors]
y = dat[0]

In [83]:
clf_pySGD = SGDClassifier(loss='log', alpha = 0.01, n_iter = 10000)
clf_pySGD.fit(X, y)
yhat = clf_pySGD.predict(X)
print(clf_pySGD.score(X, y))

0.636645962733


In [84]:
cm = pd.crosstab(y, yhat, rownames=["Actual"], colnames=["Predicted"])
cm

Predicted,0,1
Actual,Unnamed: 1_level_1,Unnamed: 2_level_1
0,101,59
1,58,104


### Method 2:   Spark

##### LabeledPoint is a built in Pyspark class (label, features)

In [85]:
def parse_point(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

In [86]:
data = sc.textFile("spark_data/sample_svm_data.txt")

In [87]:
data.take(1)

['1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0']

##### Map into key value pairs

In [88]:
parsed_data = data.map(parse_point)

In [89]:
type(parsed_data)
parsed_data.first()

LabeledPoint(1.0, [0.0,2.52078447202,0.0,0.0,0.0,2.00468443649,2.00034729927,0.0,2.22838704274,2.22838704274,0.0,0.0,0.0,0.0,0.0,0.0])

###### Use the Spark logisitic regression model

In [90]:
spark_clf = LogisticRegressionWithSGD.train(parsed_data)
type(spark_clf)

pyspark.mllib.classification.LogisticRegressionModel

##### p is a labelled point

In [91]:
labels_and_predictions = parsed_data.map(lambda p: (p.label, spark_clf.predict(p.features)))

In [92]:
type(labels_and_predictions)
pprint(labels_and_predictions.take(10))

[(1.0, 1),
 (0.0, 1),
 (0.0, 0),
 (1.0, 1),
 (1.0, 0),
 (0.0, 1),
 (1.0, 1),
 (1.0, 1),
 (0.0, 0),
 (0.0, 0)]


In [93]:
print(parsed_data.count())

322


In [94]:
yyhat = labels_and_predictions.reduceByKey(lambda x, y: x + y).collect()
yyhat

[(0.0, 59), (1.0, 104)]

In [95]:
yyhat = labels_and_predictions \
  .reduceByKey(lambda x, y: x + y) \
  .collect()
landp = labels_and_predictions.map(lambda x : (x[1], x[0]))
yyhat_1 = landp.reduceByKey(lambda x, y: x + y).collect()

print(yyhat)
print(yyhat_1)

[(0.0, 59), (1.0, 104)]
[(0, 58.0), (1, 104.0)]


In [96]:
print(labels_and_predictions.filter(lambda x: x[0] != x[1]).count())
print(labels_and_predictions.filter(lambda x: x[0] == x[1]).count())

117
205


In [97]:
results = list(labels_and_predictions.take(1000))
y = np.array([x[0] for x in results])
yhat = np.array([x[1] for x in results])

In [98]:
cm = pd.crosstab(y, yhat, rownames=["Actual"], colnames=["Predicted"])
cm

Predicted,0,1
Actual,Unnamed: 1_level_1,Unnamed: 2_level_1
0.0,101,59
1.0,58,104


In [99]:
training_error = labels_and_predictions.filter(lambda x: x[0] != x[1]).count()/float(parsed_data.count())
type(training_error)
print(training_error)

0.36335403726708076


---

### On Your Own
Go back to any of your favorite classification datasets that we've dealt with and see if you can implement the classifier with Spark as we just did above.