### Resilient Distributed Datasets

#### Two ways to create RDD
* parallelize - from collection
* textFile - from file 

### Summary of RDD API's

#### Action (PairRDDFunctions)

* collectAsMap()
* countByKey()
* lookup()

#### Map Operations

* flatMap()
* map()
* mapPartitions()
* mapPartitionsWithIndex()

#### Set Operations

* cartesian()
* distinct()
* intersection()
* subtract()
* union()

#### Other Operations

* filter()
* groupBy()
* toDebugString

#### PairRDDFunctions (Single RDD)

* combineByKey()
* foldByKey()
* groupByKey()
* mapValues()
* reduceByKey()

#### PairRDDFunctions (Two RDD)

* cogroup()
* join()
* leftOuterJoin()
* rightOuterJoin()

#### Sorting

* sortByKey()
* takeOrdered()
* top()
* Partition

#### Hash-Partition
* Partitioner set Operations
* Partitioner unset Operations
* range-partition

#### Shuffling
* coalesce()
* partitionBy()
* repartition()


#### Action (RDD)

* aggregate()
* collect()
* count()
* countByValue()
* first()
* fold()
* foreach()
* reduce()
* saveAsTextFile()
* take()
* takeOrdered()
* takeSample()
* top()

##### RDD creation from collection - mostly for learning & debugging purpose

In [1]:
#sc - handler to spark framework
rdd = sc.parallelize([1,2,3,4,5],2)

#map - transformation, it changes data from one format to another

rdd = rdd.map(lambda x: x*2)

#collect - action
#Untill action is connected, transformation wont happen. 
#This concept of delaying execution is known as lazy loading.
#Till spark control reaches action, it can decide best way to create execution code
rdd.collect()

[2, 4, 6, 8, 10]

In [3]:
#Return number of data pointed by rdd
rdd.count()

5

In [4]:
#glom - puts data of different partition in different list
rdd.glom().collect()

[[2, 4], [6, 8, 10]]

##### Creating RDD using textFile from baby_name.csv

In [6]:
baby_names = sc.textFile('Baby_Names__Beginning_2007.csv')

In [7]:
#Returns first 5 entries
baby_names.take(5)

[u'Year,First Name,County,Sex,Count',
 u'2013,GAVIN,ST LAWRENCE,M,9',
 u'2013,LEVI,ST LAWRENCE,M,9',
 u'2013,LOGAN,NEW YORK,M,44',
 u'2013,HUDSON,NEW YORK,M,49']

In [12]:
#Randomly choses 0.01% rows from the entire dataset
baby_names.sample(False, 0.01, seed=12).count()

484

In [13]:
#PairRDD
l = [('awi',1000),('jack',2000),('jill',3000),('bill',500)]
rdd_pair = sc.parallelize(l)

In [15]:
rdd_pair.collect()

[('awi', 1000), ('jack', 2000), ('jill', 3000), ('bill', 500)]

In [8]:
f = lambda x,y: x + 100 + y

In [17]:
rdd.collect()

[2, 4, 6, 8, 10]

In [18]:
#map is transformation
str_rdd = rdd.map(lambda x : str(x)+'Hello')
str_rdd.collect()

['2Hello', '4Hello', '6Hello', '8Hello', '10Hello']

In [10]:
l = rdd.map(lambda x: x*2).collect()


In [20]:
def f(x):
    print x
    return x+2

l = range(100)
rdd= sc.parallelize(l)
rdd = rdd.map(f)
rdd.count()


100

In [12]:
#First map creates a RDD with all numbers added with 2, second map creates RDD with str version of number. Collect action gets all data into driver
sc.parallelize(l).map(lambda x: x+2).filter(lambda x: x%2 == 0).map(lambda x: str(x)+' Num').collect()

