In [24]:
import pandas as pd
import os
from pathlib import Path
from sqlalchemy import create_engine
from dotenv import load_dotenv

In [25]:
# STEP 1.0 - Connect to DB (by cloud supabase)

load_dotenv()

# Ko dùng conn = psycopg2.connect vì bị cảnh báo UserWarning

# DB_CONFIG = {
#     "host": os.getenv("DB_SUPABASE_HOST"),
#     "port": int(os.getenv("DB_SUPABASE_PORT")),
#     "dbname": os.getenv("DB_SUPABASE_NAME"),
#     "user": os.getenv("DB_SUPABASE_USER"),
#     "password": os.getenv("DB_SUPABASE_PASS"),
#     "sslmode": "require" 
# }

# conn = psycopg2.connect(**DB_CONFIG)
# if conn:
#     print("Connected to database")
# else:
#     print("Failed to connect to database")

conn = create_engine(
    f"postgresql+psycopg2://{os.getenv('DB_SUPABASE_USER')}:{os.getenv('DB_SUPABASE_PASS')}"
    f"@{os.getenv('DB_SUPABASE_HOST')}:{os.getenv('DB_SUPABASE_PORT')}"
    f"/{os.getenv('DB_SUPABASE_NAME')}",
    connect_args={"sslmode": "require"}
)

print("Engine created and connected to database")

Engine created and connected to database


In [26]:
# STEP 1.1 — BASE (CTE base trong SQL) - Read data and create view dataframe

job_postings = pd.read_sql("""
    SELECT *
    FROM job_postings
    WHERE posted_date IS NOT NULL
      AND job_id IS NOT NULL
      AND employment_type IS NOT NULL
      AND location_id IS NOT NULL
      AND remote_option IS NOT NULL
""", conn, parse_dates=["posted_date"])

'''  Lọc lại cột posted_date nếu định dạng bị sai (ko cần bước này vì đã xử lý chuẩn bên pipeline)
# # Convert posted_date sang datetime 
# job_postings["posted_date"] = pd.to_datetime(
#     job_postings["posted_date"], errors="coerce" # -> MỌI giá trị không chuyển được sang datetime hợp lệ bị gán thành NaT
# )

# # Check by counting coerce
# datetime_parsed = pd.to_datetime(job_postings["posted_date"], errors="coerce")
# num_coerced = datetime_parsed.isna().sum()
# print(num_coerced)

# Filter posted_date note NA, filter lần 3, khi posted_date bị errors="coerce"
# job_postings = job_postings[job_postings["posted_date"].notna()].copy()
'''

# Add year column by extract from posted_date
job_postings["year"] = job_postings["posted_date"].dt.year.astype(int)

job_postings["has_salary"] = (
    job_postings["min_salary"].notna() |
    job_postings["max_salary"].notna()
).astype(int)

job_postings["posted_date"].dtype

print(len(job_postings), "rows loaded from job_postings table")

917389 rows loaded from job_postings table


In [37]:
# STEP 1.2 — RANKED (ROW_NUMBER PARTITION BY year) - Read data and create view dataframe (balance jobs by year)

job_postings = job_postings.sort_values( # Phải sort trước, ko thì đếm cumcount sẽ sai
    by=["year", "has_salary", "posted_date"],
    ascending=[True, False, False]
)

job_postings["rn"] = (
    job_postings
    .groupby("year")
    .cumcount() + 1 # Tạo thứ tự rank cho mỗi groupby và cộng cho 1 (vì tính từ 0 nên + 1 để số đầu tiên là 1)
)

 # Cân bằng 230k jobs mỗi năm
df_filtered_jobs_500k = job_postings[job_postings["rn"] <= 230000].copy()

print(df_filtered_jobs_500k.shape)
#df_filtered_jobs_500k.head(100)

(493473, 15)


In [28]:
# STEP 1.3 — LOAD DIMENSION TABLES

companies   = pd.read_sql("SELECT * FROM companies", conn)
locations   = pd.read_sql("SELECT * FROM locations", conn)
role_names  = pd.read_sql("SELECT * FROM role_names", conn)
skills      = pd.read_sql("SELECT * FROM skills", conn)

# 
job_ids = set(df_filtered_jobs_500k["job_id"])

job_levels  = pd.read_sql(
    """SELECT * 
    FROM job_levels
    WHERE job_id IN %(job_ids)s
    """,
    conn,
    params={"job_ids": tuple(job_ids)})

