# Welcome to an IPython notebook

## Basic Spark functionality in Python

In [1]:
import numpy as np

RDDs

In [2]:
np.arange(10)

array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])

In [3]:
data = sc.parallelize(np.arange(10))
data

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:423

In [4]:
data.first()

0

In [5]:
data.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Map

In [6]:
def square(x):
    return np.square(x)

data.map(square).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [7]:
data.map(lambda x: np.square(x)).collect()

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Filter

In [8]:
data.filter(lambda x: np.mod(x, 2) == 1). collect()

[1, 3, 5, 7, 9]

Reduce

In [9]:
data.reduce(lambda x, y: x+y)

45

## More advanced topics and examples

Loading data

In [10]:
iris = sc.textFile("example-data/series/iris_data.txt")
iris.first()

u'5.1\t3.5\t1.4\t0.2\tsetosa'

Preparing data + key-value pairs

In [11]:
split_iris = iris.map(lambda text: text.split('\t'))
split_iris.first()

[u'5.1', u'3.5', u'1.4', u'0.2', u'setosa']

In [12]:
data = split_iris.map(lambda l: (str(l[-1]), np.array(l[:-1], dtype='float')))
data.first()

('setosa', array([ 5.1,  3.5,  1.4,  0.2]))

Reduce by key

In [13]:
data.mapValues(lambda x: 1).reduceByKey(lambda x, y: x + y).collect()

[('versicolor', 50), ('virginica', 50), ('setosa', 50)]

In [14]:
data.mapValues(lambda x: np.array([x, 1])).reduceByKey(lambda x, y: x + y).mapValues(lambda x: x[0]/x[1]).collect()

[('versicolor', array([ 5.936,  2.77 ,  4.26 ,  1.326])),
 ('virginica', array([ 6.588,  2.974,  5.552,  2.026])),
 ('setosa', array([ 5.006,  3.428,  1.462,  0.246]))]

In [15]:
import seaborn as sns
sns.set_context('talk')
sns.set_style('darkgrid')



In [16]:
xy = data.filter(lambda (k, v): k == 'setosa').mapValues(lambda x: x[0:2]).values().collect()
xy = np.asarray(xy)

plt.scatter(xy[:,0], xy[:,1]);

NameError: name 'plt' is not defined

In [None]:
xy1 = data.filter(lambda (k, v): k == 'setosa').mapValues(lambda x: x[0:2]).values().collect()
xy1 = np.asarray(xy1)
xy2 = data.filter(lambda (k, v): k == 'virginica').mapValues(lambda x: x[0:2]).values().collect()
xy2 = np.asarray(xy2)
plt.scatter(xy1[:,0], xy1[:,1]);
plt.scatter(xy2[:,0], xy2[:,1], color='red');

In [17]:
import os 
file_dir = os.path.abspath('example-data/images/flowers')
only_names = [f for f in os.listdir(file_dir)]
filenames = [os.path.join(file_dir, f) for f in only_names]
file_rdd = sc.parallelize(filenames)
file_rdd.take(5)

['/home/carlilek/bin/spark-janelia/examples/example-data/images/flowers/image_1183.png',
 '/home/carlilek/bin/spark-janelia/examples/example-data/images/flowers/image_1114.png',
 '/home/carlilek/bin/spark-janelia/examples/example-data/images/flowers/image_1132.png',
 '/home/carlilek/bin/spark-janelia/examples/example-data/images/flowers/image_0663.png',
 '/home/carlilek/bin/spark-janelia/examples/example-data/images/flowers/image_1107.png']

In [18]:
images_rdd = file_rdd.map(lambda f: plt.imread(f))
num_displayed = 3
imgs = images_rdd.takeSample(False, num_displayed)

sns.set_style('white')

def plot_images(n, images): 
    plt.figure(figsize=(20,10))
    for i, img in enumerate(images): 
        plt.subplot(1,n,i)
        plt.imshow(img)
        
plot_images(num_displayed, imgs)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 65 in stage 19.0 failed 4 times, most recent failure: Lost task 65.3 in stage 19.0 (TID 546, h03u20.int.janelia.org): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 1004, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 1004, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-18-79d6555e8302>", line 1, in <lambda>
NameError: global name 'plt' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/spark-current/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 2346, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 1004, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/misc/local/spark-current/python/pyspark/rdd.py", line 1004, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "<ipython-input-18-79d6555e8302>", line 1, in <lambda>
NameError: global name 'plt' is not defined

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
colors_rdd = images_rdd.map(lambda img: (sum(img)/(img.shape[0] * img.shape[1]), img))
        
dark = colors_rdd.sortBy(lambda (k,v): k).values().take(num_displayed)
plot_images(num_displayed, dark)
bright = colors_rdd.sortBy(lambda (k,v): -k).values().take(num_displayed)
plot_images(num_displayed, bright)

## Regression example

In [None]:
npoints = 100
x = np.random.randn(npoints)

def makeRegressionData(m, noise):
    return m*x + noise*np.random.randn(len(x))

nrecords = 50
y = [(i, makeRegressionData(np.random.uniform(-4, 4), np.random.uniform(0.1, 3))) for i in xrange(npoints)]
data = sc.parallelize(y)

nsamples = 8
sample = data.values().takeSample(False, nsamples)
sns.set_style('darkgrid')
plt.figure(figsize=(20,10))
for i in xrange(nsamples):
    plt.subplot(2,4,i);
    plt.plot(x, sample[i], '.');
    plt.xlim([-3, 3]);
    plt.ylim([-12, 12]);

In [None]:
from sklearn.linear_model import LinearRegression

def regress(x, y):
    result = LinearRegression().fit(x[None, :].T, y[None, :].T)
    return result.coef_[0,0], result.intercept_[0]

regModels = data.mapValues(lambda y: regress(x, y))

regModels.first()

In [None]:
joined = data.join(regModels)
joined.first()

In [None]:
nsamples = 8
sample = joined.values().takeSample(True, nsamples)
xs = np.linspace(-3, 3, 1000)
plt.figure(figsize=(20,10))
for i in xrange(nsamples):
    yvals = sample[i][0]
    m, b = sample[i][1]
    plt.subplot(2,4,i);
    plt.plot(x, yvals, '.');
    plt.plot(xs, m*xs + b)
    plt.xlim([-3, 3]);
    plt.ylim([-12, 12]);