In [1]:
from google.cloud import storage
import os
from io import BytesIO
import pandas as pd
import re
from pyspark.sql import SparkSession
from operator import add

In [2]:
bucket_name = "jupyter_dataset" 
storage_client = storage.Client()

bucket = storage_client.get_bucket(bucket_name)

# TODO: Specify the file name for blob object
blob = storage.blob.Blob("twitter_combined.txt",bucket)

In [3]:
# Convert to string
content = blob.download_as_string()

# Create a pandas dataframe
train = pd.read_csv(BytesIO(content))


# This is the entry point to programming Spark with the Dataset and DataFrame API.
spark = SparkSession.builder.appName("PythonPageRank").getOrCreate()

# Create a spark dataframe from a pandas dataframe
df = spark.createDataFrame(train)

In [4]:
def computeContribs(urls, rank):
    """Calculates the URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)
        
def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

# Baseline model

In [5]:
# Load in the input file, we will access the input from a spark dataframe here
#lines = spark.read.text('path/to/file').rdd.map(lambda r: r[0])
lines = df.rdd.map(lambda r: r[0])

# Loads all URLs from input file and initialize their neighbors
links = lines.map(lambda line : parseNeighbors(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

# Calculates and updates URL ranks continuously using PageRank algorithm
# Calculates URL contributions to the rank of other URLs
contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
# Re-calculates URL ranks based on neighbor contributions
ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    
# Collects all URL ranks
i_nonir = []
j_nonir = []
for (link, rank) in ranks.collect():
    i_nonir.append(link)
    j_nonir.append(rank)
    #print("%s has rank: %s." % (link, rank))
k_nonir = {"link_nonir":i_nonir,
     "rank_nonir":j_nonir
    }
nonirDF = pd.DataFrame(k_nonir)
nonirDF.head(10)

Unnamed: 0,link_nonir,rank_nonir
0,2367911,22.031627
1,22462180,88.561301
2,86221475,2.324324
3,148519842,3.450056
4,227650565,0.793556
5,168224294,3.343081
6,62917600,1.466638
7,233598791,2.441054
8,196680777,6.600378
9,205017448,0.653865


In [6]:
nonirDF = nonirDF.sort_values(by = ["rank_nonir"], ascending=False)
nonirDF = nonirDF.iloc[0:100]
nonirDF

Unnamed: 0,link_nonir,rank_nonir
61156,115485051,265.781690
76198,813286,214.505979
71104,40981798,161.782843
50900,43003845,120.145804
50971,7861312,111.746345
...,...,...
56084,15661871,25.893690
61178,15666380,25.868746
20766,2384071,25.695547
40795,26281970,25.647672


# PageRank model with 10 iterations

In [7]:
# Load in the input file, we will access the input from a spark dataframe here
#lines = spark.read.text('path/to/file').rdd.map(lambda r: r[0])
lines = df.rdd.map(lambda r: r[0])

# Loads all URLs from input file and initialize their neighbors
links = lines.map(lambda line : parseNeighbors(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

# Calculates and updates URL ranks continuously using PageRank algorithm
for iteration in range(int(10)):
    
    # Calculates URL contributions to the rank of other URLs
    contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
    
    # Re-calculates URL ranks based on neighbor contributions
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    
# Collects all URL ranks
i_ir10 = []
j_ir10 = []
for (link, rank) in ranks.collect():
    i_ir10.append(link)
    j_ir10.append(rank)
    #print("%s has rank: %s." % (link, rank))
k_ir10 = {"link_ir10":i_ir10,
     "rank_ir10":j_ir10
    }
ir10DF = pd.DataFrame(k_ir10)
ir10DF.head(10)

Unnamed: 0,link_ir10,rank_ir10
0,19049747,2.548166
1,48621884,1.7153
2,21195122,3.603616
3,132660302,0.180709
4,14521330,1.729154
5,1065921,7.069719
6,108247668,1.774052
7,24249821,0.5211
8,237420957,3.595281
9,443940778,0.769982


In [8]:
ir10DF = ir10DF.sort_values(by = ["rank_ir10"], ascending=False)
ir10DF = ir10DF.iloc[0:100]
ir10DF

Unnamed: 0,link_ir10,rank_ir10
18514,115485051,469.415036
57869,116485573,423.489825
35936,813286,149.033961
54466,11348282,83.764977
13006,40981798,83.144568
...,...,...
59044,20609518,23.779876
6514,10228272,23.750302
36014,43166813,23.611549
36005,15962096,23.589795


# PageRank model with 20 iterations

In [9]:
# Load in the input file, we will access the input from a spark dataframe here
#lines = spark.read.text('path/to/file').rdd.map(lambda r: r[0])
lines = df.rdd.map(lambda r: r[0])

# Loads all URLs from input file and initialize their neighbors
links = lines.map(lambda line : parseNeighbors(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

# Calculates and updates URL ranks continuously using PageRank algorithm
for iteration in range(int(20)):
    
    # Calculates URL contributions to the rank of other URLs
    contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
    
    # Re-calculates URL ranks based on neighbor contributions
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    
# Collects all URL ranks
i_ir20 = []
j_ir20 = []
for (link, rank) in ranks.collect():
    i_ir20.append(link)
    j_ir20.append(rank)
    #print("%s has rank: %s." % (link, rank))
k_ir20 = {"link_ir20":i_ir20,
     "rank_ir20":j_ir20
    }
ir20DF = pd.DataFrame(k_ir20)
ir20DF.head(10)

Unnamed: 0,link_ir20,rank_ir20
0,337131276,1.061288
1,28420827,8.01441
2,21879024,9.093576
3,228794007,1.837216
4,12611642,19.933591
5,242479442,2.553447
6,20108560,5.299323
7,36846350,0.27435
8,15839528,0.603607
9,79184073,0.470396


In [10]:
ir20DF = ir20DF.sort_values(by = ["rank_ir20"], ascending=False)
ir20DF = ir20DF.iloc[0:100]
ir20DF

Unnamed: 0,link_ir20,rank_ir20
67655,115485051,539.591521
65182,116485573,464.329380
22728,813286,145.325111
1454,11348282,82.290522
57059,40981798,80.309592
...,...,...
35792,9451052,23.080099
42061,10228272,23.054582
43565,14464369,23.045354
30463,15962096,22.956502


In [18]:
#write to csv file
nonirDF['link_ir10'] = list(ir10DF['link_ir10'])
nonirDF['rank_ir10'] = list(ir10DF['rank_ir10'])
nonirDF['link_ir20'] = list(ir20DF['link_ir20'])
nonirDF['rank_ir20'] = list(ir20DF['rank_ir20'])
nonirDF.to_csv('PageRank_3models.csv')

# Overlapping Calculation

In [24]:
#Calculate overlapping
def overlappingCalculation(link1,link2):
    overlappingLink = set(list(nonirDF[link1]))&set(list(nonirDF[link2]))
    overlappingRate = int(len(overlappingLink)) * 0.01
    return overlappingRate

In [25]:
#nonir & ir10
overlappingCalculation('link_nonir','link_ir10')

0.72

In [26]:
#nonir & ir20
overlappingCalculation('link_nonir','link_ir20')

0.72

In [27]:
#ir10 & ir20
overlappingCalculation('link_ir10','link_ir20')

1.0

In [28]:
#overall overlapping
overlappingLink = set(list(nonirDF['link_nonir']))&set(list(nonirDF['link_ir10']))&set(list(nonirDF['link_ir20']))
overlappingRate = int(len(overlappingLink)) * 0.01
overlappingRate

0.72

# Wilcoxon Sign-Ranked Test

In [32]:
from scipy.stats import wilcoxon
compareDF = pd.merge(ir10DF, ir20DF, how='left', left_on='link_ir10',right_on='link_ir20')
compareDF.drop(['link_ir20'],axis=1,inplace=True)
compareDF

Unnamed: 0,link_ir10,rank_ir10,rank_ir20
0,115485051,469.415036,539.591521
1,116485573,423.489825,464.329380
2,813286,149.033961,145.325111
3,11348282,83.764977,82.290522
4,40981798,83.144568,80.309592
...,...,...,...
95,20609518,23.779876,23.187638
96,10228272,23.750302,23.054582
97,43166813,23.611549,23.253604
98,15962096,23.589795,22.956502


In [35]:
wilcoxon(compareDF['rank_ir10'],compareDF['rank_ir20'], zero_method='wilcox', correction=False)

WilcoxonResult(statistic=1005.0, pvalue=1.7297891814322726e-07)