In [1]:
sc

<pyspark.context.SparkContext at 0x7f8aa0181310>

In [2]:
import sklearn as sk
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
import zlib
from base64 import b64decode

### parse urls

In [3]:
urls_nums_dict = {}
nums_urls_dict = {}
with open('1_10/urls.txt', 'r') as f:
    for line in f:
        num, url = line.strip().split('\t')
        urls_nums_dict[url] = int(num)
        nums_urls_dict[int(num)] = url
print len(nums_urls_dict)
for num, key in enumerate(nums_urls_dict):
    if num == 0:
        print key, nums_urls_dict[key]
    

564549
0 http://lenta.ru/news/2010/07/09/return/


### create link graph and delete hanging vertex

In [6]:
def parse_doc(line, urls_nums_dict):
    doc_num, doc_base64 = line.strip().split('\t')
    #doc_base64 += '=' * (-len(doc_base64) % 4)
    doc_gzip = b64decode(doc_base64)
    doc_html = zlib.decompress(doc_gzip).decode('utf-8')
    soup = BeautifulSoup(doc_html, 'html.parser')
    links = []
    for a in soup.find_all('a', href=True):
        link = a['href']
        #print link
        if link.startswith('/'):
            url = 'http://lenta.ru' + link
            #print url
            if url in urls_nums_dict:
                #url
                #print urls_nums_dict[url]
                if urls_nums_dict[url] not in links: 
                    links.append(urls_nums_dict[url])
    return (int(doc_num), links)

def delHangingVertex(elem, hanging_vertex):
    result = []
    for num_vertex in elem[1]:
        if num_vertex not in hanging_vertex:
            result.append(num_vertex)
    return (elem[0], result)
    
    
input_data = sc.textFile('1_10/docs-*.txt')
urls_nums_dict_var = sc.broadcast(urls_nums_dict)

link_graph = input_data.map(lambda line: parse_doc(line, urls_nums_dict_var.value))
hanging_vertex = link_graph.filter(lambda x: len(x[1]) == 0).keys().collect()

while (len(hanging_vertex) > 0):
    hanging_vertex_var = sc.broadcast(hanging_vertex)
    link_graph = link_graph.filter(lambda x: len(x[1])).map(lambda x: delHangingVertex(x, hanging_vertex_var.value))
    hanging_vertex = link_graph.filter(lambda x: len(x[1]) == 0).keys().collect()

In [7]:
#link_graph.count()
for item in link_graph.take(10):
    print item

