## Paired RDDs

In [3]:
import findspark
findspark.init()
import pyspark

from pyspark.sql import SparkSession
from pyspark.sql import functions

spark = SparkSession.builder.appName("Page-2").getOrCreate()

import warnings
warnings.filterwarnings('ignore')

spark

In [4]:
l = "b d m t e u".split()

rdd1 = spark.sparkContext.parallelize(l, 2)
rdd1.collect()

['b', 'd', 'm', 't', 'e', 'u']

In [5]:
def vowelCheckFunction(x: str) -> int:
    """takes a letter and checks if it is consonant or vowel"""

    if x in ["a", "e", "i", "o", "u"]:
        return 1
    else:
        return 0

vowelCheckFunction("a")

1

In [6]:
# create a paired RDD
rdd2 = rdd1.map(lambda x: (x, vowelCheckFunction(x)))
rdd2.collect()

                                                                                

[('b', 0), ('d', 0), ('m', 0), ('t', 0), ('e', 1), ('u', 1)]

In [7]:
# keys 
rdd2.keys().collect()

['b', 'd', 'm', 't', 'e', 'u']

In [8]:
# values
rdd2.values().collect()

[0, 0, 0, 0, 1, 1]

### **Aggregation Operations**

---

In [10]:
# dataset
filament_data = [ 
    ["filamentA", '100W', 605], # filament type, bulb power, life in hours
    ["filamentB", '100W', 683],
    ["filamentB", '100W', 691],
    ["filamentB", '200W', 561],
    ["filamentA", '200W', 530],
    ["filamentA", '100W', 619],
    ['filamentB', '100W', 686],
    ['filamentB', '200W', 600],
    ['filamentB', '100W', 696],
    ['filamentA', '200W', 579],
    ['filamentA', '200W', 520],
    ['filamentA', '100W', 622],
    ['filamentA', '100W', 668],
    ['filamentB', '200W', 569],
    ['filamentB', '200W', 555],
    ['filamentA', '200W', 541]
]

fil_rdd = spark.sparkContext.parallelize(filament_data)

fil_rdd.take(3)

[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691]]

In [18]:
fil_rdd.count()

16

In [19]:
fil_rdd.getNumPartitions()

4

In [12]:
# create a paired RDD from fil_rdd
fil_paired_rdd1 = fil_rdd.map(lambda x: (x[0], x[2])) # filament type, life in hours
fil_paired_rdd1.take(4)

[('filamentA', 605),
 ('filamentB', 683),
 ('filamentB', 691),
 ('filamentB', 561)]

#### **Mean life based on filament type**

In [16]:
# mean life time based on filament type
# add an extra column with 1
fil_paired_rdd1_1 = fil_paired_rdd1.map(lambda x: (x[0], [x[1], 1]))
fil_paired_rdd1_1.take(5)

[('filamentA', [605, 1]),
 ('filamentB', [683, 1]),
 ('filamentB', [691, 1]),
 ('filamentB', [561, 1]),
 ('filamentA', [530, 1])]

In [21]:
# count
fil_paired_rdd1_1_sum_count = fil_paired_rdd1_1.reduceByKey(lambda x1, x2 : [x1[0] + x2[0], x1[1] + x2[1]])
fil_paired_rdd1_1_sum_count.collect()

[('filamentB', [5041, 8]), ('filamentA', [4684, 8])]

In [23]:
# find mean

fil_paired_rdd_1_mean_count = fil_paired_rdd1_1_sum_count.map(lambda x: [x[0], float(x[1][0])/x[1][1], x[1][1]])
fil_paired_rdd_1_mean_count.collect()

[['filamentB', 630.125, 8], ['filamentA', 585.5, 8]]

#### **Mean life based on bulb power**

In [26]:
fil_paired_rdd2 = fil_rdd.map(lambda x: (x[1], x[2])) # choose bulb power and life in hour
fil_paired_rdd2_1 = fil_paired_rdd2.map(lambda x: (x[0], [x[1], 1]))
fil_paired_rdd2_1.take(5)

[('100W', [605, 1]),
 ('100W', [683, 1]),
 ('100W', [691, 1]),
 ('200W', [561, 1]),
 ('200W', [530, 1])]

In [27]:
fil_paired_rdd2_1_sum_count = fil_paired_rdd2_1.reduceByKey(lambda x1, x2 : [x1[0]+x2[0], x1[1]+x2[1]])
fil_paired_rdd2_1_sum_count.collect()

[('100W', [5270, 8]), ('200W', [4455, 8])]

In [28]:
# mean
fil_paired_rdd2_1_mean_count = fil_paired_rdd2_1_sum_count.map(lambda val : [val[0], float(val[1][0]/val[1][1])])
fil_paired_rdd2_1_mean_count.collect()

[['100W', 658.75], ['200W', 556.875]]

#### **Mean life time based on filament type as well as bulb power**

In [29]:
fil_rdd.take(4)

[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691],
 ['filamentB', '200W', 561]]

In [30]:
fil_rdd_2 = fil_rdd.map(lambda val: [(val[0], val[1]), [val[2], 1]])
fil_rdd_2.collect()

