# Import libraries

In [None]:
pip install requests beautifulsoup4 tldextract


Collecting tldextract
  Downloading tldextract-5.3.0-py3-none-any.whl.metadata (11 kB)
Collecting requests-file>=1.4 (from tldextract)
  Downloading requests_file-2.1.0-py2.py3-none-any.whl.metadata (1.7 kB)
Downloading tldextract-5.3.0-py3-none-any.whl (107 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m107.4/107.4 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading requests_file-2.1.0-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: requests-file, tldextract
Successfully installed requests-file-2.1.0 tldextract-5.3.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import regexp_replace, when, col,udf,count
from pyspark.sql.types import DoubleType, StringType, StructType, StructField
import pyspark.sql.window as W

import time
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse,urlunparse

# Crawl

In [None]:
# Khởi tạo SparkSession
spark = SparkSession.builder.appName("Crawler").getOrCreate()

# Domain gốc
base_domain = "it.tdtu.edu.vn"
base_url = "https://it.tdtu.edu.vn"

# Kiểm tra URL có cùng domain
def is_same_domain(url):
    try:
        netloc = urlparse(url).netloc
        return base_domain in netloc
    except:
        return False

# Kiểm tra URL có phải file tĩnh (loại trừ)
def is_static_file(url):
    static_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg',
                         '.css', '.js', '.ico', '.pdf', '.mp4', '.avi',
                         '.mov', '.wmv', '.flv', '.mp3', '.zip', '.rar',
                         '.exe', '.tar', '.gz']
    url_lower = url.lower()
    return any(url_lower.endswith(ext) for ext in static_extensions)

# Crawl hàm crawl 1 trang
def crawl_page(url):
    print(f"Crawling: {url}")
    try:
        response = requests.get(url, timeout=5)
        if response.status_code != 200:
            print(f"Failed to retrieve {url} - status code {response.status_code}")
            return []
        soup = BeautifulSoup(response.text, 'html.parser')
        links = []
        for a in soup.find_all('a', href=True):
            href = a['href']
            full_url = urljoin(url, href)
            if is_same_domain(full_url) and not is_static_file(full_url):
                links.append(full_url)
        return links
    except Exception as e:
        print(f"Error crawling {url}: {e}")
        return []

# Main crawl process
from collections import deque

visited = set()
queue = deque([base_url])
edges = []

start_time = time.time()

while queue:
    current_url = queue.popleft()
    if current_url in visited:
        continue
    visited.add(current_url)
    out_links = crawl_page(current_url)
    for link in out_links:
        edges.append((current_url, link))
        if link not in visited and link not in queue:
            queue.append(link)

end_time = time.time()
print(f"Crawling finished. Total pages crawled: {len(visited)}. Time taken: {end_time - start_time:.2f} seconds.")

# Tạo DataFrame PySpark từ edges
schema = StructType([
    StructField("source_url", StringType(), False),
    StructField("dest_url", StringType(), False)
])

edges_df = spark.createDataFrame(edges, schema)
edges_df.show(10, truncate=False)



Crawling: https://it.tdtu.edu.vn
Crawling: https://it.tdtu.edu.vn#main-content
Crawling: https://it.tdtu.edu.vn/en
Crawling: https://it.tdtu.edu.vn/giao-vien
Crawling: https://it.tdtu.edu.vn/
Crawling: https://it.tdtu.edu.vn/gioi-thieu
Crawling: https://it.tdtu.edu.vn/giao-duc
Crawling: https://it.tdtu.edu.vn/khoa-hoc-cong-nghe
Crawling: https://it.tdtu.edu.vn/tin-tuc-khoa
Crawling: https://it.tdtu.edu.vn/doanh-nghiep
Crawling: https://it.tdtu.edu.vn/tuyen-sinh
Crawling: https://it.tdtu.edu.vn/vien-chuc
Failed to retrieve https://it.tdtu.edu.vn/vien-chuc - status code 404
Crawling: https://it.tdtu.edu.vn/sinh-vien
Failed to retrieve https://it.tdtu.edu.vn/sinh-vien - status code 404
Crawling: https://it.tdtu.edu.vn/iccies
Crawling: https://it.tdtu.edu.vn/tuyen-sinh/thac-si-nganh-khoa-hoc-may-tinh
Crawling: https://it.tdtu.edu.vn/truong-dai-hoc-ton-duc-thang-ky-ket-hop-tac-voi-tap-doan-ttc
Crawling: https://it.tdtu.edu.vn/tdtu-to-chuc-doi-thoai-sinh-vien-hoc-ky-2-nam-hoc-2024-2025
Crawl


