#### Aggregations

In [1]:
pairRDD = sc.parallelize([(1,5),(10,1),(10,12),(1,10),(2,15)])

<b>Calculate Per Key average</b>

In [3]:
mappedRDD = pairRDD.mapValues(lambda x:(x,1))

In [5]:
intermediateRDD = mappedRDD.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [6]:
intermediateRDD.mapValues(lambda x:x[0]/x[1]).collect()

[(10, 6), (2, 15), (1, 7)]

#### infamous word count problem

In [11]:
lines = sc.textFile('file:////home/cloudera/Desktop/pyspark/README.txt')

In [16]:
wordsRDD = lines.flatMap(lambda x :x.split(" ")).map(lambda x:(x.lower(),1))

In [17]:
countRDD = wordsRDD.reduceByKey(lambda x,y:x+y)
countRDD.collect()

[(u'a', 3),
 (u'', 2),
 (u'van', 1),
 (u'langauege', 1),
 (u'this', 1),
 (u'is', 3),
 (u'sample', 1),
 (u'python', 2),
 (u'file', 1),
 (u'langauge', 1),
 (u'developed', 1),
 (u'rosam', 1),
 (u'functional', 1),
 (u'by', 1),
 (u'guido', 1)]

#### This can be implemented much faster by using countByValue()

In [20]:
wordsRDD = lines.flatMap(lambda x :x.split(" ")).countByValue()

In [21]:
wordsRDD

defaultdict(int,
            {u'': 2,
             u'Guido': 1,
             u'Python': 2,
             u'Rosam': 1,
             u'This': 1,
             u'Van': 1,
             u'a': 3,
             u'by': 1,
             u'developed': 1,
             u'file': 1,
             u'functional': 1,
             u'is': 3,
             u'langauege': 1,
             u'langauge': 1,
             u'sample': 1})

note: countByValue() is an action not a transformation.

##### combineByKey()

In [22]:
pairRDD.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).collect()

[(10, (13, 2)), (2, (15, 1)), (1, (15, 2))]

Per Key average using combineByKey()

In [25]:
pairRDD.combineByKey(lambda x:(x,1),lambda x,y:(x[0]+y,x[1]+1),lambda x1,x2:(x1[0]+x2[0],x1[1]+x2[1])).collect()

[(10, (13, 2)), (2, (15, 1)), (1, (15, 2))]

##### How combineByKey() works

Signature :<br>
<b>combineByKey(createCombiner(),mergeValue(),mergeCombiner())</b><br>
<b><u>createCombiner()</u></b>- If a new key (element) is found in the partition than <b>intial value</b> is created by executing createCombiner() function for that key.Please note this happens evey time a new key is found in new partition rather than in RDD<br>
<b><u>mergeValue()</u></b> - If a element found second time then mergevalue() gets executed in which initilt of each partition.al value will the first parameter.<br>
<b><u>mergeCombiner()</u></b> - Executed at last to combie the resu

#### Find average marks per student

In [1]:
student_rdd = sc.parallelize([("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91),
                ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62),
                ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78),
                ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87),
                ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91),
                ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65),
                ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86),
                ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), 
                ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64),
                ("Juan", "Biology", 60)], 3)

In [3]:
mapped_rdd = student_rdd.map(lambda x:(x[0],(x[2],1.0)))

In [7]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [8]:
reduced_rdd.mapValues(lambda x:x[0]/x[1]).collect()

[('Thomas', 86.25),
 ('Jimmy', 77.0),
 ('Juan', 64.0),
 ('Cory', 65.0),
 ('Jackeline', 76.5),
 ('Joseph', 82.5),
 ('Tina', 76.5)]

##### Above problem using combineByKey()

In [14]:
student_rdd.map(lambda x:(x[0],(x[2])))\
        .combineByKey(lambda x:(x,1),\
                      lambda x,y:(x[0]+y,x[1]+1.0),\
                      lambda x,y:(x[0]+y[0],x[1]+y[1])\
                     )\
        .mapValues(lambda x:x[0]/x[1])\
        .collect()

[('Thomas', 86.25),
 ('Jimmy', 77.0),
 ('Juan', 64.0),
 ('Cory', 65.0),
 ('Jackeline', 76.5),
 ('Joseph', 82.5),
 ('Tina', 76.5)]

In [3]:
#find the sum of marks per student

In [14]:
student_rdd_mapped = student_rdd.map(lambda x :(x[0],(x[2],1)))

In [15]:
student_rdd_mapped.top(5)

[('Tina', (87, 1)),
 ('Tina', (78, 1)),
 ('Tina', (73, 1)),
 ('Tina', (68, 1)),
 ('Thomas', (93, 1))]

In [16]:
student_rdd_mapped.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).collect()

[('Thomas', (345, 4)),
 ('Jimmy', (308, 4)),
 ('Juan', (256, 4)),
 ('Cory', (260, 4)),
 ('Jackeline', (306, 4)),
 ('Joseph', (330, 4)),
 ('Tina', (306, 4))]

