## Transformations

Transformations are lazy operations on a RDD that create one or many new RDDs.
* Transformations are functions that take a RDD as the input and produce one or many RDDs as the output. They do not change the input RDD (since RDDs are immutable and hence cannot be modified), but always produce one or more new RDDs by applying the computations they represent.
* By applying transformations you incrementally build a RDD lineage with all the parent RDDs of the final RDD(s).
* Transformations are lazy, i.e. are not executed immediately. Only after calling an action are transformations executed.

###  Transformations Types

Spark carefully distinguish "transformation" operation in two types:
![Narrow vs wide transformations Image](assets/narrow_vs_wide_transformations.png)

* **Narrow Transformations ** are the result of map, filter and such that is from the data from a single partition only, i.e. it is self-sustained.
    * An output RDD has partitions with records that originate from a single partition in the parent RDD. Only a limited subset of partitions used to calculate the result.
    * Spark groups narrow transformations as a stage which is called pipelining.
    

* **Wide Transformations ** are the result of groupByKey and reduceByKey. The data required to compute the records in a single partition may reside in many partitions of the parent RDD.
    * All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute RDD shuffle, which transfers data across cluster and results in a new stage with a new set of partitions.


###  Transformations dependecies

The figure below gives a quick overview of the flow of a spark job
![spark schedule process Image](assets/spark_schedule-process.png)


One of the challenges in providing RDDs as an abstraction is choosing a representation for them that can 
track lineage across a wide range of transformations. The most interesting question in designing this  interface is how to represent dependencies between RDDs.

It is both sufficient and useful to classify  dependencies into two types: 
* **Narrow dependencies**, where each partition of the parent RDD is used by at most one 
partition  of the child RDD.
* **Wide dependencies**, where multiple child partitions may depend on it.

![Transformation dependencies Image](assets/Transformation_Dependencies.png)


### General Transformations

* **filter** (f)
        
    Return a new RDD containing only the elements that satisfy a predicate.

In [1]:
import pyspark
sc = pyspark.SparkContext('local[*]')

In [2]:
rdd = sc.parallelize(range(-5,5))          # Rang (-5, 5)
filtered_rdd = rdd.filter(lambda x: x>=0)  # Returns the positive
print(filtered_rdd.collect())

[0, 1, 2, 3, 4]


* **map** (f, preservesPartitioning=False)
    
    Return a new RDD by applying a function to each element of this RDD.

In [3]:
def add1(x):
    return(x+1)

squared_rdd = (filtered_rdd
               .map(add1)
               .map(lambda x: (x, x*x))) 
print(squared_rdd.collect())

[(1, 1), (2, 4), (3, 9), (4, 16), (5, 25)]


 * **flatMap** (f, preservesPartitioning=False)

    Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

In [4]:
squaredflat_rdd = (filtered_rdd
                   .map(add1)
                   .flatMap(lambda x: (x, x*x)))
print(squaredflat_rdd.collect())

[1, 1, 2, 4, 3, 9, 4, 16, 5, 25]


* **distinct**(numPartitions=None)

    Return a new RDD containing the distinct elements in this RDD.

In [5]:
distinct_rdd = squaredflat_rdd.distinct()
print(distinct_rdd.collect())

[2, 4, 16, 1, 3, 9, 5, 25]


* **groupBy**(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)

    Return an RDD of grouped items.

In [6]:
# Group values depending on the remainder of the division by 3
grouped_rdd = distinct_rdd.groupBy(lambda x: x%3)
print(grouped_rdd.collect())
print([(x,sorted(y)) for (x,y) in grouped_rdd.collect()])

[(2, <pyspark.resultiterable.ResultIterable object at 0x7f551bec2a00>), (0, <pyspark.resultiterable.ResultIterable object at 0x7f551befa190>), (1, <pyspark.resultiterable.ResultIterable object at 0x7f551befad30>)]
[(2, [2, 5]), (0, [3, 9]), (1, [1, 4, 16, 25])]


