In [1]:
from pyspark import SparkContext
sc = SparkContext("local", "count app")
sc.setLogLevel("ERROR")

22/11/01 11:18:20 WARN Utils: Your hostname, kg3597wc201 resolves to a loopback address: 127.0.1.1; using 10.19.82.77 instead (on interface wifi0)
22/11/01 11:18:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/01 11:18:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Example - Time between earthquakes

Suppose that earthquakes of a certain magnitude in a specific region can be modeled as a Poisson process with a mean of $\lambda = 4.5$ earthquakes per day.  Let $X$ be the time until the third earth quake.  It can be shown that $X$ has a $Gamma$ distribution with $\alpha = 3$ (number of events) and $\beta = \frac{1}{\lambda}=\frac{1}{4.5}$ (average time until the 3rd earthquake).  We can use Python's `random.gammavariate` to simulate the distribution.

In [2]:
from random import gammavariate
help(gammavariate)

Help on method gammavariate in module random:

gammavariate(alpha, beta) method of random.Random instance
    Gamma distribution.  Not the gamma function!
    
    Conditions on the parameters are alpha > 0 and beta > 0.
    
    The probability distribution function is:
    
                x ** (alpha - 1) * math.exp(-x / beta)
      pdf(x) =  --------------------------------------
                  math.gamma(alpha) * beta ** alpha



In [3]:
from composable.sequence import head
N = 1000000
time_between_3_quakes = [gammavariate(3,1/4.5) for i in range(N)]
time_between_3_quakes >> head(5)

[0.4998853733082485,
 0.13343252481057666,
 0.47471498369050863,
 0.35024326794863625,
 1.1036031572902247]

## Three `for` loop patterns

Most all `for` loops are reinventing one of the following patterns.

1. **Map**ping a function/transformation unto each value.
2. **Filter**ing the values by some boolean condition.
3. **Reduce** values to one or more statistics.

### Map example - Convert the times from days to hours.

In [4]:
# Loop solution
time_in_hours = []
for t in time_between_3_quakes:
    time_in_hours.append(t*24)
time_in_hours >> head(5)

[11.997248959397965,
 3.20238059545384,
 11.393159608572207,
 8.40583843076727,
 26.486475774965392]

In [5]:
# Comprehension solution
time_in_hours = [t*24 for t in time_between_3_quakes]
time_in_hours >> head(5)

[11.997248959397965,
 3.20238059545384,
 11.393159608572207,
 8.40583843076727,
 26.486475774965392]

In [6]:
# better built-in python way
time_in_hours = map(lambda t: t*24, time_between_3_quakes) >> head(5)
time_in_hours

<itertools.islice at 0x7f9fd1ed4130>

In [7]:
[x for x in time_in_hours]

[11.997248959397965,
 3.20238059545384,
 11.393159608572207,
 8.40583843076727,
 26.486475774965392]

In [8]:
# With pipeable functions
#from composable.strict import map
from composable import strict as cs

(time_between_3_quakes
 >> cs.map(lambda t: t*24)
 >> head(5)
)

[11.997248959397965,
 3.20238059545384,
 11.393159608572207,
 8.40583843076727,
 26.486475774965392]

### Filter Example -  filter out all value less than 1 day.

In [9]:
# loop solution
less_than_1_day = []
for t in time_between_3_quakes:
    if t < 1:
        less_than_1_day.append(t)
less_than_1_day >> head(5)

[0.4998853733082485,
 0.13343252481057666,
 0.47471498369050863,
 0.35024326794863625,
 0.49701079206183535]

In [10]:
# comprehension solution
less_than_1_day = [t for t in time_between_3_quakes if t < 1]
less_than_1_day >> head(5)

[0.4998853733082485,
 0.13343252481057666,
 0.47471498369050863,
 0.35024326794863625,
 0.49701079206183535]

filter() is also built into python

In [11]:
# pipeable functions
#from composable.strict import filter

(time_between_3_quakes
 >> cs.filter(lambda t: t < 1)
 >> head(5)
)

[0.4998853733082485,
 0.13343252481057666,
 0.47471498369050863,
 0.35024326794863625,
 0.49701079206183535]

### Reduce Example - Accumulating the maximum

In [12]:
## Loop solution
max_time = 0 # safe since Gamma is non-negative
for t in time_between_3_quakes:
    max_time = max(max_time, t) # update step
max_time

4.264879900932709

In [13]:
# Functional solution
from functools import reduce

reduce(lambda m, t: max(m, t), time_between_3_quakes, 0)

4.264879900932709

In [14]:
help(reduce)

Help on built-in function reduce in module _functools:

reduce(...)
    reduce(function, iterable[, initial]) -> value
    
    Apply a function of two arguments cumulatively to the items of a sequence
    or iterable, from left to right, so as to reduce the iterable to a single
    value.  For example, reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) calculates
    ((((1+2)+3)+4)+5).  If initial is present, it is placed before the items
    of the iterable in the calculation, and serves as a default when the
    iterable is empty.



In [15]:
# Pipeable solution
from composable import pipeable

update_max = lambda m, t: max(m, t)

@pipeable
def my_reduce(func, xs, init = None):
    if init is None:
        return reduce(func, xs) # Uses first value as init
    else:
        return reduce(func, xs, init)

In [16]:
# with init = 0
(time_between_3_quakes
 >> my_reduce(update_max, init = 0)
)

