# Data Cleaning for 3NF in PostgreSQL using Polars

In [1]:
# Standard library

import os
import sys
import json
import ast
from pathlib import Path

# Third-party libraries
import numpy as np
import pandas as pd
import polars as pl
from sqlalchemy import create_engine, text
import ast


pl.Config.set_tbl_rows(-1)   # show all rows
pl.Config.set_tbl_cols(-1)   # show all columns
pl.Config.set_fmt_str_lengths(10_000)

# setting up csv path

csv_path = Path("../data/data_jobs.csv")

# loading csv file with polars

df = pl.read_csv(csv_path)

# initial data exploration

print(f"Data shape: {df.shape}")
print(f"\nColumns: {df.columns}")
print(df.select(pl.all().is_null().sum()))
df.null_count()
df.schema
df.head(5)

Data shape: (785741, 17)

Columns: ['job_title_short', 'job_title', 'job_location', 'job_via', 'job_schedule_type', 'job_work_from_home', 'search_location', 'job_posted_date', 'job_no_degree_mention', 'job_health_insurance', 'job_country', 'salary_rate', 'salary_year_avg', 'salary_hour_avg', 'company_name', 'job_skills', 'job_type_skills']
shape: (1, 17)
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ job ┆ job ┆ job ┆ job ┆ job ┆ job ┆ sea ┆ job ┆ job ┆ job ┆ job ┆ sal ┆ sal ┆ sal ┆ com ┆ job ┆ job │
│ _ti ┆ _ti ┆ _lo ┆ _vi ┆ _sc ┆ _wo ┆ rch ┆ _po ┆ _no ┆ _he ┆ _co ┆ ary ┆ ary ┆ ary ┆ pan ┆ _sk ┆ _ty │
│ tle ┆ tle ┆ cat ┆ a   ┆ hed ┆ rk_ ┆ _lo ┆ ste ┆ _de ┆ alt ┆ unt ┆ _ra ┆ _ye ┆ _ho ┆ y_n ┆ ill ┆ pe_ │
│ _sh ┆ --- ┆ ion ┆ --- ┆ ule ┆ fro ┆ cat ┆ d_d ┆ gre ┆ h_i ┆ ry  ┆ te  ┆ ar_ ┆ ur_ ┆ ame ┆ s   ┆ ski │
│ ort ┆ u32 ┆ --- ┆ u32 ┆ _ty ┆ m_h ┆ ion ┆ ate ┆ e_m ┆ nsu ┆ --- ┆ --- ┆ avg ┆ avg ┆ --- ┆ --- ┆ lls │
│ --- ┆     ┆ u32 ┆

