In [0]:
# Load data
events_oct = spark.read.csv("/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/2019-Oct.csv",header=True,inferSchema=True)

In [0]:
# Convert to Delta
path = "/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct"
events_oct.write.format("delta").mode("overwrite").save(path)

In [0]:
# Reading data from delta
oct_delta = spark.read.format("delta").load(path)
display(oct_delta.limit(5))


event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-29T09:17:32.000Z,view,1004858,2053013555631882655,electronics.smartphone,samsung,131.02,559389453,d71e7146-3d1a-4b96-9d87-fdd314d4694c
2019-10-29T09:17:32.000Z,view,1004858,2053013555631882655,electronics.smartphone,samsung,131.02,565343162,cfbbc662-af26-4ee7-be22-7a3d507bb2bf
2019-10-29T09:17:33.000Z,purchase,5800792,2053013553945772349,electronics.audio.subwoofer,,162.17,536723169,567d676d-5c64-4646-adb2-c8c6ff9fc8fb
2019-10-29T09:17:33.000Z,view,1005064,2053013555631882655,electronics.smartphone,xiaomi,187.65,513355340,74a71cdb-c262-4f96-8e58-00bfeacf05e3
2019-10-29T09:17:33.000Z,view,1003315,2053013555631882655,electronics.smartphone,apple,965.17,512786243,3331167a-db7f-4b4a-bc31-30f4c4aa93f3


In [0]:
# This confirms the Schema is the same as the original
oct_delta.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [0]:
# checking the number of records
oct_delta.count()


42448764

In [0]:
# This confirms that _delta_log folder is created in the path
dbutils.fs.ls("/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct/_delta_log")


[FileInfo(path='dbfs:/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct/_delta_log/00000000000000000000.crc', name='00000000000000000000.crc', size=52239, modificationTime=1768220596000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct/_delta_log/00000000000000000000.json', name='00000000000000000000.json', size=51534, modificationTime=1768220595000),
 FileInfo(path='dbfs:/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct/_delta_log/_staged_commits/', name='_staged_commits/', size=0, modificationTime=1768221364045)]

In [0]:
# From spark dataframe to table in databricks
# mergeSchema is for Schema Evolution

oct_delta.write.format("delta")\
    .mode("overwrite")\
    .option("mergeSchema","true")\
    .saveAsTable("oct_events_delta")

In [0]:
%sql
CREATE or REPLACE TABLE oct_events_sql_delta
USING DELTA as select * from oct_events_delta

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from oct_events_sql_delta limit 5

event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-01T00:00:00.000Z,view,44600062,2103807459595387724,,shiseido,35.79,541312140,72d76fde-8bb3-4e00-8c23-a032dfed738c
2019-10-01T00:00:00.000Z,view,3900821,2053013552326770905,appliances.environment.water_heater,aqua,33.2,554748717,9333dfbd-b87a-4708-9857-6336556b0fcc
2019-10-01T00:00:01.000Z,view,17200506,2053013559792632471,furniture.living_room.sofa,,543.1,519107250,566511c2-e2e3-422b-b695-cf8e6e792ca8
2019-10-01T00:00:01.000Z,view,1307067,2053013558920217191,computers.notebook,lenovo,251.74,550050854,7c90fc70-0e80-4590-96f3-13c02c18c713
2019-10-01T00:00:04.000Z,view,1004237,2053013555631882655,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [0]:
# Reading data from delta
path = "/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct"
oct_delta = spark.read.format("delta").load(path)
oct_delta.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [0]:

# Normally product_id is integer , but here I given as string

bad_data = [("2019-10-01T00:00:00.000+00:00","view","P1004237","cat01", "electronics.smartphone","apple",
              1081.98, 535871217,"c6bd7419-2748-4c56-95b4-8cec9ff8b80d")]

bad_df = spark.createDataFrame(
    bad_data,
    ["event_time", "event_type", "product_id", "category_id",
     "category_code", "brand", "price", "user_id", "user_session"]
)

bad_df.display()


event_time,event_type,product_id,category_id,category_code,brand,price,user_id,user_session
2019-10-01T00:00:00.000+00:00,view,P1004237,cat01,electronics.smartphone,apple,1081.98,535871217,c6bd7419-2748-4c56-95b4-8cec9ff8b80d


In [0]:
try:
  bad_df.write.format("delta").mode("append").save("/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct")
except Exception as e:
    print(f"Schema enforcement: {e}")


Schema enforcement: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'event_time' and 'event_time'.

JVM stacktrace:
com.databricks.sql.transaction.tahoe.DeltaAnalysisException
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.$anonfun$mergeDataTypes$1(SchemaMergingUtils.scala:231)
	at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936)
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.merge$1(SchemaMergingUtils.scala:217)
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.mergeDataTypes(SchemaMergingUtils.scala:335)
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.mergeSchemas(SchemaMergingUtils.scala:179)
	at com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$.mergeSchema(ImplicitMetadataOperation.scala:359)
	at com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:112)
	at com.databricks.sql.transaction.tahoe.schema.I

In [0]:
from pyspark.sql.functions import to_timestamp

fixed_df = bad_df.withColumn(
    "event_time",
    to_timestamp("event_time", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
)

In [0]:
try:
    fixed_df.write.format("delta").mode("append").save("/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct")
except Exception as e:
    print(f"Schema enforcement: {e}")

Schema enforcement: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'product_id' and 'product_id'.

JVM stacktrace:
com.databricks.sql.transaction.tahoe.DeltaAnalysisException
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.$anonfun$mergeDataTypes$1(SchemaMergingUtils.scala:231)
	at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936)
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.merge$1(SchemaMergingUtils.scala:217)
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.mergeDataTypes(SchemaMergingUtils.scala:335)
	at com.databricks.sql.transaction.tahoe.schema.SchemaMergingUtils$.mergeSchemas(SchemaMergingUtils.scala:179)
	at com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$.mergeSchema(ImplicitMetadataOperation.scala:359)
	at com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation.updateMetadata(ImplicitMetadataOperation.scala:112)
	at com.databricks.sql.transaction.tahoe.schema.I

In [0]:
# Test schema enforcement
try:
    wrong_schema = spark.createDataFrame([("a","b","c")], ["x","y","z"])
    wrong_schema.write.format("delta").mode("append").save("/Volumes/workspace/ecommerce_idc/ecommerce_data_idc/delta/events_oct")
except Exception as e:
    print(f"Schema enforcement: {e}")

Schema enforcement: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table (Table ID: e15a1909-dd0b-4ba3-a818-341a931c7593).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- event_time: timestamp (nullable = true)
-- event_type: string (nullable = true)
-- product_id: integer (nullable = true)
-- category_id: long (nullable = true)
-- category_code: string (nullable = true)
-- brand: string (nullable = true)
-- price: double (nullable = true)
-- user_id: integer (nullable = true)
-- user_session: string (nullable = true)


Data schema:
root
-- x: string (nullable = true)
-- y: string (nullable = true)
-- z: string (nullable = true)

         
Table ACLs are enabled in this cluster, so automati