# Getting started with PySpark
#### Create spark context and other python imports

In [1]:
%matplotlib inline
from __future__ import print_function, division
from pyspark import SparkConf
from pyspark import SparkContext 
from StringIO import StringIO
import matplotlib.pyplot as plt
import numpy as np
from pprint import pprint

conf = SparkConf()
conf.set('spark.executor.instances', 2)
sc = SparkContext()

## A few preliminaries

#### <code>lambda</code> expressions
It is helpful to know lambda expressions and about unpacking arguments arguments for them.

<code>lambda</code> is a statement for an expression that can have a closure.  It's like a function or other callable, but in one line without a return statement.

<code>lambda</code> is useful in pyspark because there is a need for a lot of simple callables.

<code>c=lambda (key,(x,y)): (key, x * y)</code> 

makes c a callable, that can be used like this:

<code>c(('key1', (3, 4)))</code> 

to return 
<code>('key1', 12)</code> 

#### mapping / reducing
In pyspark and other parallel computing frameworks, there is a concept of 'mapping'.

E.g. mapping across a list of tuples and summing the second element of each tuple
```python
x = [(1, 2), (2, 3)]

sum(map(lambda x:x[1], x)) # answer is 5 in python
# or
sc.parallelize(x).map(lambda x:x[1]).sum() # answer is 5 in pyspark
```
In mapping and reducing with pyspark it can be helpful to unpack the arguments to a lambda used in a mapper.

