In [0]:
import pyspark
import urllib

from pyspark.sql.functions import *


In [0]:
parameters = sc.textFile('/FileStore/tables/parameters-2.txt').collect()
input_file = parameters[0]
iterations = int(parameters[1])
output_file = parameters[2]

In [0]:
# Getting AWS keys 
aws_df = spark.read.format("csv").option("header","true").option("sep", ",").load(input_file)
ACCESS_KEY = aws_df.where(col('User name') == 'BigData').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_df.where(col('User name') == 'BigData').select('Secret access key').collect()[0]['Secret access key']
 
ENCODED_SEC_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# Mounting AWS-S3 Bucket. Returns true if successful
S3_BUCKET = "bdass2"
MOUNT_NAME = "/mnt/bigdata1"
 
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SEC_KEY, S3_BUCKET)
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

In [0]:
# Checking if the bucket was mounted successfully
display(dbutils.fs.ls(MOUNT_NAME))

path,name,size,modificationTime
dbfs:/mnt/bigdata1/Page_rank/,Page_rank/,0,0
dbfs:/mnt/bigdata1/Tweets.csv,Tweets.csv,3421431,1657837611000
dbfs:/mnt/bigdata1/Tweets_classifier/,Tweets_classifier/,0,0
dbfs:/mnt/bigdata1/airport_data.csv,airport_data.csv,1424563,1657711813000
dbfs:/mnt/bigdata1/page_rank_output.csv,page_rank_output.csv,23666,1657841070000


In [0]:
# Get the airport_data.csv file
file_locn = MOUNT_NAME + "/airport_data.csv"
 
sep = ","
file_type = "csv"
header = "true"
infer_schema = "true"
 
data = spark.read.format(file_type).option("inferSchema", infer_schema).option("sep", sep).option("header", header).load(file_locn)

In [0]:
display(data)

ORIGIN,ORIGIN_CITY_NAME,DEST,DEST_CITY_NAME
06A,"Kizhuyak, AK",A43,"Kodiak Island, AK"
09A,"Homer, AK",ADQ,"Kodiak, AK"
09A,"Homer, AK",ADQ,"Kodiak, AK"
1G4,"Peach Springs, AZ",BLD,"Boulder City, NV"
1G4,"Peach Springs, AZ",BLD,"Boulder City, NV"
1G4,"Peach Springs, AZ",BLD,"Boulder City, NV"
1VA,"Warrenton, VA",TN8,"Nashville, TN"
2AK,"Deer Park, AK",PTL,"Port Armstrong, AK"
2NC,"Asheboro, NC",MXE,"Maxton, NC"
7AK,"Akun, AK",DUT,"Unalaska, AK"


In [0]:
#We only need the origin_code, dest_code.
connection_data = data.rdd.map(lambda x: (x[0], x[2]))
connection_data.collect()

