<a href="https://colab.research.google.com/github/blancavazquez/CursoDatosMasivosII/blob/2023-I/notebooks/3a_HIST_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Algoritmo de HITS usando pyspark

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
openjdk-8-jdk-headless is already the newest version (8u342-b07-0ubuntu1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 20 not upgraded.


In [2]:
import pyspark
from pyspark import SparkContext
from operator import add
import re, sys
import time

In [3]:
def computeAuth(urls, hub):
    """Calculates hub contributions to the auth of other URLs."""
    num_urls = len(urls)
    for url in urls: yield (url, hub)

def computeHub(urls, auth):
    """Calculates auth contributions to the hub of other URLs."""
    num_urls = len(urls)
    for url in urls: yield (url, auth)

def outNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[0], parts[1]

def inNeighbors(urls):
    """Parses a urls pair string into urls pair."""
    parts = re.split(r'\s+', urls)
    return parts[1], parts[0]

In [4]:
sc = SparkContext(appName="HITS_pySpark") # Initialize the spark context.

In [5]:
lineas = sc.textFile('/content/drive/MyDrive/Colab Notebooks/datos_vinculos/data_hist.txt')

In [6]:
lineas.collect()

['1 2', '1 3', '1 4', '2 1', '2 4', '3 5', '4 2', '4 3']

In [7]:
# create RDD in format (URL)
out_links = lineas.map(lambda urls: outNeighbors(urls)).distinct().groupByKey()
out_links.collect()

[('1', <pyspark.resultiterable.ResultIterable at 0x7f6cd7e13f50>),
 ('4', <pyspark.resultiterable.ResultIterable at 0x7f6cdef68750>),
 ('3', <pyspark.resultiterable.ResultIterable at 0x7f6cd7e13ed0>),
 ('2', <pyspark.resultiterable.ResultIterable at 0x7f6cdef681d0>)]

In [8]:
# create RDD in format (URL)
in_links = lineas.map(lambda urls: inNeighbors(urls)).distinct().groupByKey()
in_links.collect()

[('4', <pyspark.resultiterable.ResultIterable at 0x7f6cd7dba1d0>),
 ('1', <pyspark.resultiterable.ResultIterable at 0x7f6cd7dba090>),
 ('5', <pyspark.resultiterable.ResultIterable at 0x7f6cd7dba410>),
 ('2', <pyspark.resultiterable.ResultIterable at 0x7f6cd7dba590>),
 ('3', <pyspark.resultiterable.ResultIterable at 0x7f6cd875e450>)]

In [9]:
# Initialize hub of each URL to 1
hubs = out_links.map(lambda x: (x[0], 1.0))
hubs.collect()

[('1', 1.0), ('4', 1.0), ('3', 1.0), ('2', 1.0)]

In [10]:
# Initialize auths of each URL to 1
auths = in_links.map(lambda x: (x[0], 1.0))
auths.collect()

[('4', 1.0), ('1', 1.0), ('5', 1.0), ('2', 1.0), ('3', 1.0)]

In [11]:
# Calculates and updates hub & auth ranks continuously using HITS algorithm.
for iteration in range(10):
  # Calculates URL contributions to the rank of other URLs.
  
  # Here we are contributing auth of a link present in the outgoing list of a link whose hub is given
   print("Iteration:", iteration)
   auth_contribs = out_links.join(hubs).flatMap(lambda url_urls_rank:computeAuth(url_urls_rank[1][0], url_urls_rank[1][1]))
   auths = auth_contribs.reduceByKey(add)
   #print("auths:",auths.take(5))
   max_value = auths.max(lambda x:x[1])[1]
   auths = auths.mapValues(lambda rank: rank/max_value) #normalize
   #print("auths (normalizado):",auths.take(5))

   # Here we are contributing hub of a link present in the incoming list of a link whose auth is given
   hub_contribs = in_links.join(auths).flatMap(lambda url_urls_rank:computeHub(url_urls_rank[1][0], url_urls_rank[1][1]))
   hubs = hub_contribs.reduceByKey(add)
   max_value = hubs.max(lambda x:x[1])[1]
   hubs = hubs.mapValues(lambda rank:rank/max_value)#normalize

Iteration: 0
Iteration: 1
Iteration: 2
Iteration: 3
Iteration: 4
Iteration: 5
Iteration: 6
Iteration: 7
Iteration: 8
Iteration: 9


In [12]:
# Output final
for (link, rank) in auths.collect():
  print(link, "has auth:", (link, rank))

print("**************************************")
for (link, rank) in hubs.collect():
  print(link, "has hub:", (link, rank))

1 has auth: ('1', 0.12838756187947561)
2 has auth: ('2', 0.6149109120448921)
4 has auth: ('4', 0.4866324223620656)
3 has auth: ('3', 0.6149109120448921)
5 has auth: ('5', 2.1303163408019257e-07)
**************************************
3 has hub: ('3', 1.2411145506532355e-07)
4 has hub: ('4', 0.7164897209651799)
2 has hub: ('2', 0.3583084055475835)
1 has hub: ('1', 1.0)


In [13]:
sc.stop()