Join operation có 3 tiêu chí:

* Broadcast joins for unbalanced datasets
* Sort-merge joins for large-scale data
* Multiple joins optimization


Tất cả các phương thức join trên đều được tự động nhận diện tùy vào 2 bảng cần join

Load data vào và build bảng chứa company_detail

TODO là phần cần chỉnh sửa

CHƯA CHẠY TEST

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("JSONL_Loader") \
    .getOrCreate()

In [None]:
from pyspark.sql import SparkSession

# 1. Khởi tạo Spark Session (Bắt buộc)
# Thêm config dfs.client.use.datanode.hostname để fix lỗi kết nối DataNode
spark = SparkSession.builder \
    .appName("Doc Data Tu HDFS") \
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true") \
    .getOrCreate()

# 2. Đường dẫn HDFS (Sửa lại host/port cho đúng với setup của bạn)
# Nếu chạy từ local máy tính kết nối vào Docker: dùng localhost hoặc host.docker.internal
hdfs_path = "hdfs://host.docker.internal:9000/path/to/file.jsonl" 
# Hoặc nếu chạy production/internal network:
# hdfs_path = "hdfs://namenode:9000/path/to/file.jsonl"

# 3. Đọc file
# Do Spark hoạt động theo cơ chế Lazy Evaluation, dòng này chỉ mới check metadata
df_raw = spark.read.json(hdfs_path)

# 4. In Schema (Dòng này mới thực sự kích hoạt việc kết nối tới NameNode/DataNode)
df_raw.printSchema()

In [None]:
# chưa cần chạy vội
hdfs_path = "hdfs://<your_hdfs_namenode>:<port>/path/to/your/file.jsonl"
# hdfs_path = "hdfs://host.docker.internal:9000/path/to/file.jsonl"

df_raw = spark.read.json(hdfs_path)
df_raw.printSchema()

In [None]:
# Thử nghiệm

from pyspark.sql.functions import col

df_flat = df_raw.select(
    col("job_title"),
    col("company_name").alias("main_company_name"), # Đổi tên để tránh trùng lặp
    col("salary"),
    # Trích xuất các trường con từ 'company_detail'
    col("company_detail.company_name").alias("detail_company_name"),
    col("company_detail.declaration"),
    col("company_detail.`Company type`").alias("company_type"), # Dùng dấu `` cho tên cột có khoảng trắng
    col("company_detail.`Company industry`").alias("company_industry"),
    col("company_detail.`Company size`").alias("company_size"),
    col("company_detail.Country"),
    col("company_detail.`Working days`").alias("working_days"),
    col("company_detail.`Overtime policy`").alias("overtime_policy")
    
)

df_flat.printSchema()
df_flat.show(5)

In [None]:
df_company_detail = df_raw.select(
    
    
    # Trích xuất các trường con từ 'company_detail'
    col("company_detail.company_name").alias("company_key"),
    col("company_detail.declaration"),
    col("company_detail.`Company type`").alias("company_type"), # Dùng dấu `` cho tên cột có khoảng trắng
    col("company_detail.`Company industry`").alias("company_industry"),
    col("company_detail.`Company size`").alias("company_size"),
    col("company_detail.Country"),
    col("company_detail.`Working days`").alias("working_days"),
    col("company_detail.`Overtime policy`").alias("overtime_policy")
    
).distinct()

# df chính để join broadcast
df_job = df_raw.select(
    col("*"),
    col("company_name").alias("company_key_for_join")
).drop("company_detail") # Xóa cột lồng nhau sau khi đã trích xuất Khóa


**Xử lý _broadcast join for unbalanced dataset_** bằng cách sử dụng trường thông tin company_detail chỉ có ở IT_viec rồi join với các bảng chung khác (bởi vì 1 công ty có thể đăng tuyển ở nhiều web).

Cụ thể hơn thì load bảng company_detail này vào bộ nhớ cache của tất cả các worker để làm việc nhanh chóng hơn.

