In [None]:
import os
from pyspark.sql import SparkSession

In [None]:
# change this URI for your account
SF_ACCOUNT = "" #Account_locator.region 
SF_URL = f"https://{SF_ACCOUNT}.snowflakecomputing.com"
HORIZON_CATALOG_URI = f"{SF_URL}/polaris/api/catalog"

SF_USER = "SPARK_USER"
CATALOG_NAME = "ICEBERG_TEST_DB"
SCHEMA_NAME = "DEMO"
SF_WAREHOUSE = "MYWH"
ICEBERG_TABLE_NAME = "USERINFORMATION"
#SPARK_USER_PAT
PAT_TOKEN               = ""

HORIZON_SESSION_ROLE        = "ICEBERG_DATA_ENGINEER"
STORAGE_REGION          = "eu-west-1" 

JAVA_HOME = "/opt/homebrew/Cellar/openjdk@17/17.0.18/libexec/openjdk.jdk/Contents/Home/"
os.environ["JAVA_HOME"] = JAVA_HOME

HORIZONCATALOG = "horizon_catalog"
HORIZON_SPARK_CATALOG = "horizon_spark"

In [None]:
# 1. Get the absolute path to your 'jars' folder
jars_dir = os.path.abspath("jars")

# 2. List the specific files you need
# Using a list makes it easy to add more JARs later
target_jars = [
    os.path.join(jars_dir, "iceberg-aws-bundle-1.10.1.jar"),
    os.path.join(jars_dir, "snowflake-jdbc-3.24.0.jar"),
    os.path.join(jars_dir, "spark-snowflake_2.13-3.1.6.jar"),
    os.path.join(jars_dir, "iceberg-spark-runtime-4.0_2.13-1.10.1.jar")
]

# 3. Join them into a comma-separated string
jars_string = ",".join(target_jars)
# 3. Join them into a comma-separated st

In [None]:
try:
    spark.stop()
except:
    pass

builder = (
    SparkSession.builder
        .master("local[*]")
        .config("spark.ui.port", "0")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.driver.host", "127.0.0.1")
        .config("spark.driver.port", "0")
        .config("spark.blockManager.port", "0")
        # ðŸ”‘ Pull the needed JARs automatically
         # JAR Dependencies for Iceberg, Azure and Snowflake Connector for Spark
        .config("spark.jars", jars_string)
        .config("spark.driver.extraClassPath", jars_string)
        # Iceberg SQL Extensions
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.defaultCatalog", HORIZONCATALOG)
        .config(f"spark.sql.catalog.{HORIZONCATALOG}", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog")

        #Horizon REST Catalog Configuration
        # ============================================================
        # Catalog 1: Horizon Iceberg REST Catalog (NO policy enforcement)
        # ============================================================
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.catalog-impl", "org.apache.iceberg.spark.SparkCatalog")
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.type", "rest")
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.uri", HORIZON_CATALOG_URI)
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.warehouse", CATALOG_NAME)
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.scope", f"session:role:{HORIZON_SESSION_ROLE}")
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.client.region", STORAGE_REGION)
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.credential", PAT_TOKEN)
        .config("spark.sql.iceberg.vectorization.enabled", "false")
        
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.io-impl","org.apache.iceberg.aws.s3.S3FileIO")
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.file-io-impl","org.apache.iceberg.aws.s3.S3FileIO")
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.header.X-Iceberg-Access-Delegation","vended-credentials")
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.rest.auth.type", "oauth2")
        .config(f"spark.sql.catalog.{HORIZONCATALOG}.oauth2-server-uri",f"{HORIZON_CATALOG_URI}/v1/oauth/tokens")

        # ============================================================
        # Catalog 2: Snowflake Spark Connector Catalog (WITH policy enforcement)
        # ============================================================
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}","org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog")
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.catalog-impl","org.apache.iceberg.spark.SparkCatalog")
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.type", "rest")
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.uri", HORIZON_CATALOG_URI)
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.warehouse", CATALOG_NAME)
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.credential", PAT_TOKEN)
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.scope", f"session:role:{HORIZON_SESSION_ROLE}")
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.header.X-Iceberg-Access-Delegation","vended-credentials")
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.rest.auth.type", "oauth2")
        .config(f"spark.sql.catalog.{HORIZON_SPARK_CATALOG}.oauth2-server-uri",f"{HORIZON_CATALOG_URI}/v1/oauth/tokens")

        # Snowflake connector settings for policy evaluation
        .config("spark.snowflake.sfURL", SF_URL)
        .config("spark.snowflake.sfUser", SF_USER)
        .config("spark.snowflake.sfPassword", PAT_TOKEN)
        .config("spark.snowflake.sfDatabase", CATALOG_NAME)
        .config("spark.snowflake.sfSchema",SCHEMA_NAME) # Optional
        .config("spark.snowflake.sfRole",HORIZON_SESSION_ROLE)
        .config("spark.snowflake.sfWarehouse",SF_WAREHOUSE)
)
spark = builder.getOrCreate() 
spark.sparkContext.setLogLevel("ERROR")

print(f"Spark {spark.version} connected to {HORIZON_CATALOG_URI}")

In [None]:
spark.version

In [None]:
spark.sql(f"SHOW NAMESPACES").show(truncate=False)

In [None]:
spark.sql(f"SHOW TABLES IN DEMO").show(truncate=True) #DEMO is the SCHEMA

In [None]:
spark.sql(f"SELECT * FROM DEMO.RDI").show(truncate=True)