### Math/Statistical Transformations

   
* **sample** (withReplacement, fraction, seed=None)

    Return a sampled subset of this RDD.

    Parameters:	
        * withReplacement – can elements be sampled multiple times (replaced when sampled out)
        * fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
        * seed – seed for the random number generator

In [7]:
s1 = squaredflat_rdd.sample(False, 0.5).collect()
s2 = squaredflat_rdd.sample(True, 2).collect()
s3 = squaredflat_rdd.sample(False, 0.8).collect()
print('s1={0}\ns2={1}\ns3={2}'.format(s1, s2, s3))

s1=[4, 16, 5, 25]
s2=[1, 1, 1, 1, 1, 1, 2, 4, 4, 3, 3, 9, 4, 16, 16, 16, 5, 25, 25]
s3=[1, 1, 2, 4, 3, 9, 4, 16, 5]


### Set Theory /Relationan Transformations (transformations on two RDDs)

* **union**(other)

    Return the union of this RDD and another one.

In [8]:
rdda = sc.parallelize(['a', 'b', 'c'])
rddb = sc.parallelize(['c', 'd', 'e'])
rddu = rdda.union(rddb)
print(rddu.collect())

['a', 'b', 'c', 'c', 'd', 'e']


* **intersection** (other)

    Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.
    * This method performs a shuffle internally. 

In [9]:
rddi = rdda.intersection(rddb)
print(rddi.collect())

['c']


* **subtract** (other, numPartitions=None)

    Return each value in self that is not contained in other.

In [10]:
rdds = rdda.subtract(rddb)
print(rdds.collect())

['b', 'a']


* **zip** (other)

    Zips this RDD with another one, returning key-value pairs with the first element in each RDD second element in each RDD, etc. 
    * Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

In [11]:
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

## Actions

Actions are RDD operations that produce non-RDD values. They materialize a value in a Spark program. In other words, a RDD operation that returns a value of any type but RDD[T] is an action.

* The purpose of this action was to get data, from an RDD distributed across the nodes, back to our driver program. 
* If the results are too large to fit on the driver memory an important overhead can generated or even the driver can crash (there is an opportunity to write them directly to HDFS, instead). 

### Getting values actions


* **collect**()

    Return a list that contains all of the elements in this RDD.

    *Note: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.*

In [12]:
rdd = sc.parallelize(list("abracadabra")).cache()
listrdd = rdd.collect()
print(listrdd)

['a', 'b', 'r', 'a', 'c', 'a', 'd', 'a', 'b', 'r', 'a']


* **take** (num)

    Take the first num elements of the RDD.

    It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.



* **sample** (withReplacement, fraction, seed=None)¶

    Return a sampled subset of this RDD.
    * withReplacement – can elements be sampled multiple times (replaced when sampled out)
    * fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
    * seed – seed for the random number generator



* **takeSample** (withReplacement, num, seed=None)

    Return a fixed-size sampled subset of this RDD.

In [13]:
print(rdd.collect())
t = rdd.take(4)
print(t)
s = rdd.sample(False, 0.5)
print(s.collect())
ts = rdd.takeSample(False, 4)
print(ts)

['a', 'b', 'r', 'a', 'c', 'a', 'd', 'a', 'b', 'r', 'a']
['a', 'b', 'r', 'a']
['a', 'b', 'a', 'c', 'a', 'b', 'a']
['a', 'd', 'b', 'a']


* **top** (num, key=None)

    Get the top N elements from an RDD.
    
    
* **takeOrdered** (num, key=None)

    Get the N elements from an RDD ordered in ascending order or as specified by the optional key function.

In [14]:
rdd = sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).cache()
t = rdd.top(4)
print(t)
o = rdd.takeOrdered(4)
print(o)
i = rdd.takeOrdered(4, lambda x: -x)
print(i)