['2 Num',
 '4 Num',
 '6 Num',
 '8 Num',
 '10 Num',
 '12 Num',
 '14 Num',
 '16 Num',
 '18 Num',
 '20 Num',
 '22 Num',
 '24 Num',
 '26 Num',
 '28 Num',
 '30 Num',
 '32 Num',
 '34 Num',
 '36 Num',
 '38 Num',
 '40 Num',
 '42 Num',
 '44 Num',
 '46 Num',
 '48 Num',
 '50 Num',
 '52 Num',
 '54 Num',
 '56 Num',
 '58 Num',
 '60 Num',
 '62 Num',
 '64 Num',
 '66 Num',
 '68 Num',
 '70 Num',
 '72 Num',
 '74 Num',
 '76 Num',
 '78 Num',
 '80 Num',
 '82 Num',
 '84 Num',
 '86 Num',
 '88 Num',
 '90 Num',
 '92 Num',
 '94 Num',
 '96 Num',
 '98 Num',
 '100 Num']

In [24]:
#flatmap - does 1 to N mapping
data = [4,5,6,7,8]
rdd = sc.parallelize(data)

print 'Map ',rdd.flatMap(lambda d: (d,d*d,d*d*d)).collect()

print 'FlatMap ',rdd.map(lambda d: (d,d*d,d*d*d)).collect()

Map  [4, 16, 64, 5, 25, 125, 6, 36, 216, 7, 49, 343, 8, 64, 512]
FlatMap  [(4, 16, 64), (5, 25, 125), (6, 36, 216), (7, 49, 343), (8, 64, 512)]


In [32]:
#Using cache() persists 
rdd = sc.parallelize(range(10000)) #first time this line will be executed
rdd.cache()
rdd1 = rdd.map(lambda x: x+2)
rdd2 = rdd.map(lambda x:x+3)
print rdd1.count()
print rdd2.count()

#Remove data from chache
rdd.unpersist()

10000
10000


ParallelCollectionRDD[49] at parallelize at PythonRDD.scala:480

In [14]:
rdd.filter(lambda x: x%2 == 0).map(lambda x: str(x)+' Num').collect()

['2 Num',
 '4 Num',
 '6 Num',
 '8 Num',
 '10 Num',
 '12 Num',
 '14 Num',
 '16 Num',
 '18 Num',
 '20 Num',
 '22 Num',
 '24 Num',
 '26 Num',
 '28 Num',
 '30 Num',
 '32 Num',
 '34 Num',
 '36 Num',
 '38 Num',
 '40 Num',
 '42 Num',
 '44 Num',
 '46 Num',
 '48 Num',
 '50 Num',
 '52 Num',
 '54 Num',
 '56 Num',
 '58 Num',
 '60 Num',
 '62 Num',
 '64 Num',
 '66 Num',
 '68 Num',
 '70 Num',
 '72 Num',
 '74 Num',
 '76 Num',
 '78 Num',
 '80 Num',
 '82 Num',
 '84 Num',
 '86 Num',
 '88 Num',
 '90 Num',
 '92 Num',
 '94 Num',
 '96 Num',
 '98 Num',
 '100 Num']

In [15]:
rdd.filter(lambda x: x%2 != 0).map(lambda x: x*2).collect()

[6,
 10,
 14,
 18,
 22,
 26,
 30,
 34,
 38,
 42,
 46,
 50,
 54,
 58,
 62,
 66,
 70,
 74,
 78,
 82,
 86,
 90,
 94,
 98,
 102,
 106,
 110,
 114,
 118,
 122,
 126,
 130,
 134,
 138,
 142,
 146,
 150,
 154,
 158,
 162,
 166,
 170,
 174,
 178,
 182,
 186,
 190,
 194,
 198,
 202]

### Set Operations

In [33]:
rdd1 = sc.parallelize(range(1,10))
rdd2 = sc.parallelize(range(11,20))
rdd3 = sc.parallelize(range(5,10))

In [34]:
# Transformation
rdd1.cartesian(rdd2).collect()