Note: While using reduceByKey() please note that type of lambda function output and the type of the value of the RDD should be the same.

##### Tuning the level of Parallelism

In [1]:
data = [("a",1),("b",2),("a",12)]
sc.parallelize(data).reduceByKey(lambda x,y:x+y) #default parallelism
sc.parallelize(data).reduceByKey(lambda x,y:x+y,10)#custom parallelism

PythonRDD[10] at RDD at PythonRDD.scala:43

 <b>repartition() and coalesce()</b>

In [19]:
#repartition()
#shuffles the data across the nodes to create new set of partition
student_rdd.repartition(2)

MapPartitionsRDD[54] at repartition at NativeMethodAccessorImpl.java:-2

In [20]:
student_rdd.getNumPartitions()

3

In [22]:
student_rdd = student_rdd.repartition(2)

In [23]:
student_rdd.getNumPartitions()

2

<b>Note</b>: repartition() is really expensive since  it involves shuffling of the data .

coalesce() avoids shuffling of the data only  if number of partitions is decreased.<br>
To use it we have to check the correct partition size.

In [24]:
student_rdd = student_rdd.coalesce(1)

In [25]:
student_rdd.getNumPartitions()

1

#### Grouping Data

In [5]:
student_rdd.groupBy(lambda x:x[0]).collect()

[('Thomas', <pyspark.resultiterable.ResultIterable at 0x7fb0a449b4d0>),
 ('Jimmy', <pyspark.resultiterable.ResultIterable at 0x7fb0a449bad0>),
 ('Juan', <pyspark.resultiterable.ResultIterable at 0x7fb0a449b6d0>),
 ('Cory', <pyspark.resultiterable.ResultIterable at 0x7fb0a449b650>),
 ('Jackeline', <pyspark.resultiterable.ResultIterable at 0x7fb0a449b890>),
 ('Joseph', <pyspark.resultiterable.ResultIterable at 0x7fb0a449b5d0>),
 ('Tina', <pyspark.resultiterable.ResultIterable at 0x7fb0a449b490>)]

In [6]:
sampleRDD = sc.parallelize([('pp','India',10),
                           ('aa','Israel',25),('pp','India'),('pp','India',100),('aa','Israel',14),
                           ('ss','Switerzealand',11)])

SyntaxError: unexpected EOF while parsing (<ipython-input-10-7833bd6fa1d9>, line 1)

In [11]:
tempRDD = sc.parallelize([(3922774869,10,1),
(3922774869,11,1),
(3922774869,12,2),
(3922774869,13,2),
(1779744180,10,1),
(1779744180,11,1),
(3922774869,14,3),
(3922774869,15,2),
(1779744180,16,1),
(3922774869,12,1),
(3922774869,13,1),
(1779744180,14,1),
(1779744180,15,1),
(1779744180,16,1),
(3922774869,14,2),
(3922774869,15,1),
(1779744180,16,1),
(1779744180,17,1),
(3922774869,16,4)
          ])

Output expectation : (1779744180, (10,1), (11,1), (12,2), (13,2) ...)
(3922774869, (10,1), (11,1), (12,3), (13,4) ...)

In [29]:
k = tempRDD.map(lambda x :(x[0],x[1:]))

In [30]:
k.collect()

[(3922774869, (10, 1)),
 (3922774869, (11, 1)),
 (3922774869, (12, 2)),
 (3922774869, (13, 2)),
 (1779744180, (10, 1)),
 (1779744180, (11, 1)),
 (3922774869, (14, 3)),
 (3922774869, (15, 2)),
 (1779744180, (16, 1)),
 (3922774869, (12, 1)),
 (3922774869, (13, 1)),
 (1779744180, (14, 1)),
 (1779744180, (15, 1)),
 (1779744180, (16, 1)),
 (3922774869, (14, 2)),
 (3922774869, (15, 1)),
 (1779744180, (16, 1)),
 (1779744180, (17, 1)),
 (3922774869, (16, 4))]

In [39]:
def h(x,y):
    x.append(y)
    return x

In [40]:
k.aggregateByKey([],h,h).collect()

[(1779744180,
  [(10, 1), (11, 1), (16, 1), [(14, 1), (15, 1), (16, 1), (16, 1), (17, 1)]]),
 (3922774869,
  [(10, 1),
   (11, 1),
   (12, 2),
   (13, 2),
   (14, 3),
   (15, 2),
   [(12, 1), (13, 1), (14, 2), (15, 1), (16, 4)]])]

In [31]:
k.reduceByKey(lambda x,y :x+y).collect()

[(1779744180, (10, 1, 11, 1, 16, 1, 14, 1, 15, 1, 16, 1, 16, 1, 17, 1)),
 (3922774869,
  (10,
   1,
   11,
   1,
   12,
   2,
   13,
   2,
   14,
   3,
   15,
   2,
   12,
   1,
   13,
   1,
   14,
   2,
   15,
   1,
   16,
   4))]