In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [0]:
df_oct = spark.read.parquet("/Volumes/workspace/ecommerce/ecommerce_data/parquet/oct/")
df_nov = spark.read.parquet("/Volumes/workspace/ecommerce/ecommerce_data/parquet/nov/")
print(df_oct.count())
print(df_nov.count())


42448764
67501979


In [0]:
# load oct and nov events parquet files into df 
df_events = spark.read.parquet("/Volumes/workspace/ecommerce/ecommerce_data/parquet/*")

#### -  parquet files are just files.
#### - Delta = parquet files + ACID + schema enforcement + versioning + metadata

In [0]:
#write it into delta format which enforces ACID and schema enforcement and versioning
df_events.write.format("delta").mode("overwrite").save("/Volumes/workspace/ecommerce/ecommerce_data/delta/events_octnov")

# Read the Delta table directly from the external path and display the first 5 rows
df = spark.read.format("delta").load("/Volumes/workspace/ecommerce/ecommerce_data/delta/events_octnov")
df.show(5)

#  We can work with Delta files directly using PySpark without creating any table.
# If we want to run SQL queries, we need to register table name which is why we are creating a managed table.

# comunity edition unity catelog is not available 
# Registering the delta table as SQL table to public dbfs root is diabled so creatign the managed table 
#below line throws error
# spark.sql(""" CREATE TABLE IF NOT EXISTS events_octnov USING DELTA 
#          LOCATION '/Volumes/workspace/ecommerce/ecommerce_data/delta/events_octnov' """)


# creating managed table 
df_events.write.format("delta").mode("overwrite").saveAsTable("events_octnov")

In [0]:
#check the managed table details
spark.sql("DESCRIBE DETAIL events_octnov").show(truncate=False)

+------+------------------------------------+-------------------------------+-----------+--------+-----------------------+-------------------+----------------+-----------------+--------+-----------+-------------------------------------+----------------+----------------+-----------------------------------------+---------------------------------------------------------------+-------------+
|format|id                                  |name                           |description|location|createdAt              |lastModified       |partitionColumns|clusteringColumns|numFiles|sizeInBytes|properties                           |minReaderVersion|minWriterVersion|tableFeatures                            |statistics                                                     |clusterByAuto|
+------+------------------------------------+-------------------------------+-----------+--------+-----------------------+-------------------+----------------+-----------------+--------+-----------+--------------------

In [0]:
%sql
SELECT * FROM events_octnov limit 5

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-11-08T08:12:45.000Z,view,3500093,2053013555287949705,,,33.46,564462223,b9c1f95f-c19d-4507-93c0-e133f5c2b299
2019-11-08T08:12:45.000Z,view,1304849,2053013558920217191,computers.notebook,acer,2312.13,531136086,8ba7b208-cf77-4d2d-b4c0-195b635d1853
2019-11-08T08:12:45.000Z,view,1005173,2053013555631882655,electronics.smartphone,samsung,643.23,562236381,1834e2be-320b-4625-ade6-a282fb95d0d5
2019-11-08T08:12:45.000Z,view,26403432,2053013563651392361,,,109.66,516475705,bf672c37-a1d4-46c4-b3b0-7560c836c6ff
2019-11-08T08:12:45.000Z,view,12709849,2053013553559896355,,viatti,40.67,516203795,6767efef-5041-4f73-aeb9-77eac7d4ea8a


In [0]:
# Test Delta Lake schema enforcement with intentionally wrong schema
try:
    wrong_schema = spark.createDataFrame(
        [("a", "b", "c")],
        ["col1", "col2", "col3"]
    )

    wrong_schema.write.format("delta") \
        .mode("append") \
        .save("/Volumes/workspace/ecommerce/ecommerce_data/delta/events_octnov")

except Exception as e:
    print("Schema enforcement triggered:")
    print(e)

Schema enforcement triggered:
[_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: 64676808-1a15-46e4-83fe-ce9aa73f1fcf).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- event_time: timestamp (nullable = true)
-- event_type: string (nullable = true)
-- product_id: integer (nullable = true)
-- category_id: long (nullable = true)
-- category_code: string (nullable = true)
-- brand: string (nullable = true)
-- price: double (nullable = true)
-- user_id: integer (nullable = true)
-- user_session: string (nullable = true)


Data schema:
root
-- col1: string (nullable = true)
-- col2: string (nullable = true)
-- col3: string (nullable = true)

         
Table ACLs are enabled in this c

In [0]:
# handle duplicae insert
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(
    spark,
    "/Volumes/workspace/ecommerce/ecommerce_data/delta/events_octnov"
)

delta_table.alias("t").merge(
    df_events.alias("s"),
    "t.event_time = s.event_time"   # match condition
).whenNotMatchedInsertAll().execute()

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]