Pair RDDs contains records consisting of key and value.

In [2]:
from pyspark.sql import *

In [3]:
from pyspark import SparkContext

In [4]:
sc = SparkContext('local', 'test_app')

In [4]:
l = ['a', 'b', 'c', 'e', 'i', 'o']
rdd = sc.parallelize(l)


In [5]:
def test(x): return 1 if x in ('a', 'e', 'i', 'o', 'u') else 0

In [6]:
pair_rdd = rdd.map(lambda x: (x, test(x)))
pair_rdd.collect()

[('a', 1), ('b', 0), ('c', 0), ('e', 1), ('i', 1), ('o', 1)]

In [7]:
pair_rdd.keys().collect()

['a', 'b', 'c', 'e', 'i', 'o']

In [8]:
pair_rdd.values().collect()

[1, 0, 0, 1, 1, 1]

# RDD.keyBy(func) 
1. keyBy() transformation creates a tuple consisting of key, value pair.
2. key is derived by function we pass 
3. value is complete tuple from which key was derived


In [9]:
import pprint
filDataSingle = [['filamentA','100W',605],['filamentB','100W',683], ['filamentB','100W',691], ['filamentB','200W',561],['filamentA','200W',530], ['filamentA','100W',619],['filamentB','100W',686], ['filamentB','200W',600],['filamentB','100W',696], ['filamentA','200W',579],['filamentA','200W',520], ['filamentA','100W',622], ['filamentA','100W',668], ['filamentB','200W',569],['filamentB','200W',555],['filamentA','200W',541]]
pprint.pprint(filDataSingle)


[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691],
 ['filamentB', '200W', 561],
 ['filamentA', '200W', 530],
 ['filamentA', '100W', 619],
 ['filamentB', '100W', 686],
 ['filamentB', '200W', 600],
 ['filamentB', '100W', 696],
 ['filamentA', '200W', 579],
 ['filamentA', '200W', 520],
 ['filamentA', '100W', 622],
 ['filamentA', '100W', 668],
 ['filamentB', '200W', 569],
 ['filamentB', '200W', 555],
 ['filamentA', '200W', 541]]


In [10]:
fdata = sc.parallelize(filDataSingle)
f = fdata.keyBy(lambda x: x[0])
f.collect()

[('filamentA', ['filamentA', '100W', 605]),
 ('filamentB', ['filamentB', '100W', 683]),
 ('filamentB', ['filamentB', '100W', 691]),
 ('filamentB', ['filamentB', '200W', 561]),
 ('filamentA', ['filamentA', '200W', 530]),
 ('filamentA', ['filamentA', '100W', 619]),
 ('filamentB', ['filamentB', '100W', 686]),
 ('filamentB', ['filamentB', '200W', 600]),
 ('filamentB', ['filamentB', '100W', 696]),
 ('filamentA', ['filamentA', '200W', 579]),
 ('filamentA', ['filamentA', '200W', 520]),
 ('filamentA', ['filamentA', '100W', 622]),
 ('filamentA', ['filamentA', '100W', 668]),
 ('filamentB', ['filamentB', '200W', 569]),
 ('filamentB', ['filamentB', '200W', 555]),
 ('filamentA', ['filamentA', '200W', 541])]

# RDD.mapValues(func)
    1. mapValues() transformation passes each value in Pair RDD through a function w/o changing its key

In [11]:
f.mapValues(lambda x: x[2] + 1).collect()

[('filamentA', 606),
 ('filamentB', 684),
 ('filamentB', 692),
 ('filamentB', 562),
 ('filamentA', 531),
 ('filamentA', 620),
 ('filamentB', 687),
 ('filamentB', 601),
 ('filamentB', 697),
 ('filamentA', 580),
 ('filamentA', 521),
 ('filamentA', 623),
 ('filamentA', 669),
 ('filamentB', 570),
 ('filamentB', 556),
 ('filamentA', 542)]

# RDD.flatMapValues(func)
    1. passes each value from Pair Rdd to a func w/o changing its key and produces a flattened list

In [12]:
l = sc.parallelize(['Delhi, 32|36', 'Kolkata, 23|45|12', 'Mumbai, 34,23,34'])
kvpairs = l.map(lambda x:x.split(','))
# print kvpairs
kvpairs.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0], int(x[1]))).collect()


[('Delhi', 32),
 ('Delhi', 36),
 ('Kolkata', 23),
 ('Kolkata', 45),
 ('Kolkata', 12),
 ('Mumbai', 34)]