Assuming this really is an XML document, what you're doing might work, but you should know that using an XML parser will be more reliable. To parse this document as XML, make sure you have the Python package 'lxml' installed, and pass the keyword argument `features="xml"` into the BeautifulSoup constructor.




  soup = BeautifulSoup(response.text, 'html.parser')


[1;30;43mKết quả truyền trực tuyến bị cắt bớt đến 5000 dòng cuối.[0m
Crawling: https://it.tdtu.edu.vn/user/login?destination=/fpt-telecom-tuyen-dung-ctv%23comment-form#main-content
Crawling: https://it.tdtu.edu.vn/user/login?destination=/sun-life-viet-nam-trien-khai-chuong-trinh-tech-up-your-career-2021%23comment-form#main-content
Crawling: https://it.tdtu.edu.vn/user/login?destination=/chuong-trinh-momo-talents-2021%23comment-form#main-content
Error crawling https://it.tdtu.edu.vn/user/login?destination=/chuong-trinh-momo-talents-2021%23comment-form#main-content: HTTPSConnectionPool(host='it.tdtu.edu.vn', port=443): Read timed out. (read timeout=5)
Crawling: https://it.tdtu.edu.vn/user/login?destination=/chuong-trinh-thuc-tap-sinh-internship-at-home-cua-dxc%23comment-form#main-content
Crawling: https://it.tdtu.edu.vn/user/login?destination=/fujinet-tuyen-dung-thang-62021%23comment-form#main-content
Crawling: https://it.tdtu.edu.vn/user/login?destination=/ekino-vietnam-tuyen-dung-vi-

# Data preprocessing

In [None]:
# Nhóm theo cả hai cột và đếm số lần xuất hiện
duplicate_pairs_df = edges_df.groupBy("source_url", "dest_url") \
    .agg(count("*").alias("count")) \
    .filter(col("count") > 1)

# Kiểm tra có dòng trùng lặp không
has_duplicates = duplicate_pairs_df.limit(1).count() > 0

print("Có cặp (source_url, dest_url) trùng lặp:", has_duplicates)

# Nếu muốn xem luôn các cặp bị trùng
duplicate_pairs_df.show(truncate=False)


Có cặp (source_url, dest_url) trùng lặp: True
+-----------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+-----+
|source_url                                                                                                                         |dest_url                                                     |count|
+-----------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------+-----+
|https://it.tdtu.edu.vn/node/1293                                                                                                   |https://it.tdtu.edu.vn/tuyen-sinh                            |2    |
|https://it.tdtu.edu.vn/en/gioi-thieu                                                                                               |https://it.td

In [None]:
duplicate_pairs_df.count()

55870

In [None]:

# Lọc các dòng có source_url == dest_url
self_loop_df = edges_df.filter(col("source_url") == col("dest_url"))

# Kiểm tra có dòng nào như vậy không
has_self_loop = self_loop_df.limit(1).count() > 0

print("Có dòng self-loop (source_url == dest_url):", has_self_loop)


Có dòng self-loop (source_url == dest_url): True


In [None]:
self_loop_df.count()

5156

In [None]:

# Bước 1: Loại bỏ các dòng self-loop
no_self_loop_df = edges_df.filter(col("source_url") != col("dest_url"))

# Bước 2: Xoá các dòng trùng lặp (giữ lại dòng đầu tiên của mỗi cặp source-dest)
deduped_df = no_self_loop_df.dropDuplicates(["source_url", "dest_url"])

# Kết quả cuối cùng
print("Data sau khi xoá self-loop và loại bỏ cặp (source_url, dest_url) trùng:")
deduped_df.show(truncate=False)


Data sau khi xoá self-loop và loại bỏ cặp (source_url, dest_url) trùng:
+-----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------+
|source_url                                                                                                             |dest_url                                                                                                     |
+-----------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------+
|https://it.tdtu.edu.vn/giao-vien                                                                                       |https://it.tdtu.edu.vn/en                                                                                    |


In [None]:
deduped_df.count()

164646

In [None]:

# Bước 1: Chuẩn hoá URL: loại bỏ http:// và https://, rồi giữ lại bản https nếu có
def normalize_url_column(df, col_name):
    return df.withColumn(
        f"normalized_{col_name}",
        regexp_replace(regexp_replace(col(col_name), "^https?://", ""), "/$", "")
    )

# Chuẩn hoá cả hai cột source và dest
normalized_df = normalize_url_column(edges_df, "source_url")
normalized_df = normalize_url_column(normalized_df, "dest_url")

# Bước 2: Loại bỏ self-loop
no_self_loop_df = normalized_df.filter(
    col("normalized_source_url") != col("normalized_dest_url")
)

# Bước 3: Ưu tiên giữ bản ghi `https` nếu có cùng normalized domain
# Tạo thêm cột đánh dấu mức độ ưu tiên (1 nếu là https, 0 nếu là http)
prioritized_df = no_self_loop_df.withColumn(
    "source_priority", when(col("source_url").startswith("https"), 1).otherwise(0)
).withColumn(
    "dest_priority", when(col("dest_url").startswith("https"), 1).otherwise(0)
)

# Bước 4: Với mỗi cặp normalized, chọn bản ghi có priority cao nhất
from pyspark.sql import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("normalized_source_url", "normalized_dest_url") \
                    .orderBy(col("source_priority").desc(), col("dest_priority").desc())

deduped_df = prioritized_df.withColumn("row_num", row_number().over(window_spec)) \
                           .filter(col("row_num") == 1) \
                           .drop("row_num", "source_priority", "dest_priority",
                                 "normalized_source_url", "normalized_dest_url")

# Bước 5: Kết quả cuối cùng
print("Data sau khi xoá self-loop và loại bản http trùng:")
deduped_df.show(truncate=False)


Data sau khi xoá self-loop và loại bản http trùng:
+-------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
deduped_df.count()

82476

In [None]:
df=deduped_df

In [None]:

def remove_fragment(url):
    if url is None:
        return None
    parsed = urlparse(url)
    no_fragment = parsed._replace(fragment='')
    return urlunparse(no_fragment)

remove_fragment_udf = udf(remove_fragment, StringType())

df_clean = df.withColumn("source_url", remove_fragment_udf(col("source_url"))) \
             .withColumn("dest_url", remove_fragment_udf(col("dest_url")))


In [None]:
df_clean.count()

82476

In [None]:
df_clean.show(truncate=False)

+-------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:

# Bước 1: Loại bỏ các dòng self-loop
no_self_loop_df_1 = df_clean.filter(col("source_url") != col("dest_url"))

# Bước 2: Xoá các dòng trùng lặp (giữ lại dòng đầu tiên của mỗi cặp source-dest)
deduped_df_1 = no_self_loop_df_1.dropDuplicates(["source_url", "dest_url"])

# Kết quả cuối cùng
print("Data sau khi xoá self-loop và loại bỏ cặp (source_url, dest_url) trùng:")
deduped_df_1.show(truncate=False)


Data sau khi xoá self-loop và loại bỏ cặp (source_url, dest_url) trùng:
+---------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|source_url                                                                                               |dest_url                                                                                               |
+---------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+
|https://it.tdtu.edu.vn/buoi-gap-go-tan-sinh-vien-khoa-cong-nghe-thong-tin                                |https://it.tdtu.edu.vn/en                                                                              |
|https://it.tdtu.edu.vn/career-talk-voi-cong-ty-robert-bosch-viet-nam-09112017  

In [None]:
deduped_df_1.count()

39831

In [None]:
# Đếm số dòng
num_rows = deduped_df_1.count()
print(f"Số dòng trong DataFrame: {num_rows}")

# Lưu DataFrame thành CSV (vd. đường dẫn lưu file)
deduped_df_1.write.mode("overwrite").csv("s", header=True)


Số dòng trong DataFrame: 39831


In [None]:
from pyspark.sql.functions import udf, col

# Hàm Python bạn tạo
def remove_percent_and_after(url: str) -> str:
    if url is None:
        return None
    pos = url.find('%')
    if pos != -1:
        return url[:pos]
    return url

# Đăng ký hàm này thành UDF
remove_percent_udf = udf(remove_percent_and_after, StringType())

# Áp dụng hàm UDF vào các cột
df_cleaned = df.withColumn(
    "source_clean", remove_percent_udf(col("source_url"))
).withColumn(
    "dest_clean", remove_percent_udf(col("dest_url"))
)


# Lọc các dòng có source_clean == dest_clean
df_self_links = df_cleaned.filter(col("source_clean") != col("dest_clean"))

# Hiển thị kết quả
df_self_links.select("source_clean", "dest_clean").show(truncate=False)

+---------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|source_clean                                                                                             |dest_clean                                                                              |
+---------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+
|https://it.tdtu.edu.vn/buoi-gap-go-tan-sinh-vien-khoa-cong-nghe-thong-tin                                |https://it.tdtu.edu.vn/en                                                               |
|https://it.tdtu.edu.vn/career-talk-voi-cong-ty-robert-bosch-viet-nam-09112017                            |https://it.tdtu.edu.vn/gioi-thieu                                                       |
|https://it.tdt

In [None]:
deduped_df = df_self_links.dropDuplicates(["source_clean", "dest_clean"])


In [None]:
df_1=deduped_df.drop("source_url","dest_url")

In [None]:
df_1.count()

39795

# Class PageRank

In [None]:
# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("PageRank Implementation") \
    .getOrCreate()

# Định nghĩa schema cho file CSV
schema = StructType([
    StructField("source_url", StringType(), True),
    StructField("dest_url", StringType(), True)
])

In [None]:
# Đọc tất cả các file CSV và tạo một DataFrame duy nhất
def create_dataframe_from_csv(csv_files, schema=None):
    """
    Đọc nhiều file CSV và kết hợp thành một DataFrame duy nhất

    Parameters
    ----------
    csv_files : list
        Danh sách các đường dẫn file CSV
    schema : StructType, optional
        Schema cho DataFrame

    Returns
    -------
    df : DataFrame
        DataFrame được tạo từ các file CSV
    """
    dfs = []

    for file_path in csv_files:
        if schema:
            df = spark.read.format("csv").schema(schema).option("header", "true").load(file_path)
        else:
            df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(file_path)
        dfs.append(df)

    if not dfs:
        return None

    # Kết hợp tất cả các DataFrame
    combined_df = dfs[0]
    for df in dfs[1:]:
        combined_df = combined_df.union(df)

    return combined_df


In [None]:

class PageRank:
    def __init__(self, alpha=0.85, max_iter=100, tol=1.0e-6):
        self.alpha = alpha
        self.max_iter = max_iter
        self.tol = tol
        self.spark = SparkSession.builder.getOrCreate()

    def compute_pagerank(self, edges_df):
        src_nodes = edges_df.select("source_clean").distinct()
        dst_nodes = edges_df.select("dest_clean").distinct()
        all_nodes = src_nodes.union(dst_nodes).distinct()

        N = all_nodes.count()
        print(f"Total unique nodes: {N}")

        nodes_with_id = all_nodes.withColumnRenamed(
            all_nodes.columns[0], "node"
        ).withColumn("id", F.monotonically_increasing_id())

        edges_with_id = edges_df.join(
            nodes_with_id.withColumnRenamed("node", "source_clean").withColumnRenamed("id", "src_id"),
            on="source_clean"
        ).join(
            nodes_with_id.withColumnRenamed("node", "dest_clean").withColumnRenamed("id", "dst_id"),
            on="dest_clean"
        ).select("src_id", "dst_id")

        out_degrees = edges_with_id.groupBy("src_id").count().withColumnRenamed("count", "out_degree")

        edges_with_weight = edges_with_id.join(out_degrees, on="src_id", how="left").withColumn(
            "weight", 1.0 / F.col("out_degree")
        )

        all_node_ids = nodes_with_id.select("id")
        nodes_with_out_links = edges_with_id.select("src_id").distinct()
        dangling_nodes = all_node_ids.join(
            nodes_with_out_links.withColumnRenamed("src_id", "id"),
            on="id", how="left_anti"
        )

        dangling_count = dangling_nodes.count()
        print(f"Number of dangling nodes: {dangling_count}")

        initial_rank = 1.0 / N
        pageranks = nodes_with_id.select("id", F.lit(initial_rank).alias("rank"))

        for iteration in range(self.max_iter):
            prev_pageranks = pageranks

            if dangling_count > 0:
                dangling_sum = pageranks.join(
                    dangling_nodes, on="id", how="inner"
                ).select("rank").rdd.map(lambda row: row[0]).reduce(lambda a, b: a + b)
            else:
                dangling_sum = 0.0

            dangling_contribution = dangling_sum / N

            contributions = edges_with_weight.join(
                pageranks, edges_with_weight.src_id == pageranks.id, how="inner"
            ).select(
                edges_with_weight.dst_id.alias("id"),
                (edges_with_weight.weight * pageranks.rank * self.alpha).alias("contribution")
            )

            aggregated_contributions = contributions.groupBy("id").agg(
                F.sum("contribution").alias("sum_contributions")
            )

            teleport_value = (1.0 - self.alpha) / N
            pageranks = all_node_ids.join(
                aggregated_contributions, on="id", how="left"
            ).na.fill(0, ["sum_contributions"]).withColumn(
                "rank",
                F.col("sum_contributions") + F.lit(dangling_contribution * self.alpha) + F.lit(teleport_value)
            ).select("id", "rank")

            joined_ranks = prev_pageranks.join(
                pageranks.withColumnRenamed("rank", "new_rank"),
                on="id", how="inner"
            ).select(
                F.abs(F.col("new_rank") - F.col("rank")).alias("error")
            )

            error = joined_ranks.rdd.map(lambda row: row[0]).reduce(lambda a, b: a + b)

            print(f"Iteration {iteration+1}, Error: {error}")

            if error < self.tol * N:
                print(f"Converged after {iteration+1} iterations")
                break

        result = pageranks.join(
            nodes_with_id, on="id", how="inner"
        ).select("node", "rank").orderBy(F.desc("rank"))

        return result


In [None]:
# Sử dụng ví dụ:
if __name__ == "__main__":
    # Danh sách các file CSV
    csv_files = [
        "part-00000-5b8c25b8-0488-4aa9-963e-ccbb83cd7c26-c000.csv",
        "part-00001-5b8c25b8-0488-4aa9-963e-ccbb83cd7c26-c000.csv"
    ]

    # Đọc các file CSV
    # df = create_dataframe_from_csv(csv_files, schema)

    # Kiểm tra dữ liệu
    print("DataFrame schema:")
    df_1.printSchema()
    print("\nSample data:")
    df_1.show(5)
    print(f"Total rows: {df_1.count()}")

    # Khởi tạo lớp PageRank
    pagerank = PageRank(alpha=0.85, max_iter=100, tol=1.0e-6)

    # Tính PageRank
    result_df = pagerank.compute_pagerank(df_1)

    # Hiển thị 20 trang có PageRank cao nhất
    print("\nTop 20 pages by PageRank:")
    result_df.show(20, truncate=False)

    # Dừng SparkSession


DataFrame schema:
root
 |-- source_clean: string (nullable = true)
 |-- dest_clean: string (nullable = true)


Sample data:
+--------------------+--------------------+
|        source_clean|          dest_clean|
+--------------------+--------------------+
|https://it.tdtu.e...|https://it.tdtu.e...|
|https://it.tdtu.e...|https://it.tdtu.e...|
|https://it.tdtu.e...|https://it.tdtu.e...|
|https://it.tdtu.e...|https://it.tdtu.e...|
|https://it.tdtu.e...|https://it.tdtu.e...|
+--------------------+--------------------+
only showing top 5 rows

Total rows: 39795
Total unique nodes: 2409
Number of dangling nodes: 150
Iteration 1, Error: 1.4380859278465723
Iteration 2, Error: 0.5090778415079972
Iteration 3, Error: 0.14821014835772947
Iteration 4, Error: 0.055796332311507525
Iteration 5, Error: 0.019069417589099916
Iteration 6, Error: 0.012190579185299548
Iteration 7, Error: 0.008002808742455486
Iteration 8, Error: 0.00558518544318881
Iteration 9, Error: 0.0038702045280217534
Iteration 10, Erro