In [8]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


In [12]:

import os
os.chdir('/content/drive/Shareddrives/PySpark')
!ls

'PySpark basics'  'PySpark RDDs.ipynb'	 spark-3.0.1-bin-hadoop2.7


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://www.gtlib.gatech.edu/pub/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
# !tar -xvf spark-3.0.1-bin-hadoop2.7.tgz

In [14]:
!pip install -q findspark

In [15]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/drive/Shareddrives/PySpark/spark-3.0.1-bin-hadoop2.7"

In [17]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

## Create a Paired RDD

- create a paired RDD is by using the map() method. 


**Creating an RDD with Single Elements**

In [19]:
elements = ['b' , 'd', 'm', 't', 'e', 'u']

element_rdd = sc.parallelize(elements,2)

In [21]:
def identifyvowel(point):
  if point in ['a','e','i','o','u']:
    return 1
  else:
    return 0

In [24]:
# This makes everything into key_value pair 

pair_rdd = element_rdd.map(lambda x: (x, identifyvowel(x)))

pair_rdd.collect()

[('b', 0), ('d', 0), ('m', 0), ('t', 0), ('e', 1), ('u', 1)]

In [28]:
# capturing the keys 
pair_rdd.keys().collect()

pair_rdd.values().collect()


[0, 0, 0, 0, 1, 1]

## Aggregation of the data 

calculate the following:
* Mean life in hours for bulbs of each filament type
* Mean life in hours for bulbs of each power level
* Mean life in hours based on both filament type and power

In [30]:
filDataSingle = [['filamentA','100W',605],
['filamentB','100W',683],
['filamentB','100W',691],
['filamentB','200W',561],
['filamentA','200W',530],
['filamentA','100W',619],
['filamentB','100W',686],
['filamentB','200W',600],
['filamentB','100W',696],
['filamentA','200W',579],
['filamentA','200W',520],
['filamentA','100W',622],
['filamentA','100W',668],
['filamentB','200W',569],
['filamentB','200W',555],
['filamentA','200W',541]]

bulb_rdd = sc.parallelize(filDataSingle,2)

In [31]:
bulb_rdd.take(3)

[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691]]

In [33]:
# to find the mean, add 1 as a counter for the rdd 

filament_rdd = bulb_rdd.map(lambda x: (x[0],[x[2],1]))

filament_rdd.take(3)

[('filamentA', [605, 1]), ('filamentB', [683, 1]), ('filamentB', [691, 1])]

- reduceByKey() 
  - function applies aggregation operators key wise. 
  - It takes an aggregation function as input and applies that function on the values of each RDD key.

In [34]:
agg_rdd = filament_rdd.reduceByKey(lambda l1,l2:[l1[0]+l2[0], l1[1]+l2[1]])

agg_rdd.collect()

[('filamentB', [5041, 8]), ('filamentA', [4684, 8])]

In [40]:
mean_rdd = agg_rdd.map(lambda l :[l[0],float(l[1][0])/l[1][1]])

mean_rdd.collect()

[['filamentB', 630.125], ['filamentA', 585.5]]

* Mean Lifetime Based on Bulb Power

In [41]:
power_rdd = bulb_rdd.map(lambda x: (x[1],[x[2],1]))

power_rdd.collect()

[('100W', [605, 1]),
 ('100W', [683, 1]),
 ('100W', [691, 1]),
 ('200W', [561, 1]),
 ('200W', [530, 1]),
 ('100W', [619, 1]),
 ('100W', [686, 1]),
 ('200W', [600, 1]),
 ('100W', [696, 1]),
 ('200W', [579, 1]),
 ('200W', [520, 1]),
 ('100W', [622, 1]),
 ('100W', [668, 1]),
 ('200W', [569, 1]),
 ('200W', [555, 1]),
 ('200W', [541, 1])]

In [45]:
agg_rdd_power = power_rdd.reduceByKey(lambda l1,l2:[l1[0]+l2[0], l1[1]+l2[1]])

agg_rdd_power.collect()

[('100W', [5270, 8]), ('200W', [4455, 8])]

