# Delta Lake Lab 
## Unit 11: Using UniForm to seamlessly replicate Delta Lake commits as Iceberg commits & reading Delta Lake as Iceberg tables

This lab is powered by Dataproc Serverless Spark.


In this unit, we will -
1. Create a UniForm enabled Delta Lake table with Iceberg as the target metadata format
2. We will query the Delta Lake table
3. We will query the Iceberg table
4. We will update the Delta Lake table and then query the Iceberg table to see if the changes are available when queries from the corresspoding Iceberg table



### 1. Imports

In [1]:
import pandas as pd

from pyspark.sql.functions import month, date_format
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

from delta.tables import *

from google.cloud.exceptions import BadRequest
from google.cloud import bigquery

import sqlparse
import warnings
warnings.filterwarnings('ignore')

### 2. Create a Spark session powered by Cloud Dataproc 

In [2]:
spark = SparkSession.builder.appName('Loan Analysis').config('spark.databricks.delta.write.dataFilesToSubdir',True).config('spark.databricks.delta.allowArbitraryProperties.enabled', True).getOrCreate()
spark

23/12/03 02:03:40 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### 3. Declare variables

In [3]:
project_id_output = !gcloud config list --format "value(core.project)" 2>/dev/null
PROJECT_ID = project_id_output[0]
print("PROJECT_ID: ", PROJECT_ID)

PROJECT_ID:  delta-lake-diy-lab


In [4]:
project_name_output = !gcloud projects describe $PROJECT_ID | grep name | cut -d':' -f2 | xargs
PROJECT_NAME = project_name_output[0]
print("PROJECT_NAME: ", PROJECT_NAME)

PROJECT_NAME:  delta-lake-diy-lab


In [5]:
project_number_output = !gcloud projects describe $PROJECT_ID | grep projectNumber | cut -d':' -f2 | xargs
PROJECT_NUMBER = project_number_output[0]
print("PROJECT_NUMBER: ", PROJECT_NUMBER)

PROJECT_NUMBER:  11002190840


In [6]:
DATA_LAKE_ROOT_PATH= f"gs://dll-data-bucket-{PROJECT_NUMBER}"
DELTA_LAKE_AND_ICEBERG_DIR = f"{DATA_LAKE_ROOT_PATH}/delta-uniform-iceberg"

### 4. Create a UniForm enabled Delta Lake table with target format of Iceberg

In [7]:
# Create delta dataset from the Parquet table
spark.sql("SELECT addr_state,count(*) as count FROM loan_db.loans_by_state_parquet group by addr_state").write.mode("overwrite").format("delta").save(f"{DELTA_LAKE_AND_ICEBERG_DIR}")

ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/spark/conf/ivysettings.xml will be used
23/12/03 02:04:06 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [9]:
# Register a UniForm enabled table into the HMS, on the Delta Lake dataset above, with Iceberg as the target format for metadata translation
# Define external delta table definition
spark.sql("DROP TABLE IF EXISTS loan_db.loans_by_state_delta_uniform;").show(truncate=False)
spark.sql(f"CREATE TABLE loan_db.loans_by_state_delta_uniform USING delta LOCATION \"{DELTA_LAKE_AND_ICEBERG_DIR}\"")

++
||
++
++



23/12/03 02:05:24 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `spark_catalog`.`loan_db`.`loans_by_state_delta_uniform` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
23/12/03 02:05:24 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.


DataFrame[]

In [10]:
spark.sql("ALTER TABLE loan_db.loans_by_state_delta_uniform SET TBLPROPERTIES('delta.columnMapping.mode' = 'name','delta.enableIcebergCompatV1' = 'true','delta.universalFormat.enabledFormats' = 'iceberg');").show(truncate=False)

You are setting a property: delta.enableIcebergCompatV1 that is not recognized by this version of Delta
You are setting a property: delta.universalFormat.enabledFormats that is not recognized by this version of Delta


Py4JJavaError: An error occurred while calling o82.sql.
: org.apache.spark.sql.delta.DeltaColumnMappingUnsupportedException: 
Your current table protocol version does not support changing column mapping modes
using delta.columnMapping.mode.

Required Delta protocol version for column mapping:
Protocol(2,5)
Your table's current Delta protocol version:
Protocol(1,2)

Please enable Column Mapping on your Delta table with mapping mode 'name'.
You can use one of the following commands.

If your table is already on the required protocol version:
ALTER TABLE table_name SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')