4.264879900932709

In [17]:
# with init = first value
(time_between_3_quakes
 >> my_reduce(update_max)
)

4.264879900932709

## So Iverson, why are you such a piping fanboi!?!??

Legos

In [19]:
# Find the number of time less than 1 hour
(time_between_3_quakes
 >> cs.map(lambda t: 24*t)
 >> cs.filter(lambda t: t < 1)
 >> my_reduce(lambda cnt, t: cnt + 1, init = 0)
)

948

### <font color="red"> Task 6.1.3 </font>

Explain

> Reusable, generic components that are immediately compatible with each other.

## So ... about those loops ...

<img src="./img/no_more_for_loops.png"/>

### Loops don't work well on multiple or multi-core machines

<img src="./img/loop_problems.png">

### What about functions?

* Using [lambda calculus](https://en.wikipedia.org/wiki/Lambda_calculus) we can show that all functional programs that terminate will provide the same result regardless of the order of execution.
* This explains why `pyspark` uses functional idioms like `map`, `filter`, and `reduce`.

##  The `pyspark.RDD` 

> A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned  collection of elements that can be operated on in parallel.

In [20]:
times_RDD = sc.parallelize(time_between_3_quakes)

In [23]:
# Find the number of time less than 1 hour
(times_RDD
 .map(lambda t: 24*t)
 .filter(lambda t: t < 1)
 .map(lambda t: 1)
 .cache()
 .reduce(lambda cnt, t: cnt + 1)
)

                                                                                

948

In [25]:
help(times_RDD.cache)

Help on method cache in module pyspark.rdd:

cache() -> 'RDD[T]' method of pyspark.rdd.RDD instance
    Persist this RDD with the default storage level (`MEMORY_ONLY`).



In [24]:
help(times_RDD.reduce)

Help on method reduce in module pyspark.rdd:

reduce(f: Callable[[~T, ~T], ~T]) -> ~T method of pyspark.rdd.RDD instance
    Reduces the elements of this RDD using the specified commutative and
    associative binary operator. Currently reduces partitions locally.
    
    Examples
    --------
    >>> from operator import add
    >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
    15
    >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
    10
    >>> sc.parallelize([]).reduce(add)
    Traceback (most recent call last):
        ...
    ValueError: Can not reduce() empty RDD



<font color="red"><h2> Task 6.1.2 </h2></font>

Use our three functional idioms to compute the average time (in seconds) of all times greater than 1 hour, in two ways.

1. In python using the various `pipeable` functions presented earlier.
2. Using `pyspark` RDD's

**Hint 1.** Computing the mean requires that we (A) compute both the total and count, then (B) divide.

**Hint 2.** Allow yourself two passes through the data for 2. and 3.

In [28]:
# pipeable function solution
from operator import add

filtered_time_seconds = (time_between_3_quakes
    >> cs.filter(lambda t: t*24 > 1)
    >> cs.map(lambda t: t*24*3600)    
)

total = filtered_time_seconds >> my_reduce(add)
count = filtered_time_seconds >> my_reduce(lambda x,y: x+1, init=0)
total/count

57707.45826491965

In [29]:
total/len(filtered_time_seconds)

57707.45826491965

In [30]:
sum(filtered_time_seconds)/len(filtered_time_seconds)

57707.45826491965

In [32]:
# pyspark RDD solution

filtered_RDD = (times_RDD
    .filter(lambda t: t*24 > 1)
    .map(lambda t: t*24*3600)
) 

rdd_total = filtered_RDD.reduce(add)
rdd_count = filtered_RDD.map(lambda x: 1).reduce(add)
rdd_total/rdd_count

                                                                                

57707.45826491965

In [33]:
rdd_total / filtered_RDD.count()

                                                                                

57707.45826491965

<font color="red"><h2> Task 6.1.3 </h2></font>

The variance of a random variable is the average square of the difference between $X$ and ~~it's~~ its mean.  Use the three functional idioms to compute the variance of the times in three ways:

1. In python using a `for` loop.
2. In python using the various `pipeable` functions presented earlier.
3. Using `pyspark` RDD's

**Hint 1.** It can be shown that the mean of our distribution is $\alpha*\beta = \frac{3}{4.5}$.

**Hint 2.** Subtract, then square, then average.

**Hint 3.** In this case, we can show $V(X) = \alpha\beta^2 = \frac{3}{4.5^2}$.  Use this to check your approximation.

In [35]:
# best solution
from statistics import variance
variance(time_between_3_quakes)

0.14848240995529485

In [37]:
MU = 3/4.5

In [41]:
# for loop solution
total = 0
count = 0

for t in time_between_3_quakes:
    total += (t-MU)**2
    count += 1

total/count

0.14848267075058352

In [36]:
3 / 4.5**2

0.14814814814814814

In [39]:
# pipeable function solution
standardized = (time_between_3_quakes
    >> cs.map(lambda t: (t-MU)**2)
)

total = standardized >> my_reduce(add)
count = standardized >> my_reduce(lambda x,y: x+1, init=0)
total/count

0.14848267075058352

In [40]:
# pyspark RDD solution
standardized_RDD = (times_RDD
    .map(lambda t: (t-MU)**2)
) 

rdd_total = standardized_RDD.reduce(add)
rdd_count = standardized_RDD.map(lambda x: 1).reduce(add)
rdd_total/rdd_count

                                                                                

0.14848267075058352