Cách thức hoạt động: Apache Spark sẽ gửi toàn bộ bảng nhỏ hơn đến bộ nhớ đệm (cache) của mỗi node trong cluster. Việc join sau đó diễn ra cục bộ trên mỗi node mà không cần trao đổi dữ liệu qua mạng (shuffle), giúp giảm đáng kể độ trễ.

**Broadcast tự động**

In [None]:
from pyspark.sql import SparkSession
# ngưỡng 20MB

# cấu hình từ đầu
spark_auto = SparkSession.builder \
    .appName("AutoBroadcastJoin") \
    .config("spark.sql.autoBroadcastJoinThreshold", "20971520") \
    .getOrCreate()
    
# cấu hình trong lúc chạy
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 15728640)  # 15MB

# Spark tự động kiểm tra kích thước df_nho và áp dụng Broadcast Join
df_ket_qua_auto = df_job.join(
    df_company_detail,
    df_job["company_key_for_join"] == df_company_detail["company_key"],
    "inner"
)

# Để xem kế hoạch thực thi (Execution Plan) và xác nhận Broadcast Join được áp dụng:
print("--- Execution Plan (Auto Broadcast) ---")
df_ket_qua_auto.explain()

**Thủ công**

Với dữ liệu của mình thì company_detail chỉ khoảng 300-400KB, còn df trong bộ nhớ ram thì khoảng 1.1 MB - 3.8 MB

In [None]:
from pyspark.sql.functions import broadcast

df_ket_qua_manual = df_job.join(
    # Sử dụng hàm broadcast() trên DataFrame nhỏ hơn
    broadcast(df_company_detail), 
    df_job["company_key_for_join"] == df_company_detail["company_key"],
    "inner"
)

# Xem kết quả
df_ket_qua_manual.show()

# Xem kế hoạch thực thi để xác nhận (chắc chắn sẽ là BroadcastHashJoin)
print("--- Execution Plan (Manual Broadcast) ---")
df_ket_qua_manual.explain()

**Xử lý _Sort-merge joins for large-scale data_** (nghĩa là xử lý 2 bảng lớn)

Cơ chế: shuffle - sort - merge. Shuffle trên toàn cụm để các key join phù hợp nằm về cùng worker/partition. Sort để sắp xếp các mẫu dữ liệu trong 1 partition 1 cách có trật tự. Merge giống như trong sắp xếp merge-sort, duyệt qua từng mẫu của 2 bên để khớp key join với nhau.

Triển khai:

Đây là tự động nếu gọi join 2 bảng lớn

Vì chưa biết cần join những bảng lớn nào nên cứ partition làm sao cho tối ưu bước lưu trữ. Đến bước này thì shuffle lại. Bước shuffle là bước có cost lớn nhất nên cần tối ưu, nếu bắt buộc (thường dùng) thì partition trước. Nên sử dụng 2 bảng có tính đồng nhất cao để join để bớt bước shuffle

Tải thêm dataset từ huggingface để join
* IT resume https://huggingface.co/datasets/datasetmaster/resumes
* IT Job Roles Skills Dataset - kaggle/huggingface https://huggingface.co/datasets/NxtGenIntern/IT_Job_Roles_Skills_Certifications_Dataset

Triển khai:
* Với IT resume: explode cột skill của 2 bảng job và resume rồi join merge để kiểm tra sự chồng chéo skill giữa 2 bảng, nó sẽ tạo ra một bảng trung gian cực lớn được join theo từng kĩ năng, sau đó có thể ứng dụng tiếp. 
  * Ý nghĩa: 
    * Nhu cầu tương đối: Kỹ năng nào được yêu cầu nhiều (Job_ID lặp lại nhiều) và có nhiều ứng viên (Resume_ID lặp lại nhiều) cùng một lúc. 
    * Mức độ cạnh tranh: Bằng cách GROUP BY Skill_Name và COUNT (Job_ID) / COUNT (Resume_ID), bạn có thể đo lường mức độ cạnh tranh thực tế của một kỹ năng trên thị trường.