# RDD.groupByKey(numPartitions, partitionFunc=<hash_fn>)

1. groups the values by key in each key-value PAir RDD
2. numPartitions ~ how many groups is has to create
3. partitionFunc ~ defaults to sparks's built-in hash partitioner

In [14]:
x = sc.parallelize([
    ("USA", 1), ("USA", 2), ("India", 1),
    ("UK", 1), ("India", 4), ("India", 9),
    ("USA", 8), ("USA", 3), ("India", 4),
    ("UK", 6), ("UK", 9), ("UK", 5)])
x = x.groupByKey()
print x.collect()

[('India', <pyspark.resultiterable.ResultIterable object at 0x117c73610>), ('USA', <pyspark.resultiterable.ResultIterable object at 0x117c63bd0>), ('UK', <pyspark.resultiterable.ResultIterable object at 0x117c639d0>)]


In [15]:
x.mapValues(lambda y: [a for a in y]).collect()

[('India', [1, 4, 9, 4]), ('USA', [1, 2, 8, 3]), ('UK', [1, 6, 9, 5])]

In [16]:
x = x.mapValues(lambda y: sum([a for a in y])/float(len(y)))
x.collect()


[('India', 4.5), ('USA', 3.5), ('UK', 5.25)]

# RDD.reduceByKey(func, numPartitions, partitionFunc)

In [17]:
x = sc.parallelize([
    ("USA", 1), ("USA", 2), ("India", 1),
    ("UK", 1), ("India", 4), ("India", 9),
    ("USA", 8), ("USA", 3), ("India", 4),
    ("UK", 6), ("UK", 9), ("UK", 5)])
tupls = x.reduceByKey(lambda x, y: x+y)
tupls.collect()

[('India', 18), ('USA', 14), ('UK', 21)]

In [18]:
x.sortByKey(lambda x: x[0]).collect()

[('India', 1),
 ('India', 4),
 ('India', 9),
 ('India', 4),
 ('UK', 1),
 ('UK', 6),
 ('UK', 9),
 ('UK', 5),
 ('USA', 1),
 ('USA', 2),
 ('USA', 8),
 ('USA', 3)]

In [15]:
from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc)

In [16]:
df = sqlContext.createDataFrame(filDataSingle)

In [17]:
df = df.withColumnRenamed("_1", "filament_type").withColumnRenamed("_2", "wattage").withColumnRenamed("_3", "lifetime")

In [30]:
a = df.groupBy("filament_type").agg({"lifetime":"avg"})
a.collect()

[Row(filament_type=u'filamentB', avg(lifetime)=630.125),
 Row(filament_type=u'filamentA', avg(lifetime)=585.5)]

In [19]:
print r

MapPartitionsRDD[20] at javaToPython at NativeMethodAccessorImpl.java:0


Calculating Avg with PairRDDs based on filament type

In [21]:
filDataRDD = sc.parallelize(filDataSingle)

In [24]:
fil = filDataRDD.map(lambda x: (x[0], [x[2],1]))
fil.take(5)

[('filamentA', [605, 1]),
 ('filamentB', [683, 1]),
 ('filamentB', [691, 1]),
 ('filamentB', [561, 1]),
 ('filamentA', [530, 1])]

In [35]:
fil2 = fil.reduceByKey(lambda x,y:[x[0]+y[0], x[1] + y[1]])
fil2.map(lambda l: (l[0], float(l[1][0])/l[1][1])).take(4)

[('filamentB', 630.125), ('filamentA', 585.5)]

calculate Mean value based on Power type

In [36]:
fil2 = filDataRDD.map(lambda x: (x[1], [x[2], 1]))
fil22 = fil2.reduceByKey(lambda x,y:[x[0] +y[0], x[1] + y[1]])
fil22.map(lambda l: (l[0], float(l[1][0])/l[1][1])).take(4)

[('100W', 658.75), ('200W', 556.875)]

Finding Mean Lifetime Based on Filament Type And Power #exercise

# Join Data

In [20]:
import pprint
studentsData = [['si1','Robin','M'],['si2','Maria','F'],['si3','Julie','F'], ['si4','Bob',  'M'], ['si6','William','M']]
subjectsData = [['Python', 'si1'], ['Java', 'si3'], ['Java', 'si1'],['Python', 'si2'],['Ruby', 'si3'],['C++', 'si4'],['C', 'si5'],['Python', 'si4'],['Java', 'si2']]
pprint.pprint(studentsData)