[[('filamentA', '100W'), [605, 1]],
 [('filamentB', '100W'), [683, 1]],
 [('filamentB', '100W'), [691, 1]],
 [('filamentB', '200W'), [561, 1]],
 [('filamentA', '200W'), [530, 1]],
 [('filamentA', '100W'), [619, 1]],
 [('filamentB', '100W'), [686, 1]],
 [('filamentB', '200W'), [600, 1]],
 [('filamentB', '100W'), [696, 1]],
 [('filamentA', '200W'), [579, 1]],
 [('filamentA', '200W'), [520, 1]],
 [('filamentA', '100W'), [622, 1]],
 [('filamentA', '100W'), [668, 1]],
 [('filamentB', '200W'), [569, 1]],
 [('filamentB', '200W'), [555, 1]],
 [('filamentA', '200W'), [541, 1]]]

In [31]:
# apply reduceByKey
fil_rdd_2_sum_count = fil_rdd_2.reduceByKey(lambda val1, val2: [val1[0]+val2[0], val1[1]+val2[1]])
fil_rdd_2_sum_count.collect()

[(('filamentA', '200W'), [2170, 4]),
 (('filamentB', '200W'), [2285, 4]),
 (('filamentB', '100W'), [2756, 4]),
 (('filamentA', '100W'), [2514, 4])]

In [32]:
# mean
fil_rdd_2_mean = fil_rdd_2_sum_count.map(lambda val: [val[0], float(val[1][0]/val[1][1])])
fil_rdd_2_mean.collect()

[[('filamentA', '200W'), 542.5],
 [('filamentB', '200W'), 571.25],
 [('filamentB', '100W'), 689.0],
 [('filamentA', '100W'), 628.5]]

### **Join Operations**

---

- **Inner join**: Returns all the keys that are common to both the tables and discards uncommon keys.

    - `join()`

- **Left outer join**: Includes all keys in the left table and excludes uncommon keys from the right table.

    - `leftOuterJoin()`

- **Right outer join**: Every key of the second table is included where only common keys from first table are included

    - `rightOuterJoin()`

- **Full outer join**: Includes all the keys from both the tables

    - `fullOuterJoin()`


In [33]:
student_data = [ 
    "si1 Robin M".split(),
    "si2 Maria F".split(),
    "si3 Julie F".split(),
    "si4 Bob M".split(),
    "si6 William M".split()
]

sub_data = [ 
    "si1 Python".split(),
    "si3 Java".split(),
    "si1 Java".split(),
    "si2 Python".split(),
    "si3 Ruby".split(),
    "si4 C++".split(),
    "si5 C".split(),
    "si4 Python".split(),
    "si2 Java".split()
]

student_rdd = spark.sparkContext.parallelize(student_data)
subject_rdd = spark.sparkContext.parallelize(sub_data)

In [34]:
print(student_rdd.collect())
print()
print(subject_rdd.take(4))

[['si1', 'Robin', 'M'], ['si2', 'Maria', 'F'], ['si3', 'Julie', 'F'], ['si4', 'Bob', 'M'], ['si6', 'William', 'M']]

[['si1', 'Python'], ['si3', 'Java'], ['si1', 'Java'], ['si2', 'Python']]


In [35]:
# transform student rdd to paired rdd

student_rdd_paired = student_rdd.map(lambda val: (val[0], [val[1], val[2]]))
student_rdd_paired.take(4)

[('si1', ['Robin', 'M']),
 ('si2', ['Maria', 'F']),
 ('si3', ['Julie', 'F']),
 ('si4', ['Bob', 'M'])]

In [36]:
# transform subject rdd to paired rdd
subject_rdd_paired = subject_rdd.map(lambda val: (val[0], val[1]))
subject_rdd_paired.take(3)

[('si1', 'Python'), ('si3', 'Java'), ('si1', 'Java')]

In [38]:
print(subject_rdd_paired.keys().distinct().collect()) # check keys of subject rdd



['si4', 'si3', 'si5', 'si1', 'si2']


                                                                                

In [39]:
# inner join

stu_sub_inner_join = student_rdd_paired.join(subject_rdd_paired)
stu_sub_inner_join.collect()

                                                                                

[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java'))]

In [40]:
# left outer join

stu_sub_left_outer_join = student_rdd_paired.leftOuterJoin(subject_rdd_paired)
stu_sub_left_outer_join.collect()

                                                                                

[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python')),
 ('si6', (['William', 'M'], None)),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java'))]

In [41]:
# right outer join
stu_sub_right_outer_join = student_rdd_paired.rightOuterJoin(subject_rdd_paired)
stu_sub_right_outer_join.collect()

                                                                                

[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si5', (None, 'C')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java'))]

In [42]:
# full outer join

stu_sub_full_outer_join = student_rdd_paired.fullOuterJoin(subject_rdd_paired)
stu_sub_full_outer_join.collect()

                                                                                

[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si5', (None, 'C')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python')),
 ('si6', (['William', 'M'], None)),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java'))]