In [0]:
import requests
import csv
from pyspark.sql import SparkSession

def extract_transform_load(api_url, output_file, table_name):
    """
    Mini ETL Simulation:
    1. Extract data from API
    2. Transform (flatten selected fields)
    3. Load into CSV (local)
    4. Load into Spark table
    """
    try:
        # Extract
        response = requests.get(api_url, timeout=10)
        response.raise_for_status()
        users = response.json()

        # Transform
        parsed_users = []
        for user in users:
            parsed_users.append({
                "name": user.get("name"),
                "username": user.get("username"),
                "email": user.get("email"),
                "city": user.get("address", {}).get("city"),
                "company": user.get("company", {}).get("name")
            })

        # ✅ Load to CSV (stored in driver local path)
        with open(output_file, mode="w", newline="", encoding="utf-8") as file:
            writer = csv.DictWriter(file, fieldnames=["name", "username", "email", "city", "company"])
            writer.writeheader()
            writer.writerows(parsed_users)

        print(f"✅ Data saved to CSV: {output_file}")

        # ✅ Load into Spark table (append mode)
        spark = SparkSession.builder.getOrCreate()
        df = spark.createDataFrame(parsed_users)
        df.write.mode("append").saveAsTable(table_name)

        print(f"✅ Data appended to table: {table_name}")

    except requests.exceptions.RequestException as e:
        print(f"❌ API Request failed: {e}")
    except Exception as e:
        print(f"❌ Error during ETL process: {e}")


# Run ETL
if __name__ == "__main__":
    extract_transform_load(
        "https://jsonplaceholder.typicode.com/users",
        "api_users.csv",      # Local file (driver path)
        "api_users_table"     # Spark SQL table
    )


In [0]:
%sql
SELECT * FROM default.api_users_table;
