In [117]:
from google.cloud import storage
import os
from io import BytesIO
import pandas as pd
import re
from pyspark.sql import SparkSession
from operator import add



In [118]:
bucket_name = "yh160" 
storage_client = storage.Client()

bucket = storage_client.get_bucket(bucket_name)

# TODO: Specify the file name for blob object
blob = storage.blob.Blob("twitter_combined.txt",bucket)

In [119]:
# Convert to string
content = blob.download_as_string()

# Create a pandas dataframe
train = pd.read_csv(BytesIO(content))


# This is the entry point to programming Spark with the Dataset and DataFrame API.
spark = SparkSession.builder.appName("PythonPageRank").getOrCreate()

# Create a spark dataframe from a pandas dataframe
df = spark.createDataFrame(train)

In [120]:
df.head()

Row(214328887 34428380='17116707 28465635')

In [121]:
def computeContribs(urls, rank):
    """Calculates the URL contributions to the rank of other URLs."""
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)
        
def parseNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

In [122]:
# Load in the input file, we will access the input from a spark dataframe here
#lines = spark.read.text('path/to/file').rdd.map(lambda r: r[0])
lines = df.rdd.map(lambda r: r[0])

# Loads all URLs from input file and initialize their neighbors
links = lines.map(lambda line : parseNeighbors(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()

# Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one
ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

# Calculates and updates URL ranks continuously using PageRank algorithm
for iteration in range(int(10)):
    
    # Calculates URL contributions to the rank of other URLs
    contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
    
    # Re-calculates URL ranks based on neighbor contributions
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    
# Collects all URL ranks
for (link, rank) in ranks.collect():
    print("%s has rank: %s." % (link, rank))

92336981 has rank: 1.9430573040299834.
148745060 has rank: 2.748661895942804.
17654192 has rank: 3.5523979676709656.
163090431 has rank: 0.9654809080361126.
359757400 has rank: 0.7244978910054823.
22949668 has rank: 1.5732146468823758.
36767299 has rank: 6.078044315244232.
333021748 has rank: 0.5240168644292261.
74580436 has rank: 9.161264885814727.
109055708 has rank: 0.243633636313562.
50059478 has rank: 0.2558339639007986.
40470589 has rank: 0.3860862103681646.
55387080 has rank: 0.9814977375843512.
160664744 has rank: 0.26487427940525843.
125120339 has rank: 1.385268029102299.
128260873 has rank: 1.7432685988800662.
269304313 has rank: 0.7868989691767964.
232561503 has rank: 1.0556397032771976.
361142565 has rank: 3.311248542445357.
282679866 has rank: 0.40841315581781557.
93069445 has rank: 0.8437095625979838.
73738774 has rank: 0.6231924700089181.
120599128 has rank: 0.36771540729523683.
17434613 has rank: 2.7436156800215286.
160565602 has rank: 0.6822976765525969.
114873228 has 

In [153]:
m = ranks.collect()
m = sorted(m, key=lambda x: x[1], reverse=True)
i_10 = m[0:100]

In [154]:
# iterations 20
lines = df.rdd.map(lambda r: r[0])

links = lines.map(lambda line : parseNeighbors(line)).distinct().groupByKey().mapValues(lambda x: list(x)).cache()

ranks = links.map(lambda url_neighbors: (url_neighbors[0], 1.0))

for iteration in range(int(20)):  
    
    contribs = links.join(ranks).flatMap(lambda url_urls_rank: computeContribs(url_urls_rank[1][0], url_urls_rank[1][1]))
    
    ranks = contribs.reduceByKey(add).mapValues(lambda rank: rank * 0.85 + 0.15)
    
for (link, rank_20) in ranks.collect():
    print("%s has rank: %s." % (link, rank_20))

134940306 has rank: 1.8972685102228966.
56423330 has rank: 1.4259724799784859.
540912889 has rank: 0.7837726728179566.
331720696 has rank: 0.4751747060974346.
117901353 has rank: 1.1077166421064208.
358775055 has rank: 1.8229426877505666.
127713437 has rank: 0.7254157226125947.
99577011 has rank: 0.34452157921109594.
247741328 has rank: 0.9810031367431499.
87309986 has rank: 0.3023052574918338.
254610699 has rank: 0.8531411518912699.
247583674 has rank: 0.5829758433959589.
22839501 has rank: 1.5496614343717403.
14166096 has rank: 3.6342539612724987.
337131276 has rank: 1.0612883342961867.
20106852 has rank: 5.965738728107597.
110481357 has rank: 1.136990950527906.
498549038 has rank: 0.6550545231761161.
130897520 has rank: 2.071592367190333.
131061324 has rank: 0.4368219399307366.
217708587 has rank: 0.22107376472343254.
31062805 has rank: 0.622072715419052.
24083901 has rank: 7.684985823159327.
12611642 has rank: 19.93359133688113.
304679484 has rank: 2.8913691135552777.
303060652 has

In [155]:
n = ranks.collect()
n = sorted(n, key=lambda x: x[1], reverse=True)
i_20 = n[0:100]

In [156]:
#Connect to GCP bucket and assign the bucket_name and specify the file name
bucket_name = "yh160" #Assign the bucket name where your file is stored
storage_client = storage.Client()

bucket = storage_client.get_bucket(bucket_name)

In [157]:
blob = storage.blob.Blob("twitter_combined.txt",bucket)
blob

<Blob: yh160, twitter_combined.txt, None>

In [158]:
# Convert to string
content = blob.download_as_string()

# Create a pandas dataframe
train = pd.read_csv(BytesIO(content))


# This is the entry point to programming Spark with the Dataset and DataFrame API.
spark = SparkSession.builder.appName("PythonPageRank").getOrCreate()

# Create a spark dataframe from a pandas dataframe
df = spark.createDataFrame(train)

In [159]:
target=df.collect()

In [160]:
data = []
for i in range(0,len(target)):
    m = target[i][0]
    data.append(m)
    

In [161]:
from collections import Counter

x = Counter(data)
i_fo = x.most_common(100)

In [162]:
i_10p=pd.DataFrame(i_10, columns =['source1','target1'])
i_20p=pd.DataFrame(i_20, columns =['source2','target2'])
i_fop=pd.DataFrame(i_fo, columns =['source3','target3'])

In [163]:
merged = pd.concat( [i_10p, i_20p, i_fop], axis=1 )
merged.to_csv('merged.csv', index = False)

In [179]:
info = []
for index in range(0,len(i_fo)):
    inx = i_fo[index][0].split()[1]
    fol = i_fo[index][1]
    info.append([inx,fol])

In [186]:
k = 0
for m in range(0,len(info)):
    if info[m][0] in i_20:
        k+=1
print(k)

0


In [187]:
k = 0
for m in range(0,len(info)):
    if info[m][0] in i_10:
        k+=1
print(k)

0
