In [None]:

# depending on how this notebook is invoked, sc may already be instantiated.
# if we open ipython using the pyspark python kernel, then all the Spark stuff is already here and imported
# but if we just open a plain ipython session, then we need to do all the imports and instantiation

#from pyspark import SparkContext
#sc = SparkContext()

## RDD Basics

Everything starts from the "SparkContext". This object lives on the driver node and controls the whole application.

In [1]:
sc

<pyspark.context.SparkContext at 0x7f56b4c41990>

We can use the SparkContext to create an RDD from some local data.

In [2]:
myRDD = sc.parallelize(range(50))

In [3]:
myRDD

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

We can also use the SparkContext to load in external data, from csv files or on a distributed filesystem.

In [20]:
beer = sc.textFile("../data/beer.txt")

In [21]:
beer

MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:-2

The RDD is distributed across the cluster, so if want to "view" any of it, we have to bring a piece back down to the driver node.

In [22]:
beer.take(2)

[u'name calories sodium alcohol cost', u'Budweiser 144 15 4.7 0.43']

We see that the result of sc.textFile() is an RDD where each element is a string, corredponding to each line of the file. We probably want to parse these lines and make something a little more useful.

In [23]:
beer.map(lambda line: line.split(" ")).take(3)

[[u'name', u'calories', u'sodium', u'alcohol', u'cost'],
 [u'Budweiser', u'144', u'15', u'4.7', u'0.43'],
 [u'Schlitz', u'151', u'19', u'4.9', u'0.43']]

Let's get rid of that first line, since it represent the header info.

In [24]:
beer = beer.map(lambda line: line.split(" ")).filter(lambda l:l[0]!="name")

In [25]:
beer.take(2)

[[u'Budweiser', u'144', u'15', u'4.7', u'0.43'],
 [u'Schlitz', u'151', u'19', u'4.9', u'0.43']]

Now we've got an RDD of lists of strings, we probably want to parse this into something more useful. 

In [13]:
from collections import namedtuple

In [15]:
Beer = namedtuple("Beer", ("name", "calories","sodium","alcohol","cost"))

In [29]:
def parse(line):
    n = line[0]
    values = [float(val) for val in line[1:]]
    return Beer(n, *values)

In [30]:
beer_parsed = beer.map(parse)

In [31]:
beer_parsed.take(2)

[Beer(name=u'Budweiser', calories=144.0, sodium=15.0, alcohol=4.7, cost=0.43),
 Beer(name=u'Schlitz', calories=151.0, sodium=19.0, alcohol=4.9, cost=0.43)]

In [34]:
beer_parsed.filter(lambda a: a.cost<.4).collect()

[Beer(name=u'Old_Milwaukee', calories=145.0, sodium=23.0, alcohol=4.6, cost=0.28),
 Beer(name=u'Pabst_Extra_Light', calories=68.0, sodium=15.0, alcohol=2.3, cost=0.38)]

In [37]:
beer_parsed.map(lambda a : (a.cost,1)).reduceByKey(lambda a,b: a+b).collect()

[(0.5, 1),
 (0.4, 2),
 (0.77, 1),
 (0.44, 1),
 (0.48, 1),
 (0.76, 1),
 (0.42, 1),
 (0.38, 1),
 (0.28, 1),
 (0.46, 2),
 (0.43, 5),
 (0.47, 1),
 (0.79, 1),
 (0.73, 1)]

## MLlib Example

In [38]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel, LabeledPoint

Let's create a classification example, where want to predict whether the beer cost less than or more then .5.

In [43]:
def prepPoint(p):
    expensive = 1 if p.cost >.5 else 0
    return LabeledPoint(expensive,[p.calories, p.sodium, p.alcohol])

In [44]:
labeledPoints = beer_parsed.map(prepPoint)

In [45]:
labeledPoints.collect()

[LabeledPoint(0.0, [144.0,15.0,4.7]),
 LabeledPoint(0.0, [151.0,19.0,4.9]),
 LabeledPoint(0.0, [157.0,15.0,0.9]),
 LabeledPoint(1.0, [170.0,7.0,5.2]),
 LabeledPoint(1.0, [152.0,11.0,5.0]),
 LabeledPoint(0.0, [145.0,23.0,4.6]),
 LabeledPoint(0.0, [175.0,24.0,5.5]),
 LabeledPoint(0.0, [149.0,27.0,4.7]),
 LabeledPoint(0.0, [99.0,10.0,4.3]),
 LabeledPoint(0.0, [113.0,8.0,3.7]),
 LabeledPoint(0.0, [140.0,18.0,4.6]),
 LabeledPoint(0.0, [102.0,15.0,4.1]),
 LabeledPoint(0.0, [135.0,11.0,4.2]),
 LabeledPoint(1.0, [150.0,19.0,4.7]),
 LabeledPoint(1.0, [149.0,6.0,5.0]),
 LabeledPoint(0.0, [68.0,15.0,2.3]),
 LabeledPoint(0.0, [139.0,19.0,4.4]),
 LabeledPoint(0.0, [144.0,24.0,4.9]),
 LabeledPoint(0.0, [72.0,6.0,2.9]),
 LabeledPoint(0.0, [97.0,7.0,4.2])]

In [46]:
model = LogisticRegressionWithLBFGS.train(labeledPoints)

In [47]:
# Evaluating the model on training data
labelsAndPredictions = labeledPoints.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(labeledPoints.count())
print("Training Error = " + str(trainErr))

Training Error = 0.1


Very similar interface and style of thinking as we're used to with scikitlearn. But this Logistic Regression model is estimated on a whole RDD, which could easily be billions of data points. All the cleverness and innovation is in making model.train() work effectively in a distributed computing environment.