[(1, 11),
 (1, 12),
 (1, 13),
 (1, 14),
 (1, 15),
 (1, 16),
 (1, 17),
 (1, 18),
 (1, 19),
 (2, 11),
 (2, 12),
 (2, 13),
 (2, 14),
 (2, 15),
 (2, 16),
 (2, 17),
 (2, 18),
 (2, 19),
 (3, 11),
 (3, 12),
 (3, 13),
 (3, 14),
 (3, 15),
 (3, 16),
 (3, 17),
 (3, 18),
 (3, 19),
 (4, 11),
 (4, 12),
 (4, 13),
 (4, 14),
 (4, 15),
 (4, 16),
 (4, 17),
 (4, 18),
 (4, 19),
 (5, 11),
 (5, 12),
 (5, 13),
 (5, 14),
 (5, 15),
 (5, 16),
 (5, 17),
 (5, 18),
 (5, 19),
 (6, 11),
 (6, 12),
 (6, 13),
 (6, 14),
 (6, 15),
 (6, 16),
 (6, 17),
 (6, 18),
 (6, 19),
 (7, 11),
 (7, 12),
 (7, 13),
 (7, 14),
 (7, 15),
 (7, 16),
 (7, 17),
 (7, 18),
 (7, 19),
 (8, 11),
 (8, 12),
 (8, 13),
 (8, 14),
 (8, 15),
 (8, 16),
 (8, 17),
 (8, 18),
 (8, 19),
 (9, 11),
 (9, 12),
 (9, 13),
 (9, 14),
 (9, 15),
 (9, 16),
 (9, 17),
 (9, 18),
 (9, 19)]

In [18]:
print rdd1.union(rdd2).collect()

#Intersection
print rdd1.intersection(rdd3).collect()

[8, 6, 9, 5, 7]

In [19]:
#Checkpointing makes a backup of all rdd so that it can restore if system crashes
sc.setCheckpointDir('ckpt')
rdd.checkpoint()

In [20]:
#4 is number of partition
sc.parallelize(range(10),4).glom().collect()

[[0, 1], [2, 3], [4, 5], [6, 7, 8, 9]]

In [21]:
sc.parallelize(range(10),4).coalesce(2).glom().collect()

[[0, 1, 2, 3], [4, 5, 6, 7, 8, 9]]

In [22]:
 x = sc.parallelize([("a", 1), ("b", 4),("a",9)])

In [23]:
y = sc.parallelize([("a",5)])

### cogroup

In [24]:
new_rdd = x.cogroup(y)

In [25]:
n_rdd = y.cogroup(x)

In [27]:
n_rdd.collect()

In [26]:
[(a, map(list,b)) for a, b in y.cogroup(x).collect()]

[('a', [[5], [1, 9]]), ('b', [[], [4]])]

In [27]:
new_rdd.collect()

[('a',
  (<pyspark.resultiterable.ResultIterable at 0x7f17934c50d0>,
   <pyspark.resultiterable.ResultIterable at 0x7f17934c6ed0>)),
 ('b',
  (<pyspark.resultiterable.ResultIterable at 0x7f17934c6b10>,
   <pyspark.resultiterable.ResultIterable at 0x7f17934c6fd0>))]

In [28]:
#[x+2 for x in range(10)]
[(a, map(list,b)) for a, b in x.cogroup(y).collect()]

[('a', [[1, 9], [5]]), ('b', [[4], []])]

In [29]:
#cogroup returns 
list(x.cogroup(y).collect()[0][1][1])

[5]

In [30]:
rdd = sc.parallelize([('a',(11,12)),('b',(13,14))])

In [31]:
rdd.collectAsMap()

{'a': (11, 12), 'b': (13, 14)}

In [32]:
sc.parallelize(['a','c','a','d','c']).countByValue()

defaultdict(int, {'a': 2, 'c': 2, 'd': 1})

In [33]:
from operator import add

In [34]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

In [35]:
rdd = sc.parallelize(range(51))
rdd.histogram(2)

([0, 25, 50], [25, 26])

In [36]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

In [37]:
x.collectAsMap()

{'a': 1, 'b': 1}