Ý tưởng:
* IT job - IT skill theo các key trung gian: 
  * IT role(vị trí công việc): chi tiết hơn về công việc
  * skill name: đề xuất các certificate với mỗi công việc, skill
* IT resume - IT skill:
  * skill/role: đề xuất certificate cần thêm với skill, role ứng tuyển hiện tại

In [1]:
from datasets import load_dataset

IT_skill = load_dataset("NxtGenIntern/IT_Job_Roles_Skills_Certifications_Dataset")
IT_skill

  from .autonotebook import tqdm as notebook_tqdm


DatasetDict({
    train: Dataset({
        features: ['Job Title', 'Job Description', 'Skills', 'Certifications'],
        num_rows: 207
    })
})

In [2]:
from datasets import load_dataset
IT_skill_df = IT_skill["train"]
IT_skill_df = IT_skill_df.to_pandas()
print(type(IT_skill_df))
IT_skill_df.head()


<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,Job Title,Job Description,Skills,Certifications
0,Admin Big Data,Responsible for managing and overseeing big da...,"Hadoop, Spark, MapReduce, Data Lakes, Data War...","Cloudera Certified Professional (CCP), Hortonw..."
1,Ansible Operations Engineer,Focuses on automating IT processes using Ansib...,"Ansible, Linux, Automation, Cloud Platforms, C...",Red Hat Certified Specialist in Ansible Automa...
2,Artifactory Administrator,Manages the Artifactory repository for build a...,"Artifactory, CI/CD, Jenkins, Docker, Maven, Gr...","JFrog Artifactory Certification, DevOps Instit..."
3,Artificial intelligence / Machine Learning Eng...,No description available,,
4,Artificial Intelligence / Machine Learning Leader,"Leads AI/ML projects and teams, defining strat...","AI Strategy, Machine Learning, Team Management...","AI-900: Microsoft Azure AI Fundamentals, Certi..."


In [3]:
from huggingface_hub import hf_hub_download
import pandas as pd

REPO_ID = "datasetmaster/resumes"
FILENAME = "master_resumes.jsonl"


IT_resume_df = pd.read_json(
    hf_hub_download(repo_id=REPO_ID, filename=FILENAME, repo_type="dataset"),
    lines = True
)

