In [None]:
from pyspark.sql import SparkSession

# --conf spark.sql.catalog.spark_catalog.uri=thrift://127.0.0.1:9083

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("iceberg_streaming") \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkSessionCatalog') \
    .config('spark.sql.catalog.spark_catalog.type', 'hadoop') \
    .config("spark.sql.catalog.spark_catalog.warehouse", './hive-warehouse') \
    .config('spark.sql.catalog.iceberg', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.iceberg.type', 'hadoop') \
    .config('spark.sql.catalog.iceberg.warehouse', './iceberg-warehouse') \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
    .getOrCreate()

In [None]:
spark.sql("show tables in iceberg.db").show(truncate=False)

In [None]:
path = "./data/activity-data/"
static = spark.read.json(path)
static.createOrReplaceTempView("activityTempView")
print(static.schema)
sql = f"""
    create table iceberg.db.activity
    using iceberg
    partitioned by (device)
    as select * from activityTempView limit 10
    """
spark.sql(sql)

In [None]:
spark.sql("select * from iceberg.db.activity.files;").toPandas()

In [None]:
spark.sql("select * from iceberg.db.activity.snapshots;").toPandas()

In [None]:
# Compact data files
spark.sql("CALL iceberg.system.rewrite_data_files(table => 'iceberg.db.activity')").show()

In [None]:
spark.sql("CALL iceberg.system.expire_snapshots(table => 'iceberg.db.activity', older_than => timestamp '2023-04-14 21:17:55', retain_last=>1)").toPandas()

In [None]:
# orphan files include data and metadata files
spark.sql("CALL iceberg.system.remove_orphan_files(table => 'iceberg.db.activity', older_than => timestamp '2023-04-14 21:17:55', dry_run=>true)").toPandas()
# spark.sql("CALL iceberg.system.remove_orphan_files(table => 'iceberg.db.activity', older_than => timestamp '2023-04-14 21:17:55')").toPandas()