In [2]:
from pyspark.sql import SparkSession

# Step 1: Create SparkSession (with URI fix to /api/v2)
spark = SparkSession.builder \
    .appName("Airline JSON to Iceberg Silver") \
    .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog") \
    .config("spark.sql.catalog.nessie.uri", "http://nessie:19120/api/v1") \
    .config("spark.sql.catalog.nessie.ref", "main") \
    .config("spark.sql.catalog.nessie.warehouse", "s3a://warehouse/") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()
# Step 2: Load CSV from MinIO
csv_df = spark.read \
    .option("header", True) \
    .option("inferSchema", True) \
    .csv("s3a://cities-countries/countries/csv/*.csv")

# Optional: inspect schema
csv_df.printSchema()
csv_df.show(5)

# Step 3: Create namespace if needed
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.silver_layer")

# Step 4: Write to Iceberg table
csv_df.writeTo("nessie.silver_layer.countries_table").createOrReplace()

# Step 5: Verify
spark.read.table("nessie.silver_layer.countries_table").show(5)


                                                                                

root
 |-- id: integer (nullable = true)
 |-- capital: string (nullable = true)
 |-- currency_code: string (nullable = true)
 |-- fips_code: string (nullable = true)
 |-- country_iso2: string (nullable = true)
 |-- country_iso3: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- country_id: integer (nullable = true)
 |-- country_name: string (nullable = true)
 |-- currency_name: string (nullable = true)
 |-- country_iso_numeric: integer (nullable = true)
 |-- phone_prefix: string (nullable = true)
 |-- population: integer (nullable = true)

+-----+--------------+-------------+---------+------------+------------+---------+----------+------------+-------------+-------------------+------------+----------+
|   id|       capital|currency_code|fips_code|country_iso2|country_iso3|continent|country_id|country_name|currency_name|country_iso_numeric|phone_prefix|population|
+-----+--------------+-------------+---------+------------+------------+---------+----------+-----------

[Stage 4:>                                                          (0 + 1) / 1]

+-----+--------------+-------------+---------+------------+------------+---------+----------+------------+-------------+-------------------+------------+----------+
|   id|       capital|currency_code|fips_code|country_iso2|country_iso3|continent|country_id|country_name|currency_name|country_iso_numeric|phone_prefix|population|
+-----+--------------+-------------+---------+------------+------------+---------+----------+------------+-------------+-------------------+------------+----------+
|96617|Port-au-Prince|          HTG|       HA|          HT|         HTI|       NA|       101|       Haiti|       Gourde|                332|         509|   9648924|
|96618|      Budapest|          HUF|       HU|          HU|         HUN|       EU|       102|     Hungary|       Forint|                348|          36|   9982000|
|96619|       Jakarta|          IDR|       ID|          ID|         IDN|       AS|       103|   Indonesia|       Rupiah|                360|          62| 242968342|
|96620|   

                                                                                