In [1]:
# Kết nối Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://archive.apache.org/dist/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
# ! cp /content/drive/MyDrive/midterm_MMDS/pyspark/spark-3.5.4-bin-hadoop3.tgz .
# ! cp /content/drive/MyDrive/MMDS/spark/spark-3.5.4-bin-hadoop3.tgz .

! tar xf spark-3.5.4-bin-hadoop3.tgz
! pip install -q findspark

In [3]:
! du -sh spark-3.5.4-bin-hadoop3.tgz

383M	spark-3.5.4-bin-hadoop3.tgz


In [4]:
import findspark
import pyspark as spark

findspark.init()
print(spark.__version__)

3.5.1


In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Distributed PageRank") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "16") \
    .getOrCreate()

print("SparkSession đã được khởi tạo thành công.")

SparkSession đã được khởi tạo thành công.


In [6]:
from pyspark.sql.functions import col, count, lit, sum as sum_agg, abs, row_number
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window

In [7]:
def preprocess_data(spark, df):
    """
    Tiền xử lý dữ liệu: ánh xạ URL thành chỉ số và tạo DataFrame cạnh.

    Parameters:
    spark (SparkSession): SparkSession
    df (DataFrame): DataFrame chứa cột src và dst

    Returns:
    tuple: (edges_df, n, url_to_idx, idx_to_url)
    """
    # Lấy danh sách URL duy nhất và gán chỉ số
    urls_df = df.select("src").union(df.select("dst")).distinct().withColumnRenamed("src", "url")
    window = Window.orderBy("url")
    url_idx_df = urls_df.withColumn("idx", row_number().over(window) - 1)

    # Join để ánh xạ src và dst thành chỉ số
    edges_df = df.join(url_idx_df.withColumnRenamed("url", "src"), "src", "left") \
                 .withColumnRenamed("idx", "src_idx") \
                 .join(url_idx_df.withColumnRenamed("url", "dst"), "dst", "left") \
                 .withColumnRenamed("idx", "dst_idx") \
                 .select("src_idx", "dst_idx") \
                 .filter(col("src_idx").isNotNull() & col("dst_idx").isNotNull())

    # Đếm số node (số hàng trong url_idx_df)
    n = url_idx_df.count()


    print(f"Số trang web (node): {n}")
    print(f"Số liên kết (edge): {edges_df.count()}")
    print(f"Kiểu của n: {type(n)}")

    return edges_df, n, {}, {}

In [8]:
def compute_out_degree(spark, edges_df, n):
    """
    Tính số liên kết ra và xử lý dead ends.

    Parameters:
    spark (SparkSession): SparkSession
    edges_df (DataFrame): DataFrame chứa cạnh
    n (int): Số node

    Returns:
    DataFrame: out_degree_df chứa số liên kết ra
    """
    # Đảm bảo n là int
    n = int(n)

    # Tính out-degree
    out_degree_df = edges_df.groupBy("src_idx").agg(count("dst_idx").alias("out_degree"))

    # Tạo danh sách tất cả node
    all_nodes_df = spark.range(n).select(col("id").alias("node_idx"))

    # Kết hợp để xử lý dead ends
    out_degree_df = all_nodes_df.join(out_degree_df,
                                      all_nodes_df.node_idx == out_degree_df.src_idx,
                                      "left_outer") \
                                .select("node_idx", col("out_degree").alias("out_degree")) \
                                .fillna({"out_degree": 0})

    print("Đã tính số liên kết ra cho tất cả các node.")

    return out_degree_df