[10, 9, 7, 6]
[1, 2, 3, 4]
[10, 9, 7, 6]


### Count Actions

* **count**()

    Return the number of elements in this RDD.
    
    
* **countApprox** (timeout, confidence=0.95)

    Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished.
    
    
    
* **countApproxDistinct** (relativeSD=0.05)

    Return approximate number of distinct elements in the RDD.
    
    * The algorithm used is based on streamlib’s implementation of “HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm”, available here.   
    * relativeSD – Relative accuracy. Smaller values create counters that require more space. It must be greater than 0.000017.

In [15]:
rdd = sc.parallelize([i % 20 for i in range(1000)])
print(rdd.count())
print(rdd.distinct().count())
print(900 < rdd.countApprox(1, 0.4) < 1100)
print(15 < rdd.countApproxDistinct(0.5) < 25)

1000
20
True
True


* **countByValue**()

    Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

In [16]:
rdd = sc.parallelize(list("abracadabra")).cache()
dicc = rdd.countByValue()
print(dicc.items())

dict_items([('a', 5), ('b', 2), ('r', 2), ('c', 1), ('d', 1)])


### Agregation Actions

* **reduce** (f)

    Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.

In [17]:
rdd = sc.parallelize(range(1,5))
r = rdd.reduce(lambda x,y: x*y) # r = 1*2*3*4 = 24
from operator import add
s = rdd.reduce(add) # s = 1+2+3+4 = 10
print(s)

10


* **fold** (zeroValue, op)

    Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral “zero value.”
    Similar to `reduce`, but providing the identity value for the function (eg 0 for $+$; 1 for $\times$, or `[]` for concatenation of lists)
   
    * The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
    * This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

   Similar to `reduce`, but providing the identity value for the function (eg 0 for $+$; 1 for $\times$, or `[]` for concatenation of lists)

In [18]:
rdd = sc.parallelize([list(range(1,5)), list(range(-10,-3)), ['a', 'b', 'c']])
print(rdd.collect())
f = rdd.fold([], lambda x,y : x+y)
print(f)
sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)

[[1, 2, 3, 4], [-10, -9, -8, -7, -6, -5, -4], ['a', 'b', 'c']]
[1, 2, 3, 4, -10, -9, -8, -7, -6, -5, -4, 'a', 'b', 'c']


15

* **aggregate** (zeroValue, seqOp, combOp)

    Aggregate the elements of each partition, and then the results for all the partitions, using a given combine functions and a neutral “zero value.”
    * The functions op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.
    * The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into an U and one operation for merging two U

In [19]:
l = [1, 2, 3, 4, 5, 6, 7, 8]
rdd = sc.parallelize(l, 2)
seqOp  = (lambda acc, val: (acc[0]+[val], acc[1]*val, acc[2]+1))
combOp = (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]*acc2[1], acc1[2]+acc2[2]))
a = rdd.aggregate(([], 1., 0), seqOp, combOp) 
print(a)
sc.parallelize([]).aggregate((0, 0, 0), seqOp, combOp)

([1, 2, 3, 4, 5, 6, 7, 8], 40320.0, 8)


(0, 0, 0)

## Pair RDD Transformations

They are RDDs that work with key pair / value (Pair RDDs)
* Data types very used in Big Data (MapReduce)
* Spark has special operations for handling

Creating Pair RDDs

    * From a list of tuples

In [20]:
prdd = sc.parallelize([('a',2), ('b',5), ('a',3)])
print(prdd.collect())
prdd = sc.parallelize(zip(['a', 'b', 'c'], range(3)))
print(prdd.collect())

[('a', 2), ('b', 5), ('a', 3)]
[('a', 0), ('b', 1), ('c', 2)]


    * From another RDD

In [21]:
linesrdd = sc.textFile("./Datasets/Books/Quijote.txt")
prdd = linesrdd.map(lambda x : (x.split(" ")[0], x))
print("Pair (1st word, line): {0}\n".format(prdd.takeSample(False, 1)[0]))

