In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, coalesce, collect_list, size, sum as Fsum
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup

spark = SparkSession.builder \
    .appName("Task04_PageRank") \
    .getOrCreate()

In [3]:
def normalize_url(base_url, link):
    """
    Only keep links in the same domain as base_url; strip fragments and trailing slash.
    """
    parsed = urlparse(link)

    if parsed.netloc and parsed.netloc != urlparse(base_url).netloc:
        return None

    url = urljoin(base_url, parsed.path or "")
    # remove fragment
    url = url.split('#')[0]
    # strip trailing slash
    if url.endswith('/') and len(url) > len(base_url):
        url = url[:-1]
    return url

def crawl_domain(start_url, max_pages=10000, max_depth=5):
    """
    BFS crawl within start_url domain, up to max_pages or max_depth.
    Returns a list of (src, dst) URL tuples.
    """
    from collections import deque

    visited = set([start_url])
    queue   = deque([(start_url, 0)])
    edges   = []
    static_ext = {'.jpg','.jpeg','.png','.gif','.svg',
                  '.css','.js','.ico','.pdf','.mp4','.woff','.ttf'}

    while queue and len(visited) < max_pages:
        src, depth = queue.popleft()
        if depth >= max_depth:
            continue
        try:
            resp = requests.get(src, timeout=5)
            if 'text/html' not in resp.headers.get('Content-Type',''):
                continue
            soup = BeautifulSoup(resp.text, 'html.parser')
            for a in soup.find_all('a', href=True):
                dst = normalize_url(start_url, a['href'])
                if not dst:
                    continue
                # skip static assets
                if any(dst.lower().endswith(ext) for ext in static_ext):
                    continue
                edges.append((src, dst))
                if dst not in visited:
                    visited.add(dst)
                    queue.append((dst, depth + 1))
        except Exception:
            # timeouts, parse errors, etc.
            pass

    return edges


In [4]:
start_url = "https://it.tdtu.edu.vn"

edges_list = crawl_domain(start_url,
                          max_pages=5000,
                          max_depth=3)

edges_df = spark.createDataFrame(edges_list, schema=["src", "dst"]) \
                .cache()

class PageRank:
    def __init__(self, edges_df, damping=0.85):
        self.edges   = edges_df
        self.damping = damping

    def run(self, num_iters=10):
        # 1) collect all unique pages
        src_pages = self.edges.select("src").distinct()
        dst_pages = self.edges.select("dst").distinct()
        pages = src_pages.union(dst_pages).distinct().cache()

        # total count
        N = pages.count()

        # 2) initial uniform rank
        ranks = pages.withColumn("rank", lit(1.0 / N))

        # 3) adjacency list: for each src, list of neighbors
        links = self.edges.groupBy("src") \
                          .agg(collect_list("dst").alias("neighbors")) \
                          .cache()

        for i in range(num_iters):
            # 4a) compute contributions from non‑dangling nodes
            contribs = links.join(ranks, links.src == ranks.src) \
                            .select(
                                explode("neighbors").alias("dst"),
                                (col("rank") / size("neighbors")).alias("contrib")
                            )

            sums = contribs.groupBy("dst") \
                           .agg(Fsum("contrib").alias("total_contrib"))

            # 4b) dangling nodes (no out‑links) keep their rank and distribute evenly
            dangling = ranks.join(links, ranks.src == links.src, how="left_anti")
            dangling_sum = dangling.agg(Fsum("rank")).first()[0] or 0.0
            dangling_share = dangling_sum / N

            # 4c) teleportation (random jump) factor
            teleport = (1.0 - self.damping) / N

            # 4d) new ranks
            ranks = pages.withColumnRenamed("src", "dst") \
                         .join(sums, "dst", how="left") \
                         .select(
                              col("dst").alias("src"),
                              (lit(teleport) +
                               lit(self.damping) * (
                                 coalesce(col("total_contrib"), lit(0.0))
                                 + lit(dangling_share)
                               )
                              ).alias("rank")
                         )

        # cache final ranks
        self.ranks = ranks.cache()
        return self.ranks


In [5]:
pr = PageRank(edges_df, damping=0.85)
final_ranks = pr.run(num_iters=20)

# Show top 20 pages
final_ranks.orderBy(col("rank").desc()) \
           .show(20, truncate=False)

# write out to CSV
final_ranks.orderBy(col("rank").desc()) \
           .write.csv("pagerank_it_tdtu.csv",
                      header=True,
                      mode="overwrite")

# Gracefully stop Spark
spark.stop()

+-----------------------------------------------------------------------------------------------------------------------+--------------------+
|src                                                                                                                    |rank                |
+-----------------------------------------------------------------------------------------------------------------------+--------------------+
|https://it.tdtu.edu.vn                                                                                                 |0.07659125231741104 |
|https://it.tdtu.edu.vn/giao-duc                                                                                        |0.044229181122106276|
|https://it.tdtu.edu.vn/tuyen-sinh                                                                                      |0.03657383534849928 |
|https://it.tdtu.edu.vn/gioi-thieu                                                                                      |0.03623801188773295 |