```python
sc.parallelize(x).map(lambda (key, value): value).sum() # 5
```
(x was mapped as a Resilient Distributed Dataset, the key-values were split and the values were summed.

## Common pyspark usage patterns

#### <code>sc.parallelize</code>: create an RDD from a python iterable

<code>sc.parallelize</code> can be called on a list, tuple, generator, or other iterator.  Generators are useful because the full set of elements of the RDD never has to be held in memory in the python main process.

In [2]:
import random
def generate_rands(length=100):
    for idx in range(length):
        yield (random.uniform(0,1), random.uniform(0,1))
rand_rdd = sc.parallelize(generate_rands())

#### We have just created rand_rdd in parallel java processes for use with pyspark map reduce
If we want to get the RDD back to python, we can call <code>.collect()</code>

In [3]:
as_list_again = rand_rdd.collect()
as_list_again[:10]

[(0.8233133911430207, 0.23390036322252228),
 (0.5864887904722843, 0.7247920583468935),
 (0.868808963085547, 0.38927505714571997),
 (0.7997240345772992, 0.8648685339908081),
 (0.7887573866879661, 0.2956290317157453),
 (0.3067619246071419, 0.34909126160939163),
 (0.4059335260023892, 0.38823106121150397),
 (0.33601159510207335, 0.4990349540137352),
 (0.18695004090526468, 0.43572846130587106),
 (0.13497377912873232, 0.5993620897859301)]

#### But a better way to do it is to <code>take</code> a few elements rather than the full RDD
This avoids pulling the entire RDD into the main python process, which breaks parallelism.

In [4]:
rand_rdd.take(5)

[(0.8233133911430207, 0.23390036322252228),
 (0.5864887904722843, 0.7247920583468935),
 (0.868808963085547, 0.38927505714571997),
 (0.7997240345772992, 0.8648685339908081),
 (0.7887573866879661, 0.2956290317157453)]

#### <code>sc.binaryFiles</code>: read binary HDFS files into an RDD, matching a wildcard pattern

In [5]:
img_files = sc.binaryFiles('/imgs/malestaff*')  # wildcard hadoop distributed file system name
key, img = img_files.take(1)[0] # Don't call .collect() on this one!
pprint((key, img[:100])) # abbreviated the image blob to 100 characters

(u'hdfs://ip-10-111-177-131:9000/imgs/malestaffanonymanonym.11.jpg',
 '\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c\x1c $.\' ",#\x1c\x1c(7),01444\x1f\'9=82<.342\xff\xdb\x00C\x01\t\t\t\x0c\x0b\x0c')


#### <code>sc.pickleFile</code>: read results typically from pyspark that were output with <code>saveAsPickleFile</code>

In pyspark, an RDD may be persisted to the hadoop distributed file system (HDFS) with

<code>x.saveAsPickleFile(some_path)</code> where <code>x</code> is the RDD.

In [6]:
image_measurements = sc.pickleFile('hdfs:///t1/map_each_image/measures')
image_measurements.take(1)  # Don't call .collect() on this one!

[(u'hdfs://ip-10-111-177-131:9000/imgs/femaleasammaasamma.2.jpg',
  {'cen': array([ 0.0980597 ,  0.10067332,  0.07502033,  0.4832288 ,  0.74329001,
           0.3068403 ,  0.39556718,  0.33680019,  0.34318587,  0.69543546,
           0.5883016 ,  0.61652535,  0.97057652,  0.86680192,  0.84002519,
           0.26330549,  0.2297267 ,  0.24091083,  0.89458674,  0.70459908,
           0.66731501,  0.28936416,  0.39460137,  0.14140551,  0.04144318,
           0.04623584,  0.02837553,  0.40374291,  0.58303112,  0.23085584,
           0.53872192,  0.4576503 ,  0.44566652,  0.16590469,  0.16399746,
           0.13009502], dtype=float32),
   'histo': array([ 0.02744549,  0.03814275,  0.05321374,  0.16956136,  0.45035073,
           0.50272733,  0.75232422,  0.0308008 ,  0.04285328,  0.05870309,
           0.17326318,  0.51095122,  0.69764537,  0.76385945,  0.01479674,
           0.02571036,  0.03825854,  0.13062458,  0.30416244,  0.36708713,
           0.64770496], dtype=float32),
   'id': u'hd

#### <code>.map</code> applys a function to each element of the RDD
This functions maps x and y random numbers and calculates their squares and product

In [7]:
products = rand_rdd.map(lambda (x, y): (x**2, y**2, x*y))
products.take(5)

[(0.6778449400354206, 0.054709379915627854, 0.1925733012343191),
 (0.343969101349643, 0.5253235278427267, 0.4250824176437869),
 (0.7548290143377834, 0.15153507011580356, 0.33820565875384),
 (0.6395585314805933, 0.7479975810874095, 0.6916561533820831),
 (0.6221382150548297, 0.08739652439318914, 0.2331795824852051)]

#### <code>.reduce</code> applys an aggregation to an RDD
<code>.reduce</code> takes a callable which is called with two elements of the RDD (repeatedly).  The function must be idempotent, that is it must give the same answer if called more than once on the same inputs.  Summing is an example of an idempotent operation.

Here we calculate the correlation coefficient between the two columns by summing <code class="python">x\*\*2</code>, <code class="python">y\*\*2</code> and <code>x * y</code>

In [8]:
x_variance, y_variance, covar = products.reduce(lambda a, b: [ai + bi for ai, bi in zip(a,b)]) 
# in the above line a and b are both tuples 3 long (x square, y square, x * y)
correlation = covar / (y_variance * x_variance)
correlation

0.022629099386862826

#### <code>.flatMap</code> works like <code>.map</code> but <code>.flatMap</code> returns a list

It is useful for making more elements out of an element of an RDD, like this word count example.

In [9]:
documents = sc.parallelize(['useful for RDDs', 
                            'RDDs are useful', 
                            'flatMap is useful', 
                            'map/reduce is also useful'])
words = documents.flatMap(
    lambda x: [(word, 1) for word in x.split()]  #(from documents rdd to words rdd with 1 for counting)
)
print('words RDD')
pprint(words.collect())
print('Word counts')
words.reduceByKey(
    lambda a, b: a + b  # the word is not passed in, only the count which may be 1 or more when called
).sortBy(
    lambda (word, count): -count   # sort by negative count for descending
).collect()

words RDD
[('useful', 1),
 ('for', 1),
 ('RDDs', 1),
 ('RDDs', 1),
 ('are', 1),
 ('useful', 1),
 ('flatMap', 1),
 ('is', 1),
 ('useful', 1),
 ('map/reduce', 1),
 ('is', 1),
 ('also', 1),
 ('useful', 1)]
Word counts


[('useful', 4),
 ('RDDs', 2),
 ('is', 2),
 ('map/reduce', 1),
 ('are', 1),
 ('also', 1),
 ('flatMap', 1),
 ('for', 1)]

#### Inspecting an RDD, to see what it looks like, we have to call <code>take</code> to see elements of the RDD and we can do type inspection to figure out what is going on.

In [10]:
example = documents.take(1)[0] # take gives a list and we want 1st element
print('example is of type', type(example))

example is of type <type 'str'>


#### <code>.count()</code> gives the number of elements in an RDD

In [11]:
documents.count()

4

## Going through some map reduce ideas with the image files

#### Mapping the <code>load_image</code> function to filenames will load each filename's image into an RDD
The RDD consists of <code>(filename, image_numpy_3d_array)</code> tuples

In [12]:
def load_image(image):
    """Load one image, where image = (key, blob)"""
    from StringIO import StringIO
    from PIL import Image
    img = Image.open(StringIO(image[1]))
    return  image[0], np.asarray(img, dtype=np.uint8)

img_files = sc.binaryFiles('/imgs/malestaff*')  # wildcard hadoop distributed file system name
img_mapped = img_files.map(load_image, img_files)
key, img_3d = img_mapped.take(1)[0] # Don't call .collect() on this one
print(key, "has a shape of ", img_3d.shape)

hdfs://ip-10-111-177-131:9000/imgs/malestaffanonymanonym.11.jpg has a shape of  (200, 180, 3)


#### At this point <code>img_mapped</code> is a iterable of tuples (filename, image).  We can in turn map and reduce this RDD to create other RDDs.  Here we are calculating the percentiles in each color band for each image and printing out the first 5 images' percentils as a sample.

In [13]:
labels_bands = zip(('red','green','blue'), range(3))
for label, band in labels_bands:
    percentiles = img_mapped.map(
                        lambda (fname, img): np.percentile(img[:,:,band], (5, 25, 50, 75, 95))
                    )
    print('Percentiles for', label)
    pprint(percentiles.take(5))


Percentiles for red
[array([  31.,   85.,  114.,  136.,  254.]),
 array([  31. ,   84. ,  113.5,  136. ,  253. ]),
 array([  10.,   36.,   71.,   78.,  183.]),
 array([  10.,   37.,   70.,   78.,  184.]),
 array([  13.,   33.,   58.,   64.,  115.])]
Percentiles for green
[array([  27.,   75.,  133.,  180.,  203.]),
 array([  27.,   73.,  134.,  181.,  202.]),
 array([  12.,   34.,   84.,  109.,  132.]),
 array([  11.,   34.,   82.,  108.,  131.]),
 array([  13.,   32.,   61.,   95.,  104.])]
Percentiles for blue
[array([  20.,   44.,   76.,  112.,  209.]),
 array([  20.,   45.,   79.,  110.,  209.]),
 array([   7.,   26.,   42.,   55.,  129.]),
 array([   8.,   27.,   42.,   57.,  132.]),
 array([  11.,   35.,   53.,   66.,  109.])]


#### With spark, we can also reduce an RDD.  This will take an average of the red percentiles by summing and dividing by the count.

In [14]:
avgs = percentiles.reduce(
    lambda a, b: a + b  # adding numpy percentiles arrays a and b from 2 images
) / percentiles.count() # diving by the number of images
print(avgs)

[  16.03125     37.1796875   55.65625     79.8125     153.09375  ]


####Example of groupby operations, with grouping by mean color bins
Below <code>int(np.mean(img[:,:, band]) // 5) * 5</code> creates an integer key that is the pixel 0 to 255 value floored to the nearest 5.  The first map emits this key with the filename.  

In [15]:
for label, band in labels_bands:
    rand_groups = img_mapped.map(
                lambda (fname, img): (int(np.mean(img[:,:, band]) // 5) * 5, fname)
            ).groupByKey(
            ).map(
                lambda (group, fnames): (group, list(fnames))
            ).sortByKey(
            )
    print('Grouping by', label)
    pprint(rand_groups.collect())



Grouping by red
[(45,
  [u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.3.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.6.jpg']),
 (50, [u'hdfs://ip-10-111-177-131:9000/imgs/malestaffdorajdoraj.3.jpg']),
 (55, [u'hdfs://ip-10-111-177-131:9000/imgs/malestaffdorajdoraj.13.jpg']),
 (60,
  [u'hdfs://ip-10-111-177-131:9000/imgs/malestaffieorfieorf.2.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/malestaffieorfieorf.9.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/malestaffrobinrobin.10.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/malestaffrobinrobin.4.jpg']),
 (65,
  [u'hdfs://ip-10-111-177-131:9000/imgs/malestaffcwangcwang.12.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/malestaffcwangcwang.5.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/malestaffvoudcxvoudcx.17.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/malestaffvoudcxvoudcx.7.jpg']),
 (80,
  [u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmaccimacci.13.jpg',
   u'hdfs://ip-10-111-177-131:9000/imgs/males

#### The next few steps load the configuration yaml file for image-analyzer, set an output hdfs location, and apply the map_each_image function to write an RDD of image measurements to HDFS

In [17]:
import yaml
from map_each_image import map_each_image
config = yaml.load(open('config.yaml').read())
config['random_state'] = 42
# make a new filename each time for the demo starting with /%s/map_each_image/ % config['test_name']
output_hdfs = '/%s/map_each_image/measures_8'  % config['test_name']
input_file_spec = '/imgs/malestaff*'
measures = map_each_image(sc, config, input_file_spec, output_hdfs)
measures

PythonRDD[75] at RDD at PythonRDD.scala:43

#### later load the image measurements from HDFS using unpickling

In [18]:
measures = sc.pickleFile(output_hdfs)
measures.take(1) # don't call .collect() here

[(u'hdfs://ip-10-111-177-131:9000/imgs/malestaffanonymanonym.11.jpg',
  {'cen': array([ 0.35465017,  0.29722726,  0.25416034,  0.60304958,  0.53971756,
           0.37986243,  0.98485434,  0.81124085,  0.86357927,  0.36931238,
           0.47650814,  0.15291676,  0.22837859,  0.19009633,  0.1671942 ,
           0.67391962,  0.54674029,  0.57025176,  0.49174181,  0.73113638,
           0.32001153,  0.44407496,  0.39712024,  0.44328627,  0.12760539,
           0.11378499,  0.08404868,  0.98294377,  0.76325887,  0.68695372,
           0.40710312,  0.56513143,  0.23902495,  0.83226156,  0.65122527,
           0.6805672 ], dtype=float32),
   'histo': array([ 0.12638396,  0.22037019,  0.33601463,  0.44619676,  0.53434545,
           0.7464537 ,  0.99226081,  0.11083502,  0.17869972,  0.3017368 ,
           0.52392244,  0.70189315,  0.74901539,  0.79198784,  0.07672456,
           0.13942909,  0.17422667,  0.29805189,  0.43718517,  0.60197175,
           0.80762595], dtype=float32),
   'id': 

#### From the image analyzer code we know that the <code>'histo'</code> key is the 3 band histogram flattened into one array, so we can check its size and know where the median colors are.  For example, if it is 21 elements long, then the first 7 are the red histogram and the middle of those 7 elements is the median red color.

In [19]:
def get_median_from_histo(histo, band):
    lh = len(histo) // 3
    return histo[lh // 2 + 1 + band * lh]

#### As an exercise, sort the images by median color in each band, take the 5 image file names that are lowest in that color

In [20]:
for label, band in labels_bands:
    sorted_filenames = measures.sortBy(
        lambda (key, value): get_median_from_histo(value['histo'], band)
    ).map(
        lambda (key, value): key
    ).collect()
    print('Lowest 5 images in %s ' % label)
    pprint(sorted_filenames[:5])


Lowest 5 images in red 
[u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.3.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.6.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.3.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.6.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.3.jpg']
Lowest 5 images in green 
[u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.3.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.6.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffdorajdoraj.13.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffdorajdoraj.3.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffdorajdoraj.3.jpg']
Lowest 5 images in blue 
[u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.6.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffmartinmartin.3.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffieorfieorf.9.jpg',
 u'hdfs://ip-10-111-177-131:9000/imgs/malestaffi

####Make a function to print out images in the same order they are found in <code>measures</code>.

#### Joins in PySpark work on RDDs, using equality on the first item in tuples to determine the join

Here is are several join examples with a 2 lists of tuples that are spark RDDs from python lists

In [21]:
list_a = sc.parallelize([(1, 1), (3, 3), (5, 10)]) # a has some keys not in b's keys
list_b = sc.parallelize([(1, 2), (3, 4), (7, 12)]) # b has some keys not in a's keys
list_a.join(list_b).collect()

[(1, (1, 2)), (3, (3, 4))]

In [22]:
list_b.join(list_a).collect()

[(1, (2, 1)), (3, (4, 3))]

In [23]:
list_a.rightOuterJoin(list_b).collect()

[(1, (1, 2)), (3, (3, 4)), (7, (None, 12))]

In [24]:
list_b.leftOuterJoin(list_a).collect()

[(1, (2, 1)), (3, (4, 3)), (7, (12, None))]

In [25]:
list_b.rightOuterJoin(list_a).collect()

[(1, (2, 1)), (3, (4, 3)), (5, (None, 10))]

In [26]:
list_a.leftOuterJoin(list_b).collect()

[(1, (1, 2)), (3, (3, 4)), (5, (10, None))]

In [27]:
list_a.fullOuterJoin(list_b).collect()

[(1, (1, 2)), (3, (3, 4)), (5, (10, None)), (7, (None, 12))]

#### Joins with the image data
In the example images we have <code>hdfs:///imgs/</code> which includes the original and <code>hdfs:///fuzzy/</code> which is a fuzzy version of those.  We can map the measurements of fuzzy and original images and join on ward cluster hashes.  In each image's measures dictionary, the ward clusters are saved in a list, as shown above.  We can use <code>flatMap</code> to flatten that list and create an RDD with tuples of <code>(one_ward_cluster_hash, image_file_name)</code>.

In [28]:
parts = (config['test_name'], config['candidate_batch'])
candidates = sc.pickleFile('hdfs:///%s/candidates/%s/measures' % parts)

In [29]:
def flatten_ward(rdd):
    return rdd.flatMap(lambda (key, value): [(wc, key) for wc in value['ward']])

In [30]:
candidates_flat = flatten_ward(candidates)
originals_flat = flatten_ward(measures)
pprint(candidates_flat.take(1))
pprint(originals_flat.take(1))

[(-3831614152369362579,
  u'hdfs://ip-10-111-177-131:9000/fuzzy/femaleanpageanpage.20.jpg')]
[(-3831614152369362579,
  u'hdfs://ip-10-111-177-131:9000/imgs/malestaffanonymanonym.11.jpg')]


In [31]:
joined = candidates_flat.join(originals_flat)

In [32]:
joined.groupByKey().map(lambda (key, value): (key, list(value))).take(4)

[(-5265720812001719200,
  [(u'hdfs://ip-10-111-177-131:9000/fuzzy/femaleanpageanpage.20.jpg',
    u'hdfs://ip-10-111-177-131:9000/imgs/malestaffieorfieorf.9.jpg'),
   (u'hdfs://ip-10-111-177-131:9000/fuzzy/femaleanpageanpage.20.jpg',
    u'hdfs://ip-10-111-177-131:9000/imgs/malestaffsandmsandm.4.jpg'),
   (u'hdfs://ip-10-111-177-131:9000/fuzzy/femaleanpageanpage.20.jpg',
    u'hdfs://ip-10-111-177-131:9000/imgs/malestaffvoudcxvoudcx.7.jpg'),
   (u'hdfs://ip-10-111-177-131:9000/fuzzy/malecshubbcshubb.10.jpg',
    u'hdfs://ip-10-111-177-131:9000/imgs/malestaffieorfieorf.9.jpg'),
   (u'hdfs://ip-10-111-177-131:9000/fuzzy/malecshubbcshubb.10.jpg',
    u'hdfs://ip-10-111-177-131:9000/imgs/malestaffsandmsandm.4.jpg'),
   (u'hdfs://ip-10-111-177-131:9000/fuzzy/malecshubbcshubb.10.jpg',
    u'hdfs://ip-10-111-177-131:9000/imgs/malestaffvoudcxvoudcx.7.jpg'),
   (u'hdfs://ip-10-111-177-131:9000/fuzzy/malejdbenmjdbenm.20.jpg',
    u'hdfs://ip-10-111-177-131:9000/imgs/malestaffieorfieorf.9.jpg'),


### Explore_Spark_Results.ipynb in this directory goes further into the image example