In [46]:
mean_rdd_power = agg_rdd_power.map(lambda x: [x[0],float(x[1][0]/x[1][1])])

mean_rdd_power.collect()

[['100W', 658.75], ['200W', 556.875]]

**Mean Lifetime Based on Filament Type
and Power**

In [57]:
both_rdd = bulb_rdd.map( lambda val : [(val[0], val[1]),[val[2],1]])
both_rdd.collect()

[[('filamentA', '100W'), [605, 1]],
 [('filamentB', '100W'), [683, 1]],
 [('filamentB', '100W'), [691, 1]],
 [('filamentB', '200W'), [561, 1]],
 [('filamentA', '200W'), [530, 1]],
 [('filamentA', '100W'), [619, 1]],
 [('filamentB', '100W'), [686, 1]],
 [('filamentB', '200W'), [600, 1]],
 [('filamentB', '100W'), [696, 1]],
 [('filamentA', '200W'), [579, 1]],
 [('filamentA', '200W'), [520, 1]],
 [('filamentA', '100W'), [622, 1]],
 [('filamentA', '100W'), [668, 1]],
 [('filamentB', '200W'), [569, 1]],
 [('filamentB', '200W'), [555, 1]],
 [('filamentA', '200W'), [541, 1]]]

In [58]:
both_agg = both_rdd.reduceByKey(lambda l1,l2:[l1[0]+l2[0], l1[1]+l2[1]])

both_agg.collect()

[(('filamentB', '100W'), [2756, 4]),
 (('filamentA', '200W'), [2170, 4]),
 (('filamentA', '100W'), [2514, 4]),
 (('filamentB', '200W'), [2285, 4])]

In [59]:
mean_both = both_agg.map(lambda x: [x[0], float( x[1][0]  /  x[1][1] )  ]  )

mean_both.collect()

[[('filamentB', '100W'), 689.0],
 [('filamentA', '200W'), 542.5],
 [('filamentA', '100W'), 628.5],
 [('filamentB', '200W'), 571.25]]

## Join Data 

perform the following on the Students and Subjects tables:

* Inner join (common in both) **join()**
  - An inner join returns all the keys that are common to both tables. 
  - It discards the key elements that are not common to both tables
* Left outer join (left table elements) **leftOuterJoin()**
  - left outer join includes all keys in the left table and excludes uncommon keys from  the right table. 
  - A left outer join can be performed by using the *leftOuterJoin()* function
defined on the RDD in PySpark.
* Right outer join **rightOuterJoin()**
  - every key of the second table is included, but from the first table, only those keys that are common to both tables are included.
* Full outer join **fullOuterJoin()** 
  - include all keys from both tables, go for a full outer join.

In [60]:
studentData = [['si1','Robin','M'],
['si2','Maria','F'],
['si3','Julie','F'],
['si4','Bob', 'M'],
['si6','William','M']]

In [61]:
subjectsData = [['si1','Python'],
 ['si3','Java'],
['si1','Java'],
 ['si2','Python'],
['si3','Ruby'],
 ['si4','C++'],
 ['si5','C'],
['si4','Python'],
 ['si2','Java']]

In [62]:
studentRDD = sc.parallelize(studentData, 2)

subjectRDD = sc.parallelize(subjectsData,2)

In [68]:
# making pair RDD 

stu_rdd = studentRDD.map(lambda x: (x[0],[x[1],x[2]]))
sub_rdd = subjectRDD.map(lambda x: (x[0],[x[1]]))

**INNER JOIN**

In [69]:
inner_rdd = stu_rdd.join(sub_rdd)

inner_rdd.collect()

[('si4', (['Bob', 'M'], ['C++'])),
 ('si4', (['Bob', 'M'], ['Python'])),
 ('si3', (['Julie', 'F'], ['Java'])),
 ('si3', (['Julie', 'F'], ['Ruby'])),
 ('si1', (['Robin', 'M'], ['Python'])),
 ('si1', (['Robin', 'M'], ['Java'])),
 ('si2', (['Maria', 'F'], ['Python'])),
 ('si2', (['Maria', 'F'], ['Java']))]

**LEFT OUTER JOIN**