job_title_short,job_title,job_location,job_via,job_schedule_type,job_work_from_home,search_location,job_posted_date,job_no_degree_mention,job_health_insurance,job_country,salary_rate,salary_year_avg,salary_hour_avg,company_name,job_skills,job_type_skills
str,str,str,str,str,bool,str,str,bool,bool,str,str,f64,f64,str,str,str
"""Senior Data Engineer""","""Senior Clinical Data Engineer / Principal Clinical Data Engineer ...""","""Watertown, CT""","""via Work Nearby""","""Full-time""",False,"""Texas, United States""","""2023-06-16 13:44:15""",False,False,"""United States""",,,,"""Boehringer Ingelheim""",,
"""Data Analyst""","""Data Analyst""","""Guadalajara, Jalisco, Mexico""","""via BeBee México""","""Full-time""",False,"""Mexico""","""2023-01-14 13:18:07""",False,False,"""Mexico""",,,,"""Hewlett Packard Enterprise""","""['r', 'python', 'sql', 'nosql', 'power bi', 'tableau']""","""{'analyst_tools': ['power bi', 'tableau'], 'programming': ['r', 'python', 'sql', 'nosql']}"""
"""Data Engineer""","""Data Engineer/Scientist/Analyst, Mid or Senior (m/f/x)""","""Berlin, Germany""","""via LinkedIn""","""Full-time""",False,"""Germany""","""2023-10-10 13:14:55""",False,False,"""Germany""",,,,"""ALPHA Augmented Services""","""['python', 'sql', 'c#', 'azure', 'airflow', 'dax', 'docker', 'kubernetes', 'jenkins']""","""{'analyst_tools': ['dax'], 'cloud': ['azure'], 'libraries': ['airflow'], 'other': ['docker', 'kubernetes', 'jenkins'], 'programming': ['python', 'sql', 'c#']}"""
"""Data Engineer""","""LEAD ENGINEER - PRINCIPAL ANALYST - PRINCIPAL ENGINEER - DATA...""","""San Antonio, TX""","""via Diversity.com""","""Full-time""",False,"""Texas, United States""","""2023-07-04 13:01:41""",True,False,"""United States""",,,,"""Southwest Research Institute""","""['python', 'c++', 'java', 'matlab', 'aws', 'tensorflow', 'keras', 'pytorch']""","""{'cloud': ['aws'], 'libraries': ['tensorflow', 'keras', 'pytorch'], 'programming': ['python', 'c++', 'java', 'matlab']}"""
"""Data Engineer""","""Data Engineer- Sr Jobs""","""Washington, DC""","""via Clearance Jobs""","""Full-time""",False,"""Sudan""","""2023-08-07 14:29:36""",False,False,"""Sudan""",,,,"""Kristina Daniel""","""['bash', 'python', 'oracle', 'aws', 'ansible', 'puppet', 'jenkins', 'gitlab', 'git']""","""{'cloud': ['oracle', 'aws'], 'other': ['ansible', 'puppet', 'jenkins', 'gitlab', 'git'], 'programming': ['bash', 'python']}"""


In [2]:
#check for null values

print("Null value counts:")
print(df.null_count())

# checking data types
print(f"Data types: {df.dtypes}")

for col , dtype in df.schema.items():
    print(f" {col}: {dtype}")
    
    
#Sample values for JSON columns

print("\n Sample job_skills (first 5 rows):")
print(df.select("job_skills").head(5))
print("\nSample job_type_skills (first 3):")
print(df.select("job_type_skills").head(3))



Null value counts:
shape: (1, 17)
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ job ┆ job ┆ job ┆ job ┆ job ┆ job ┆ sea ┆ job ┆ job ┆ job ┆ job ┆ sal ┆ sal ┆ sal ┆ com ┆ job ┆ job │
│ _ti ┆ _ti ┆ _lo ┆ _vi ┆ _sc ┆ _wo ┆ rch ┆ _po ┆ _no ┆ _he ┆ _co ┆ ary ┆ ary ┆ ary ┆ pan ┆ _sk ┆ _ty │
│ tle ┆ tle ┆ cat ┆ a   ┆ hed ┆ rk_ ┆ _lo ┆ ste ┆ _de ┆ alt ┆ unt ┆ _ra ┆ _ye ┆ _ho ┆ y_n ┆ ill ┆ pe_ │
│ _sh ┆ --- ┆ ion ┆ --- ┆ ule ┆ fro ┆ cat ┆ d_d ┆ gre ┆ h_i ┆ ry  ┆ te  ┆ ar_ ┆ ur_ ┆ ame ┆ s   ┆ ski │
│ ort ┆ u32 ┆ --- ┆ u32 ┆ _ty ┆ m_h ┆ ion ┆ ate ┆ e_m ┆ nsu ┆ --- ┆ --- ┆ avg ┆ avg ┆ --- ┆ --- ┆ lls │
│ --- ┆     ┆ u32 ┆     ┆ pe  ┆ ome ┆ --- ┆ --- ┆ ent ┆ ran ┆ u32 ┆ u32 ┆ --- ┆ --- ┆ u32 ┆ u32 ┆ --- │
│ u32 ┆     ┆     ┆     ┆ --- ┆ --- ┆ u32 ┆ u32 ┆ ion ┆ ce  ┆     ┆     ┆ u32 ┆ u32 ┆     ┆     ┆ u32 │
│     ┆     ┆     ┆     ┆ u32 ┆ u32 ┆     ┆     ┆ --- ┆ --- ┆     ┆     ┆     ┆     ┆     ┆     ┆     │
│     ┆     ┆     ┆     ┆     

