# Spark Interoperability with Snowflake Iceberg V3 Tables

This notebook demonstrates querying Snowflake-managed Iceberg V3 tables from Apache Spark,
with access controls enforced via Snowflake Horizon catalog.

## Prerequisites
- Apache Spark 4.0+ (required for VARIANT support)
- Python 3.10+
- Completed the Snowflake setup from the main guide

## Cloud Storage Bundles

Iceberg requires cloud-specific SDK bundles to access storage. The Spark config below uses **AWS** by default.

| Cloud Provider | Bundle to Add |
|----------------|---------------|
| **AWS S3** | `org.apache.iceberg:iceberg-aws-bundle:1.10.1` |
| **Google Cloud** | `org.apache.iceberg:iceberg-gcp-bundle:1.10.1` |
| **Azure** | `org.apache.iceberg:iceberg-azure-bundle:1.10.1` |

Update `spark.jars.packages` in Cell 3 if using GCP or Azure storage.

In [None]:
import os
from dotenv import load_dotenv

# Load configuration from the same config.env used during setup
load_dotenv('config.env')

SNOWFLAKE_ACCOUNT = os.getenv('SNOWFLAKE_ACCOUNT')
SNOWFLAKE_USER = os.getenv('SNOWFLAKE_USER')
SNOWFLAKE_DATABASE = os.getenv('SNOWFLAKE_DATABASE', 'FLEET_ANALYTICS_DB')

# For Iceberg REST API, dashes must be replaced with underscores in account identifier
# IMPORTANT: Set your Snowflake account identifier for the REST API endpoint
# Use one of these formats (see https://docs.snowflake.com/en/user-guide/admin-account-identifier):
#   - orgname-accountname (e.g., "myorg-myaccount")
#   - account_locator.region.cloud (e.g., "xy12345.us-west-2.aws")
# To find yours, run in Snowflake: SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
SNOWFLAKE_ACCOUNT_URL = os.getenv('SNOWFLAKE_ACCOUNT_URL', SNOWFLAKE_ACCOUNT)
SNOWFLAKE_ACCOUNTADMIN_TOKEN = os.getenv('SNOWFLAKE_ACCOUNTADMIN_TOKEN')
SNOWFLAKE_FLEET_ANALYST_TOKEN = os.getenv('SNOWFLAKE_FLEET_ANALYST_TOKEN')
SCALA_VERSION = '2.13'
ICEBERG_VERSION = '1.10.1'

print(f"Snowflake Account (REST endpoint): {SNOWFLAKE_ACCOUNT_URL}")
print(f"Database: {SNOWFLAKE_DATABASE}")

## Create Spark Session with Horizon Catalog

In [None]:
from pyspark.sql import SparkSession

# Configuration
SNOWFLAKE_PASSWORD = os.getenv('SNOWFLAKE_PASSWORD')
SNOWFLAKE_ROLE = 'ACCOUNTADMIN'
SNOWFLAKE_SCHEMA = 'RAW'
SNOWFLAKE_WAREHOUSE = os.getenv('SNOWFLAKE_WAREHOUSE', 'FLEET_ANALYTICS_WH')
SF_URL = f"{SNOWFLAKE_ACCOUNT_URL}.snowflakecomputing.com"

# Versions - Note: Spark 3.5 required for Snowflake Connector masking support
SNOWFLAKE_JDBC_VERSION = "3.24.0"
SNOWFLAKE_SPARK_CONNECTOR_VERSION = "3.1.6"

