In [3]:
import findspark
import os
findspark.init()
import pyspark
sc = pyspark.SparkContext()

In [4]:
spark_home = os.environ.get('SPARK_HOME', None)

In [5]:
print spark_home

/usr/local/opt/apache-spark/libexec


In [42]:
import numpy as np
from sklearn.datasets import fetch_20newsgroups
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from sklearn.linear_model import SGDClassifier
import csv

In [83]:
from pyspark import SparkConf, SparkContext
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint

In [84]:
text_file = sc.textFile("./Spark_INFO.md")

In [85]:
type(text_file)

pyspark.rdd.RDD

## Doing things to the data 

In [86]:
text_file.take(10)

[u'',
 u'# SPARK',
 u'',
 u'## WHAT IS Spark?',
 u'',
 u'    * Resilient Distributed Datasets (RDDs)',
 u'    * Processes on a Cluster',
 u'    * Lazy',
 u'    * Interactives Shells',
 u'']

In [87]:
lines_not_empty = text_file.filter(lambda x: len(x) > 0)

There's 20 lines I threw out

In [88]:
lines_not_empty.count(), text_file.count()

(38, 58)

In [89]:
lines_not_empty.take(10)

[u'# SPARK',
 u'## WHAT IS Spark?',
 u'    * Resilient Distributed Datasets (RDDs)',
 u'    * Processes on a Cluster',
 u'    * Lazy',
 u'    * Interactives Shells',
 u'### RDD:',
 u'The main abstraction Spark provides is a Resilient Distributed Dataset (RDD), which is a fault-tolerant collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.',
 u'An example of a RDD: ',
 u'data = [1, 2, 3, 4, 5]   ']

#### Flat map made the list of words into one long list of words.  

In [90]:
words = text_file.flatMap(lambda x: x.split())

In [91]:
words.take(10)

[u'#',
 u'SPARK',
 u'##',
 u'WHAT',
 u'IS',
 u'Spark?',
 u'*',
 u'Resilient',
 u'Distributed',
 u'Datasets']

In [92]:
words = words.map(lambda x: x.replace('|', '').replace('.', '').\
                 replace('-', '').replace(' ', '').replace('&', '').replace('#', '').upper())

In [93]:
words.take(10)

[u'',
 u'SPARK',
 u'',
 u'WHAT',
 u'IS',
 u'SPARK?',
 u'*',
 u'RESILIENT',
 u'DISTRIBUTED',
 u'DATASETS']

In [94]:
import re

#### Filtering out things that match the regex there

In [95]:
words = words.filter(lambda x: re.match('[A-Z]+', x))

#### This is a mapper that returns a count of "1' for each words

In [96]:
word_counts = words.map(lambda x: (x, 1))

In [97]:
word_counts = word_counts.reduceByKey(lambda a, b: a + b)

In [98]:
word_counts.take(10)

[(u'WE', 7),
 (u'WORKERS', 1),
 (u'EFFECIENT', 1),
 (u'COMPONENTS)', 1),
 (u'RESILIENT', 2),
 (u'NODES', 5),
 (u'WITH', 1),
 (u'COMPUTATIONS', 1),
 (u'PARTIONS', 1),
 (u'ACTION', 1)]

#### Sorting

In [99]:
word_counts = word_counts.map(lambda x: (x[1], x[0]))

In [100]:
word_counts = word_counts.sortByKey(False)

In [101]:
word_counts.take(10)

[(17, u'THE'),
 (12, u'IS'),
 (9, u'A'),
 (9, u'TO'),
 (8, u'SPARK'),
 (8, u'THAT'),
 (7, u'WE'),
 (7, u'IN'),
 (6, u'BE'),
 (6, u'ON')]

## Word counting

In [102]:
lines = sc.parallelize(['Its fun to have fun,', 'but you have to know how.'])

In [103]:
rd1 = lines.map(lambda x: x.replace('|', '').\
                replace('.', '').replace('-', '').replace('&', '').replace('#','').upper())

In [104]:
rd1.take(10)

['ITS FUN TO HAVE FUN,', 'BUT YOU HAVE TO KNOW HOW']

In [105]:
rd2 = rd1.flatMap(lambda x: x.split())

In [106]:
rd2.take(20)

['ITS', 'FUN', 'TO', 'HAVE', 'FUN,', 'BUT', 'YOU', 'HAVE', 'TO', 'KNOW', 'HOW']

#### flatMap flattened to a list of words instead of a list of list of words

## More complicated example

In [107]:
ngd = fetch_20newsgroups(shuffle = True, remove = ("headers", "footers", "quotes"), random_state = 6)


In [108]:
mrd1 = sc.parallelize(ngd.data)

In [109]:
print np.shape(ngd.data)

(11314,)


In [110]:
mrd1.take(1)

[u'\n\n\n\n\nTheir should be no difference in the drive itself between IBM-PC and Mac.\nThe two main differences are the formatting of the disk itself (but with\nthe correct software each can read the others) and maybe the cable\n(depends on your SCSI board on IBM-PC).\n\nIf you get some Mac softawre to allow mounting of ANY IBM-formatted disk\nand the correct cable you should br able to mount and read your IBM-PC\nsyquest.\n\ngood luck,\n\n--Paul\n\n-- \n  +-------------------------------------------------------------------------+\n  | Paul Hardwick  |  Technical Consulting  |  InterNet: hardwick@panix.com |\n  | P.O. Box 1482  |  for MVS (SP/XA/ESA)   |  Voice:    (212) 535-0998     |\n  | NY, NY 10274   |  and 3rd party addons  |  Fax:      (212) Pending      |\n  +-------------------------------------------------------------------------+']

