In [1]:
sc

<pyspark.context.SparkContext at 0x100676850>

In [2]:
# from __future__ import print_function

import re
import sys
from operator import add

In [87]:
base_path = "./1_10/"
# base_path = "./lenta.ru/1_1000/"

# 1. Parse documents

In [88]:
import pandas as pd
urls_df = pd.read_csv(base_path + 'urls.txt', sep=str('\t'), header=None)
urls_df.columns = ["doc_id", "url"]
urls_df = urls_df.set_index("doc_id")
doc_urls_dict = urls_df.to_dict()["url"]

In [89]:
from base64 import b64decode
import zlib
from bs4 import BeautifulSoup

doc_urls_dict_bc = sc.broadcast(doc_urls_dict)

def parse_doc_line(line):
    doc_id, doc_base64 = line.split('\t')
    doc_gzip = b64decode(doc_base64)
    doc_html = zlib.decompress(doc_gzip).decode('utf-8')
    
    return int(doc_id), doc_html

def extract_links(doc):
    doc_id, doc_html = doc
    
    doc_links = []
    boc_bs = BeautifulSoup(doc_html, 'lxml')
    for a in boc_bs.find_all('a', href=True):
        href = a['href']
        if href.startswith('mailto'):
            continue
        if href.startswith('/'):
            href = 'http://lenta.ru' + href
        if not href.startswith('http') and not href.startswith('www'):
            pass
        else:
            doc_links.append(href.strip())
            
    return doc_urls_dict_bc.value[doc_id].strip(), doc_links


lines = sc.textFile(base_path + "docs*")
docs = lines.map(lambda line: parse_doc_line(line))
doc_links = docs.map(lambda doc: extract_links(doc))
doc_links = doc_links.filter(lambda x: len(x[1]))
adjacency_list = doc_links.flatMapValues(lambda x: x)
to_save = adjacency_list.map(lambda urls: "%s\t%s" % urls)
to_save = to_save.distinct()
out_path_stage1 = './out_stage1/%s' % base_path.replace('/', '_').replace('.', '_')
to_save.saveAsTextFile(out_path_stage1)

# 2. Page Rank

In [90]:
N = sc.textFile(base_path + "docs*").count()
gamma = 0.85

In [91]:
N

56455

In [92]:
def compute_contribs(urls, rank):
    num_urls = len(urls)
    for url in urls:
        yield (url, rank / num_urls)

def parse_neighbors(urls):
    parts = urls.split('\t')
    return parts[0], parts[1]

In [93]:
lines = sc.textFile(out_path_stage1 + '/part-*')
doc_links = lines.map(lambda urls: parse_neighbors(urls)).distinct().groupByKey().cache()

In [94]:
ranks = doc_links.map(lambda url_neighbors: (url_neighbors[0], 1/float(N)))

In [95]:
for iteration in range(4):
    contribs = doc_links.join(ranks)\
                    .flatMap(lambda url_urls_rank: compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1]))

    ranks = contribs.reduceByKey(add).mapValues(lambda sum_contrib: sum_contrib * gamma + (1 - gamma)/float(N))

In [96]:
ranks_sorted = ranks.sortBy(lambda a: a[1], ascending=False)

In [97]:
# out_path_stage2 = './out_stage2/%s' % base_path.replace('/', '_').replace('.', '_')
# to_save = ranks_sorted.map(lambda x: "%s\t%s" % x)
# to_save.saveAsTextFile(out_path_stage2)

In [115]:
for (link, rank) in ranks_sorted.take(30):
    print("%s has rank: %s." % (link, rank))

http://ads.adfox.ru/202433/goDefaultLink?p1=biozy&p2=v has rank: 4.2482341664e-05.
http://lenta.ru/rubrics/sport/ has rank: 4.21980741976e-05.
http://dom.lenta.ru has rank: 4.21980741976e-05.
http://lenta.ru/rubrics/russia/ has rank: 4.21980741976e-05.
http://motor.ru has rank: 4.21980741976e-05.
http://lenta.ru/rubrics/forces/ has rank: 4.21980741976e-05.
http://orphus.ru has rank: 4.21980741976e-05.
http://lenta.ru/rubrics/life/ has rank: 4.21980741976e-05.
http://lenta.ru/info/ has rank: 4.21980741976e-05.
http://lenta.ru/parts/photo/ has rank: 4.21980741976e-05.
http://lenta.ru/rubrics/media/ has rank: 4.21980741976e-05.
http://lenta.ru/rubrics/ussr/ has rank: 4.21980741976e-05.
http://lenta.ru/parts/video/ has rank: 4.21980741976e-05.
http://dom.lenta.ru/ has rank: 4.21980741976e-05.
http://lenta.ru/rubrics/science/ has rank: 4.21980741976e-05.
http://motor.ru/ has rank: 4.21980741976e-05.
http://lenta.ru/rubrics/economics/ has rank: 4.21980741976e-05.
http://vk.com/lentaru has ra