# keyBy(f): Create tuples of RDD elements using f to get the key.
nrdd = sc.parallelize(range(2,5))
prdd = nrdd.keyBy(lambda x: x*x)
print(prdd.collect())

# ZipWithIndex (): Zipe the RDD with the indexes of its elements.
# This method triggers a spark job when the RDD has more than one partition.
rdd = sc.parallelize(["a", "b", "c", "d", "e", "f", "g", "h"], 3)
prdd = rdd.zipWithIndex()
print(prdd.collect())

# ZipWithUniqueId (): Zipe the RDD with unique identifiers (long) for each element.
# The elements in the kth partition give the ids k, n + k, 2 * n + k, ... where n = number of partitions
# Does not fire a spark job
rdd = sc.parallelize(["a", "b", "c", "d", "e", "f", "g", "h"], 3)
print("RDD partition: {0}".format(rdd.glom().collect()))
prdd = rdd.zipWithUniqueId()
print(prdd.collect())

Pair (1st word, line): ('burlaban', 'burlaban de Sancho; pero él se las tenía tiesas a todos, maguera tonto,')

[(4, 2), (9, 3), (16, 4)]
[('a', 0), ('b', 1), ('c', 2), ('d', 3), ('e', 4), ('f', 5), ('g', 6), ('h', 7)]
RDD partition: [['a', 'b'], ['c', 'd'], ['e', 'f', 'g', 'h']]
[('a', 0), ('b', 3), ('c', 1), ('d', 4), ('e', 2), ('f', 5), ('g', 8), ('h', 11)]


    * Using a zip of two RDDs
      
      RDDs must have the same number of partitions and the same number of elements in each partition.

In [22]:
rdd1 = sc.parallelize(range(0,5), 2)
rdd2 = sc.parallelize(range(1000, 1005), 2)
prdd = rdd1.zip(rdd2)
print(prdd.collect())

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]


### Transformations on a key / value RDD

#### Aggregation Transformations

* **reduceByKey** (func, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)¶

    Merge the values for each key using an associative and commutative reduce function.

    * This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.

    * Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

In [23]:
from operator import add
prdd   = sc.parallelize([("a", 2), ("b", 5), ("a", 8)]).cache()
redrdd = prdd.reduceByKey(add)
print(redrdd.collect())

[('b', 5), ('a', 10)]


* **groupByKey** (numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)

    Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.

In [24]:
print(sorted(prdd.groupByKey().mapValues(len).collect()))
print(sorted(prdd.groupByKey().mapValues(list).collect()))

[('a', 2), ('b', 1)]
[('a', [2, 8]), ('b', [5])]


* **combineByKey** (createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7fc35dbc8e60>)

    Generic function to combine the elements for each key using a custom set of aggregation functions.
    * Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.
    * Users provide three functions:
        * createCombiner, which turns a V into a C (e.g., creates a one-element list)
        * mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
        * mergeCombiners, to combine two C’s into a single one.
    * In addition, users can control the partitioning of the output RDD.

In [25]:
# Calculate the average per key using combineByKey ()
l = sc.parallelize([("a",2), ("b",5), ("a",8), ("b", 6), ("b",1)])

# For each key, add and count the values
sumCount = l.combineByKey(
           (lambda x : (x, 1)),
           (lambda x, y : (x[0]+y, x[1]+1)),
           (lambda x, y : (x[0]+y[0], x[1]+y[1])))
print(sumCount.collect())

# Gets the mean of the values
m = sumCount.mapValues(lambda v : v[0]/v[1])
print(m.collect())

[('b', (12, 3)), ('a', (10, 2))]
[('b', 4.0), ('a', 5.0)]


### Transformations on keys or values

* **keys**() returns an RDD with the keys
* **values**() returns an RDD with values
* **sortByKey**() returns a key/value RDD with sorted keys