# Create Spark session with Iceberg and Snowflake catalog configuration
# Note: 
# - driver.host and bindAddress ensure Spark uses localhost (avoids VPN issues)
# - Using Spark 4.0 + Iceberg 1.10.1 for native VARIANT support
spark = SparkSession.builder \
    .appName("Fleet Analytics - Iceberg V3 Interop") \
    .master("local[*]") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.jars.packages", 
            f"org.apache.iceberg:iceberg-spark-runtime-4.0_{SCALA_VERSION}:{ICEBERG_VERSION},"
            f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},"
            f"net.snowflake:snowflake-jdbc:{SNOWFLAKE_JDBC_VERSION},"
            f"net.snowflake:spark-snowflake_{SCALA_VERSION}:{SNOWFLAKE_SPARK_CONNECTOR_VERSION}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.horizon", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.horizon.type", "rest") \
    .config("spark.sql.catalog.horizon.uri", f"https://{SNOWFLAKE_ACCOUNT_URL}.snowflakecomputing.com/polaris/api/catalog") \
    .config("spark.sql.catalog.horizon.credential", SNOWFLAKE_ACCOUNTADMIN_TOKEN) \
    .config("spark.sql.catalog.horizon.warehouse", SNOWFLAKE_DATABASE) \
    .config("spark.sql.catalog.horizon.scope", f"session:role:{SNOWFLAKE_ROLE}") \
    .config("spark.sql.catalog.horizon.header.X-Iceberg-Access-Delegation","vended-credentials") \
    .config("spark.snowflake.sfURL", SF_URL) \
    .config("spark.snowflake.sfUser", os.getenv('SNOWFLAKE_USER')) \
    .config("spark.snowflake.sfPassword", SNOWFLAKE_FLEET_ANALYST_TOKEN) \
    .config("spark.snowflake.sfDatabase", SNOWFLAKE_DATABASE) \
    .config("spark.snowflake.sfSchema", SNOWFLAKE_SCHEMA) \
    .config("spark.snowflake.sfRole", SNOWFLAKE_ROLE) \
    .config("spark.snowflake.sfWarehouse", SNOWFLAKE_WAREHOUSE) \
    .config("spark.sql.iceberg.vectorization.enabled", "false") \
    .getOrCreate()

print("Spark session created successfully!")
print(f"Spark version: {spark.version}")

## List Available Tables (including Dynamic Tables)

In [None]:
# Show all tables visible to Spark
spark.sql("SHOW TABLES IN horizon.RAW").show(truncate=False)
spark.sql("SHOW TABLES IN horizon.CURATED").show(truncate=False)
spark.sql("SHOW TABLES IN horizon.ANALYTICS").show(truncate=False)

In [None]:
# See variant column in Iceberg table
spark.sql("DESCRIBE TABLE horizon.RAW.VEHICLE_TELEMETRY_STREAM").show(truncate=False)

## Query Data and Extract VARIANT Fields

In [None]:
# Use Spark's variant_get() function to extract nested fields from VARIANT
# This query succeeds, and no Snowflake compute is used, since there's no masking policy on the Iceberg table.
df = spark.sql(f"""
    SELECT 
        VEHICLE_ID,
        EVENT_TIMESTAMP,
        variant_get(TELEMETRY_DATA, '$.speed_mph', 'float') AS speed_mph,
        variant_get(TELEMETRY_DATA, '$.engine.temperature_f', 'int') AS engine_temp,
        variant_get(TELEMETRY_DATA, '$.location.lat', 'float') AS latitude,
        variant_get(TELEMETRY_DATA, '$.location.lon', 'float') AS longitude
    FROM horizon.RAW.VEHICLE_TELEMETRY_STREAM
    WHERE variant_get(TELEMETRY_DATA, '$.speed_mph', 'float') > 60
    LIMIT 10
""")
df.show()

## Demonstrate Access Control (Masking Enforcement)

In [None]:
# To enforce masking policies from Spark, we need the Snowflake Connector for Spark
# which routes queries through Snowflake for policy evaluation
# See: https://docs.snowflake.com/en/user-guide/tables-iceberg-query-using-external-query-engine-snowflake-horizon-enforce-access-policies

# Stop the previous Spark session first
spark.stop()
from pyspark.sql import SparkSession

