In [None]:
# 1
dbutils.fs.cp("file:/Workspace/Shared/sales_data.csv", "dbfs:/FileStore/sales_data.csv")
dbutils.fs.cp("file:/Workspace/Shared/customer_data.json", "dbfs:/FileStore/customer_data.json")
dbutils.fs.cp("file:/Workspace/Shared/new_sales_data.csv", "dbfs:/FileStore/new_sales_data.csv")

# 1. Load the sales_data.csv file into a DataFrame.
sales_df = spark.read.csv("dbfs:/FileStore/sales_data.csv", header=True, inferSchema=True)
sales_df.show()

# 2. Write the DataFrame as a Delta Table.
sales_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/sales_delta_table")

+-------+----------+----------+--------+--------+-----+
|OrderID| OrderDate|CustomerID| Product|Quantity|Price|
+-------+----------+----------+--------+--------+-----+
|   1001|2024-01-15|      C001|Widget A|      10| 25.5|
|   1002|2024-01-16|      C002|Widget B|       5|15.75|
|   1003|2024-01-16|      C001|Widget C|       8| 22.5|
|   1004|2024-01-17|      C003|Widget A|      15| 25.5|
|   1005|2024-01-18|      C004|Widget D|       7| 30.0|
|   1006|2024-01-19|      C002|Widget B|       9|15.75|
|   1007|2024-01-20|      C005|Widget C|      12| 22.5|
|   1008|2024-01-21|      C003|Widget A|      10| 25.5|
+-------+----------+----------+--------+--------+-----+



In [None]:
# 3. Load the customer_data.json file into a DataFrame.
from pyspark.sql.types import StructType, StructField, StringType, DateType

customer_schema = StructType([
    StructField("CustomerID", StringType(), True),
    StructField("CustomerName", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("SignupDate", DateType(), True)
])

customer_df = spark.read.format("json") \
    .schema(customer_schema) \
    .load("dbfs:/FileStore/customer_data.json")

customer_df.show()

# 4. Write the DataFrame as a Delta Table.
customer_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/customer_delta_table")

+----------+-------------+------+----------+
|CustomerID| CustomerName|Region|SignupDate|
+----------+-------------+------+----------+
|      C001|     John Doe| North|2022-07-01|
|      C002|   Jane Smith| South|2023-02-15|
|      C003|Emily Johnson|  East|2021-11-20|
|      C004|Michael Brown|  West|2022-12-05|
|      C005|  Linda Davis| North|2023-03-10|
+----------+-------------+------+----------+



In [None]:
# 5. Convert an existing Parquet file into a Delta Table (For demonstration, use a Parquet file available in your workspace).

customer_df = spark.read.format("parquet") \
    .load("dbfs:/FileStore/customer_data.parquet")

customer_df.write.format("delta") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/delta/customer_delta_table")

sales_df = spark.read.format("parquet") \
    .load("dbfs:/FileStore/sales_data.parquet")

sales_df.write.format("delta") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/delta/sales_delta_table")

In [None]:
# 2. Data Management
# 1. Load the new_sales_data.csv file into a DataFrame.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("OrderID", IntegerType(), True),
    StructField("OrderDate", StringType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Product", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", DoubleType(), True)
])

new_sales_df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("dbfs:/FileStore/new_sales_data.csv")

new_sales_df.show()

# 2. Write the new DataFrame as a Delta Table.
new_sales_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("dbfs:/FileStore/delta/new_sales_delta_table")

new_sales_df.show()


# 3. Perform a MERGE INTO operation to update and insert records into the existing Delta table.
spark.sql("""
MERGE INTO delta.`dbfs:/FileStore/delta/sales_delta_table` AS target
USING delta.`dbfs:/FileStore/delta/new_sales_delta_table` AS source
ON target.OrderID = source.OrderID
WHEN MATCHED THEN
  UPDATE SET
    target.OrderDate = source.OrderDate,
    target.CustomerID = source.CustomerID,
    target.Product = source.Product,
    target.Quantity = source.Quantity,
    target.Price = source.Price
WHEN NOT MATCHED THEN
  INSERT (OrderID, OrderDate, CustomerID, Product, Quantity, Price)
  VALUES (source.OrderID, source.OrderDate, source.CustomerID, source.Product, source.Quantity, source.Price)
""")

+-------+----------+----------+--------+--------+-----+
|OrderID| OrderDate|CustomerID| Product|Quantity|Price|
+-------+----------+----------+--------+--------+-----+
|   1009|2024-01-22|      C006|Widget E|      14| 20.0|
|   1010|2024-01-23|      C007|Widget F|       6| 35.0|
|   1002|2024-01-16|      C002|Widget B|      10|15.75|
+-------+----------+----------+--------+--------+-----+

+-------+----------+----------+--------+--------+-----+
|OrderID| OrderDate|CustomerID| Product|Quantity|Price|
+-------+----------+----------+--------+--------+-----+
|   1009|2024-01-22|      C006|Widget E|      14| 20.0|
|   1010|2024-01-23|      C007|Widget F|       6| 35.0|
|   1002|2024-01-16|      C002|Widget B|      10|15.75|
+-------+----------+----------+--------+--------+-----+



DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [None]:
# 3. Optimize Delta Table
# 1. Apply the OPTIMIZE command on the Delta Table and use Z-Ordering on an appropriate column.
spark.sql("OPTIMIZE delta.`dbfs:/FileStore/delta/sales_delta_table`ZORDER BY (OrderDate)")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>,de

In [None]:
# 4.Advanced Features
# 1. Use DESCRIBE HISTORY to inspect the history of changes for a Delta Table.
history_df = spark.sql("DESCRIBE HISTORY delta.`dbfs:/FileStore/delta/sales_delta_table`")
history_df.show(truncate=False)

# 2. Use VACUUM to remove old files from the Delta Table.
spark.sql("VACUUM delta.`dbfs:/FileStore/delta/sales_delta_table` RETAIN 168 HOURS")

+-------+-------------------+----------------+----------------------------------+---------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

DataFrame[path: string]

In [None]:
# 5. (Handson Exercises)
# 1. Using Delta Lake for Data Versioning: Query historical versions of the Delta Table using Time Travel.
version_number = 2
historical_df = spark.sql(f"SELECT * FROM delta.`dbfs:/FileStore/delta/sales_delta_table` VERSION AS OF {version_number}")
historical_df.show(truncate=False)

+-------+----------+----------+--------+--------+-----+
|OrderID|OrderDate |CustomerID|Product |Quantity|Price|
+-------+----------+----------+--------+--------+-----+
|1001   |2024-01-15|C001      |Widget A|10      |25.5 |
|1003   |2024-01-16|C001      |Widget C|8       |22.5 |
|1004   |2024-01-17|C003      |Widget A|15      |25.5 |
|1005   |2024-01-18|C004      |Widget D|7       |30.0 |
|1006   |2024-01-19|C002      |Widget B|9       |15.75|
|1007   |2024-01-20|C005      |Widget C|12      |22.5 |
|1008   |2024-01-21|C003      |Widget A|10      |25.5 |
|1002   |2024-01-16|C002      |Widget B|10      |15.75|
|1009   |2024-01-22|C006      |Widget E|14      |20.0 |
|1010   |2024-01-23|C007      |Widget F|6       |35.0 |
+-------+----------+----------+--------+--------+-----+



In [None]:
# 2. Building a Reliable Data Lake with Delta Lake:
# Implement schema enforcement and handle data updates with Delta Lake.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("OrderID", IntegerType(), False),
    StructField("OrderDate", StringType(), False),
    StructField("CustomerID", StringType(), False),
    StructField("Product", StringType(), False),
    StructField("Quantity", IntegerType(), False),
    StructField("Price", DoubleType(), False)
])

new_sales_df = spark.read \
    .schema(schema) \
    .format("csv") \
    .option("header", "true") \
    .load("dbfs:/FileStore/new_sales_data.csv")

new_sales_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save("dbfs:/FileStore/delta/new_sales_delta_table")

# Perform the MERGE INTO operation
spark.sql("""
MERGE INTO delta.`dbfs:/FileStore/delta/sales_delta_table` AS target
USING delta.`dbfs:/FileStore/delta/new_sales_delta_table` AS source
ON target.OrderID = source.OrderID
WHEN MATCHED THEN
  UPDATE SET
    target.OrderDate = source.OrderDate,
    target.CustomerID = source.CustomerID,
    target.Product = source.Product,
    target.Quantity = source.Quantity,
    target.Price = source.Price
WHEN NOT MATCHED THEN
  INSERT (OrderID, OrderDate, CustomerID, Product, Quantity, Price)
  VALUES (source.OrderID, source.OrderDate, source.CustomerID, source.Product, source.Quantity, source.Price)
""")

spark.sql("DESCRIBE delta.`dbfs:/FileStore/delta/sales_delta_table`").show()
spark.sql("DESCRIBE delta.`dbfs:/FileStore/delta/new_sales_delta_table`").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|   OrderID|      int|   NULL|
| OrderDate|     date|   NULL|
|CustomerID|   string|   NULL|
|   Product|   string|   NULL|
|  Quantity|      int|   NULL|
|     Price|   double|   NULL|
+----------+---------+-------+

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|   OrderID|      int|   NULL|
| OrderDate|   string|   NULL|
|CustomerID|   string|   NULL|
|   Product|   string|   NULL|
|  Quantity|      int|   NULL|
|     Price|   double|   NULL|
+----------+---------+-------+



In [None]:
# Optimize data layout and perform vacuum operations to maintain storage efficiency.
spark.sql("OPTIMIZE delta.`dbfs:/FileStore/delta/sales_delta_table`ZORDER BY (OrderDate)")
spark.sql("VACUUM delta.`dbfs:/FileStore/delta/sales_delta_table` RETAIN 168 HOURS")

DataFrame[path: string]