In [70]:
left_rdd = stu_rdd.leftOuterJoin(sub_rdd)

left_rdd.collect()

[('si4', (['Bob', 'M'], ['C++'])),
 ('si4', (['Bob', 'M'], ['Python'])),
 ('si6', (['William', 'M'], None)),
 ('si3', (['Julie', 'F'], ['Java'])),
 ('si3', (['Julie', 'F'], ['Ruby'])),
 ('si1', (['Robin', 'M'], ['Python'])),
 ('si1', (['Robin', 'M'], ['Java'])),
 ('si2', (['Maria', 'F'], ['Python'])),
 ('si2', (['Maria', 'F'], ['Java']))]

**RIGHT OUTER JOIN**

In [71]:
right_rdd = stu_rdd.rightOuterJoin(sub_rdd)

right_rdd.collect()

[('si4', (['Bob', 'M'], ['C++'])),
 ('si4', (['Bob', 'M'], ['Python'])),
 ('si3', (['Julie', 'F'], ['Java'])),
 ('si3', (['Julie', 'F'], ['Ruby'])),
 ('si5', (None, ['C'])),
 ('si1', (['Robin', 'M'], ['Python'])),
 ('si1', (['Robin', 'M'], ['Java'])),
 ('si2', (['Maria', 'F'], ['Python'])),
 ('si2', (['Maria', 'F'], ['Java']))]

**FULL OUTER JOIN**

In [72]:
fullOuter_rdd = stu_rdd.fullOuterJoin(sub_rdd)

fullOuter_rdd.collect()

[('si4', (['Bob', 'M'], ['C++'])),
 ('si4', (['Bob', 'M'], ['Python'])),
 ('si6', (['William', 'M'], None)),
 ('si3', (['Julie', 'F'], ['Java'])),
 ('si3', (['Julie', 'F'], ['Ruby'])),
 ('si5', (None, ['C'])),
 ('si1', (['Robin', 'M'], ['Python'])),
 ('si1', (['Robin', 'M'], ['Java'])),
 ('si2', (['Maria', 'F'], ['Python'])),
 ('si2', (['Maria', 'F'], ['Java']))]

## Page - Rank

The page rank of a particular web page indicates its relative importance within a group of web pages. The higher the page rank, the higher up it will appear in a search result list.

The importance of a page is defined by the importance of all the web pages that provide an outbound link to the web page in consideration. For example, say that web page X has very high relative importance. Web page X is outbounding to web page Y; hence, web page Y will also have high importance.

- Algorithm 

1. Initialize each page with a page rank of 1 or some arbitrary
number.
2. For i in someSequence, do the following:

  a. Calculate the contribution of each inbound page.

  b. Calculate the sum of the contributions.

  c. Determine the updated page rank as follows:
  
updated page rank = 1 – s + s × summation of contributions

In [73]:
pageLinks = [['a' ,['b','c','d']],
['c', ['b']],
['b', ['d','c']],
['d', ['a','c']]]



In [75]:
# initialize page ranks (step 1)
pageRanks = [['a',1],['c',1],
['b',1],
['d',1]]

In [76]:
"""
function that will take two arguments. 
- The first argument : web page URIs, which provide the outbound links to other web pages.
- The second argument: rank of the web page accessed through the outbound links that are the first argument. 

@ The function will return the contribution to all the web pages in the first argument
"""

def rankContribution(uris, rank):
  numberOfUris = len(uris)
  rankContribution = float(rank) / numberOfUris
  newrank =[]
  for uri in uris:
    newrank.append((uri, rankContribution))
  return newrank

In [77]:
pageLinksRDD = sc.parallelize(pageLinks, 2)
pageRanksRDD = sc.parallelize(pageRanks, 2)

In [78]:
# define the iteration parameters 
numIter = 20
s = 0.85

In [79]:
for i in range(numIter):
  linksRank = pageLinksRDD.join(pageRanksRDD)
  contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
  sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2)
  pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1]))

In [80]:
pageRanksRDD.collect()

[('a', 0.5217268024809147),
 ('b', 1.357243795127982),
 ('c', 1.2463781024360086),
 ('d', 0.8746512999550939)]