###### Set up notebook 
Project Notebook. 

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS jobads_bronze;
CREATE SCHEMA IF NOT EXISTS jobads_silver;
CREATE SCHEMA IF NOT EXISTS jobads_gold;


##### Job ads lakehouse pipeline
##### Step: Bronze ingestion (API -> Delta)
##### Author: Phat Cao

######Getting api key 

In [0]:
import requests
from pyspark.sql import Row
from pyspark.sql.functions import current_timestamp, current_date, lit


In [0]:
APP_ID = dbutils.secrets.get(scope = "adzuna_scope",key = "app_id");
APP_KEY = dbutils.secrets.get(scope = "adzuna_scope",key = "app_key");


In [0]:
URL = "https://api.adzuna.com/v1/api/jobs/au/search/1"

params = {
    "app_id": APP_ID,
    "app_key": APP_KEY,
    "what": "data engineer",
    "where": "Melbourne VIC",
    "results_per_page": 50,
    "content-type": "application/json"
}
resp = requests.get(URL, params=params, timeout=30)
print("Status:", resp.status_code)

data = resp.json()
print("Top keys:", list(data.keys()))
print("Jobs returned:", len(data.get("results", [])))

##### Ingest data to bronze layers.

In [0]:
jobs = data.get("results", [])raw_json
rows = [Row(raw_json=json.dumps(job)) for job in jobs]
df_raw = spark.createDataFrame(rows)

# Add metadata columns
df_enriched = (
    df_raw
    .withColumn("ingest_timestamp", current_timestamp())
    .withColumn("ingest_date", current_date())
    .withColumn("ingest_source", lit("adzuna"))
    .withColumn("search_country", lit("au"))
    .withColumn("search_term", lit("data engineer"))
    .withColumn("search_location", lit("Melbourne VIC"))
)

# Try to write with overwrite mode if table does not exist
try:
    df_enriched.write.format("delta").mode("append").saveAsTable("jobads_bronze.job_ads_raw")
except Exception:
    df_enriched.write.format("delta").mode("overwrite").saveAsTable("jobads_bronze.job_ads_raw")

In [0]:
%sql
SELECT ingest_date, COUNT(*) 
FROM jobads_bronze.job_ads_raw
GROUP BY ingest_date
ORDER BY ingest_date DESC;