# 3 HIST

In [129]:
import math

In [130]:
lines = sc.textFile(out_path_stage1 + '/part-*')
doc_links = lines.map(lambda urls: parse_neighbors(urls)).distinct().groupByKey().cache()
lines = sc.textFile(out_path_stage1 + '/part-*')
invert_doc_links = lines.map(lambda urls: parse_neighbors(urls)[::-1]).distinct().groupByKey().cache()

In [131]:
auth_score = invert_doc_links.map(lambda url_neighbors: (url_neighbors[0], 1/math.sqrt(N)))
hub_score = doc_links.map(lambda url_neighbors: (url_neighbors[0], 1/math.sqrt(N)))

In [132]:
for _ in range(1):
    # update all authority values first
    hub_contribs = invert_doc_links\
                        .join(auth_score)\
                        .flatMap(lambda url_urls_rank:\
                                 compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1]))

    hub_score = hub_contribs\
                    .reduceByKey(add)\
                    .mapValues(lambda x: x)

    hub_norm = math.sqrt(hub_score.map(lambda x: x[1]**2).sum())
    hub_norm_bc = sc.broadcast(hub_norm)
    hub_score = hub_score.map(lambda x: (x[0], x[1]/hub_norm_bc.value))

    # then update all hub values
    auth_contribs = doc_links\
                    .join(hub_score)\
                    .flatMap(lambda url_urls_rank:\
                                 compute_contribs(url_urls_rank[1][0], url_urls_rank[1][1]))

    auth_score = auth_contribs\
                    .reduceByKey(add)\
                    .mapValues(lambda x: x)

    auth_norm = math.sqrt(auth_score.map(lambda x: x[1]**2).sum())
    auth_norm_bc = sc.broadcast(auth_norm)
    auth_score = auth_score.map(lambda x: (x[0], x[1]/auth_norm_bc.value))

In [133]:
auth_score_sorted = auth_score.sortBy(lambda a: a[1], ascending=False)
hub_score_sorted = hub_score.sortBy(lambda a: a[1], ascending=False)

In [134]:
for (link, rank) in hub_score_sorted.take(30):
    print("%s has hub: %s." % (link, rank))

http://lenta.ru/lib has hub: 0.471454349149.
http://lenta.ru/2009/03/31/ has hub: 0.0578326952318.
http://lenta.ru/2008/05/29 has hub: 0.0577821205397.
http://lenta.ru/2009/01/23 has hub: 0.0577284294997.
http://lenta.ru/2009/03/18 has hub: 0.0574488477191.
http://lenta.ru/2009/05/13 has hub: 0.0574219332306.
http://lenta.ru/2010/03/23/ has hub: 0.0574060937419.
http://lenta.ru/2009/06/02 has hub: 0.0573672235107.
http://lenta.ru/news/2009/02/11/ has hub: 0.056633267469.
http://lenta.ru/2008/07/02 has hub: 0.0561851248401.
http://lenta.ru/2011/04/20 has hub: 0.0558182336089.
http://lenta.ru/2009/07/10 has hub: 0.055598437454.
http://lenta.ru/2008/09/25 has hub: 0.0555979221885.
http://lenta.ru/2008/10/09 has hub: 0.0549042424073.
http://lenta.ru/2008/08/08/ has hub: 0.0548080221404.
http://lenta.ru/2010/06/30 has hub: 0.0547290512639.
http://lenta.ru/2010/04/06 has hub: 0.0541940427941.
http://lenta.ru/2008/04/16/ has hub: 0.0541677801821.
http://lenta.ru/2008/02/22 has hub: 0.05398848

In [135]:
for (link, rank) in auth_score_sorted.take(30):
    print("%s has authority: %s." % (link, rank))

http://lenta.ru/rubrics/russia/ has authority: 0.143536839393.
http://vk.com/lentaru has authority: 0.143536839393.
http://facebook.com/lenta.ru has authority: 0.143536839393.
http://lenta.ru/specprojects/ has authority: 0.143536839393.
http://lenta.ru/rubrics/culture/ has authority: 0.143536839393.
http://lenta.ru/rubrics/media/ has authority: 0.143536839393.
http://lenta.ru/rss has authority: 0.143536839393.
http://lenta.ru/rubrics/science/ has authority: 0.143536839393.
http://motor.ru/ has authority: 0.143536839393.
http://lenta.ru/parts/photo/ has authority: 0.143536839393.
http://lenta.ru/rubrics/world/ has authority: 0.143536839393.
http://dom.lenta.ru/ has authority: 0.143536839393.
http://lenta.ru/rubrics/ussr/ has authority: 0.143536839393.
http://lenta.ru/parts/text/ has authority: 0.143536839393.
http://lenta.ru/rubrics/economics/ has authority: 0.143536839393.
http://lenta.ru/rubrics/sport/ has authority: 0.143536839393.
http://lenta.ru/rubrics/life/ has authority: 0.14353