# Gravitino Federation Test
Single Spark session connecting to Gravitino to query Iceberg and Hudi tables

## 1. Environment Setup

In [None]:
import os
import subprocess

# Set Java 11+ (required by Iceberg/Gravitino)
java_home = subprocess.check_output(
    ['/usr/libexec/java_home', '-v', '19'], 
    text=True
).strip()
os.environ['JAVA_HOME'] = java_home

# Set Spark 3.5
os.environ['SPARK_HOME'] = os.path.expanduser('~/Documents/spark-3.5.3-bin-hadoop3')
os.environ['PYSPARK_PYTHON'] = 'python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'

print(f"JAVA_HOME: {os.environ['JAVA_HOME']}")
print(f"SPARK_HOME: {os.environ['SPARK_HOME']}")

## 2. Create Spark Session with Gravitino

In [None]:
import os
import subprocess

# MUST set environment BEFORE importing pyspark
java_home = subprocess.check_output(['/usr/libexec/java_home', '-v', '19'], text=True).strip()
os.environ['JAVA_HOME'] = java_home
os.environ['SPARK_HOME'] = os.path.expanduser('~/Documents/spark-3.5.3-bin-hadoop3')
os.environ['PYSPARK_PYTHON'] = 'python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'python3'

# Add Spark's pyspark to path BEFORE the pip-installed one
import sys
spark_python = os.path.join(os.environ['SPARK_HOME'], 'python')
spark_py4j = os.path.join(spark_python, 'lib', 'py4j-0.10.9.7-src.zip')
# Insert at beginning to override pip pyspark
sys.path.insert(0, spark_python)
sys.path.insert(0, spark_py4j)

# Now import and create session
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName("Gravitino Federation Test") \
  .config("spark.plugins", "org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin") \
  .config("spark.sql.gravitino.uri", "http://localhost:8090") \
  .config("spark.sql.gravitino.metalake", "test_metalake") \
  .config("spark.sql.gravitino.enableIcebergSupport", "true") \
  .config("spark.jars.packages",
          "org.apache.gravitino:gravitino-spark-connector-runtime-3.5_2.12:1.1.0,"
          "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,"
          "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0") \
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
  .config("spark.sql.catalogImplementation", "hive") \
  .config("spark.sql.extensions",
          "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
          "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
  .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print(f"Spark version: {spark.version}")
print(f"Spark home: {os.environ['SPARK_HOME']}")

## 3. List Available Catalogs

In [None]:
# Note: It may take a few moments for hive_catalog and iceberg_catalog to appear, but these are available to query as soon as the Spark session is created.
spark.sql("SHOW CATALOGS").show()

## 4. Query Iceberg Table (from Tabular REST Catalog)

In [None]:
# List schemas in iceberg_catalog
spark.sql("SHOW SCHEMAS IN iceberg_catalog").show()

In [None]:
# List tables in test_db
spark.sql("SHOW TABLES IN iceberg_catalog.test_db").show()

In [None]:
# Query Iceberg table
iceberg_df = spark.sql("""
    SELECT * FROM iceberg_catalog.test_db.sales_iceberg
    ORDER BY transaction_id
""")

print("Iceberg table (sales_iceberg):")
iceberg_df.show()
print(f"Row count: {iceberg_df.count()}")

## 5. Query Hudi Table (from Hive Metastore)

In [None]:
# List schemas in hive_catalog (Hudi tables are accessed via Gravitino's hive provider)
spark.sql("SHOW SCHEMAS IN hive_catalog").show()

In [None]:
# List tables in test_db
spark.sql("SHOW TABLES IN hive_catalog.test_db").show()

In [None]:
# Query Hudi table via Gravitino's hive_catalog
hudi_df = spark.sql("""
    SELECT transaction_id, customer_tier, discount
    FROM hive_catalog.test_db.customer_info_hudi
    ORDER BY transaction_id
""")

print("Hudi table (customer_info_hudi):")
hudi_df.show()
print(f"Row count: {hudi_df.count()}")

## 6. Cross-Format Join (Iceberg + Hudi)

In [None]:
# Join Iceberg and Hudi tables through Gravitino
joined_df = spark.sql("""
    SELECT
        i.transaction_id,
        i.customer_name,
        i.amount,
        h.customer_tier,
        h.discount,
        (i.amount - h.discount) as final_amount
    FROM iceberg_catalog.test_db.sales_iceberg i
    INNER JOIN hive_catalog.test_db.customer_info_hudi h
    ON i.transaction_id = h.transaction_id
    ORDER BY i.transaction_id
""")

print("Cross-format join result:")
joined_df.show()
print(f"Joined row count: {joined_df.count()}")

## 7. Aggregation on Federated Data

In [None]:
# Aggregation by customer tier
agg_df = spark.sql("""
    SELECT
        h.customer_tier,
        COUNT(*) as transaction_count,
        SUM(i.amount) as total_amount,
        AVG(h.discount) as avg_discount
    FROM iceberg_catalog.test_db.sales_iceberg i
    INNER JOIN hive_catalog.test_db.customer_info_hudi h
    ON i.transaction_id = h.transaction_id
    GROUP BY h.customer_tier
    ORDER BY total_amount DESC
""")

print("Aggregation by customer tier:")
agg_df.show()

## 8. Summary

In [None]:
print("=== Test Complete ===")
print("- Single Gravitino connection configured")
print("- Queried Iceberg table from Tabular REST catalog")
print("- Queried Hudi table from Hive Metastore")
print("- Performed cross-format join successfully")
print("- Executed aggregations on federated data")

## Cleanup (Optional)

In [None]:
# Stop Spark session when done
# spark.stop()