[['si1', 'Robin', 'M'],
 ['si2', 'Maria', 'F'],
 ['si3', 'Julie', 'F'],
 ['si4', 'Bob', 'M'],
 ['si6', 'William', 'M']]


In [21]:
pprint.pprint(subjectsData)

[['Python', 'si1'],
 ['Java', 'si3'],
 ['Java', 'si1'],
 ['Python', 'si2'],
 ['Ruby', 'si3'],
 ['C++', 'si4'],
 ['C', 'si5'],
 ['Python', 'si4'],
 ['Java', 'si2']]


In [25]:
studentsRDD = sc.parallelize(studentsData, 2).keyBy(lambda x:x[0])
studentsRDD.collect()

[('si1', ['si1', 'Robin', 'M']),
 ('si2', ['si2', 'Maria', 'F']),
 ('si3', ['si3', 'Julie', 'F']),
 ('si4', ['si4', 'Bob', 'M']),
 ('si6', ['si6', 'William', 'M'])]

In [26]:
subjectsRDD = sc.parallelize(subjectsData, 2).keyBy(lambda x:x[1])

In [27]:
studentsRDD.join(subjectsRDD).take(10)

[('si3', (['si3', 'Julie', 'F'], ['Java', 'si3'])),
 ('si3', (['si3', 'Julie', 'F'], ['Ruby', 'si3'])),
 ('si2', (['si2', 'Maria', 'F'], ['Python', 'si2'])),
 ('si2', (['si2', 'Maria', 'F'], ['Java', 'si2'])),
 ('si1', (['si1', 'Robin', 'M'], ['Python', 'si1'])),
 ('si1', (['si1', 'Robin', 'M'], ['Java', 'si1'])),
 ('si4', (['si4', 'Bob', 'M'], ['C++', 'si4'])),
 ('si4', (['si4', 'Bob', 'M'], ['Python', 'si4']))]

In [97]:
studentsRDD.leftOuterJoin(subjectsRDD).collect()

[('si3', (['si3', 'Julie', 'F'], ['Java', 'si3'])),
 ('si3', (['si3', 'Julie', 'F'], ['Ruby', 'si3'])),
 ('si2', (['si2', 'Maria', 'F'], ['Python', 'si2'])),
 ('si2', (['si2', 'Maria', 'F'], ['Java', 'si2'])),
 ('si6', (['si6', 'William', 'M'], None)),
 ('si1', (['si1', 'Robin', 'M'], ['Python', 'si1'])),
 ('si1', (['si1', 'Robin', 'M'], ['Java', 'si1'])),
 ('si4', (['si4', 'Bob', 'M'], ['C++', 'si4'])),
 ('si4', (['si4', 'Bob', 'M'], ['Python', 'si4']))]

In [98]:
studentsRDD.rightOuterJoin(subjectsRDD).collect()

[('si3', (['si3', 'Julie', 'F'], ['Java', 'si3'])),
 ('si3', (['si3', 'Julie', 'F'], ['Ruby', 'si3'])),
 ('si2', (['si2', 'Maria', 'F'], ['Python', 'si2'])),
 ('si2', (['si2', 'Maria', 'F'], ['Java', 'si2'])),
 ('si1', (['si1', 'Robin', 'M'], ['Python', 'si1'])),
 ('si1', (['si1', 'Robin', 'M'], ['Java', 'si1'])),
 ('si5', (None, ['C', 'si5'])),
 ('si4', (['si4', 'Bob', 'M'], ['C++', 'si4'])),
 ('si4', (['si4', 'Bob', 'M'], ['Python', 'si4']))]

In [99]:
studentsRDD.fullOuterJoin(subjectsRDD).collect()

[('si3', (['si3', 'Julie', 'F'], ['Java', 'si3'])),
 ('si3', (['si3', 'Julie', 'F'], ['Ruby', 'si3'])),
 ('si2', (['si2', 'Maria', 'F'], ['Python', 'si2'])),
 ('si2', (['si2', 'Maria', 'F'], ['Java', 'si2'])),
 ('si6', (['si6', 'William', 'M'], None)),
 ('si1', (['si1', 'Robin', 'M'], ['Python', 'si1'])),
 ('si1', (['si1', 'Robin', 'M'], ['Java', 'si1'])),
 ('si5', (None, ['C', 'si5'])),
 ('si4', (['si4', 'Bob', 'M'], ['C++', 'si4'])),
 ('si4', (['si4', 'Bob', 'M'], ['Python', 'si4']))]