In [None]:
from __future__ import annotations

import json
import os
from pyspark.sql import SparkSession

In [None]:
minio_connection = ""
nessie_connection = ""

In [None]:
#

try:
    minio_conn = json.loads(minio_connection)
except json.JSONDecodeError:
    with open('../local_variables/minio_connection.json', "r") as minio_connection_file:
        minio_conn = json.loads(minio_connection_file.read())

In [None]:
#

try:
    nessie_conn = json.loads(nessie_connection)
except json.JSONDecodeError:
    with open('../local_variables/nessie_connection.json', "r") as nessie_connection_file:
        nessie_conn = json.loads(nessie_connection_file.read())

In [None]:
os.environ["AWS_REGION"]=minio_conn.get("aws_region")
os.environ["AWS_ACCESS_KEY_ID"]=minio_conn.get("aws_access_key_id")
os.environ["AWS_SECRET_ACCESS_KEY"]=minio_conn.get("aws_secret_access_key")
# os.environ["AWS_S3_ENDPOINT"] = minio_conn.get("aws_s3_endpoint")

In [None]:
class LazySparkSession:
    packages = [
        "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.103.6",
        "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1",
        "software.amazon.awssdk:bundle:2.31.68",
        "software.amazon.awssdk:url-connection-client:2.31.68",
    ]

    extensions = [
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        "org.projectnessie.spark.extensions.NessieSparkSessionExtensions"
    ]

    def start(
        self,
        app_name: str = "Default App Name",
        executor_memory: str = "1g",
        driver_memory: str = "1g",
        driver_maxresultsize: str = "1g",
        master_url: str = "local[*]",
    ):

        spark = (
            SparkSession
            .Builder()
            .appName(app_name)
            # Configurações Iceberg e Nessie
            .config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
            .config("spark.sql.catalog.nessie.uri", nessie_conn.get("nessie_uri"))
            .config("spark.sql.catalog.nessie.warehouse", "s3a://nessie/warehouse/")
            .config("spark.sql.catalog.nessie.type", "rest")
            # .config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
            # .config("spark.sql.catalog.nessie.s3.endpoint", minio_conn.get("aws_s3_endpoint"))
            # .config("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
            # .config("spark.sql.catalog.nessie.ref","main")
            # .config("spark.sql.defaultCatalog", "nessie")
            # Configurações Spark
            .config("spark.executor.memory", executor_memory)
            .config("spark.driver.memory", driver_memory)
            .config("spark.driver.maxResultSize", driver_maxresultsize)
            # Lista de jars
            .config("spark.sql.extensions", ",".join(self.extensions))
            .config("spark.jars.packages", ",".join(self.packages))
            .master(master_url)
            .getOrCreate()
        )

        spark.sparkContext.setLogLevel("ERROR")

        return spark

In [None]:
with LazySparkSession().start(app_name="Airflow Spark Iceberg Nessie Pipeline") as spark:
    
    print(f'The PySpark {spark.version} version is running...')

    spark.sql("CREATE DATABASE IF NOT EXISTS nessie.coordenadas_geograficas")

    df = spark.createDataFrame(
        [
            ("maceio", -9.66625, -35.7351),
            ("arapiraca", -9.75164, -36.6604)
        ], 
            schema="cidade string,latitude double,longitude double"
    )

    df.writeTo("coordenadas_geograficas.latitudes").createOrReplace()
    spark.table("coordenadas_geograficas.latitudes").show()