In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bronze;

In [0]:
# Import necessary modules
import requests
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime

# API Configuration
API_URL = "https://findwork.dev/api/jobs/"
API_TOKEN = "18c297ab1b0529b4ca1629a2051d8e8d3716f526"

In [0]:
# Define necessary functions, first for fetching jobs from Findwork API, then for loading them into the bronze schema, finally for logging load events

def fetch_all_jobs():
    """
    Fetch all jobs from the API, handling pagination
    """
    all_jobs = []
    url = API_URL
    
    headers = {
        'Authorization': f'Token {API_TOKEN}'
    }
    
    while url:
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            data = response.json()
            all_jobs.extend(data['results'])
            # Get next page URL
            url = data['next']  
            print(f"Fetched {len(all_jobs)} jobs so far...")
        else:
            print(f"Error: {response.status_code}")
            print(response.text)
            break
    
    return all_jobs


def load_to_database(jobs_data, table_name):
    """
    Load jobs data into end_to_end analytics SQL table
    """
    # Convert to pandas DataFrame
    df_pandas = pd.DataFrame(jobs_data)
    
    # Convert pandas df to Spark DataFrame
    df_spark = spark.createDataFrame(df_pandas)

    # Specify database to load the data into
    full_table_name = f'bronze.{table_name}'
    
    # Write to table (creates if doesn't exist, and appends data if it does)
    df_spark.write.mode("overwrite").saveAsTable(full_table_name)
    
    print(f"Successfully loaded {len(jobs_data)} records to table '{full_table_name}'")
    
    return df_spark


def log_load_event(table_name, record_count):
    log_df = spark.createDataFrame([{
        "table_name": table_name,
        "records_loaded": record_count,
        "load_timestamp": datetime.utcnow().isoformat()
    }])
    log_df.write.mode("append").saveAsTable("bronze.load_log")


In [0]:
# Main execution
if __name__ == "__main__":
    # Fetch data from API
    print("Fetching jobs from API...")
    jobs = fetch_all_jobs()
    
    # Load to Databricks
    print("Loading data to Database...")
    df = load_to_database(jobs, "jobs_table")

    # Log load event
    print("Logging load event...")
    log_load_event("jobs_table", len(jobs))

    print("Jobs Sucessfully Loaded")