In [9]:
def compute_pagerank(spark, edges_df, n, out_degree_df, d=0.85, max_iter=50, tol=1e-4):
    """
    Tính PageRank phân tán, tránh lỗi unhashable type Column.

    Parameters:
    spark (SparkSession): SparkSession
    edges_df (DataFrame): DataFrame chứa cạnh
    n (int): Số node
    out_degree_df (DataFrame): DataFrame chứa out-degree
    d (float): Damping factor
    max_iter (int): Số lần lặp tối đa
    tol (float): Ngưỡng hội tụ

    Returns:
    DataFrame: DataFrame chứa (node_idx, pagerank)
    """
    # Đảm bảo n là int
    n = int(n)

    # Khởi tạo vector PageRank
    r = spark.createDataFrame([(i, 1.0 / n) for i in range(n)], ["node_idx", "pagerank"]).cache()

    for iteration in range(max_iter):
        # Tính đóng góp, sử dụng join với out_degree_df
        contrib_df = edges_df.join(r, edges_df.src_idx == r.node_idx, "inner") \
                            .join(out_degree_df, edges_df.src_idx == out_degree_df.node_idx, "inner") \
                            .select(edges_df.dst_idx,
                                    (d * col("pagerank") / col("out_degree")).alias("contrib")) \
                            .where(col("out_degree") > 0)

        # Gộp đóng góp và thêm teleport
        new_r = contrib_df.groupBy("dst_idx").agg(sum_agg("contrib").alias("sum_contrib")) \
                         .withColumn("pagerank", col("sum_contrib") + lit((1 - d) / n)) \
                         .select(col("dst_idx").alias("node_idx"), "pagerank")

        # Kết hợp với tất cả node để xử lý node không nhận liên kết
        all_nodes_df = spark.range(n).select(col("id").alias("node_idx"))
        new_r = all_nodes_df.join(new_r, all_nodes_df.node_idx == new_r.node_idx, "left_outer") \
                           .select(all_nodes_df.node_idx,
                                   col("pagerank").alias("pagerank")) \
                           .fillna({"pagerank": (1 - d) / n}).cache()

        # Chuẩn hóa tổng PageRank (sử dụng approx sum để tránh collect nếu cần)
        total_pr_df = new_r.agg(sum_agg("pagerank").alias("total"))
        total_pr = total_pr_df.first()["total"]  # first() thay collect()[0]

        new_r = new_r.withColumn("pagerank", col("pagerank") / lit(total_pr))

        # Kiểm tra hội tụ (sử dụng first() thay collect())
        diff_df = new_r.join(r, "node_idx", "inner") \
                      .withColumn("diff", abs(col("new_r.pagerank") - col("r.pagerank"))) \
                      .agg(sum_agg("diff").alias("l1_norm"))
        diff = diff_df.first()["l1_norm"]

        print(f"Iteration {iteration + 1}, L1 norm: {diff:.6f}")

        # Unpersist cũ, cập nhật r
        r.unpersist()
        r = new_r
        if diff < tol:
            print(f"Hội tụ sau {iteration + 1} lần lặp")
            break

    if iteration == max_iter - 1:
        print(f"Đạt số lần lặp tối đa ({max_iter})")

    r.unpersist()
    return new_r


In [11]:
def display_results(pagerank_df, idx_to_url, top_n=10, d=0.85, n=4622):
    """
    Hiển thị top trang web và thống kê teleport.

    Parameters:
    pagerank_df (DataFrame): DataFrame chứa (node_idx, pagerank)
    idx_to_url (dict): Ánh xạ từ chỉ số sang URL
    top_n (int): Số trang hiển thị
    d (float): Damping factor
    n (int): Số node
    """
    n = int(n)

    # Tính teleport value
    teleport_value = (1 - d) / n

    # Lấy top N trực tiếp từ Spark
    top_df = pagerank_df.orderBy(col("pagerank").desc()).limit(top_n)
    top_rows = top_df.collect()  # Thu thập chỉ top_n hàng

    # Tính tổng và dead ends phân tán
    total_pr_df = pagerank_df.agg(sum_agg("pagerank").alias("total"))
    total_pr = total_pr_df.first()["total"]

    dead_ends_df = pagerank_df.withColumn("is_dead_end", abs(col("pagerank") - lit(teleport_value)) < 1e-10)
    dead_ends_count = dead_ends_df.filter(col("is_dead_end")).count()

    print(f"Nodes relying only on teleport: {dead_ends_count}")

    print("\nTop trang web theo PageRank:")
    for row in top_rows:
        url = idx_to_url.get(row["node_idx"])
        print(f"URL: {url}\nScore: {row['pagerank']:.6f}\n")

    print(f"Tổng PageRank: {total_pr:.4f}")

In [12]:
from pyspark.sql import SparkSession
import pandas as pd
import os

