In [None]:
# ==============================================
# Cell 1: Install dependencies (uncomment if needed)
# ==============================================
# !pip install requests pandas sqlalchemy psycopg2-binary

In [46]:
# ==============================================
# Cell 2: Imports
# ==============================================
import requests
import pandas as pd
from sqlalchemy import create_engine, text
from datetime import datetime
import time
import re
import html

In [110]:
# ==============================================
# Cell 3: PostgreSQL setup
# ==============================================
OL_DB_USER = '********'
OL_DB_PASSWORD = '********'
OL_DB_HOST = 'jde08-ip-p2-angbj1976-c47c.c.aivencloud.com:15241'
OL_DB_NAME = 'Interim_Project_DB'

OL_engine = create_engine(f'postgresql+psycopg2://{OL_DB_USER}:{OL_DB_PASSWORD}@{OL_DB_HOST}/{OL_DB_NAME}')
print("Connected to Online PostgreSQL")

L_DB_USER = '********'
L_DB_PASSWORD = '********'
L_DB_HOST = 'localhost'
L_DB_NAME = 'TestDB'

L_engine = create_engine(f'postgresql+psycopg2://{L_DB_USER}:{L_DB_PASSWORD}@{L_DB_HOST}/{L_DB_NAME}')
print("Connected to PostgreSQL")

Connected to Online PostgreSQL


In [3]:
# ==============================================
# Cell 4: Create staging jobs table (drop first if necessary)
# For purpose of saving the fetched jobs till date
# ==============================================
# Uncomment this if you want a fresh table
# with engine.connect() as conn:
#     conn.execute(text("DROP TABLE IF EXISTS jobs"))
#     conn.commit()

create_table_query = """
CREATE TABLE IF NOT EXISTS jobs (
    job_id SERIAL PRIMARY KEY,
    remote_job_id VARCHAR(100) UNIQUE,
    title VARCHAR(500),
    company VARCHAR(255),
    location_geo VARCHAR(100),
    job_type VARCHAR(100),
    salary_min VARCHAR(100),
    salary_max VARCHAR(100),
    salary_currency VARCHAR(20),
    description TEXT,
    pub_date TIMESTAMP,
    job_url VARCHAR(500),
    source VARCHAR(100)
);
"""
# Amend the engine.connect to corresponding l/OL for Local or Online DB
with L_engine.connect() as conn:
    conn.execute(text(create_table_query))
    conn.commit()
print("Table jobs ready")


Table jobs ready


### **API Query Parameters (optional)**  
Refer for more details : https://jobicy.com/jobs-rss-feed  
count - Number of listings to return (default: 100, range: 1-100)  
geo - Filter by job region (default: all regions)  
industry - Filter by job category (default: all categories)  
tag - Search by job title and description (default: all jobs)  
Eg: *https://jobicy.com/api/v2/remote-jobs?count=100&geo=usa&industry=marketing&tag=seo*  
Eg: *https://jobicy.com/api/v2/remote-jobs?count=100&geo=singapore*

In [58]:
# ==============================================
# Cell 5: Function to fetch jobs from Jobicy API
# ==============================================

def fetch_jobs(api_url, delay=1):
    """
    Fetch up to 100 jobs from Jobicy.
    The full API URL must already be included in the function call
    """
    print("Requesting:", api_url)
    resp = requests.get(api_url)

    # Error checking
    # Other status codes include : 200 OK → Request succeeded ; 201 Created ; 301/302 Redirect ; 400 Bad request ;
    #                              401 Unauthorized ; 403 Forbidden ; 404 Not found ; 500 Internal server error
    if resp.status_code != 200:
        print(f"Failed to fetch jobs: {resp.status_code}")
        return []
    
    
    data = resp.json()
    jobs = data.get("jobs", [])

    print(f"Fetched {len(jobs)} jobs.")
    time.sleep(delay)

    return jobs

In [96]:
# ==============================================
# Cell 6: Fetch jobs
# ==============================================

# Amend URL as per reference Jobicy API page https://jobicy.com/jobs-rss-feed
API_URL = "https://jobicy.com/api/v2/remote-jobs?count=100&tag=sql"

# Jobs fetched from the API is a list of jobs
jobs = fetch_jobs(API_URL, delay=1)
print(f"Total jobs fetched: {len(jobs)}")

Requesting: https://jobicy.com/api/v2/remote-jobs?count=100&tag=sql
Fetched 1 jobs.
Total jobs fetched: 1


## <u>Take Note</u>
#### Cell 7 & 8 is for use if the data is to be uploaded into a staging DB first
#### Can skip to Cell 8A if the data is to be processed directly