In [38]:
sc.parallelize([("a", 1), ("b", 1), ("a", 2),("a",8),("c",4), ("a", 12),("a",18),("c",14)],2).glom().collect()

[[('a', 1), ('b', 1), ('a', 2), ('a', 8)],
 [('c', 4), ('a', 12), ('a', 18), ('c', 14)]]

In [39]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2),("a",8),("c",4), ("a", 12),("a",18),("c",14)],2)

### combineByKey

In [40]:
# In MyStr
# In MyStr
# In MyConcat
# In MyConcat
# In MyStr
# In MyStr
# In MyConcat
# In MyConcat
# In myPartConcat

#Invoked per partition first time a key appears, d is the corresponding value 
def mystr(d):
    print 'In MyStr'
    return d

# 2nd time & onwards for same key in same partition
def myconcat(a,b):
    print 'In MyConcat'
    return a + b

#Works across partitions
def mypartConcat(a,b):
    print 'In myPartConcat'
    return a + b

#mystr - this converts the V into of type C
rdd.combineByKey(mystr, myconcat, mypartConcat).collect()

[('a', 41), ('c', 18), ('b', 1)]

In [43]:
l = [1,2,3,4,5,6]
rdd = sc.parallelize(l)


In [41]:
def f(x):
    print x
    return x

rdd.map(f).count()

8

In [42]:
#Invoked per partition first time a key appears, d is the corresponding value 
def mystr(d):
    print 'In MyStr'
    return (d,1)

# 2nd time & onwards for same key in same partition
def myconcat(a,b):
    print 'In MyConcat'
    return (a[0] + b,a[1]+1)

#Works across partitions
def mypartConcat(a,b):
    print 'In myPartConcat'
    return (a[0] + b[0],a[1] + b[1])

#mystr - this converts the V into of type C
data = rdd.combineByKey(mystr, myconcat, mypartConcat)
data.map(lambda x: (x[0], x[1][0]/(x[1][1]*1.0))).collect()
#map(lambda x: (x[0], float(x[1][0])/x[1][1]),data)

[('a', 8.2), ('c', 9.0), ('b', 1.0)]

In [43]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

In [44]:
rdd3 = rdd1.leftOuterJoin(rdd2)

In [45]:
rdd3.collect()

[('a', (1, 4)), ('a', (1, 1)), ('c', (10, None)), ('b', (4, '6'))]

In [46]:
rdd1.join(rdd2).collect()

[('a', (1, 4)), ('a', (1, 1)), ('b', (4, '6'))]

In [47]:
rdd1.repartition(4)

MapPartitionsRDD[132] at coalesce at NativeMethodAccessorImpl.java:0

In [48]:
len(rdd1.glom().collect())

1

In [49]:
rdd1.glom().collect()

[[('a', 1), ('b', 4), ('c', 10)]]

In [50]:
#rdd1.saveAsTextFile('/path/')

In [51]:
rdd1.collect()

[('a', 1), ('b', 4), ('c', 10)]

In [53]:
def f(x):
    print x

In [55]:
#prints in executer
rdd1.foreach(f)

### Partition

In [56]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd.mapPartitions(f).collect()

[3, 7]

In [57]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(splitIndex, iterator): yield (splitIndex,sum(iterator))
rdd.mapPartitionsWithIndex(f).collect()

[(0, 3), (1, 7)]

In [60]:
pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
pairs.partitionBy(3, lambda x: x%3).glom().collect()

[[(3, 3)], [(1, 1), (4, 4), (4, 4), (1, 1)], [(2, 2), (2, 2)]]

### Aggregate

In [58]:
data = sc.parallelize([('awi',4),('bcd',6),('jkl',88),('qek',99)])
data.aggregate(5,lambda x,y: x + y[1], lambda x,y: x + y)

207

In [59]:
rdd = sc.parallelize([1,2,3,4],2)
rdd.aggregate((5,1), lambda acc,data: (acc[0] + data, acc[1] +1), lambda acc1,acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))

(25, 7)

### UniqueId

In [61]:
sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()

[('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]