job_skills = pd.read_sql(
    """
    SELECT *
    FROM job_skills
    WHERE job_id IN %(job_ids)s
    """,
    conn,
    params={"job_ids": tuple(job_ids)})

job_roles = pd.read_sql(
    """
    SELECT *
    FROM job_roles
    WHERE job_id IN %(job_ids)s
    """,
    conn,
    params={"job_ids": tuple(job_ids)})

In [29]:
# STEP 1.4 — AGGREGATE (GROUP_CONCAT)

roles_agg = (
    job_roles
    .merge(role_names, on="role_id", how="left")
    .groupby("job_id")["role_name"]
    .apply(lambda x: "; ".join(sorted(x.dropna().unique())))
    .reset_index(name="roles_list")
)

levels_agg = (
    job_levels
    .groupby("job_id")["job_level"]
    .apply(lambda x: "; ".join(sorted(x.dropna().unique())))
    .reset_index(name="levels_list")
)

skills_agg = (
    job_skills
    .merge(skills, on="skill_id", how="left")
    .groupby("job_id")["skill_name"]
    .apply(lambda x: "; ".join(sorted(x.dropna().unique())))
    .reset_index(name="skills_list")
)

print(f"Count roles_agg: {len(roles_agg)}")
print(f"Count levels_agg: {len(levels_agg)}")
print(f"Count skills_agg: {len(skills_agg)}")

Count roles_agg: 484895
Count levels_agg: 12432
Count skills_agg: 399675


In [30]:
print(skills_agg)

         job_id              skills_list
0           165                    Shell
1           166                    Shell
2           167                    Shell
3           188                    Shell
4           349  Learn; Machine Learning
...         ...                      ...
399670  1098847             Shell; Visio
399671  1098848                     Word
399672  1098849               Git; Shell
399673  1098850             Excel; Shell
399674  1098851            Ansible; Flow

[399675 rows x 2 columns]


In [31]:
# STEP 1.5 — JOIN FULL TABLES

df_final_jobs_500k = (
    df_filtered_jobs_500k
    .merge(companies, on="company_id", how="left")
    .merge(locations, on="location_id", how="left")
    .merge(roles_agg, on="job_id", how="left")
    .merge(levels_agg, on="job_id", how="left")
    .merge(skills_agg, on="job_id", how="left")
)

print(f"Count df_final_jobs_500k: {len(df_final_jobs_500k)}")

Count df_final_jobs_500k: 493473


In [32]:
df_final_jobs_500k.head(100)

Unnamed: 0,job_id,company_id,location_id,posted_date,min_salary,max_salary,currency,required_exp_years,education_level,employment_type,...,industry,city,country,country_iso,latitude,longitude,population,roles_list,levels_list,skills_list
0,87299,13621,3185,2021-11-30,,,,5.0,Bachelor,Full-time,...,Manufacturing,Hyderabad,Pakistan,PK,29.973460,69.413998,2.165653e+08,Data Engineer,,Azure; Shell; Windows
1,87496,13702,3184,2021-04-17,,,,,Master,Full-time,...,Technology,Bengaluru,India,IN,22.925006,79.593704,1.366418e+09,Data Scientist,Senior,
2,34274,6066,1356,2022-12-01,,,,,,Full-time,...,Technology,San Antonio Canada,Mexico,MX,23.935372,-102.576350,1.275755e+08,Data Engineer,,Matplotlib; Python
3,88023,13913,3193,2022-10-13,,,,,Bachelor,Full-time,...,Technology,Mumbai,India,IN,22.925006,79.593704,1.366418e+09,Data Engineer,Senior,
4,36538,6430,166,2022-10-03,,,,,,Part-time,...,Technology,Sidney,United States of America,US,45.705628,-112.599436,3.282395e+08,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,803,386,4,2022-01-06,,,,,,Full-time,...,Technology,Vienna,United States of America,US,45.705628,-112.599436,3.282395e+08,,Intern,Shell
96,805,386,4,2022-01-06,,,,,,Full-time,...,Technology,Vienna,United States of America,US,45.705628,-112.599436,3.282395e+08,,Intern,Shell
97,87487,13698,3183,2022-01-02,,,,5.0,,Full-time,...,Technology,Ingija,Serbia,RS,44.233037,20.819652,6.944975e+06,Data Engineer,,
98,78203,12298,1406,2023-12-28,399825.0,399825.0,,,Master,Part-time,...,Technology,London Village,Kiribati,KI,1.873200,-157.429600,3.120000e+05,Data Engineer,Mid,