In [97]:
# ==============================================
# Cell 7: Insert jobs into PostgreSQL (Staging DB)
# ==============================================

with L_engine.connect() as conn:
    for job in jobs:
        job_data = {
            "remote_job_id": job.get("id"),
            "title": job.get("jobTitle"),
            "company": job.get("companyName"),
            "location_geo": job.get("jobGeo"),
            "job_type": ", ".join(job.get("jobType")) if job.get("jobType") else None,
            "salary_min": job.get("salaryMin"),         # <-- corrected
            "salary_max": job.get("salaryMax"),         # <-- corrected
            "salary_currency": job.get("salaryCurrency"),
            "description": job.get("jobDescription"),
            "pub_date": job.get("pubDate"),
            "job_url": job.get("url"),
            "source": "Jobicy"
        }
        insert = text("""
            INSERT INTO jobs (
                remote_job_id, title, company, location_geo,
                job_type, salary_min, salary_max, salary_currency,
                description, pub_date, job_url, source
            ) VALUES (
                :remote_job_id, :title, :company, :location_geo,
                :job_type, :salary_min, :salary_max, :salary_currency,
                :description, :pub_date, :job_url, :source
            )
            ON CONFLICT (remote_job_id) DO NOTHING
        """)
        conn.execute(insert, job_data)
    conn.commit()
print("Jobs inserted into Staging DB")


Jobs inserted into Staging DB


In [98]:
# ==============================================
# Cell 8: Load into Pandas and preview
# ==============================================
df = pd.read_sql("SELECT * FROM jobs ORDER BY pub_date DESC NULLS LAST", L_engine)

# 1. Retain only the required columns
df = df[["remote_job_id", "title", "company", "location_geo", "salary_min", "salary_max", "description"]]
# 2. Add prefix "BEN_" to remote_job_id
df["remote_job_id"] = "BEN_" + df["remote_job_id"].astype(str)
# Optional: reset index for clean DataFrame
df = df.reset_index(drop=True)

print(df.head())
print(df.shape)

  remote_job_id                                  title    company  \
0    BEN_137551  Mathematical Formalization Specialist   Labelbox   
1    BEN_137550     Customer Onboarding Manager | EMEA       Deel   
2    BEN_137548            Marketing Analytics Manager  Consensys   
3    BEN_137542               Quality Engineer (m/f/d)    NVision   
4    BEN_137531         AI Pilot Vibe Coding Assistant   Mindrift   

  location_geo salary_min salary_max  \
0      Germany       None       None   
1         EMEA       None       None   
2   EMEA,  USA     105000     187000   
3      Germany       None       None   
4  Philippines       None       None   

                                         description  