In [3]:
# Cleaning white space on columns

df = df.with_columns(
    pl.col(pl.Utf8).str.strip_chars()
)


# Parse job_posted_date to datetime
df = df.with_columns(
    pl.col("job_posted_date")
      .str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False)
      .dt.replace_time_zone("UTC")
      .alias("job_posted_date")
)

# verify date columns are parsed correctly

print(f"Parsed job_posted_date: {df['job_posted_date'].dtype}")
for col, dtype in df.schema.items():
    print(f" {col}: {dtype}")
    

Parsed job_posted_date: Datetime(time_unit='us', time_zone='UTC')
 job_title_short: String
 job_title: String
 job_location: String
 job_via: String
 job_schedule_type: String
 job_work_from_home: Boolean
 search_location: String
 job_posted_date: Datetime(time_unit='us', time_zone='UTC')
 job_no_degree_mention: Boolean
 job_health_insurance: Boolean
 job_country: String
 salary_rate: String
 salary_year_avg: Float64
 salary_hour_avg: Float64
 company_name: String
 job_skills: String
 job_type_skills: String


In [4]:
# Standardize Boolean Columns 

bool_cols = [ c for c, t in df.schema.items() if t == pl.Boolean]

print(bool_cols)


# Convert to boolean (handles various string representations)
df = df.with_columns(
    pl.col(bool_cols)
    .cast(pl.String)
    .str.to_lowercase()
    .is_in(["true", "1", "yes", "y"])
    .cast(pl.Boolean)
)

#verify bools cols

print(df.select(bool_cols).schema)


# Ensure numeric columns are floats

num_cols = [c for c, t in df.schema.items() if t == pl.Float64]

print(num_cols)


# Convert and cast numeric columns to float

df = df.with_columns([
    pl.col("salary_year_avg").cast(pl.Float64, strict=False),
    pl.col("salary_hour_avg").cast(pl.Float64, strict=False)
])

#verify for nulls on float cols and convert nulls to 0

df.select(num_cols).null_count()

df = df.with_columns(
    pl.col(num_cols).fill_null(0)
)

#verify nulls are converted to 0

df.select(num_cols).null_count()

# adding a job id (row index starting at 1)

if "job_id" not in df.columns:
    df = df.with_row_index(name="job_id", offset=1)
    
    

['job_work_from_home', 'job_no_degree_mention', 'job_health_insurance']
Schema({'job_work_from_home': Boolean, 'job_no_degree_mention': Boolean, 'job_health_insurance': Boolean})
['salary_year_avg', 'salary_hour_avg']


In [5]:
# Parse JSON Columns (job_skills and job_type_skills)

df = df.with_columns(
    pl.col("job_skills")
    .str.replace_all("'", '"')
    .str.json_decode(pl.List(pl.Utf8))
    .alias("job_skills")
)

# clean elements inside job_skills list

print(df.schema["job_skills"])

df = df.with_columns(
    pl.col("job_skills")
      .fill_null(pl.lit([]))
      .list.eval(
          pl.element()
            .str.strip_chars()
            .str.to_lowercase()
      )
      .alias("job_skills")
)

df.select("job_skills").head(5)

# Explode into one row per skill

job_skills_rel = (
    df
    .select(["job_id", "job_skills"])
    .explode("job_skills")  # 1 row per skill
    .rename({"job_skills": "skill_name"})
)

# cleaned exploded df columns

