In [0]:
from pyspark import SparkFiles
class airportPageRank:
     
    #Funtion to initialize parameters and getting dataframe in context    
    def __init__(self, inputFile, maxIter, alpha = 0.15):
        self.maxIter = maxIter
        self.alpha = alpha
        spark.sparkContext.addFile(inputFile)
        self.flights = spark.read.csv("file://"+SparkFiles.get("Jan_2021.csv"), header = True, inferSchema = True)

    #Function to get total number of nodes(Vertices)    
    def getCount(self):
        #Get distinct origin and destination nodes 
        self.destination = self.flights.select(['DEST']).distinct()
        self.origin = self.flights.select(['ORIGIN']).distinct()
        
        # Taking union of distinct origin and destinations to get all vertices 
        totalN = self.origin.union(self.destination).distinct()
        self.N = totalN.count()
    
    #Function to create edge dataframe 
    def getNodePair(self):
        flights = self.flights.select(['ORIGIN', 'DEST'])
        # Creating dataframe to map
        self.flightSrcDest = flights.rdd.map(lambda row : (row.ORIGIN, row.DEST)) 
    
    #Origin nodes that have no destination 
    def getDangNodes(self):
        dang = self.destination.subtract(self.origin)
        self.danglingNodes = dang.rdd.map(lambda x : (x.DEST, "no_dest"))
        
    def getPageRank(self):
        self.getCount()
        self.getDangNodes()
        self.getNodePair()
        flightPageRank = self.flightSrcDest.map(lambda x : (x[0], (x[1], 10)))
        
        for _ in range(self.maxIter):
            
            # Outgoing edges 
            Edges = flightPageRank.map(lambda x : (x[0], 1)).reduceByKey(lambda x, y : x+y)
            rank = flightPageRank.join(Edges)
            
            # page rank using outgoing edges 
            pageRank = rank.map(lambda x : (x[1][0][0], x[1][0][1]/x[1][1])).reduceByKey(lambda x, y: x+y)
            
            # for dangling nodes 
            mass = sc.accumulator(0)
            pageRank.join(self.danglingNodes).foreach(lambda x : mass.add(x[1][0]))
            dangPageRank = pageRank.join(self.danglingNodes)
            Mass = mass.value
            
            # page rank for this iteration
            G = self.N
            alphaTemp = self.alpha
            pageRank = pageRank.map(lambda x : (x[0], x[1]+(Mass/G)))
            pageRank = pageRank.map(lambda x : (x[0], x[1]*(1-alphaTemp)))
            pageRank = pageRank.map(lambda x : (x[0], x[1]+(alphaTemp/G)))
            
            #Updating page ranks
            flightPageRank = self.flightSrcDest.join(pageRank)
        
        # final calculated page after running all iterations
        finalPageRank = pageRank.sortBy(lambda x : -x[1])
        return finalPageRank

In [0]:
Url = "https://raw.githubusercontent.com/Anirudh-thakur/AirportPageRanking/main/Jan_2021.csv"
#datasetUrl='https://drive.google.com/uc?id=' + datasetUrl.split('/')[-2]
pageRankings = airportPageRank(Url, 25)
outputRdd = pageRankings.getPageRank()
schema = ["AirportNode", "PageRankValue"]
output = spark.createDataFrame(outputRdd, schema)
display(output)

AirportNode,PageRankValue
ORD,4.228510253615341
DEN,3.987748669200255
DFW,3.156090863365515
ATL,3.1432237847491478
PHX,2.675551631881731
LAX,2.643896942502539
CLT,2.603437357020547
LAS,2.335036483945933
MCO,2.316826119001359
IAH,2.2807504963769785


In [0]:
Output : https://raw.githubusercontent.com/Anirudh-thakur/AirportPageRanking/main/Output/Q2.1/AirportPageRank.csv