In [1]:
# from pyspark import SparkConf, SparkContext
import findspark
findspark.init('/Users/amogh/spark-2.4.4-bin-hadoop2.7/')

In [8]:
from __future__ import print_function
import re,sys
from operator import add
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql import SQLContext as sqlcontext
from pyspark.sql.functions import *

class PageRank:
    def __init__(self,path,max_iter=10,init_rank=1.0):
        self.path = path
        self.max_iter = max_iter
        self.init_rank = init_rank

    def fit(self):
        spark = SparkSession.builder.appName("PageRank").getOrCreate()
        data = spark.read.text(self.path).rdd.map(lambda row: row[0])
#         data = spark.read.format('csv').options(delimiter='\t').load(self.path).rdd.map(lambda row:row[0])
        '''
        OUTPUT: data.collect() will retrieve the content inside the RDD.
        ['article1\tarticle2','article1\tarticle4','article2\tarticle3','article3\tarticle1','article4\tarticle2','article5\tarticle6']
        '''
        
        adj_list = data.map(lambda data: self.create_adj_list(data)).distinct().groupByKey().cache()
        
        '''
        OUTPUT: links.collect() will give a <K, Iterables<V> pairs)
        [('article1', <pyspark.resultiterable.ResultIterable at 0x1c6e180d4c8>),
         ('article2', <pyspark.resultiterable.ResultIterable at 0x1c6e180da48>),
         ('article3', <pyspark.resultiterable.ResultIterable at 0x1c6e180d488>),
         ('article4', <pyspark.resultiterable.ResultIterable at 0x1c6e180dd08>),
         ('article5', <pyspark.resultiterable.ResultIterable at 0x1c6e180d0c8>)]
 
        links.mapValues(list).collect() will give the values of the key instead of Iterables
        [('article1', ['article2', 'article4']),
         ('article2', ['article3']),
         ('article3', ['article1']),
         ('article4', ['article2']),
         ('article5', ['article6'])]
        '''
        
        ranks = adj_list.map(lambda key: (key[0], self.init_rank))
        '''
        Initializes the ranks to 1.
        Output: initial_ranks.collect()
        [('article1', 1),
         ('article2', 1),
         ('article3', 1),
         ('article4', 1),
         ('article5', 1)]
        '''
        for iteration in range(self.max_iter):
            mapped_adj_list = self.mapper(adj_list,ranks)
            '''
            OUTPUT: mapped_adj_list.collect()
            [('article2', 0.5), ('article4', 0.5), ('article3', 1.0), ('article1', 1.0), ('article2', 1.0), ('article6', 1.0)]
            '''
            ranks = self.reducer(mapped_adj_list)
            print("RANKER IS", ranks.collect())
            '''
            OUTPUT: ranks.collect()
            [('article2', 1.4249999999999998), ('article3', 1.0), ('article1', 1.0), ('article6', 1.0), ('article4', 0.575)]
            '''
            
        #Print page rank values
        for (link, rank) in ranks.collect():
            print("%s has rank: %s." % (link, rank))
        
        schema = StructType([StructField(str(i), StringType(), True) for i in range(2)])
        df = spark.createDataFrame(ranks, schema)
        print(df.columns)
#         df.sort(df.columns[0].asc()).collect()
        top_df = df.sort(asc(df.columns[0]))
        top_df = top_df.sort(asc(df.columns[1]))
        top_df.limit(3).repartition(1).write.csv("outt1t.csv", sep='\t')
        
        spark.stop()
    
    def mapper(self, adj_list, ranks):
        # Generates Key-Value pair where Key is the node and the value is the page rank value of its incoming node
        return adj_list.join(ranks).flatMap(lambda rank: self.getRank(rank[1][0], rank[1][1]))
        
    def reducer(self,mapped_adj_list):
        # Aggregates the output from the mapper along with dampning effect
        return mapped_adj_list.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)    

    def create_adj_list(self,link):
        k,v = re.split(r'\s+', link)
        return k,v
    
    def getRank(self,links, rank):
        num_link = len(links)
        for link in links:
            yield (link, rank / num_link)

    def read(self):
        return spark.read.text(self.path).rdd.map(lambda row: row[0]) 

In [9]:
pg = PageRank('url2.txt',max_iter=2,init_rank=1)
pg.fit()

RANKER IS [('C', 0.3625), ('B', 3.125), ('D', 1.2125), ('E', 0.3625), ('F', 0.3625), ('A', 0.575)]
RANKER IS [('C', 0.8140625), ('D', 1.3028125), ('B', 1.5896875), ('A', 0.6653125), ('E', 0.8140625), ('F', 0.8140625)]
C has rank: 0.8140625.
D has rank: 1.3028125.
B has rank: 1.5896875.
A has rank: 0.6653125.
E has rank: 0.8140625.
F has rank: 0.8140625.
['0', '1']


AnalysisException: 'path file:/C:/Users/amogh/Appledore/Spring-2020/Computer-Systems/outt1t.csv already exists.;'