job_skills_rel = (
    job_skills_rel
    .filter(pl.col("skill_name").is_not_null() & (pl.col("skill_name") != ""))
    .unique()
)

def parse_skill_dict(x):
    if x is None:
        return None
    s = str(x).strip()
    if s in ("", "null", "None", "[]", "{}"):
        return None
    try:
        return ast.literal_eval(s)  # parses "{'cloud': ['aws']}" into dict
    except Exception:
        return None

# parsed job_type_skills into a dictionary

parsed = df.with_columns(
    pl.col("job_type_skills")
      .map_elements(parse_skill_dict, return_dtype=pl.Object)
      .alias("skills_obj")
)

# 1) Normalize job_type_skills (skills_obj dict) into long rows: job_id, skill_group, skill_name

job_type_skills_rel = (
    parsed
    .select(["job_id", "skills_obj"])
    .drop_nulls("skills_obj")
    .with_columns(
        pl.col("skills_obj").map_elements(
            lambda d: [
                {"skill_group": k, "skill_name": v}
                for k, vals in d.items()
                for v in (vals or [])
            ],
            return_dtype=pl.List(
                pl.Struct([
                    pl.Field("skill_group", pl.Utf8),
                    pl.Field("skill_name", pl.Utf8),
                ])
            ),
        ).alias("pairs")
    )
    .explode("pairs")
    .unnest("pairs")
    .with_columns(
        pl.col("skill_group").str.strip_chars().str.to_lowercase(),
        pl.col("skill_name").str.strip_chars().str.to_lowercase(),
    )
    .filter(pl.col("skill_name").is_not_null() & (pl.col("skill_name") != ""))
    .unique()
)

# 2) Add a nullable group to job_skills_rel so schemas match for union

job_skills_rel2 = (
    job_skills_rel
    .with_columns(pl.lit(None).cast(pl.Utf8).alias("skill_group"))
    .select(["job_id", "skill_group", "skill_name"])
)

# 3) Union both sources + dedupe
job_skill_all = (
    pl.concat([
        job_skills_rel2,
        job_type_skills_rel.select(["job_id", "skill_group", "skill_name"])
    ])
    .unique()
)

# 4) Build dim_skill

dim_skill = (
    job_skill_all
    .select("skill_name")
    .unique()
    .sort("skill_name")
    .with_row_index("skill_id", offset=1)
)

# 5) Build bridge table (pure 3NF)

bridge_job_skill = (
    job_skill_all
    .join(dim_skill, on="skill_name", how="left")
    .select(["job_id", "skill_id"])
    .unique()
)

# 6) Quick sanity check

print(job_skill_all.select(
    pl.len().alias("rows"),
    pl.n_unique("job_id").alias("jobs_with_skills"),
    pl.n_unique("skill_name").alias("unique_skills"),
))

print("sample job_skills:")
print(df.select("job_skills").head(5))
print("\nSample job_type_skills:")
print(df.select("job_type_skills").head(5))


List(String)
shape: (1, 3)
┌─────────┬──────────────────┬───────────────┐
│ rows    ┆ jobs_with_skills ┆ unique_skills │
│ ---     ┆ ---              ┆ ---           │
│ u32     ┆ u32              ┆ u32           │
╞═════════╪══════════════════╪═══════════════╡
│ 7255181 ┆ 668704           ┆ 252           │
└─────────┴──────────────────┴───────────────┘
sample job_skills:
shape: (5, 1)
┌────────────────────────────────┐
│ job_skills                     │
│ ---                            │
│ list[str]                      │
╞════════════════════════════════╡
│ []                             │
│ ["r", "python", … "tableau"]   │
│ ["python", "sql", … "jenkins"] │
│ ["python", "c++", … "pytorch"] │
│ ["bash", "python", … "git"]    │
└────────────────────────────────┘

