In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

def get_iceberg_spark_session():
    return SparkSession.builder \
        .appName("Iceberg-Demo") \
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3") \
        .config("spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.iceberg_catalog.type", "hadoop") \
        .config("spark.sql.catalog.iceberg_catalog.warehouse", "/warehouse/iceberg/test") \
        .config("spark.sql.catalog.iceberg_catalog.cache-enabled", "false") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.defaultCatalog", "spark_catalog") \
        .config("spark.sql.warehouse.dir", "/warehouse/test") \
        .getOrCreate()

def get_hudi_spark_session():
    return SparkSession.builder \
        .appName("Hudi-Demo") \
        .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0") \
        .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.hudi.catalog.HoodieCatalog') \
        .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
        .config('spark.kryo.registrator', 'org.apache.spark.HoodieSparkKryoRegistrar') \
        .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
        .config("spark.sql.warehouse.dir", "/warehouse/hudi") \
        .getOrCreate()

In [1]:
def create_sample_data(spark):
    data = [(4, "John", "2024-01-01"),
            (5, "Jane", "2024-01-02"),
            (6, "Bob", "2024-01-03")]
    df = spark.createDataFrame(data, ["id", "name", "date"])
    df.show()
    return df.withColumn("time_col", current_timestamp())

In [2]:
def write_to_iceberg(df):
    spark = get_iceberg_spark_session()
    # Alternative approach using DataFrame API
    # df.writeTo("iceberg_catalog.default.users_test").using("iceberg").createOrReplace()
    spark.sql("ALTER TABLE iceberg_catalog.default.users_test ADD COLUMNS (time_col TIMESTAMP);")
    df.writeTo("iceberg_catalog.default.users_test").append()
    result = spark.sql("SELECT * FROM iceberg_catalog.default.users_test")
    result.show()

def write_to_hudi(df):
    spark = get_hudi_spark_session()
    df.write.format("hudi") \
        .option("hoodie.datasource.write.recordkey.field", "id") \
        .option("hoodie.datasource.write.table.name", "hudi_test") \
        .option("hoodie.datasource.write.operation", "upsert") \
        .option("hoodie.datasource.write.precombine.field", "time_col") \
        .option("hoodie.table.name", "hudi_test") \
        .mode("append") \
        .save("/warehouse/hudi/hudi_test/")

+---+----+----------+
| id|name|      date|
+---+----+----------+
|  4|John|2024-01-01|
|  5|Jane|2024-01-02|
|  6| Bob|2024-01-03|
+---+----+----------+



In [10]:
# Example usage
iceberg_spark = get_iceberg_spark_session()
hudi_spark = get_hudi_spark_session()

users_df = create_sample_data(iceberg_spark)
write_to_iceberg(users_df)
write_to_hudi(users_df)

DataFrame[]