# Configuration
SNOWFLAKE_PASSWORD = os.getenv('SNOWFLAKE_PASSWORD')
SNOWFLAKE_ROLE = 'FLEET_ANALYST'
SNOWFLAKE_SCHEMA = 'RAW'
SNOWFLAKE_WAREHOUSE = os.getenv('SNOWFLAKE_WAREHOUSE', 'FLEET_ANALYTICS_WH')
SF_URL = f"{SNOWFLAKE_ACCOUNT_URL}.snowflakecomputing.com"

# Versions - Note: Spark 3.5 required for Snowflake Connector masking support
SNOWFLAKE_JDBC_VERSION = "3.24.0"
SNOWFLAKE_SPARK_CONNECTOR_VERSION = "3.1.6"

spark_analyst = SparkSession.builder \
    .appName("Fleet Analytics - Analyst View") \
    .master("local[*]") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.jars.packages", 
            f"org.apache.iceberg:iceberg-spark-runtime-4.0_{SCALA_VERSION}:{ICEBERG_VERSION},"
            f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},"
            f"net.snowflake:snowflake-jdbc:{SNOWFLAKE_JDBC_VERSION},"
            f"net.snowflake:spark-snowflake_{SCALA_VERSION}:{SNOWFLAKE_SPARK_CONNECTOR_VERSION}") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.defaultCatalog", "horizon") \
    .config("spark.sql.catalog.horizon", "org.apache.spark.sql.snowflake.catalog.SnowflakeFallbackCatalog") \
    # .config("spark.sql.catalog.horizon.catalog-impl", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.horizon.type", "rest") \
    .config("spark.sql.catalog.horizon.uri", f"https://{SNOWFLAKE_ACCOUNT_URL}.snowflakecomputing.com/polaris/api/catalog") \
    .config("spark.sql.catalog.horizon.warehouse", SNOWFLAKE_DATABASE) \
    .config("spark.sql.catalog.horizon.scope", f"session:role:{SNOWFLAKE_ROLE}") \
    .config("spark.sql.catalog.horizon.credential", SNOWFLAKE_FLEET_ANALYST_TOKEN) \
    .config("spark.sql.catalog.horizon.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.horizon.header.X-Iceberg-Access-Delegation", "vended-credentials") \
    .config("spark.snowflake.sfURL", SF_URL) \
    .config("spark.snowflake.sfUser", os.getenv('SNOWFLAKE_USER')) \
    .config("spark.snowflake.sfPassword", SNOWFLAKE_FLEET_ANALYST_TOKEN) \
    .config("spark.snowflake.sfDatabase", SNOWFLAKE_DATABASE) \
    .config("spark.snowflake.sfSchema", SNOWFLAKE_SCHEMA) \
    .config("spark.snowflake.sfRole", SNOWFLAKE_ROLE) \
    .config("spark.snowflake.sfWarehouse", SNOWFLAKE_WAREHOUSE) \
    .config("spark.sql.iceberg.vectorization.enabled", "false") \
    .getOrCreate()

spark_analyst.sparkContext.setLogLevel("ERROR")

In [None]:
# Still see Iceberg tables via Iceberg REST API
spark_analyst.sql("SHOW TABLES in horizon.RAW").show(truncate=False)

In [None]:
# When reading an Iceberg table with a maskign policy, Snowflake returns masked results
spark_analyst.sql("""
    SELECT
        VEHICLE_ID,
        MAKE,
        MODEL,
        YEAR,
        LICENSE_PLATE,
        DRIVER_NAME,
        DRIVER_EMAIL,
        DRIVER_PHONE,
        FLEET_REGION
    FROM horizon.RAW.VEHICLE_REGISTRY
""").show(truncate=True)

## Summary

This notebook demonstrated:
1. **Connecting Spark to Snowflake Iceberg Catalog** using REST API
2. **Listing and describing tables** including Dynamic Iceberg Tables
3. **Querying VARIANT data** with JSON path extraction
4. **Access control enforcement** with masking policies applied in Spark