Out[49]: [('06A', 'A43'),
 ('09A', 'ADQ'),
 ('09A', 'ADQ'),
 ('1G4', 'BLD'),
 ('1G4', 'BLD'),
 ('1G4', 'BLD'),
 ('1VA', 'TN8'),
 ('2AK', 'PTL'),
 ('2NC', 'MXE'),
 ('7AK', 'DUT'),
 ('7AK', 'KQA'),
 ('7AK', 'DUT'),
 ('A1K', 'SCC'),
 ('A1K', 'A1K'),
 ('A20', 'ANC'),
 ('A27', 'FAI'),
 ('A43', 'KWP'),
 ('A43', 'EHR'),
 ('A43', 'KPY'),
 ('A43', '06A'),
 ('A43', 'DGB'),
 ('A43', 'BFB'),
 ('A43', 'KKB'),
 ('A43', 'ALZ'),
 ('A43', 'KKB'),
 ('A43', 'KZB'),
 ('A43', 'ALZ'),
 ('ABE', 'CVG'),
 ('ABE', 'ILN'),
 ('ABE', 'AFW'),
 ('ABE', 'YIP'),
 ('ABE', 'HRL'),
 ('ABE', 'STC'),
 ('ABE', 'ORD'),
 ('ABE', 'FNT'),
 ('ABE', 'MEM'),
 ('ABE', 'IND'),
 ('ABE', 'ORD'),
 ('ABE', 'LGA'),
 ('ABE', 'EWR'),
 ('ABE', 'ILN'),
 ('ABE', 'SBD'),
 ('ABE', 'CLE'),
 ('ABE', 'DTW'),
 ('ABE', 'ATL'),
 ('ABE', 'DTW'),
 ('ABE', 'LGA'),
 ('ABE', 'ATL'),
 ('ABE', 'HPN'),
 ('ABE', 'EWR'),
 ('ABE', 'ORD'),
 ('ABE', 'IAD'),
 ('ABE', 'EWR'),
 ('ABE', 'CLT'),
 ('ABE', 'CLT'),
 ('ABE', 'BNA'),
 ('ABE', 'EWR'),
 ('ABE', 'MDT'),
 ('AB

In [0]:
#Grouping by on the keys to get the outlinks of every airport in a list
outlink_connections = connection_data.groupByKey().mapValues(list)
outlink_connections.collect()

Out[50]: [('06A', ['A43']),
 ('09A', ['ADQ', 'ADQ']),
 ('1G4', ['BLD', 'BLD', 'BLD']),
 ('1VA', ['TN8']),
 ('2AK', ['PTL']),
 ('2NC', ['MXE']),
 ('7AK', ['DUT', 'KQA', 'DUT']),
 ('A1K', ['SCC', 'A1K']),
 ('A20', ['ANC']),
 ('A27', ['FAI']),
 ('A43',
  ['KWP',
   'EHR',
   'KPY',
   '06A',
   'DGB',
   'BFB',
   'KKB',
   'ALZ',
   'KKB',
   'KZB',
   'ALZ']),
 ('ABE',
  ['CVG',
   'ILN',
   'AFW',
   'YIP',
   'HRL',
   'STC',
   'ORD',
   'FNT',
   'MEM',
   'IND',
   'ORD',
   'LGA',
   'EWR',
   'ILN',
   'SBD',
   'CLE',
   'DTW',
   'ATL',
   'DTW',
   'LGA',
   'ATL',
   'HPN',
   'EWR',
   'ORD',
   'IAD',
   'EWR',
   'CLT',
   'CLT',
   'BNA',
   'EWR',
   'MDT',
   'SFB',
   'BNA',
   'PIE',
   'FLL',
   'BOS',
   'SRQ',
   'PBI',
   'PGD',
   'GPT']),
 ('ABI', ['LBB', 'DFW', 'AFW', 'DFW', 'DFW', 'DFW']),
 ('ABL',
  ['OTZ',
   'IAN',
   'SHG',
   'OTZ',
   'ORV',
   'SHG',
   'OBU',
   'OTZ',
   'RDB',
   'SHG',
   'IAN']),
 ('ABQ',
  ['LAX',
   'PDX',
   'EWR',
   'SEA',
   

In [0]:
#Getting distinct source nodes and initiallizing pagerank
src_nodes = data.rdd.map(lambda x: (x[0], 10))

#Getting distince destination nodes and initiallizing pagerank
dest_nodes = data.rdd.map(lambda x: (x[2], 10))

#Creating a list of all distinct nodes
nodes = src_nodes.union(dest_nodes).distinct()
nodes.collect()


Out[51]: [('09A', 10),
 ('1VA', 10),
 ('2AK', 10),
 ('2NC', 10),
 ('A1K', 10),
 ('A43', 10),
 ('ABE', 10),
 ('ABR', 10),
 ('ACT', 10),
 ('ACV', 10),
 ('ACY', 10),
 ('ADK', 10),
 ('ADQ', 10),
 ('AEX', 10),
 ('AFW', 10),
 ('AGS', 10),
 ('AHN', 10),
 ('AIA', 10),
 ('AIN', 10),
 ('AKB', 10),
 ('AKK', 10),
 ('AKN', 10),
 ('AKP', 10),
 ('ALN', 10),
 ('ALZ', 10),
 ('ANV', 10),
 ('APF', 10),
 ('APN', 10),
 ('ARC', 10),
 ('ART', 10),
 ('ATL', 10),
 ('ATT', 10),
 ('ATW', 10),
 ('AUG', 10),
 ('AUK', 10),
 ('AUO', 10),
 ('AUS', 10),
 ('AVP', 10),
 ('AVW', 10),
 ('AZ3', 10),
 ('AZA', 10),
 ('BAF', 10),
 ('BCT', 10),
 ('BDR', 10),
 ('BED', 10),
 ('BFD', 10),
 ('BFF', 10),
 ('BFI', 10),
 ('BFL', 10),
 ('BGM', 10),
 ('BGR', 10),
 ('BHM', 10),
 ('BID', 10),
 ('BIS', 10),
 ('BJC', 10),
 ('BJI', 10),
 ('BKC', 10),
 ('BKW', 10),
 ('BLD', 10),
 ('BLM', 10),
 ('BMI', 10),
 ('BOS', 10),
 ('BPT', 10),
 ('BQK', 10),
 ('BQN', 10),
 ('BRL', 10),
 ('BRY', 10),
 ('BTL', 10),
 ('BTT', 10),
 ('BTV', 10),
 ('BVU', 10

In [0]:
#Total node count.
N = nodes.count()

#Alpha
alpha = 0.15


#Finding out the dangling nodes and assigning them the bias. We will use them after 1st iteration.
dangling = src_nodes.subtractByKey(dest_nodes).map(lambda x: (x[0], alpha/N))
dangling.collect()


Out[52]: [('ALN', 0.00016574585635359117),
 ('AZ3', 0.00016574585635359117),
 ('BJC', 0.00016574585635359117),
 ('CEF', 0.00016574585635359117),
 ('CIC', 0.00016574585635359117),
 ('N1Y', 0.00016574585635359117),
 ('UOX', 0.00016574585635359117),
 ('VWD', 0.00016574585635359117),
 ('BCB', 0.00016574585635359117),
 ('ERV', 0.00016574585635359117),
 ('MVW', 0.00016574585635359117),
 ('PAQ', 0.00016574585635359117),
 ('SDX', 0.00016574585635359117),
 ('UXK', 0.00016574585635359117)]

In [0]:
#Running iterative PageRank
for i in range(iterations):
    nodes = outlink_connections.join(nodes)\
    .flatMap(lambda x : [(i, float(x[1][1])/len(x[1][0])) for i in x[1][0]])\
    .reduceByKey(lambda x,y: x+y)\
    .map(lambda x: (x[0], ((1 - alpha) * x[1]) + (alpha / N))).union(dangling)
    
nodes.count()


Out[53]: 905

In [0]:
#Sorting them in descending order.
ranking = nodes.sortBy(lambda x: -x[1])
ranking.collect()

Out[54]: [('ORD', 0.021289161977041525),
 ('DEN', 0.02044281734511347),
 ('ATL', 0.015671040102450975),
 ('DFW', 0.0144159048897202),
 ('ANC', 0.014300905153635338),
 ('FAI', 0.012414733478146199),
 ('LAS', 0.012270965185394887),
 ('BET', 0.012016010397682447),
 ('LAX', 0.01172475955121434),
 ('MSP', 0.01132500521100855),
 ('MCO', 0.011127407225370835),
 ('IAH', 0.011017073304921203),
 ('PHX', 0.01077003246826954),
 ('EWR', 0.010687940122868207),
 ('SEA', 0.010246093474706912),
 ('CLT', 0.010078170285926356),
 ('IAD', 0.008879188558951786),
 ('DTW', 0.008830197492640536),
 ('SLC', 0.008427030062986954),
 ('MIA', 0.008397877838100472),
 ('PHL', 0.008300298308946507),
 ('FLL', 0.00802096994323125),
 ('SFO', 0.00784817683848519),
 ('MEM', 0.007538234974546215),
 ('BOS', 0.007393989388510759),
 ('SDF', 0.0073453005309737016),
 ('BNA', 0.0072914422636011856),
 ('TPA', 0.006992508567338046),
 ('AUS', 0.0068692602394275935),
 ('JFK', 0.006119561397861505),
 ('WFB', 0.006048434275865864),
 ('B

In [0]:
#Writing the output file.
page_ranking = ranking.toDF(["Airport_code","PageRank"])
page_ranking.write.format("csv").option("header", "true").mode("overwrite").save(output_file)

In [0]:
display(page_ranking)

Airport_code,PageRank
ORD,0.0212891619770415
DEN,0.0204428173451134
ATL,0.0156710401024509
DFW,0.0144159048897202
ANC,0.0143009051536353
FAI,0.0124147334781461
LAS,0.0122709651853948
BET,0.0120160103976824
LAX,0.0117247595512143
MSP,0.0113250052110085