print(type(IT_resume_df))
IT_resume_df.head()

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,personal_info,experience,education,skills,projects,certifications,achievements,workshops,publications,teaching_experience,internships
0,"{'name': 'Unknown', 'email': 'Unknown', 'phone...","[{'company': 'Fresher', 'company_info': {'indu...","[{'degree': {'level': 'ME', 'field': 'Computer...",{'technical': {'programming_languages': [{'nam...,"[{'name': 'Unknown', 'description': 'Unknown',...",,,,,,
1,"{'name': 'Unknown', 'email': 'Unknown', 'phone...","[{'company': 'Delta Controls, Dubai FZCO', 'co...","[{'degree': {'level': 'B.E', 'field': 'Electro...",{'technical': {'project_management': [{'name':...,"[{'name': 'FGP/WPMP', 'description': 'Led syst...",,,,,,
2,"{'name': 'Not Provided', 'email': 'Not Provide...","[{'company': 'Parkar Consulting and Labs', 'co...","[{'degree': {'level': 'B.E.', 'field': 'Not Pr...",{'technical': {'programming_languages': [{'nam...,"[{'name': 'FPGA Implementation', 'description'...",,,,,,
3,"{'name': 'Unknown', 'email': 'Unknown', 'phone...","[{'company': 'Delta Controls, Dubai FZCO', 'co...","[{'degree': {'level': 'B.E', 'field': 'Electro...",{'technical': {'project_management': [{'name':...,"[{'name': 'FGP/WPMP', 'description': 'Led syst...",,,,,,
4,"{'name': '', 'email': '', 'phone': '', 'locati...","[{'company': 'Atos Syntel', 'company_info': {'...",[{'degree': {'level': 'Bachelor of Engineering...,{'technical': {'programming_languages': [{'nam...,[],"{""name"": ""ESD Program"", ""issuer"": ""Zensar Tech...","[Treasurer in IEEE student branch at JSCOE, Pu...","[{'name': 'Medical IoT', 'issuer': 'IEEE Stand...",,,


Cấu hình ngưỡng của broadcast cực nhỏ để khi chạy join() thì auto là sort-merge join

In [4]:
# cấu hình broadcast để thử nghiệm với dữ liệu nhỏ
from pyspark.sql import SparkSession

# cấu hình từ đầu
spark_auto = SparkSession.builder \
    .appName("AutoBroadcastJoin") \
    .config("spark.sql.autoBroadcastJoinThreshold", "20971520") \
    .getOrCreate()

# cấu hình trong lúc chạy
spark_auto.conf.set("spark.sql.autoBroadcastJoinThreshold", 32)  # 15MB



In [None]:
# TODO: Thay thế Job Title thành các cột tương ứng
IT_job_detail_df = IT_job_df.join(
    IT_skill_df,
    IT_job_df['Job Title']== IT_skill_df['Job Title'],
    'inner'
)



Xử lý job - resume : explode + join theo key là skill

In [None]:
# Explode cột skill trong 2 bảng
IT_job_exploded_df = IT_job_detail_df.withColumn("Skills", F.explode("Skills"))
IT_resume_exploded_df = IT_resume_exploded_df.withColumn("Skills", F.explode("Skills"))

# Join 2 bảng với cột đã có, tạo ra một bảng cực lớn
IT_job_resume_df = IT_job_exploded_df.join(
    IT_skill_exploded_df,
    IT_job_exploded_df['Skills'] == IT_skill_exploded_df['Skills'],
    'inner'
)

In [None]:
# Lưu df dạng Parquet vào HDFS
# TODO: Cần thay đổi đường dẫn HDFS phù hợp với hệ thống 
try:
    IT_job_detail_df.write.mode("overwrite").parquet("hdfs://namenode_host:9000/user/your_user/mydata_parquet")
    print("✅ Lưu Parquet thành công vào HDFS")
except Exception as e:
    print(f"❌ Lỗi khi lưu Parquet: {e}")

**Xử lý _Multiple Joins Optimization_** 

Mục tiêu là giảm số lần shuffle và điều chỉnh thứ tự join để tối ưu

Tối ưu thứ tự join (Thực hiện phép join giúp giảm kích thước data nhanh nhất trước):
* Join các bảng nhỏ nhất trước: Join bảng thực tế (fact table) với các bảng chiều (dimension tables) nhỏ trước, đặc biệt là các bảng có thể được broadcast. Điều này giúp giảm kích thước của tập kết quả trung gian trước khi join với các bảng lớn hơn.

* Sử dụng CBO (Cost-Based Optimizer): Spark có một bộ tối ưu hóa truy vấn (Query Optimizer) mạnh mẽ. Nó cố gắng tự động tìm ra thứ tự join tốt nhất dựa trên các số liệu thống kê (cardinality, kích thước bảng, v.v.). Trong đây thì cần thu thập thông kê các bảng lớn trước. ANALYZE TABLE ten_bang COMPUTE STATISTICS FOR ALL COLUMNS;

Giảm shuffle:
* Thường xuyên broadcast các bảng nhỏ
* Dùng bucket join

Triển khai với 3 bảng job, company, skill:
* join 2 bảng job và company trước rồi join bảng kết quả thu được với bảng skill
* Có broadcast các bảng bé, partitioning bảng job với key job title trước để giảm số lần shuffle
* Có thể sử dụng SQL Hint để tối ưu hóa hơn, 

In [None]:
# TODO: Thay thế company thành các tên cột tương ứng
IT_job_company_df = IT_job_df.join(
    broadcast(IT_company_df),
    IT_job_df['Company']== IT_company_df['Company'],
    'inner'
)

IT_job_company_skill_df = IT_job_company_df.join(
    IT_skill_df,
    IT_job_company_df['Job Title']== IT_skill_df['Job Title'],
    'inner'
)

In [None]:
# TODO: Dùng CBO thử cho tác vụ này