In [33]:
# STEP 1.6 — GET 50k SAMPLES

df_final_jobs_sample_50k = df_final_jobs_500k.sample(
    n=50000,
    random_state=42
).reset_index(drop=True)

print(f"Count final_jobs_sample_50k: {len(df_final_jobs_sample_50k)}")
df_final_jobs_sample_50k.head(100)

Count final_jobs_sample_50k: 50000


Unnamed: 0,job_id,company_id,location_id,posted_date,min_salary,max_salary,currency,required_exp_years,education_level,employment_type,...,industry,city,country,country_iso,latitude,longitude,population,roles_list,levels_list,skills_list
0,1052241,41288,4609,2025-01-06,,,,,,Full-time,...,,Singapore,Singapore,SG,1.352100,103.819800,5900000.0,Data Engineer,,AWS; Airflow; BigQuery; Hadoop; Jenkins; Kuber...
1,951091,11347,673,2024-08-29,,,,,,Full-time,...,Technology,Croydon,United States of America,US,45.705628,-112.599436,328239523.0,Data Engineer,,
2,108298,4930,3992,2025-12-10,,,,,Master,Part-time,...,Technology,Mexico,United States of America,US,45.705628,-112.599436,328239523.0,Data Engineer,Intern,AWS
3,261692,58630,5370,2023-10-23,,,,,,Full-time,...,,,Italy,IT,42.751183,12.140788,60297396.0,Data Engineer,,
4,928698,33500,5380,2024-09-06,,,,,,Full-time,...,,Brazil,United States of America,US,45.705628,-112.599436,328239523.0,Data Engineer,,Java; React; SQL
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,684959,124485,5395,2023-10-15,,,,,,Full-time,...,,Belgium,United States of America,US,45.705628,-112.599436,328239523.0,Data Analyst,,
96,231010,3102,5370,2023-09-30,,,,,,Full-time,...,Technology,,Italy,IT,42.751183,12.140788,60297396.0,Data Engineer,,
97,953609,12634,1732,2024-11-12,,,,,,Full-time,...,Technology,Munich,Germany,DE,51.133723,10.288485,83132799.0,Data Scientist,,Alteryx; Git; PowerPoint; Python; SQL; Tableau
98,714406,132082,4654,2023-09-02,,,,,,Full-time,...,,Boston Heights,United States of America,US,45.705628,-112.599436,328239523.0,Data Analyst,,Looker; Python; SQL; Tableau


In [34]:
# STEP 1.7: EXPORT CSV

ROOT = Path.cwd().parents[0]
#INPUT_DIR = ROOT / "data" / "data_processed" # INPUT trực tiếp từ cloud, ko dùng csv
OUTPUT_DIR = ROOT / "analysis" / "data"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Output file path
output_path_final_500k = OUTPUT_DIR / "final_jobs_500k.csv"
output_path_sample_50k = OUTPUT_DIR / "final_jobs_sample_50k.csv"

def confirm_overwrite(path: Path) -> bool:
    if path.exists():
        ans = input(f"⚠️ File '{path.name}' đã tồn tại. Ghi đè? (y/n): ").strip().lower()
        return ans == "y"
    return True

# Export
if confirm_overwrite(output_path_final_500k):
    df_final_jobs_500k.to_csv(
        output_path_final_500k,
        index=False,
        encoding="utf-8-sig"
    )
    print(f"✅ Exported: {output_path_final_500k}")
else:
    print(f"⏭️ Skip: {output_path_final_500k}")

# Export final_jobs_sample_50k
if confirm_overwrite(output_path_sample_50k):
    df_final_jobs_sample_50k.to_csv(
        output_path_sample_50k,
        index=False,
        encoding="utf-8-sig"
    )
    print(f"✅ Exported: {output_path_sample_50k}")
else:
    print(f"⏭️ Skip: {output_path_sample_50k}")


✅ Exported: d:\Work_Study\IT\Data\Projects\data_industry_insights\analysis\data\final_jobs_500k.csv
✅ Exported: d:\Work_Study\IT\Data\Projects\data_industry_insights\analysis\data\final_jobs_sample_50k.csv