#### .glom() takes each partition and makes it an array of items (one item representing each part of the data the from each partition)

In [118]:
test = mrd1.glom()

In [123]:
mrd2 = mrd1.glom().map(lambda x: " ".join(x)).flatMap(lambda x: x.split('.')).map(lambda x: x.replace('\n', '').\
    lower()).map(lambda x: x.replace(' the ', ' '))

In [150]:
mrd2.take(2)

[u'their should be no difference in drive itself between ibm-pc and mac',
 u'the two main differences are formatting of disk itself (but withthe correct software each can read others) and maybe cable(depends on your scsi board on ibm-pc)']

In [160]:
bigrams = mrd2.map(lambda x: x.split()).flatMap(lambda x: [((x[i], x[i+1]), 1) for i in range(len(x)-1)]).reduceByKey(lambda a, b: a+b)

#### We split everything up into bigrams and then counted the number of them 

In [161]:
new1 = sc.parallelize(ngd.data).glom().map(lambda x: " ".join(x)).flatMap(lambda x: x.split('.')).\
    map(lambda x: x.replace('\n', '').lower()).map(lambda x: x.replace('the', '')).map(lambda x: x.split()).\
    flatMap(lambda x: [((x[i], x[i+1]), 1) for i in range(0, len(x)-1)]).\
    reduceByKey(lambda a, b: a + b, numPartitions = 12).\
    map(lambda x: (x[1], x[0])).sortByKey(False)

In [162]:
def countPartitions(id, iterator): 
    c = 0 
    for _ in iterator: 
        c += 1 
        yield (id, c) 

In [163]:
new1.mapPartitionsWithIndex(countPartitions).collectAsMap()

{0: 54600, 1: 29046, 2: 90094, 11: 706646}

## Logistic Regression with Spark MLlib

In [169]:
dat = pd.read_csv("sample_svm_data.txt", delimiter = ' ', header = None)
predictors = dat.columns.values[1:]

In [170]:
X = dat[predictors]
y = dat[0]

In [171]:
clf_pySGD = SGDClassifier(loss='log', alpha = 0.01, n_iter = 10000)
clf_pySGD.fit(X, y)
yhat = clf_pySGD.predict(X)
print clf_pySGD.score(X, y)

0.636645962733


In [172]:
cm = pd.crosstab(y, yhat, rownames=["Actual"], colnames=["Predicted"])
cm

Predicted,0,1
Actual,Unnamed: 1_level_1,Unnamed: 2_level_1
0,101,59
1,58,104


In [173]:
def parse_point(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

In [177]:
data = sc.textFile("sample_svm_data.txt")

In [178]:
data.take(1)

[u'1 0 2.52078447201548 0 0 0 2.004684436494304 2.000347299268466 0 2.228387042742021 2.228387042742023 0 0 0 0 0 0']

In [180]:
parsed_data = data.map(parse_point)

In [181]:
print type(parsed_data)
parsed_data.first()

<class 'pyspark.rdd.PipelinedRDD'>


LabeledPoint(1.0, [0.0,2.52078447202,0.0,0.0,0.0,2.00468443649,2.00034729927,0.0,2.22838704274,2.22838704274,0.0,0.0,0.0,0.0,0.0,0.0])

### Fit a model in Spark

`from pyspark.mllib.classification import LogisticRegressionWithSGD`

In [182]:
spark_clf = LogisticRegressionWithSGD.train(parsed_data)

print type(spark_clf)


<class 'pyspark.mllib.classification.LogisticRegressionModel'>


In [183]:
labels_and_predictions = parsed_data.map(lambda p: (p.label, spark_clf.predict(p.features)))

In [184]:
print type(labels_and_predictions)
print labels_and_predictions.take(10)

<class 'pyspark.rdd.PipelinedRDD'>
[(1.0, 1), (0.0, 1), (0.0, 0), (1.0, 1), (1.0, 0), (0.0, 1), (1.0, 1), (1.0, 1), (0.0, 0), (0.0, 0)]


In [186]:
print parsed_data.count()

322


In [187]:
yyhat = labels_and_predictions.reduceByKey(lambda x, y: x + y).collect()
landp = labels_and_predictions.map(lambda x : (x[1], x[0]))
yyhat_1 = landp.reduceByKey(lambda x, y: x + y).collect()


print yyhat
print yyhat_1
print labels_and_predictions.filter(lambda (x, y): x != y).count()
print labels_and_predictions.filter(lambda (x, y): x == y).count()

[(0.0, 59), (1.0, 104)]
[(0, 58.0), (1, 104.0)]
117
205


In [188]:
results = list(labels_and_predictions.take(1000))
y = np.array([x[0] for x in results])
yhat = np.array([x[1] for x in results])

In [189]:
cm = pd.crosstab(y, yhat, rownames=["Actual"], colnames=["Predicted"])
cm

Predicted,0,1
Actual,Unnamed: 1_level_1,Unnamed: 2_level_1
0,101,59
1,58,104
