In [None]:
import org.apache.spark.sql._
import scala.sys.process._

In [None]:
// So now we need to configure Spark to use Iceberg
// See https://iceberg.apache.org/docs/1.6.0/spark-configuration/ & https://iceberg.apache.org/docs/1.6.0/spark-getting-started/
// We'll use the "hadoop" (aka file) catalog & /high-performance-spark-examples/warehouse for the location
val spark = (SparkSession.builder.master("local[*]")
             // Setup the extensions
             .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
             .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
             .config("spark.sql.catalog.local.type", "hadoop")
             .config("spark.sql.catalog.local.warehouse", "/high-performance-spark-examples/warehouse")
             .getOrCreate()
             )
import spark._

In [None]:
spark.sparkContext.uiWebUrl.get

In [None]:
// Load the current data
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("/high-performance-spark-examples/data/fetched/2021")

In [None]:
// Drop existing table if present & create new table
spark.sql("DROP TABLE IF EXISTS local.uk_gender_pay_data")

In [None]:
// Write the data out
df.write.saveAsTable("local.uk_gender_pay_data")

In [None]:
"ls /high-performance-spark-examples/warehouse/uk_gender_pay_data/metadata/".!!

In [None]:
"cat  /high-performance-spark-examples/warehouse/uk_gender_pay_data/metadata/v1.metadata.json".!!

In [None]:
// Java SDK time imports
import java.util.HashMap
import java.util.Map

import org.apache.iceberg.Table
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog


// And to handle java types
import scala.jdk.CollectionConverters._

In [None]:
// Create a local Iceberg Catalog client. Here we're using the "hadoop catalog"
// The spark hadoop conf can be got from: spark.sparkContext.hadoopConfiguration
// Here we make the Catalog, it's kind of funky. Spark also has methods which return tables but they're Spark tables so
// which aren't the type we want
val catalog = new HadoopCatalog(spark.sparkContext.hadoopConfiguration, "/high-performance-spark-examples/warehouse")

In [None]:
// Now we want to load the table. To do that we need to make a TableIdentifier of the same table we wrote to. Note it'll just be
// the table name no need for the "local" prefix.
// See https://iceberg.apache.org/javadoc/1.6.0/org/apache/iceberg/catalog/TableIdentifier.html
val name = TableIdentifier.of("uk_gender_pay_data")

In [None]:
val table = catalog.loadTable(name)

In [None]:
// Now we want to get the snapshots from the table. There are a few different ways we can do this:
// 1) Using the Iceberg Table API (see https://iceberg.apache.org/javadoc/1.6.0/org/apache/iceberg/Table.html)
// 2) Using the Iceberg + Spark SQL special query interface https://iceberg.apache.org/javadoc/1.6.0/org/apache/iceberg/Table.html
val snapshots = table.snapshots().asScala.toList
snapshots

In [None]:
val snapshot = snapshots(0).snapshotId()

In [None]:
val altSnapshotQuery = spark.sql("SELECT * FROM local.uk_gender_pay_data.snapshots")
altSnapshotQuery.show()

In [None]:
val altSnapshotId = spark.sql("SELECT snapshot_id FROM local.uk_gender_pay_data.snapshots").collect()(0)

In [None]:
spark.sql("SELECT * FROM local.uk_gender_pay_data WHERE isnull(responsibleperson) LIMIT 5").show()

In [None]:
// We can also list snapshots with the select
spark.sql("SELECT * FROM local.uk_gender_pay_data.snapshots").show()

In [None]:
// And the files!
// We can also list snapshots with the select
spark.sql("SELECT * FROM local.uk_gender_pay_data.files").show()

In [None]:
// Lets take a quick look and see
spark.sql("SELECT * FROM local.uk_gender_pay_data WHERE isnull(responsibleperson) LIMIT 5").show()

In [None]:
spark.sql("DELETE FROM local.uk_gender_pay_data WHERE isnull(responsibleperson)")

In [None]:
// Make sure the data is gone
spark.sql("SELECT * FROM local.uk_gender_pay_data WHERE isnull(responsibleperson) LIMIT 5").show()

In [None]:
// Yay! ok now lets travel back in time
// We can do this with SQL or with a read option
spark.sql(f"SELECT * FROM local.uk_gender_pay_data VERSION AS OF ${snapshot} WHERE isnull(responsibleperson) LIMIT 5").show()

In [None]:
// Or the same with option + DF syntax
spark.read.option("snapshot-id", f"${snapshot}").table("local.uk_gender_pay_data").show()