If your table is not on the required protocol version and requires a protocol upgrade:
ALTER TABLE table_name SET TBLPROPERTIES (
   'delta.columnMapping.mode' = 'name',
   'delta.minReaderVersion' = '2',
   'delta.minWriterVersion' = '5')

	at org.apache.spark.sql.delta.DeltaErrorsBase.changeColumnMappingModeOnOldProtocol(DeltaErrors.scala:1932)
	at org.apache.spark.sql.delta.DeltaErrorsBase.changeColumnMappingModeOnOldProtocol$(DeltaErrors.scala:1914)
	at org.apache.spark.sql.delta.DeltaErrors$.changeColumnMappingModeOnOldProtocol(DeltaErrors.scala:2794)
	at org.apache.spark.sql.delta.DeltaColumnMappingBase.verifyAndUpdateMetadataChange(DeltaColumnMapping.scala:127)
	at org.apache.spark.sql.delta.DeltaColumnMappingBase.verifyAndUpdateMetadataChange$(DeltaColumnMapping.scala:93)
	at org.apache.spark.sql.delta.DeltaColumnMapping$.verifyAndUpdateMetadataChange(DeltaColumnMapping.scala:611)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal(OptimisticTransaction.scala:415)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal$(OptimisticTransaction.scala:395)
	at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadataInternal(OptimisticTransaction.scala:137)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:379)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:372)
	at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:137)
	at org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand.$anonfun$run$1(alterDeltaTableCommands.scala:126)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
	at org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand.recordFrameProfile(alterDeltaTableCommands.scala:99)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
	at org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand.recordOperation(alterDeltaTableCommands.scala:99)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
	at org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand.recordDeltaOperation(alterDeltaTableCommands.scala:99)
	at org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand.run(alterDeltaTableCommands.scala:106)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$alterTable$3(DeltaCatalog.scala:571)
	at scala.collection.immutable.HashMap.foreach(HashMap.scala:1076)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.$anonfun$alterTable$1(DeltaCatalog.scala:539)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.recordFrameProfile(DeltaCatalog.scala:57)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.alterTable(DeltaCatalog.scala:521)
	at org.apache.spark.sql.delta.catalog.DeltaCatalog.alterTable(DeltaCatalog.scala:57)
	at org.apache.spark.sql.execution.datasources.v2.AlterTableExec.run(AlterTableExec.scala:37)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:640)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:630)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:662)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)


#### 4.2. Create a manifest file on the partitioned loans dataset

#### 6.2. Create BigLake table DDL 

#### 6.3. Create the BigLake unpartitioned manifest based Delta Lake table

#### 6.4. Run a query against the BigLake unpartitioned Delta Lake table using the BigQuery Python SDK

### 7. Create and query a BigLake table on the partitioned Delta Lake table manifest

#### 7.1. Create an external table in Spark on the data and explore it

In [None]:
# Define external delta table definition
spark.sql("DROP TABLE IF EXISTS loan_db.loans_by_state_delta_partitioned;").show(truncate=False)
spark.sql(f"CREATE TABLE loan_db.loans_by_state_delta_partitioned USING delta LOCATION \"{PARTITIONED_DELTA_LAKE_DIR}\"").show(truncate=False)
spark.sql("SHOW TABLES IN loan_db").show(truncate=False)

#### 7.2. Explore the table data in Spark

In [None]:
spark.sql(f"SELECT * FROM loan_db.loans_by_state_delta_partitioned LIMIT 5").show(truncate=False)

#### 7.3. Create a BigLake table defintion

In [None]:
DATASET = 'delta_dataset'
CONNECTION = 'us.biglake-connection'
TABLE_NAME = 'loans_deltalake_partitioned'
URI = 'gs://dll-data-bucket-' + PROJECT_NUMBER + '/delta-sample-partitioned/_symlink_format_manifest/*/manifest'
BIGLAKE_PARTITIONED_TABLE_DDL = 'CREATE OR REPLACE EXTERNAL TABLE ' + DATASET + '.' + TABLE_NAME + ' ' + \
        'WITH PARTITION COLUMNS(addr_state STRING)' + ' ' + \
        'WITH CONNECTION `' +  CONNECTION + '` ' + \
        'OPTIONS (' + \
        'format="PARQUET", ' + \
        'hive_partition_uri_prefix="' + PARTITIONED_DELTA_LAKE_DIR + '", ' + \
        'uris=["' + URI + '"],' + \
        'file_set_spec_type = \'NEW_LINE_DELIMITED_MANIFEST\',' + \
        'max_staleness = INTERVAL 1 DAY,' + \
        'metadata_cache_mode = \'AUTOMATIC\'' + \
        ');'



print(sqlparse.format(BIGLAKE_PARTITIONED_TABLE_DDL, reindent_aligned = True, keyword_case='upper'))

#### 7.4. Create the BigLake partitioned manifest based Delta Lake table

In [None]:
# You can execute the SQL in the BQ UI; The following shows how to create the table using the BQ Python SDK; Returns a Pandas dataframe
PDF = fn_execute_bq_statement(BIGLAKE_PARTITIONED_TABLE_DDL)
print(PDF)

#### 7.5. Run a query against the BigLake partitioned Delta Lake table using the BigQuery Python SDK

In [None]:
BIGLAKE_SELECT_SQL = 'select addr_state,count from '  + DATASET + '.' + TABLE_NAME + ' LIMIT 5;'
print(BIGLAKE_SELECT_SQL)
PDF = fn_execute_bq_statement(BIGLAKE_SELECT_SQL)
print(PDF)

### THIS CONCLUDES THIS UNIT. PROCEED TO THE NEXT NOTEBOOK