Sample job_type_skills:
shape: (5, 1)
┌───────────────────────────────────────────────────────────────────────────────────────────────┐
│ job_type_skills                                                                        

In [6]:
#Handle Null Values

print("null counts after cleaning:")
print(df.null_count())


# filling empty strings with null

df = df.with_columns([
    pl.when(pl.col(col) == "")
    .then(None)
    .otherwise(pl.col(col))
    .alias(col)
    for col in df.columns
    if df[col].dtype == pl.Utf8
])

null counts after cleaning:
shape: (1, 18)
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ job ┆ job ┆ job ┆ job ┆ job ┆ job ┆ job ┆ sea ┆ job ┆ job ┆ job ┆ job ┆ sal ┆ sal ┆ sal ┆ com ┆ job ┆ job │
│ _id ┆ _ti ┆ _ti ┆ _lo ┆ _vi ┆ _sc ┆ _wo ┆ rch ┆ _po ┆ _no ┆ _he ┆ _co ┆ ary ┆ ary ┆ ary ┆ pan ┆ _sk ┆ _ty │
│ --- ┆ tle ┆ tle ┆ cat ┆ a   ┆ hed ┆ rk_ ┆ _lo ┆ ste ┆ _de ┆ alt ┆ unt ┆ _ra ┆ _ye ┆ _ho ┆ y_n ┆ ill ┆ pe_ │
│ u32 ┆ _sh ┆ --- ┆ ion ┆ --- ┆ ule ┆ fro ┆ cat ┆ d_d ┆ gre ┆ h_i ┆ ry  ┆ te  ┆ ar_ ┆ ur_ ┆ ame ┆ s   ┆ ski │
│     ┆ ort ┆ u32 ┆ --- ┆ u32 ┆ _ty ┆ m_h ┆ ion ┆ ate ┆ e_m ┆ nsu ┆ --- ┆ --- ┆ avg ┆ avg ┆ --- ┆ --- ┆ lls │
│     ┆ --- ┆     ┆ u32 ┆     ┆ pe  ┆ ome ┆ --- ┆ --- ┆ ent ┆ ran ┆ u32 ┆ u32 ┆ --- ┆ --- ┆ u32 ┆ u32 ┆ --- │
│     ┆ u32 ┆     ┆     ┆     ┆ --- ┆ --- ┆ u32 ┆ u32 ┆ ion ┆ ce  ┆     ┆     ┆ u32 ┆ u32 ┆     ┆     ┆ u32 │
│     ┆     ┆     ┆     ┆     ┆ u32 ┆ u32 ┆     ┆     ┆ --- ┆ --- ┆     ┆    

In [7]:
# Check for duplicates

duplicate_count = df.is_duplicated().sum()

print(f"duplicate count: {duplicate_count}")

duplicate count: 0


In [8]:
# Expected columns for 3NF preparation

EXPECTED_COLS = {
    "job_id",
    "job_title_short",
    "job_title",
    "company_name",
    "job_location",
    "job_via",
    "job_schedule_type",
    "job_work_from_home",
    "search_location",
    "job_posted_date",
    "job_no_degree_mention",
    "job_health_insurance",
    "job_country",
    "salary_rate",
    "salary_year_avg",
    "salary_hour_avg",
    "job_skills",
    "job_type_skills"
}

# Verify all expected columns exist

assert set(df.columns).issuperset(EXPECTED_COLS), "Missing expected columns"
assert df["job_id"].null_count() == 0, "job_id cannot be null"
assert df["job_posted_date"].dtype == pl.Datetime, "job_posted_date must be datetime"
assert df["job_work_from_home"].dtype == pl.Boolean, "job_work_from_home must be boolean"
assert df["job_no_degree_mention"].dtype == pl.Boolean, "job_no_degree_mention must be boolean"
assert df["job_health_insurance"].dtype == pl.Boolean, "job_health_insurance must be boolean"
assert df["job_country"].dtype == pl.Utf8, "job_country must be string"
assert df["salary_rate"].dtype == pl.Utf8, "salary_rate must be string"
assert df["salary_year_avg"].dtype == pl.Float64, "salary_year_avg must be float"
assert df["salary_hour_avg"].dtype == pl.Float64, "salary_hour_avg must be float"
assert job_skills_rel.schema["skill_name"] == pl.Utf8, "skill_name must be a string after explode"
assert job_skills_rel.filter(pl.col("skill_name").is_null() | (pl.col("skill_name") == "")).height == 0, \
    "skill_name should not contain null/empty values"
