In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit
from pyspark.sql.types import *
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('Graphs-HW2').getOrCreate()

In [2]:
import pandas as pd
import numpy as np

In [3]:
pr = spark.read.load('pr_graph.txt', format="text")

In [4]:
pr.createOrReplaceTempView('pr_sdf_view')
pr_sdf=spark.sql('SELECT CAST(split(value, " ")[0] AS integer) AS from_node ,CAST(split(value, " ")[1] AS integer) AS to_node, 0 AS contrib FROM pr_sdf_view')

In [5]:
pr_sdf.withColumn('pg', lit(0.001)).show()

+---------+-------+-------+-----+
|from_node|to_node|contrib|   pg|
+---------+-------+-------+-----+
|        1|      2|      0|0.001|
|        1|      3|      0|0.001|
|        1|      4|      0|0.001|
|        1|      5|      0|0.001|
|        2|      3|      0|0.001|
|        2|      5|      0|0.001|
|        3|      2|      0|0.001|
|        4|      5|      0|0.001|
|        5|      1|      0|0.001|
|        5|      6|      0|0.001|
|        5|      7|      0|0.001|
|        6|      7|      0|0.001|
|        7|      6|      0|0.001|
|        7|      2|      0|0.001|
|        7|      7|      0|0.001|
|        5|      4|      0|0.001|
+---------+-------+-------+-----+



In [6]:
def pagerank(G,num_iter):
    
    G.cache()
    G.createOrReplaceTempView('g_view')
    
    #getting the list of all the nodes in the graph and storing them in a nodes data frame
    nodes=G.select('from_node').distinct().sort('from_node')
    nodes=nodes.union(G.select('to_node').distinct().sort('to_node'))
    nodes=nodes.select('from_node').distinct().sort('from_node')
    
    #calculating the number of nodes to get the intitial page rank
    n=nodes.count()
    n=float(1)/n
    
    #defining the user-defined fns
    my_fn = F.udf(lambda pg, connects: div(pg,connects), DoubleType())
    my_pg = F.udf(lambda x: alpha_beta(x), DoubleType())
    
    #proportion of weight 
    p_o_w=pr_sdf.groupBy("from_node").count().withColumnRenamed("count", "pofw")
    p_o_w.createOrReplaceTempView('p_o_w_view')
    q=('SELECT g.from_node,g.to_node,p.pofw AS connects, CAST(' +str(n)+' AS float) AS pg FROM g_view AS g ,p_o_w_view AS p WHERE g.from_node=p.from_node')
    g=spark.sql(q)
    #print
    g.show()
    
    #datafram that contains the pagerank after every iteration
    pgi=nodes.withColumn("pg", lit(0))
    #print
    pgi.show()
    
    for i in range(num_iter):
        #print(i)
        g.cache()
        g.show()
        
        g.createOrReplaceTempView('g_view')
        df1= g.select('from_node','to_node',my_fn('pg','connects').alias('page_rank'))
        #df1.show()
        df1.createOrReplaceTempView('df1_view')
        
        q1=('SELECT to_node, sum(page_rank) as page_rank from df1_view GROUP BY to_node')
        pgi=spark.sql(q1)
        pgi=pgi.select('to_node',my_pg('page_rank').alias('pg'))
        #pgi.show()
        pgi.createOrReplaceTempView('pgi_view')
        q=('SELECT g.from_node,g.to_node,g.connects, p.pg FROM g_view g JOIN pgi_view p ON p.to_node=g.from_node')
        g=spark.sql(q)
        
    return pgi
        
def div(a,b):
    return float(a)/b

def alpha_beta(x):
    alpha=0.85
    beta=0.15
    return ((alpha*x)+beta)

## Step 5

In [7]:
pr_sdf.count()

16

In [8]:
pr_sdf.show()

+---------+-------+-------+
|from_node|to_node|contrib|
+---------+-------+-------+
|        1|      2|      0|
|        1|      3|      0|
|        1|      4|      0|
|        1|      5|      0|
|        2|      3|      0|
|        2|      5|      0|
|        3|      2|      0|
|        4|      5|      0|
|        5|      1|      0|
|        5|      6|      0|
|        5|      7|      0|
|        6|      7|      0|
|        7|      6|      0|
|        7|      2|      0|
|        7|      7|      0|
|        5|      4|      0|
+---------+-------+-------+



In [9]:
pagerank(pr_sdf,5).show()

+---------+-------+--------+----------+
|from_node|to_node|connects|        pg|
+---------+-------+--------+----------+
|        1|      2|       4|0.14285715|
|        1|      3|       4|0.14285715|
|        1|      4|       4|0.14285715|
|        1|      5|       4|0.14285715|
|        2|      3|       2|0.14285715|
|        2|      5|       2|0.14285715|
|        3|      2|       1|0.14285715|
|        4|      5|       1|0.14285715|
|        5|      1|       4|0.14285715|
|        5|      6|       4|0.14285715|
|        5|      7|       4|0.14285715|
|        6|      7|       1|0.14285715|
|        7|      6|       3|0.14285715|
|        7|      2|       3|0.14285715|
|        7|      7|       3|0.14285715|
|        5|      4|       4|0.14285715|
+---------+-------+--------+----------+

+---------+---+
|from_node| pg|
+---------+---+
|        1|  0|
|        2|  0|
|        3|  0|
|        4|  0|
|        5|  0|
|        6|  0|
|        7|  0|
+---------+---+

+---------+-------+---

In [10]:
pr_sdf.show()

+---------+-------+-------+
|from_node|to_node|contrib|
+---------+-------+-------+
|        1|      2|      0|
|        1|      3|      0|
|        1|      4|      0|
|        1|      5|      0|
|        2|      3|      0|
|        2|      5|      0|
|        3|      2|      0|
|        4|      5|      0|
|        5|      1|      0|
|        5|      6|      0|
|        5|      7|      0|
|        6|      7|      0|
|        7|      6|      0|
|        7|      2|      0|
|        7|      7|      0|
|        5|      4|      0|
+---------+-------+-------+