In [26]:
print("RDD complete: {0}".format(prdd.collect()))
print("RDD with the keys: {0}".format(prdd.keys().collect()))
print("RDD with the values: {0}".format(prdd.values().collect()))
print("RDD with ordered keys: {0}".format(prdd.sortByKey().collect()))

RDD complete: [('a', 2), ('b', 5), ('a', 8)]
RDD with the keys: ['a', 'b', 'a']
RDD with the values: [2, 5, 8]
RDD with ordered keys: [('a', 2), ('a', 8), ('b', 5)]


* **MapValues**(func):  returns an RDD by applying a function to the values
* **FlatMapValues**(func): returns an RDD by applying a function on the values and "flattening" the output

In [27]:
mapv = prdd.mapValues(lambda x: (x, 10*x))
print(mapv.collect())
fmapv = prdd.flatMapValues(lambda x: (x, 10*x))
print(fmapv.collect())

[('a', (2, 20)), ('b', (5, 50)), ('a', (8, 80))]
[('a', 2), ('a', 20), ('b', 5), ('b', 50), ('a', 8), ('a', 80)]


#### Transformations on two key / value RDDs

* Join / rightOuterJoin / leftOuterJoin / fullOuterJoin performs inner / outer joins between the two RDDs

In [None]:
rdd1 = sc.parallelize([("a",2), ("b",5), ("a",8)]).cache()
rdd2 = sc.parallelize([("c",7), ("a",1)]).cache()

rdd3 = rdd1.join(rdd2)
print(rdd3.collect())

rdd4 = rdd1.leftOuterJoin(rdd2)
print(rdd4.collect())

rdd5 = rdd1.rightOuterJoin(rdd2)
print(rdd5.collect())

rdd6 = rdd1.fullOuterJoin(rdd2)
print(rdd6.collect())

* **SubtractByKey** removes elements with a key present in another RDD

In [None]:
rdd7 = rdd1.subtractByKey(rdd2)
print(rdd7.collect())

* **Cogroup** groups data that shares the same key in both RDDs

In [None]:
rdd8 = rdd1.cogroup(rdd2)
print(rdd8.mapValues(lambda v: [list(l) for l in v]).collectAsMap())

### Actions on key/value RDDs

* **CountByKey**() returns a map indicating the number of occurrences of each key

In [None]:
prdd = sc.parallelize([("a", 2), ("b", 5), ("a", 8)]).cache()
countMap = prdd.countByKey()
print(countMap)

* **CollectAsMap**() gets the RDD as a map

In [None]:
rddMap = prdd.collectAsMap()
print(rddMap)

* **Lookup**(key)  returns a list of values associated with a key

In [None]:
listA = prdd.lookup('a')
print(listA)

Numerical RDDs
--------------

Descriptive statistics functions implemented in Spark

  | Method              | Description
  | ------------------- | ----------------------------------
  | Stats()             | Summary of statistics
  | Mean()              | Arithmetic average
  | Sum(), max(), min() | Sum, maximum and minimum
  | Variance()          | Variance of the elements
  | SampleVariance()    | Variance of a sample
  | Stdev()             | Standard deviation
  | SampleStdev()       | Standard deviation of a sample
  | Histogram()         | Histogram

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pylab
from math import fabs

# An RDD with random data from a normal distribution
nrdd = sc.parallelize(np.random.normal(size=10000)).cache()

# Summary of statistics
sts = nrdd.stats()
print("Summary of statistics:\n {0}".format(sts))
# Filters outliers
stddev = sts.stdev()
avg = sts.mean()
frdd = nrdd.filter(lambda x: fabs(x - avg) < 3*stddev).cache()
print("Number of outliers: {0}".format(sts.count() - frdd.count()))

# Gets a histogram with 10 groups
x,y = frdd.histogram(10)
plt.bar(x[:-1],y,width=0.6)