
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>




# Delta Lake Lab
##### Tasks
1. Write sales data to Delta
1. Modify sales data to show item count instead of item array
1. Rewrite sales data to same Delta path
1. Create table and view version history
1. Time travel to read previous version

In [0]:
%run ../Includes/Classroom-Setup

Resetting the learning environment:
| removing the working directory "dbfs:/mnt/dbacademy-users/labuser6023680@vocareum.com/apache-spark-programming-with-databricks"...(0 seconds)

Skipping install of existing datasets to "dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03"

Validating the locally installed datasets:
| listing local files...(4 seconds)
| validation completed...(4 seconds total)

Creating & using the schema "labuser6023680_cng7_da_asp" in the catalog "hive_metastore"...(6 seconds)

Predefined tables in "labuser6023680_cng7_da_asp":
| -none-

Predefined paths variables:
| DA.paths.working_dir: dbfs:/mnt/dbacademy-users/labuser6023680@vocareum.com/apache-spark-programming-with-databricks
| DA.paths.user_db:     dbfs:/mnt/dbacademy-users/labuser6023680@vocareum.com/apache-spark-programming-with-databricks/database.db
| DA.paths.datasets:    dbfs:/mnt/dbacademy-datasets/apache-spark-programming-with-databricks/v03
| DA.paths.checkpoints: dbfs:/mnt/dba

In [0]:
sales_df = spark.read.parquet(f"{DA.paths.datasets}/ecommerce/sales/sales.parquet")
delta_sales_path = f"{DA.paths.working_dir}/delta-sales"




### 1. Write sales data to Delta
Write **`sales_df`** to **`delta_sales_path`**

In [0]:
# TODO
sales_df.write.format("delta").mode("overwrite").save(delta_sales_path)




**1.1: CHECK YOUR WORK**

In [0]:
assert len(dbutils.fs.ls(delta_sales_path)) > 0



### 2. Modify sales data to show item count instead of item array
Replace values in the **`items`** column with an integer value of the items array size.
Assign the resulting DataFrame to **`updated_sales_df`**.

In [0]:
# TODO

from pyspark.sql.functions import size, col

updated_sales_df = sales_df.withColumn("items", size(col("items")))
display(updated_sales_df)

order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
257437,kmunoz@powell-duran.com,1592194221828900,1,1995.0,1,1
282611,bmurillo@hotmail.com,1592504237604072,1,940.5,1,1
257448,bradley74@gmail.com,1592200438030141,1,945.0,1,1
257440,jameshardin@campbell-morris.biz,1592197217716495,1,1045.0,1,1
283949,whardin@hotmail.com,1592510720760323,1,535.5,1,1
257444,emily88@cobb.com,1592199040703476,1,1045.0,1,1
257449,craig61@luna-oliver.com,1592200459769596,1,1195.0,1,1
257441,johnsonashley@mcclain.com,1592197729873798,1,945.0,1,1
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,2
286727,rojasjorge@yahoo.com,1592533048926949,1,535.5,1,1





**2.1: CHECK YOUR WORK**

In [0]:
from pyspark.sql.types import IntegerType

assert updated_sales_df.schema[6].dataType == IntegerType()
print("All test pass")

All test pass




### 3. Rewrite sales data to same Delta path
Write **`updated_sales_df`** to the same Delta location **`delta_sales_path`**.

<img src="https://files.training.databricks.com/images/icon_hint_32.png" alt="Hint"> This will fail without an option to overwrite the schema.

In [0]:
# TODO
updated_sales_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(delta_sales_path)




**3.1: CHECK YOUR WORK**

In [0]:
assert spark.read.format("delta").load(delta_sales_path).schema[6].dataType == IntegerType()
print("All test pass")

All test pass




### 4. Create table and view version history
Run SQL queries by writing SQL inside of `spark.sql()` to perform the following steps.
- Drop table **`sales_delta`** if it exists
- Create **`sales_delta`** table using the **`delta_sales_path`** location
- List version history for the **`sales_delta`** table