In [None]:
spark.sql(f"SELECT * FROM local.uk_gender_pay_data.files").show()

In [None]:
spark.sql("DROP TABLE IF EXISTS local.uk_gender_pay_data_postcode")

In [None]:
// Write the data out partitioned
df.registerTempTable("temp_table")
// We could use the table write semantics but we can't do truncate() on that
spark.sql("CREATE TABLE local.uk_gender_pay_data_postcode USING iceberg PARTITIONED BY (truncate(1, PostCode)) AS select * from temp_table")

In [None]:
// Inspect the files again. This should look familiar ish
spark.sql("SELECT * FROM local.uk_gender_pay_data_postcode.files").show()

In [None]:
val year_dfs = 2022.to(2023).map(r => spark.read.option("header", "true").option("inferSchema", "true").csv(s"/high-performance-spark-examples/data/fetched/${r}"))

In [None]:
List("local.uk_gender_pay_data", "local.uk_gender_pay_data_postcode").foreach(table => year_dfs.foreach(df => df.write.mode("append").saveAsTable(table)))

In [None]:
val uncompacted_file_list = "ls -alh ../warehouse/uk_gender_pay_data/data/".!!

In [None]:
val uncompacted_metadata_file_list = "ls -alh ../warehouse/uk_gender_pay_data/metadata/".!!

In [None]:
spark.sql("SELECT * FROM local.uk_gender_pay_data.files").show()

In [None]:
spark.sql("SELECT * FROM local.uk_gender_pay_data.snapshots").show()

In [None]:
import org.apache.iceberg.spark.actions.SparkActions
// Iceberg actions
import org.apache.iceberg.actions.Action

In [None]:
// So far the logging has been... verbose but interesting, but the next stages it's actually too much
spark.sparkContext.setLogLevel("ERROR")

In [None]:
// Ok let's try and compact things down a little bit.
// You should look at SparkActions & use the rewrite data files operation.
// Consider specifying rewrite-all to true to force rewrites
// https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/spark/actions/SparkActions.html
SparkActions.get().rewriteDataFiles(table).option("target-file-size-bytes", (512L*1024L*1024L).toString).option("rewrite-all", "true").execute()

In [None]:
val compacted_file_list = "ls -alh ../warehouse/uk_gender_pay_data/data/".!!

In [None]:
// Remove the old snapshots but keep the latest one.
// This produces _so much logging_ by default that running in the NB would be slow (that's why we set the log level to error)
// Here your going to want to use the expireSnapshots action.
// Note: if you _just set_ retainLast it will keep all snapshots, retain last is like a safety mechanism that keeps the last K
// snapshots. To get rid of everything except the last expire everything older than right now.
SparkActions.get().expireSnapshots(table).expireOlderThan(System.currentTimeMillis()).retainLast(1).execute()

In [None]:
val compacted_and_expired_file_list = "ls -alh ../warehouse/uk_gender_pay_data/data/".!!

In [None]:
// Table is in an inconsistent state here, this is not "good"
spark.sql("REFRESH local.uk_gender_pay_data").show()
spark.sql("SELECT * FROM local.uk_gender_pay_data.files").show()

In [None]:
spark.sql("SELECT * FROM local.uk_gender_pay_data").show()

In [None]:
// Remove the orphaned files
SparkActions.get().deleteOrphanFiles(table).execute()

In [None]:
val cleaned_and_compacted_file_list = "ls ../warehouse/uk_gender_pay_data/data/".!!

In [None]:
spark.sql("SELECT * FROM local.uk_gender_pay_data.files").show()

In [None]:
// Lets go take a look at a quick side-by-side test
//cd /high-performance-spark-examples/spark-upgrade/;./e2e_demo/scala/run_demo.sh
//That'll be easier to run in a terminal than the .!! trick we've been doing

In [None]:
// Ok, let's try branching! Note: requires very recent Iceberg, so if you're doing this elsewhere might not be a party
// Relevant docs: https://iceberg.apache.org/docs/nightly/spark-ddl/#branching-and-tagging-ddl
// https://iceberg.apache.org/docs/nightly/spark-queries/#sql
spark.sql("ALTER TABLE local.uk_gender_pay_data CREATE BRANCH IF NOT EXISTS `new-software-branch`")
spark.sql("DELETE FROM local.uk_gender_pay_data.`branch_new-software-branch` WHERE isnull(DueDate)")