In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit, current_date
import requests

# 1️⃣ Create Spark session
spark = SparkSession.builder.appName("reqres_api_to_delta").getOrCreate()

# 2️⃣ Define API base URL
base_url = "https://reqres.in/api/users"

# 3️⃣ Fetch all paginated data
all_data = []
page = 1

while True:
    response = requests.get(f"{base_url}?page={page}")
    json_data = response.json()

    if not json_data.get("data"):  # stop when no more data
        break

    all_data.extend(json_data["data"])
    page += 1

# 4️⃣ Define schema for DataFrame
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("email", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("avatar", StringType(), True)
])

# 5️⃣ Create DataFrame
df = spark.createDataFrame(all_data, schema=schema)

# 6️⃣ Add derived columns
df_final = df.withColumn("site_address", lit("reqres.in")) \
             .withColumn("load_date", current_date())

# 7️⃣ Show DataFrame
display(df_final)

# 8️⃣ Safe Delta write
delta_path = "/Volumes/assignment/question2/write"

# Remove old folder if exists (to avoid transaction log issues)
try:
    dbutils.fs.rm(delta_path, True)  # recursive delete
except:
    pass  # ignore if folder does not exist

# Write DataFrame as Delta
df_final.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(delta_path)


In [0]:
spark.read.format("csv").load("/Volumes/assignment/question2/write/file.csv").display()