(0, [548964, 22116, 421943, 230318, 160577, 344192, 237530, 400516, 104512, 139038, 450829, 376651, 546615, 437861, 194459, 12739, 292708, 21557, 539319, 120369, 165839, 562111, 65668, 292479, 545151, 107460, 371742, 101766, 276498, 66931, 219685, 446892, 82112, 468431, 332228, 397925, 493616, 525520, 377520, 397989, 254306, 55, 7003, 265759, 235714, 268573, 384934, 192269, 256070, 357923, 374324, 190304, 158629, 216499, 348721, 211991, 241736, 520936, 484893, 355097, 320597, 79761, 318329, 400678, 312011, 26984, 557578, 522713, 57740, 281835, 516741, 33106, 261457, 554624, 386139, 1301, 392491, 216453, 256626, 232845, 479370, 81738, 47941, 310936, 164761, 143214, 360517, 442188, 249145, 555301, 402018, 553746, 18382, 336213, 357417, 82101, 186102, 326176, 407656, 562529, 522142, 312126, 395976, 11949, 53049, 493453, 94029, 84445, 275747, 294200, 155532, 554159, 315918, 130864])
(1, [548964, 22116, 421943, 230318, 344192, 237530, 400516, 104512, 326176, 384561, 315918, 130864])
(2, [54

### count pagerank

In [8]:
def flatMapRank(elem):
    result = []
    count_vertex = len(elem[1])
    stream_weight = 1. / count_vertex
    for num_vertex in elem[1]:
        result.append([num_vertex, stream_weight])
    return result

def RankForOtherIter(elem):
    result = []
    count_vertex = len(elem[1][0])
    stream_weight = elem[1][1] * 1. / count_vertex
    for num_vertex in elem[1][0]:
        result.append([num_vertex, stream_weight])
    return result

In [9]:
vertex_count = link_graph.count()
factor = 0.8

In [10]:
print vertex_count, factor

56454 0.8


In [11]:
link_graph = link_graph.cache()
rank = link_graph.flatMap(lambda x: flatMapRank(x)).reduceByKey(lambda a, b: a + b).\
    mapValues(lambda x: (1. - factor) / vertex_count + factor * x).cache()

In [12]:
for i in range(10):
    join_rank_graph = link_graph.join(rank)
    rank = join_rank_graph.flatMap(lambda x: RankForOtherIter(x)).reduceByKey(lambda a, b: a + b).\
        mapValues(lambda x: (1. - factor) / vertex_count + factor * x)

In [13]:
rank = rank.map(lambda a: (a[1], a[0])).sortByKey(ascending=False).map(lambda a: (a[1], a[0])).collect()

In [14]:
for i in range(30):
    print nums_urls_dict[rank[i][0]], rank[i][1]
    

http://lenta.ru/ 2.96546123032e-05
http://lenta.ru/parts/video/ 2.96546123032e-05
http://lenta.ru/info/ 2.96546123032e-05
http://lenta.ru/rubrics/ww1/ 2.96546123032e-05
http://lenta.ru/parts/infographics/ 2.96546123032e-05
http://lenta.ru/specprojects/ 2.96546123032e-05
http://lenta.ru/parts/text/ 2.96546123032e-05
http://lenta.ru/rss 2.96546123032e-05
http://lenta.ru/rubrics/weapons/ 2.96546123032e-05
http://lenta.ru/rubrics/forces/ 2.96546123032e-05
http://lenta.ru/rubrics/business/ 2.94628815374e-05
http://lenta.ru/photo/2015/01/05/portartur/ 2.73046035069e-05
http://lenta.ru/rubrics/library/ 1.06005377355e-05
http://lenta.ru/photo/2014/12/20/newyeartoys/ 9.32739886992e-06
http://lenta.ru/articles/2014/12/25/wwichristmas/ 8.515565597e-06
http://lenta.ru/news/2014/12/25/chelsea/ 8.46553224437e-06
http://lenta.ru/rubrics/russia/regions/ 7.62867241204e-06
http://lenta.ru/rubrics/russia/moscow/ 7.62867241204e-06
http://lenta.ru/news/2014/11/24/arrows/ 5.60403278005e-06
http://lenta.ru/n

## Hits algorithm

### read data

In [15]:
def makeInvGraph(elem):
    result = []
    proccessVertex = elem[0]
    for nowVertex in elem[1]:
        result.append([nowVertex, proccessVertex])
    return result

input_data = sc.textFile('1_10/docs-*.txt')
urls_nums_dict_var = sc.broadcast(urls_nums_dict)

graph = input_data.map(lambda line: parse_doc(line, urls_nums_dict_var.value)).cache()
inv_graph = graph.flatMap(lambda x: makeInvGraph(x)).map(lambda x: (x[0], [x[1]])).\
            reduceByKey(lambda a, b: a + b).cache()

In [16]:
#for item in graph.take(10):
#    print item
for item in inv_graph.take(10):
    print item

(168784, [39331])
(164864, [10114])
(329112, [23283])
(298424, [28843])
(221704, [43312])
(210952, [828])
(259280, [44849])
(404264, [53737])
(407568, [10532])
(139608, [9443, 24448, 29295, 45665])


### initialize hub and auto for each node

In [17]:
auth_val = graph.map(lambda x: (x[0], 1.)).cache()
hub_val = graph.map(lambda x: (x[0], 1.)).cache()

## cut graph

In [18]:
hanging_vertex = inv_graph.filter(lambda x: len(x[1]) == 0).keys().collect()
while (len(hanging_vertex) > 0):
    hanging_vertex_var = sc.broadcast(hanging_vertex)
    inv_graph = inv_graph.filter(lambda x: len(x[1])).map(lambda x: delHangingVertex(x, hanging_vertex_var.value)).cache()
    hanging_vertex = inv_graph.filter(lambda x: len(x[1]) == 0).keys().collect()

In [19]:
hanging_vertex = graph.filter(lambda x: len(x[1]) == 0).keys().collect()
while (len(hanging_vertex) > 0):
    hanging_vertex_var = sc.broadcast(hanging_vertex)
    graph = graph.filter(lambda x: len(x[1])).map(lambda x: delHangingVertex(x, hanging_vertex_var.value)).cache()
    hanging_vertex = graph.filter(lambda x: len(x[1]) == 0).keys().collect()

### count auto and hub values for each vertex

In [20]:
for i in range(10):
    new_hub_val = inv_graph.join(auth_val).flatMap(lambda x: RankForOtherIter(x)).reduceByKey(lambda a, b:a+b)
    new_auth_val = graph.join(hub_val).flatMap(lambda x: RankForOtherIter(x)).reduceByKey(lambda a, b:a+b)
    #hub_norm_val = np.sqrt(new_hub_val.map(lambda x: x[1]**2).sum())
    #hub_norm_val_sc = sc.broadcast(hub_norm_val)
    #auth_norm_val = np.sqrt(new_auth_val.map(lambda x: x[1] ** 2).sum())
    #auth_norm_val_sc = sc.broadcast(auth_norm_val)
    #new_hub_val = new_hub_val.map(lambda x: (x[0], x[1] / hub_norm_val_sc))
    #new_auth_val = auth_norm_val.map(lambda x: (x[0], x[1] / auth_norm_val_sc))
    auth_val = new_auth_val.cache()
    hub_val = new_hub_val.cache()

### ouput result

In [21]:
auth_val = auth_val.sortBy(lambda x: x[1], ascending=False).collect()
hub_val = hub_val.sortBy(lambda x: x[1], ascending=False).collect()

In [22]:
for i in range(30):
    print nums_urls_dict[auth_val[i][0]], auth_val[i][1]

http://lenta.ru/ 92.921484877
http://lenta.ru/info/ 92.921484877
http://lenta.ru/specprojects/ 92.921484877
http://lenta.ru/parts/text/ 92.921484877
http://lenta.ru/rss 92.921484877
http://lenta.ru/parts/video/ 92.8297567597
http://lenta.ru/parts/infographics/ 92.8297567597
http://lenta.ru/rubrics/weapons/ 92.8297567597
http://lenta.ru/rubrics/ww1/ 92.3974181184
http://lenta.ru/rubrics/forces/ 92.298677507
http://lenta.ru/rubrics/business/ 89.3820960625
http://lenta.ru/photo/2015/01/05/portartur/ 58.5526465034
http://lenta.ru/rubrics/library/ 49.8744094999
http://lenta.ru/photo/2014/12/20/newyeartoys/ 44.1612056952
http://lenta.ru/news/2014/11/24/arrows/ 32.5682967257
http://lenta.ru/news/2014/12/04/transvesti/ 32.0923497863
http://lenta.ru/news/2014/12/22/hmmwv/ 28.7085644586
http://lenta.ru/articles/2014/12/25/wwichristmas/ 28.2435440821
http://lenta.ru/articles/2014/10/30/handmaid/ 27.5855166092
http://lenta.ru/articles/2014/11/03/traub/ 27.5811947327
http://lenta.ru/articles/2014/1

In [23]:
for i in range(30):
    print nums_urls_dict[hub_val[i][0]], hub_val[i][1]

http://lenta.ru/world/2000/07/07/nicaragua/ 33.7555035088
http://lenta.ru/oddly/2001/05/17/smile/ 4.55474302783
http://lenta.ru/news/2006/08/09/shvydkoi/ 4.42679898264
http://lenta.ru/news/2007/12/14/latvia/ 4.42614992331
http://lenta.ru/russia/2002/07/12/nazdratenko/ 4.4247393084
http://lenta.ru/economy/2001/09/14/oil/ 4.38122408143
http://lenta.ru/news/2005/09/29/fight/ 4.37921293047
http://lenta.ru/news/2010/12/12/gneva/ 4.37632915719
http://lenta.ru/articles/2010/08/16/starcraft/ 4.36324770178
http://lenta.ru/news/2012/11/27/radio/ 4.30901358579
http://lenta.ru/news/2006/11/09/poll/ 4.30326212741
http://lenta.ru/news/2006/02/16/mkkk/ 4.28867154797
http://lenta.ru/news/2005/07/28/footfans/ 4.28180547864
http://lenta.ru/sport/2001/08/24/draw/ 4.2576692132
http://lenta.ru/news/2011/11/16/blast/ 4.25489538765
http://lenta.ru/news/2009/03/25/wiki/ 4.25305446942
http://lenta.ru/news/2009/11/25/plead/ 4.25209348867
http://lenta.ru/news/2010/12/18/beefheart/ 4.2379199379
http://lenta.ru/ar