class DistributedPageRank:
    def __init__(self, csv_path, driver_memory="4g", executor_memory="4g"):
        if not os.path.exists(csv_path):
            raise FileNotFoundError(f"File {csv_path} không tồn tại")

        self.spark = SparkSession.builder \
            .appName("Distributed PageRank") \
            .config("spark.driver.memory", driver_memory) \
            .config("spark.executor.memory", executor_memory) \
            .getOrCreate()

        self.csv_path = csv_path
        self.df = self.spark.read.csv(csv_path, header=True)
        if not all(col in self.df.columns for col in ["src", "dst"]):
            raise ValueError("File CSV phải có cột 'src' và 'dst'")

        self.n = 0
        self.edges_df = None
        self.url_to_idx = {}
        self.idx_to_url = {}
        self.out_degree_df = None
        self.pagerank_df = None

    def preprocess_data(self):
        self.edges_df, self.n, self.url_to_idx, self.idx_to_url = preprocess_data(self.spark, self.df)

    def compute_out_degree(self):
        self.out_degree_df = compute_out_degree(self.spark, self.edges_df, self.n)

    def compute_pagerank(self, d=0.85, max_iter=15, tol=1e-6):
        self.pagerank_df = compute_pagerank(self.spark, self.edges_df, self.n,
                                          self.out_degree_df, d, max_iter, tol)

    def display_results(self, top_n=10):
        if self.pagerank_df is None:
            print("Chưa tính PageRank. Vui lòng chạy compute_pagerank() trước.")
            return
        display_results(self.pagerank_df, self.idx_to_url, top_n, d=0.85, n=self.n)

    def save_results(self, output_path):
        self.pagerank_df.write.csv(f"{output_path}/pagerank", mode="overwrite")
        pd.DataFrame(self.idx_to_url.items(), columns=["idx", "url"]) \
          .to_csv(f"{output_path}/url_mapping.csv", index=False)
        print(f"Kết quả đã được lưu tại {output_path}")

    def cleanup(self):
        if self.edges_df:
            self.edges_df.unpersist()
        if self.pagerank_df:
            self.pagerank_df.unpersist()
        if self.out_degree_df:
            self.out_degree_df.unpersist()
        self.spark.stop()
        print("SparkSession đã được dừng.")

In [15]:
# Đường dẫn
csv_path = "/content/drive/MyDrive/MMDS/Final/source/dataset_link_web1.csv"
output_path = "/content/drive/MyDrive/MMDS/Final/output"

pr = DistributedPageRank(csv_path)
pr.preprocess_data()
pr.compute_out_degree()
pr.compute_pagerank(d=0.85, max_iter=15, tol=1e-6)
pr.display_results(top_n=10)
pr.save_results(output_path)


Số trang web (node): 2479
Số liên kết (edge): 39856
Kiểu của n: <class 'int'>
Đã tính số liên kết ra cho tất cả các node.
Iteration 1, L1 norm: 1.506295
Iteration 2, L1 norm: 0.573540
Iteration 3, L1 norm: 0.154672
Iteration 4, L1 norm: 0.056789
Iteration 5, L1 norm: 0.021351
Iteration 6, L1 norm: 0.014512
Iteration 7, L1 norm: 0.009896
Iteration 8, L1 norm: 0.007265
Iteration 9, L1 norm: 0.005205
Iteration 10, L1 norm: 0.003763
Iteration 11, L1 norm: 0.002707
Iteration 12, L1 norm: 0.001950
Iteration 13, L1 norm: 0.001403
Iteration 14, L1 norm: 0.001009
Iteration 15, L1 norm: 0.000726
Đạt số lần lặp tối đa (15)
Nodes relying only on teleport: 0

Top trang web theo PageRank:
URL: https://it.tdtu.edu.vn/
Score: 0.041608

URL: https://it.tdtu.edu.vn/en
Score: 0.038987

URL: https://it.tdtu.edu.vn/vien-chuc
Score: 0.032443

URL: https://it.tdtu.edu.vn/sinh-vien
Score: 0.032443

URL: https://it.tdtu.edu.vn/tin-tuc/tuyen-dung
Score: 0.031879

URL: https://it.tdtu.edu.vn/gioi-thieu
Score: 0.