assert job_skills_rel.height == job_skills_rel.unique(["job_id", "skill_name"]).height, \
    "Duplicate (job_id, skill_name) pairs found"


print("validations passed")

# summary

print("="*60)
print("CLEANED DATA SUMMARY")
print("="*60)
print(f"Total rows: {df.shape[0]:,}")
print(f"Total columns: {df.shape[1]}")
print(f"\nSchema:")
print(df.schema)
print(f"\nNull counts:")
print(df.null_count())
print(f"\nSample of cleaned data:")
df.head(5)

validations passed
CLEANED DATA SUMMARY
Total rows: 785,741
Total columns: 18

Schema:
Schema({'job_id': UInt32, 'job_title_short': String, 'job_title': String, 'job_location': String, 'job_via': String, 'job_schedule_type': String, 'job_work_from_home': Boolean, 'search_location': String, 'job_posted_date': Datetime(time_unit='us', time_zone='UTC'), 'job_no_degree_mention': Boolean, 'job_health_insurance': Boolean, 'job_country': String, 'salary_rate': String, 'salary_year_avg': Float64, 'salary_hour_avg': Float64, 'company_name': String, 'job_skills': List(String), 'job_type_skills': String})

Null counts:
shape: (1, 18)
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ job ┆ job ┆ job ┆ job ┆ job ┆ job ┆ job ┆ sea ┆ job ┆ job ┆ job ┆ job ┆ sal ┆ sal ┆ sal ┆ com ┆ job ┆ job │
│ _id ┆ _ti ┆ _ti ┆ _lo ┆ _vi ┆ _sc ┆ _wo ┆ rch ┆ _po ┆ _no ┆ _he ┆ _co ┆ ary ┆ ary ┆ ary ┆ pan ┆ _sk ┆ _ty │
│ --- ┆ tle ┆ tle ┆ cat ┆ a   ┆ hed ┆ r