0  <p>Mathematical Formalization Specialist (Lean...  
1  <p><strong>Who we are is what we do.</strong><...  
2  <p>Consensys is the leading blockchain and web...  
3  <p>As a key member of our R&amp;D department, ...  
4  <p><em><strong>This opportunity is only for ca...  
(292, 7)


In [100]:
# ==============================================
# Cell 8A: Convert returned Jobs (type : List) to Dataframe for processing
# ==============================================

# Create a DataFrame
df_test = pd.DataFrame(jobs)

# Select and rename the columns
df_test = df.rename(columns={
        'id': 'remote_job_id',
        'jobTitle': 'title',
        'companyName': 'company',
        'jobGeo': 'location_geo',
        'salaryMin': 'salary_min',
        'salaryMax': 'salary_max',
        'jobDescription': 'description'
})[['remote_job_id', 'title', 'company', 'location_geo', 'salary_min', 'salary_max', 'description']]

# Display the first few rows
print(df_test.head())


  remote_job_id                                  title    company  \
0    BEN_137551  Mathematical Formalization Specialist   Labelbox   
1    BEN_137550     Customer Onboarding Manager | EMEA       Deel   
2    BEN_137548            Marketing Analytics Manager  Consensys   
3    BEN_137542               Quality Engineer (m/f/d)    NVision   
4    BEN_137531         AI Pilot Vibe Coding Assistant   Mindrift   

  location_geo salary_min salary_max  \
0      Germany       None       None   
1         EMEA       None       None   
2   EMEA,  USA     105000     187000   
3      Germany       None       None   
4  Philippines       None       None   

                                         description  
0  <p>Mathematical Formalization Specialist (Lean...  
1  <p><strong>Who we are is what we do.</strong><...  
2  <p>Consensys is the leading blockchain and web...  
3  <p>As a key member of our R&amp;D department, ...  
4  <p><em><strong>This opportunity is only for ca...  


In [101]:
# ==============================================
# Cell 9 : Standardized cleaning function for company names
# ==============================================

def clean_text_column(name):
    if not isinstance(name, str):
        return name

    # Lowercase and strip
    x = name.strip().lower()

    # Remove extra punctuation
    x = re.sub(r"[^a-z0-9\s]", " ", x)

    # Collapse multiple spaces
    x = " ".join(x.split())

    # Optional: remove trailing words like "pte ltd", "ltd", "solutions" if used in lookup rules
    remove_terms = [
        r"\bpte ltd\b",
        r"\bltd\b",
        r"\bprivate limited\b",
        r"\bplc\b",
        r"\bllc\b",
        r"\binc\b",
        r"\bltd\b"      
    ]

    for term in remove_terms:
        x = re.sub(term, "", x) 

    # Fix leftover "amp 038" to &
    x = x.replace("amp 038", "&")

    # Trim after removal
    x = " ".join(x.split())

    # Title-case after cleaning
    x = x.title()

    return x

# Apply standardize cleaning function to company name column
clean_df = df.copy()
clean_df["company"] = df["company"].apply(clean_text_column)
clean_df.head()

Unnamed: 0,remote_job_id,title,company,location_geo,salary_min,salary_max,description
0,BEN_137551,Mathematical Formalization Specialist,Labelbox,Germany,,,<p>Mathematical Formalization Specialist (Lean...
1,BEN_137550,Customer Onboarding Manager | EMEA,Deel,EMEA,,,<p><strong>Who we are is what we do.</strong><...
2,BEN_137548,Marketing Analytics Manager,Consensys,"EMEA, USA",105000.0,187000.0,<p>Consensys is the leading blockchain and web...
3,BEN_137542,Quality Engineer (m/f/d),Nvision,Germany,,,"<p>As a key member of our R&amp;D department, ..."
4,BEN_137531,AI Pilot Vibe Coding Assistant,Mindrift,Philippines,,,<p><em><strong>This opportunity is only for ca...


In [102]:
# ==============================================
# Cell 10 : Cleaning function for text columns for HTML tags 
# and more to deal with text in title and description column
# ==============================================

def clean_text_2(text):
    if not isinstance(text, str):
        return text

    # ---------------------------------------
    # 1. Decode HTML entities repeatedly
    # ---------------------------------------
    prev = None
    while prev != text:
        prev = text
        text = html.unescape(text)

    # ---------------------------------------
    # 2. Convert structural HTML to text
    # ---------------------------------------
    text = re.sub(r"(?i)<br\s*/?>", "\n", text)
    text = re.sub(r"(?i)</p>", "\n", text)
    text = re.sub(r"(?i)<p[^>]*>", "", text)
    text = re.sub(r"(?i)<li[^>]*>", "- ", text)
    text = re.sub(r"(?i)</li>", "\n", text)

    # ---------------------------------------
    # 3. Remove all remaining HTML tags
    # ---------------------------------------
    text = re.sub(r"<[^>]+>", "", text)

    # ---------------------------------------
    # 4. Fix encoding artifacts & HTML leftovers
    # ---------------------------------------
    replacements = {
        "â€™": "'",
        "â€œ": '"',
        "â€": '"',
        "â€“": "-",
        "â€”": "-",
        "â€˜": "'",
        "â€¢": "-",
        "â€¦": "...",
        "Â": "",
        "\xa0": " ",
    }
    for bad, good in replacements.items():
        text = text.replace(bad, good)

    # Handle leftover dash entities
    text = re.sub(r"&#8211;?|&#x2013;?|&ndash;?", "-", text, flags=re.IGNORECASE)

    # ---------------------------------------
    # 5. Normalize bullet characters
    # ---------------------------------------
    text = re.sub(r"[•●◦·]", "-", text)

    # ---------------------------------------
    # 6. Normalize dashes
    # ---------------------------------------
    text = re.sub(r"[–—]", "-", text)

    # ---------------------------------------
    # 7. Remove remaining non-ASCII
    # ---------------------------------------
    text = re.sub(r"[^\x00-\x7F]+", " ", text)

    # ---------------------------------------
    # 8. Collapse whitespace
    # ---------------------------------------
    text = re.sub(r"[ \t]+", " ", text)
    text = re.sub(r"\n\s*\n+", "\n", text)  # collapse multiple blank lines

    text = text.strip()

    return text

# Apply cleaning to produce title and description columns
clean_df_2 = clean_df.copy()
clean_df_2["description"] = clean_df["description"].apply(clean_text_2)
clean_df_2["title"] = clean_df["title"].apply(clean_text_2)
clean_df_2.head()

Unnamed: 0,remote_job_id,title,company,location_geo,salary_min,salary_max,description
0,BEN_137551,Mathematical Formalization Specialist,Labelbox,Germany,,,Mathematical Formalization Specialist (Lean / ...
1,BEN_137550,Customer Onboarding Manager | EMEA,Deel,EMEA,,,Who we are is what we do.\nDeel is the all-in-...
2,BEN_137548,Marketing Analytics Manager,Consensys,"EMEA, USA",105000.0,187000.0,Consensys is the leading blockchain and web3 s...
3,BEN_137542,Quality Engineer (m/f/d),Nvision,Germany,,,"As a key member of our R&D department, you wil..."
4,BEN_137531,AI Pilot Vibe Coding Assistant,Mindrift,Philippines,,,This opportunity is only for candidates curren...


In [107]:
# ==============================================
# Cell 11 : Reference country table to match and input country code
# ==============================================

# Load dim_countries table from Postgres, standardized list of countries
query = "SELECT country_name, country_code FROM dim_countries"
dim_countries = pd.read_sql(query, OL_engine)

# clean up the locations indicating multiple countries
clean_df_2["location_geo2"] = clean_df_2["location_geo"].astype(str).str.split(",", n=1).str[0].str.strip()

# replace some shortformed, different format country names
replacements = {
    "USA": "United States of America",
    "Vietnam": "Viet Nam",
    "UK": "United Kingdom"
}
clean_df_2["location_geo2"] = clean_df_2["location_geo2"].replace(replacements)

# Option A: Using merge (safe for large datasets)
#clean_df_2 = clean_df_2.merge(dim_countries, how='left', left_on='location_geo2', right_on='country_name')
clean_df_2 = clean_df_2.merge(
    dim_countries.rename(columns={"country_name": "country_name_ref", "country_code": "country_code_ref"}),
    how="left",
    left_on="location_geo2",
    right_on="country_name_ref"
)
# Option B: Using map (faster for smaller datasets)
# country_map = dict(zip(dim_countries['country_name'], dim_countries['country_code']))
# df['country_code'] = df['location_geo'].map(country_map)

# -----------------------------
# Save rows where country_code_ref is NaN for further processing and remove them 
# -----------------------------
failed_rows = clean_df_2[clean_df_2["country_code_ref"].isna()]

if not failed_rows.empty:
    failed_rows.to_csv("country_update_fail.csv", index=False)
    print(f"{len(failed_rows)} rows failed to map a country code and were saved to 'country_update_fail.csv'.")
else:
    print("No rows failed to map a country code.")

# Remove those rows from the main DataFrame
clean_df_2 = clean_df_2[clean_df_2["country_code_ref"].notna()]

# Retain only required columns
clean_df_2 = clean_df_2[[
    "remote_job_id", "title", "company", 
    "salary_min", "salary_max", "description", 
    "location_geo2", "country_code_ref"
]]

clean_df_2.head()

33 rows failed to map a country code and were saved to 'country_update_fail.csv'.


Unnamed: 0,remote_job_id,title,company,salary_min,salary_max,description,location_geo2,country_code_ref
0,BEN_137551,Mathematical Formalization Specialist,Labelbox,,,Mathematical Formalization Specialist (Lean / ...,Germany,DE
3,BEN_137542,Quality Engineer (m/f/d),Nvision,,,"As a key member of our R&D department, you wil...",Germany,DE
4,BEN_137531,AI Pilot Vibe Coding Assistant,Mindrift,,,This opportunity is only for candidates curren...,Philippines,PH
5,BEN_137529,Enterprise Integrations Lead,Employment Hero,,,Who we areEmployment Hero is on a mission to m...,Australia,AU
6,BEN_137527,"Senior Backend Engineer, User Platform - Java ...",Canva,,,Job DescriptionJoin the team redefining how th...,Australia,AU


In [108]:
# ==============================================
# Cell 12 : Find distinct company name to prep for final table and update
# Rows with existing company name in dim_companies to be skipped
# ==============================================

# 1. Get existing companies from dim_companies
existing_companies_query = 'SELECT "company_id", "company_name" FROM dim_companies'
dim_companies_df = pd.read_sql(existing_companies_query, OL_engine)
dim_companies_df["company_name"] = dim_companies_df["company_name"].str.strip()
existing_set = set(dim_companies_df["company_name"])

# --------------------------------------------------
# 2. Prepare new companies from clean_df_2
df_to_insert = clean_df_2[["company", "country_code_ref"]].rename(
    columns={"company": "company_name", "country_code_ref": "country_code"}
)

# Remove duplicates within the new data
df_to_insert = df_to_insert.drop_duplicates(subset=["company_name"])

# Keep only companies not already in dim_companies
df_to_insert = df_to_insert[~df_to_insert["company_name"].str.strip().isin(existing_set)]

# --------------------------------------------------
# 3. Insert new companies into dim_companies
if not df_to_insert.empty:
    df_to_insert.to_sql(
        "dim_companies",
        OL_engine,
        if_exists="append",
        index=False
    )
    print(f"{len(df_to_insert)} new companies inserted.")
else:
    print("No new companies to insert.")

# --------------------------------------------------
# 4. Reload dim_companies to get all company_ids
dim_companies_df = pd.read_sql("SELECT company_id, company_name FROM dim_companies", OL_engine)
dim_companies_df["company_name"] = dim_companies_df["company_name"].str.strip()

# --------------------------------------------------
# 5. Map company_id back into clean_df_2
clean_df_2 = clean_df_2.merge(
    dim_companies_df,
    how="left",
    left_on="company",
    right_on="company_name"
)

# Drop extra company_name column from merge
clean_df_2 = clean_df_2.drop(columns=["company_name"])

print("Company IDs mapped back into clean_df_2")

# Drop extra columns
clean_df_2 = clean_df_2.drop(["company", "location_geo2", "country_code_ref"], axis=1)
clean_df_2.head()

52 new companies inserted.
Company IDs mapped back into clean_df_2


Unnamed: 0,remote_job_id,title,salary_min,salary_max,description,company_id
0,BEN_137551,Mathematical Formalization Specialist,,,Mathematical Formalization Specialist (Lean / ...,4217
1,BEN_137542,Quality Engineer (m/f/d),,,"As a key member of our R&D department, you wil...",4218
2,BEN_137531,AI Pilot Vibe Coding Assistant,,,This opportunity is only for candidates curren...,1441
3,BEN_137529,Enterprise Integrations Lead,,,Who we areEmployment Hero is on a mission to m...,1437
4,BEN_137527,"Senior Backend Engineer, User Platform - Java ...",,,Job DescriptionJoin the team redefining how th...,1438


In [41]:
# ==============================================
# THIS CELL NO NEED TO BE RUN.  
# Cell 14 : This is code for debugging purpose. 
# - DB connection test
# ==============================================

# Test query
test_query = "SELECT 1 AS test_col"
test_df = pd.read_sql(test_query, OL_engine)
test_df

# Identify the DB connection 
print(OL_engine)

# Test fetching some data from the DB
existing_companies_query = "SELECT * FROM dim_companies LIMIT 5"
dim_companies_df = pd.read_sql(existing_companies_query, OL_engine)
print(dim_companies_df)

# Query the DB for the table schema
existing_companies_col_query = "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'dim_companies';"
dim_companies_col_df = pd.read_sql(existing_companies_col_query, OL_engine)
print(dim_companies_col_df)

Engine(postgresql+psycopg2://avnadmin:***@jde08-ip-p2-angbj1976-c47c.c.aivencloud.com:15241/Interim_Project_DB)
   company_id             company_name country_code
0           1              Wells Fargo           US
1           2               Nationwide           US
2           3  Boston Consulting Group           US
3           4        American Airlines           US
4           5     Nikola Motor Company           US
    column_name          data_type
0    company_id            integer
1  company_name  character varying
2  country_code  character varying


In [109]:
# ==============================================
# Cell 13 : check remote_job_id if exist in fact_jobs, if so update not append
# insert all other rows. 
# ==============================================

# Map clean_df_2 columns to fact_jobs columns
df_upsert = clean_df_2.rename(columns={
    "remote_job_id": "joblistingid",
    "title": "job_title",
    "company_id": "company_id",
    "salary_min": "salary_min",
    "salary_max": "salary_max",
    "description": "description"
})

# Define SQL template
sql_template = """
INSERT INTO fact_jobs (joblistingid, company_id, job_title, salary_min, salary_max, description)
VALUES (:joblistingid, :company_id, :job_title, :salary_min, :salary_max, :description)
ON CONFLICT (joblistingid) 
DO UPDATE SET
    company_id = EXCLUDED.company_id,
    job_title = EXCLUDED.job_title,
    salary_min = EXCLUDED.salary_min,
    salary_max = EXCLUDED.salary_max,
    description = EXCLUDED.description;
"""

# Execute upsert row by row (corrected for SQLAlchemy 2.x)
with OL_engine.begin() as conn:  # begin() handles commit automatically
    for _, row in df_upsert.iterrows():
        conn.execute(text(sql_template), row.to_dict())

print(f"{len(df_upsert)} rows processed with upsert into fact_jobs.")



259 rows processed with upsert into fact_jobs.