An example of a SQL query inside of `spark.sql()` would be something like ```spark.sql("SELECT * FROM sales_data")```

In [0]:
# TODO
spark.sql("DROP TABLE IF EXISTS sales_delta")
spark.sql("CREATE TABLE sales_delta USING DELTA LOCATION '{}'".format(delta_sales_path))

Out[17]: DataFrame[]

In [0]:
# TODO
display(spark.sql("DESCRIBE HISTORY sales_delta"))

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2024-03-05T14:14:11.000+0000,3786691366414762,labuser6023680@vocareum.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(486870689447368),0304-081701-vx23oic7,0.0,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 210370, numOutputBytes -> 5687697)",,Databricks-Runtime/11.3.x-scala2.12
0,2024-03-05T14:13:11.000+0000,3786691366414762,labuser6023680@vocareum.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(486870689447368),0304-081701-vx23oic7,,WriteSerializable,False,"Map(numFiles -> 2, numOutputRows -> 210370, numOutputBytes -> 6473080)",,Databricks-Runtime/11.3.x-scala2.12





**4.1: CHECK YOUR WORK**

In [0]:
sales_delta_df = spark.sql("SELECT * FROM sales_delta")
assert sales_delta_df.count() == 210370
assert sales_delta_df.schema[6].dataType == IntegerType()
print("All test pass")

All test pass




### 5. Time travel to read previous version
Read delta table at **`delta_sales_path`** at version 0.
Assign the resulting DataFrame to **`old_sales_df`**.

In [0]:
# TODO
old_sales_df = spark.read.format("delta").option("versionAsOf", 0).load(delta_sales_path)
display(old_sales_df)

order_id,email,transaction_timestamp,total_item_quantity,purchase_revenue_in_usd,unique_items,items
257437,kmunoz@powell-duran.com,1592194221828900,1,1995.0,1,"List(List(null, M_PREM_K, Premium King Mattress, 1995.0, 1995.0, 1))"
282611,bmurillo@hotmail.com,1592504237604072,1,940.5,1,"List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1))"
257448,bradley74@gmail.com,1592200438030141,1,945.0,1,"List(List(null, M_STAN_F, Standard Full Mattress, 945.0, 945.0, 1))"
257440,jameshardin@campbell-morris.biz,1592197217716495,1,1045.0,1,"List(List(null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1))"
283949,whardin@hotmail.com,1592510720760323,1,535.5,1,"List(List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1))"
257444,emily88@cobb.com,1592199040703476,1,1045.0,1,"List(List(null, M_STAN_Q, Standard Queen Mattress, 1045.0, 1045.0, 1))"
257449,craig61@luna-oliver.com,1592200459769596,1,1195.0,1,"List(List(null, M_STAN_K, Standard King Mattress, 1195.0, 1195.0, 1))"
257441,johnsonashley@mcclain.com,1592197729873798,1,945.0,1,"List(List(null, M_STAN_F, Standard Full Mattress, 945.0, 945.0, 1))"
264191,maxwelltara@edwards.com,1592306255847870,2,993.6,2,"List(List(NEWBED10, M_STAN_Q, Standard Queen Mattress, 940.5, 1045.0, 1), List(NEWBED10, P_FOAM_S, Standard Foam Pillow, 53.1, 59.0, 1))"
286727,rojasjorge@yahoo.com,1592533048926949,1,535.5,1,"List(List(NEWBED10, M_STAN_T, Standard Twin Mattress, 535.5, 595.0, 1))"





**5.1: CHECK YOUR WORK**

In [0]:
assert old_sales_df.select(size(col("items"))).first()[0] == 1
print("All test pass")

All test pass




### Clean up classroom

In [0]:
DA.cleanup()

Resetting the learning environment:
| dropping the schema "labuser6023680_cng7_da_asp"...(1 seconds)
| removing the working directory "dbfs:/mnt/dbacademy-users/labuser6023680@vocareum.com/apache-spark-programming-with-databricks"...(1 seconds)

Validating the locally installed datasets:
| listing local files...(3 seconds)
| validation completed...(3 seconds total)


&copy; 2023 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>