job_id,job_title_short,job_title,job_location,job_via,job_schedule_type,job_work_from_home,search_location,job_posted_date,job_no_degree_mention,job_health_insurance,job_country,salary_rate,salary_year_avg,salary_hour_avg,company_name,job_skills,job_type_skills
u32,str,str,str,str,str,bool,str,"datetime[μs, UTC]",bool,bool,str,str,f64,f64,str,list[str],str
1,"""Senior Data Engineer""","""Senior Clinical Data Engineer / Principal Clinical Data Engineer ...""","""Watertown, CT""","""via Work Nearby""","""Full-time""",False,"""Texas, United States""",2023-06-16 13:44:15 UTC,False,False,"""United States""",,0.0,0.0,"""Boehringer Ingelheim""",[],
2,"""Data Analyst""","""Data Analyst""","""Guadalajara, Jalisco, Mexico""","""via BeBee México""","""Full-time""",False,"""Mexico""",2023-01-14 13:18:07 UTC,False,False,"""Mexico""",,0.0,0.0,"""Hewlett Packard Enterprise""","[""r"", ""python"", … ""tableau""]","""{'analyst_tools': ['power bi', 'tableau'], 'programming': ['r', 'python', 'sql', 'nosql']}"""
3,"""Data Engineer""","""Data Engineer/Scientist/Analyst, Mid or Senior (m/f/x)""","""Berlin, Germany""","""via LinkedIn""","""Full-time""",False,"""Germany""",2023-10-10 13:14:55 UTC,False,False,"""Germany""",,0.0,0.0,"""ALPHA Augmented Services""","[""python"", ""sql"", … ""jenkins""]","""{'analyst_tools': ['dax'], 'cloud': ['azure'], 'libraries': ['airflow'], 'other': ['docker', 'kubernetes', 'jenkins'], 'programming': ['python', 'sql', 'c#']}"""
4,"""Data Engineer""","""LEAD ENGINEER - PRINCIPAL ANALYST - PRINCIPAL ENGINEER - DATA...""","""San Antonio, TX""","""via Diversity.com""","""Full-time""",False,"""Texas, United States""",2023-07-04 13:01:41 UTC,True,False,"""United States""",,0.0,0.0,"""Southwest Research Institute""","[""python"", ""c++"", … ""pytorch""]","""{'cloud': ['aws'], 'libraries': ['tensorflow', 'keras', 'pytorch'], 'programming': ['python', 'c++', 'java', 'matlab']}"""
5,"""Data Engineer""","""Data Engineer- Sr Jobs""","""Washington, DC""","""via Clearance Jobs""","""Full-time""",False,"""Sudan""",2023-08-07 14:29:36 UTC,False,False,"""Sudan""",,0.0,0.0,"""Kristina Daniel""","[""bash"", ""python"", … ""git""]","""{'cloud': ['oracle', 'aws'], 'other': ['ansible', 'puppet', 'jenkins', 'gitlab', 'git'], 'programming': ['bash', 'python']}"""


In [9]:

# Load credentials

CREDENTIALS_PATH = Path.cwd().parent / "credentials.json"
with open(CREDENTIALS_PATH) as f:
    creds = json.load(f)

PG_HOST = creds["PG_HOST"]
PG_PORT = creds["PG_PORT"]
PG_DB   = creds["PG_DB"]
PG_USER = creds["PG_USER"]
PG_PASS = creds.get("PG_PASS", "")

if PG_PASS:
    DATABASE_URL = f"postgresql+psycopg2://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}"
else:
    DATABASE_URL = f"postgresql+psycopg2://{PG_USER}@{PG_HOST}:{PG_PORT}/{PG_DB}"

engine = create_engine(DATABASE_URL)

# quick connection test

pd.read_sql("SELECT 1 AS ok", engine)

# Convert Polars to Pandas

pdf = df.to_pandas()

# Find ndarray/list-like columns (causes psycopg2 can't adapt type)

array_cols = [c for c in pdf.columns if pdf[c].apply(lambda v: isinstance(v, np.ndarray)).any()]
print("array_cols:", array_cols)

# Convert them to JSON strings so Postgres can store them

for c in array_cols:
    pdf[c] = pdf[c].apply(
        lambda v: json.dumps(v.tolist()) if isinstance(v, np.ndarray)
        else (json.dumps(v) if isinstance(v, (list, dict)) else v)
    )

# #Export the FIXED pandas df (pdf)

# pdf.to_sql(
#     name="data_jobs_raw",
#     con=engine,
#     if_exists="replace",
#     index=False,
#     method="multi",
#     chunksize=5000
# )

# #Export tables Job_skills and Job_type_skills

# # Polars → Pandas
# dim_skill_pd = dim_skill.to_pandas()
# bridge_job_skill_pd = bridge_job_skill.to_pandas()

# # Export to Postgres
# dim_skill_pd.to_sql(
#     "dim_skill",
#     engine,
#     if_exists="replace",   # change to "append" once stable
#     index=False
# )

# bridge_job_skill_pd.to_sql(
#     "bridge_job_skill",
#     engine,
#     if_exists="replace",   # change to "append" once stable
#     index=False
# )

# #verify load

# pd.read_sql("SELECT COUNT(*) AS n FROM data_jobs_raw", engine)


array_cols: ['job_skills']
