Task 1: Creating Delta Table using Three Methods
1. Load the given CSV and JSON datasets into Databricks.

In [0]:

dbutils.fs.cp("file:/Workspace/Shared/new_employee_data4.csv", "dbfs:/FileStore/streaming/input/new_employee_data.csv")
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DeltaLakesAssignment") \
    .getOrCreate()


2. Create a Delta table using the following three methods:
Create a Delta table from a DataFrame.
Use SQL to create a Delta table.
Convert both the CSV and JSON files into Delta format.

In [0]:
# Method 1: Create a Delta table from a DataFrame
employee_data_df = spark.read.csv("dbfs:/FileStore/streaming/input/employee_data.csv", header=True, inferSchema=True)
employee_data_df.write.format("delta").mode("overwrite").save("/Workspace/Shared/employee_data_table")


# Method 2: Use SQL to create a Delta table
spark.sql("""
CREATE OR REPLACE TABLE employees_sql
USING DELTA
AS SELECT * FROM delta.`/Workspace/Shared/employee_data_table`
""")

# Method 3: Convert both the CSV and JSON files into Delta format
# For CSV
dbutils.fs.cp("file:/Workspace/Shared/employee_data4.csv", "dbfs:/FileStore/streaming/input/employee_data.csv")

# For JSON
dbutils.fs.cp("file:/Workspace/Shared/products4.json", "dbfs:/FileStore/streaming/input/products.json")

True

Task 2: Merge and Upsert (Slowly Changing Dimension - SCD)
1. Load the Delta table for employees created in Task 1.

In [0]:
employee_data_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/streaming/input/employee_data.csv")
display(employee_data_df)

EmployeeID,EmployeeName,Department,JoiningDate,Salary
101,John,HR,2023-01-10,50000
102,Alice,Finance,2023-02-15,70000
103,Mark,Engineering,2023-03-20,85000
104,Emma,Sales,2023-04-01,55000
105,Liam,Marketing,2023-05-12,60000


2. Merge the new employee data into the employees Delta table.
3. If an employee exists, update their salary. If the employee is new, insert
their details.

In [0]:
from delta.tables import DeltaTable
from pyspark.sql.utils import AnalysisException

# Load the new employee data
new_employees_df = spark.read.csv("dbfs:/FileStore/streaming/input/new_employee_data.csv", header=True, inferSchema=True)

# Path to the existing employees table
employee_data_path = "dbfs:/FileStore/streaming/input/employee_data"

# convert it to Delta
employee_data_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/streaming/input/employee_data.csv")
employee_data_df.write.format("delta").mode("overwrite").save(employee_data_path)
deltaTable = DeltaTable.forPath(spark, employee_data_path)

# Merge the new data into the existing Delta table
deltaTable.alias("old").merge(
    new_employees_df.alias("new"),
    "old.EmployeeID = new.EmployeeID"
).whenMatchedUpdate(set={
    "salary": "new.salary"
}).whenNotMatchedInsert(values={
    "EmployeeID": "new.EmployeeID",
    "EmployeeName": "new.EmployeeName",
    "Department": "new.Department",
    "JoiningDate": "new.JoiningDate",
    "Salary": "new.Salary"
}).execute()

Task 3: Internals of Delta Table

1. Explore the internals of the employees Delta table using Delta Lake features.


In [0]:
# Describe the history of the employees Delta table to understand its evolution
history_df = deltaTable.history()
display(history_df)

# Describe the details of the employees Delta table to understand its schema and properties
details_df = deltaTable.toDF()
display(details_df.describe())


# List all the files associated with the employees Delta table
files_df = deltaTable.toDF()
display(files_df)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2024-09-17T05:37:31Z,8532116595080723,azuser2110_mml.local@techademy.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(1284719369368105),0911-102441-r0fo913u,3.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 2790, p25FileSize -> 1505, numDeletionVectorsRemoved -> 1, minFileSize -> 1505, numAddedFiles -> 1, maxFileSize -> 1505, p75FileSize -> 1505, p50FileSize -> 1505, numAddedBytes -> 1505)",,Databricks-Runtime/15.4.x-photon-scala2.12
3,2024-09-17T05:37:28Z,8532116595080723,azuser2110_mml.local@techademy.com,MERGE,"Map(predicate -> [""(cast(EmployeeID#11432 as int) = EmployeeID#11039)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(1284719369368105),0911-102441-r0fo913u,2.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 1378, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 4042, materializeSourceTimeMs -> 189, numTargetRowsInserted -> 1, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 2334, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1470)",,Databricks-Runtime/15.4.x-photon-scala2.12
2,2024-09-17T05:37:23Z,8532116595080723,azuser2110_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(1284719369368105),0911-102441-r0fo913u,1.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 5, numOutputBytes -> 1412)",,Databricks-Runtime/15.4.x-photon-scala2.12
1,2024-09-17T05:35:47Z,8532116595080723,azuser2110_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(1284719369368105),0911-102441-r0fo913u,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 5, numOutputBytes -> 1412)",,Databricks-Runtime/15.4.x-photon-scala2.12
0,2024-09-17T05:34:57Z,8532116595080723,azuser2110_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(1284719369368105),0911-102441-r0fo913u,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 5, numOutputBytes -> 1412)",,Databricks-Runtime/15.4.x-photon-scala2.12


summary,EmployeeID,EmployeeName,Department,JoiningDate,Salary
count,6.0,6,6,6,6.0
mean,103.5,,,,62500.0
stddev,1.8708286933869709,,,,15545.631755148024
min,101.0,Alice,Engineering,2023-01-10,50000.0
max,106.0,Olivia,Sales,2023-06-10,85000.0


EmployeeID,EmployeeName,Department,JoiningDate,Salary
101,John,HR,2023-01-10,50000
103,Mark,Engineering,2023-03-20,85000
104,Emma,Sales,2023-04-01,55000
105,Liam,Marketing,2023-05-12,60000
102,Alice,Finance,2023-02-15,75000 # Updated Salary
106,Olivia,HR,2023-06-10,65000 # New Employee


2.Check the transaction history of the table.

In [0]:
# Check the transaction history of the employees Delta table
transaction_history_df = deltaTable.history()
display(transaction_history_df)

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
4,2024-09-17T05:37:31Z,8532116595080723,azuser2110_mml.local@techademy.com,OPTIMIZE,"Map(predicate -> [], auto -> true, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(1284719369368105),0911-102441-r0fo913u,3.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 2, numRemovedBytes -> 2790, p25FileSize -> 1505, numDeletionVectorsRemoved -> 1, minFileSize -> 1505, numAddedFiles -> 1, maxFileSize -> 1505, p75FileSize -> 1505, p50FileSize -> 1505, numAddedBytes -> 1505)",,Databricks-Runtime/15.4.x-photon-scala2.12
3,2024-09-17T05:37:28Z,8532116595080723,azuser2110_mml.local@techademy.com,MERGE,"Map(predicate -> [""(cast(EmployeeID#11432 as int) = EmployeeID#11039)""], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,List(1284719369368105),0911-102441-r0fo913u,2.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 1, numTargetBytesAdded -> 1378, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 1, executionTimeMs -> 4042, materializeSourceTimeMs -> 189, numTargetRowsInserted -> 1, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 2334, numTargetRowsUpdated -> 1, numOutputRows -> 2, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 2, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 1470)",,Databricks-Runtime/15.4.x-photon-scala2.12
2,2024-09-17T05:37:23Z,8532116595080723,azuser2110_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(1284719369368105),0911-102441-r0fo913u,1.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 5, numOutputBytes -> 1412)",,Databricks-Runtime/15.4.x-photon-scala2.12
1,2024-09-17T05:35:47Z,8532116595080723,azuser2110_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(1284719369368105),0911-102441-r0fo913u,0.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 5, numOutputBytes -> 1412)",,Databricks-Runtime/15.4.x-photon-scala2.12
0,2024-09-17T05:34:57Z,8532116595080723,azuser2110_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(1284719369368105),0911-102441-r0fo913u,,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 5, numOutputBytes -> 1412)",,Databricks-Runtime/15.4.x-photon-scala2.12


3. Perform Time Travel and retrieve the table before the previous merge operation.

In [0]:
# Perform Time Travel to retrieve the table before the previous merge operation
version_before_merge = deltaTable.history().filter("operation = 'MERGE'").select("version").collect()[0][0]
table_path = "dbfs:/FileStore/streaming/input/employee_data" 
df_before_merge = spark.read.format("delta").option("versionAsOf", version_before_merge).load(table_path)
display(df_before_merge)

EmployeeID,EmployeeName,Department,JoiningDate,Salary
101,John,HR,2023-01-10,50000
103,Mark,Engineering,2023-03-20,85000
104,Emma,Sales,2023-04-01,55000
105,Liam,Marketing,2023-05-12,60000
102,Alice,Finance,2023-02-15,75000 # Updated Salary
106,Olivia,HR,2023-06-10,65000 # New Employee


Task 4: Optimize Delta Table
1. Optimize the employees Delta table for better performance.
2. Use Z-ordering on the Department column for improved query performance.


In [0]:

# Access the Delta table
deltaTable = DeltaTable.forName(spark, "employee_data")

# Optimize and execute Z-order by Department
deltaTable.optimize().executeZOrderBy("Department")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

Task 5: Time Travel with Delta Table
1. Retrieve the employees Delta table as it was before the last merge.


In [0]:
# Merge the new data into the existing Delta table
deltaTable.alias("old").merge(
    new_employees_df.alias("new"),
    "old.EmployeeID = new.EmployeeID"
).whenMatchedUpdate(set={
    "salary": "new.salary"
}).whenNotMatchedInsert(values={
    "EmployeeID": "new.EmployeeID",
    "EmployeeName": "new.EmployeeName",
    "Department": "new.Department",
    "JoiningDate": "new.JoiningDate",
    "Salary": "new.Salary"
}).execute()
version_before_merge = deltaTable.history().filter("operation = 'MERGE'").select("version").collect()[0][0]
table_path = "dbfs:/FileStore/streaming/input/employee_data" 
df_before_merge = spark.read.format("delta").option("versionAsOf", version_before_merge).load(table_path)
display(df_before_merge)

EmployeeID,EmployeeName,Department,JoiningDate,Salary
101,John,HR,2023-01-10,50000
102,Alice,Finance,2023-02-15,70000
103,Mark,Engineering,2023-03-20,85000
104,Emma,Sales,2023-04-01,55000
105,Liam,Marketing,2023-05-12,60000


2. Query the table at a specific version to view the older records.

In [0]:
# Specify the version you want to query
version = 5

# Load the Delta table at the specified version
df_specific_version = spark.read.format("delta").option("versionAsOf", version).load("dbfs:/FileStore/streaming/input/employee_data")

# Display the DataFrame
display(df_specific_version)

EmployeeID,EmployeeName,Department,JoiningDate,Salary
101,John,HR,2023-01-10,50000
102,Alice,Finance,2023-02-15,70000
103,Mark,Engineering,2023-03-20,85000
104,Emma,Sales,2023-04-01,55000
105,Liam,Marketing,2023-05-12,60000


Task 6: Vacuum Delta Table
Use the vacuum operation on the employees Delta table to remove old versions
and free up disk space.

In [0]:
from pyspark.sql import DataFrame

# Vacuum the Delta table to remove files older than the default retention period
vacuumed: DataFrame = spark.sql("VACUUM 'dbfs:/FileStore/streaming/input/employee_data'")

# Display the result of the vacuum operation
display(vacuumed)

path
dbfs:/FileStore/streaming/input/employee_data


2. Set the retention period to 7 days and ensure that old files are deleted.

In [0]:
# Set the retention period to 7 days (168 hours) and vacuum the Delta table to remove old files
vacuumed: DataFrame = spark.sql("VACUUM 'dbfs:/FileStore/streaming/input/employee_data' RETAIN 168 HOURS")

# Display the result of the vacuum operation
display(vacuumed)

path
dbfs:/FileStore/streaming/input/employee_data


Assignment: Structured Streaming and Transformations on Streams
Create a folder for streaming CSV files.

1. Create a folder for streaming CSV files.
2. Set up a structured streaming source to continuously read CSV data from this
folder.
3. Ensure that the streaming query reads the data continuously in append mode and
displays the results in the console.

In [0]:
# Create a folder for streaming CSV files
dbutils.fs.mkdirs("/Workspace/Shared/transaction.csv")

# Define the schema for the CSV files
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

schema = StructType([
    StructField("TransactionID", StringType(), True),
    StructField("ProductID", StringType(), True),
    StructField("TransactionDate", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", IntegerType(), True)
])

# Set up a structured streaming source to continuously read CSV data
csv_streaming_df = (
    spark
    .readStream
    .option("sep", ",")
    .option("header", "true")
    .schema(schema)
    .csv("/Workspace/Shared/transaction.csv")
)

query = csv_streaming_df \
    .writeStream \
    .format("memory") \
    .queryName("filtered_transactions") \
    .start()

Assign the streaming query to a variable named 'query'

In [0]:
query.stop()

Task 2: Stream Transformations
1. Once the data is streaming in, perform transformations on the incoming data:
Add a new column for the TotalAmount ( Quantity * Price ).
Filter records where the Quantity is greater than 1.

In [0]:
from pyspark.sql.functions import col

# Perform transformations on the incoming streaming data
transformed_csv_streaming_df = csv_streaming_df \
    .withColumn("TotalAmount", col("Quantity") * col("Price")) \
    .filter(col("Quantity") > 1)

# Assign the transformed stream to a variable for further operations
filtered_csv_streaming_df = transformed_csv_streaming_df

2. Write the transformed stream to a memory sink to see the updated results
continuously.

In [0]:
query_filtered = (
    filtered_csv_streaming_df
    .writeStream
    .format("memory")
    .queryName("filtered_csv_data")
    .outputMode("append")
    .start()
)

display(spark.sql("SELECT * FROM filtered_csv_data"))

TransactionID,ProductID,TransactionDate,Quantity,Price,TotalAmount


In [0]:
# Stop the execution of the stream
query_filtered.stop()

Task 3: Aggregations on Streaming Data
1. Implement an aggregation on the streaming data:
Group the data by ProductID and calculate the total sales for each
product (i.e., sum of Quantity * Price for each product).

In [0]:
from pyspark.sql.functions import col, sum

# Group the data by ProductID and calculate the total sales for each product
product_sales_aggregation = csv_streaming_df.groupBy("ProductID").agg(sum(col("Quantity") * col("Price")).alias("TotalSales"))

# Start the streaming query to display the aggregated data
query_aggregated = (
    product_sales_aggregation
    .writeStream
    .format("memory")
    .queryName("aggregated_product_sales")
    .outputMode("complete")
    .start()
)

# Display the aggregated data
display(spark.sql("SELECT * FROM aggregated_product_sales"))

ProductID,TotalSales


In [0]:
query_aggregated.stop()

Task 4: Writing Streaming Data to File Sinks
1. After transforming and aggregating the data, write the streaming results to a
Parquet sink.
2. Ensure that you configure a checkpoint location to store progress and ensure
recovery in case of failure.

In [0]:
# Write the aggregated stream to a Parquet sink with checkpointing
query_aggregated_to_parquet = (
    product_sales_aggregation
    .writeStream
    .format("parquet")
    .option("path", "/Workspace/Shared/aggregated_dataaggregated_product_sales")
    .option("checkpointLocation", "/mnt/delta/checkpoints/aggregated_product_sales")
    .outputMode("append")  # Change the output mode to "append" or "update"
)

Task 5: Handling Late Data using Watermarks
1. Introduce a watermark on the TransactionDate column to handle late data
arriving in the stream.
2. Set the watermark to 1 day to allow late data within a 24-hour period and
discard data that is older.

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType

# Convert TransactionDate column to timestamp type
transformed_csv_streaming_df = transformed_csv_streaming_df.withColumn("TransactionDate", col("TransactionDate").cast(TimestampType()))

# Set watermark on TransactionDate column
transformed_csv_streaming_df = transformed_csv_streaming_df.withWatermark("TransactionDate", "1 day")

Task 6: Streaming from Multiple Sources
1. Simulate a scenario where two streams of data are being ingested:
Stream 1: Incoming transaction data (same as Task 1).
Stream 2: Product information (CSV with columns: ProductID, ProductName,
Category).
2. Perform a join on the two streams using the ProductID column and display the
combined stream results.

In [0]:
# Join the transaction_stream with the product_stream on ProductID
joined_stream = transaction_stream.join(
    product_stream,
    transaction_stream.ProductID == product_stream.ProductID,
    "inner"
)

# Display the joined stream
display(joined_stream)

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-1284719369368689>, line 4[0m
[1;32m      1[0m [38;5;66;03m# Step 1: Import necessary libraries and create SparkSession object[39;00m
[1;32m      2[0m 
[1;32m      3[0m [38;5;66;03m# Step 2: Define transaction_stream and product_stream DataFrames[39;00m
[0;32m----> 4[0m transaction_stream [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124myour_format[39m[38;5;124m"[39m)[38;5;241m.[39moption([38;5;124m"[39m[38;5;124myour_options[39m[38;5;124m"[39m)[38;5;241m.[39mload([38;5;124m"[39m[38;5;124myour_transaction_data_source[39m[38;5;124m"[39m)
[1;32m      5[0m product_stream [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124myour_format[39m[38;5;124m"[39m)[38;5;241m.[39mopt

Task 7: Stopping and Restarting Streaming Queries
1. Stop the streaming query and explore the results.
2. Restart the query and ensure that it continues from the last processed data by
utilizing the checkpoint.

In [0]:
# Stop the streaming query
joined_stream_query = display(joined_stream, streamName="joinedStreamQuery").awaitTermination()

# Restart the query with checkpointing to ensure it continues from the last processed data
joined_stream_query_restart = joined_stream.writeStream \
    .format("memory") \
    .queryName("joinedStreamQuery") \
    .option("checkpointLocation", "/path/to/checkpoint/dir") \
    .start()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-1284719369368711>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Stop the streaming query[39;00m
[0;32m----> 2[0m joined_stream_query [38;5;241m=[39m display(joined_stream, streamName[38;5;241m=[39m[38;5;124m"[39m[38;5;124mjoinedStreamQuery[39m[38;5;124m"[39m)[38;5;241m.[39mawaitTermination()
[1;32m      4[0m [38;5;66;03m# Restart the query with checkpointing to ensure it continues from the last processed data[39;00m
[1;32m      5[0m joined_stream_query_restart [38;5;241m=[39m joined_stream[38;5;241m.[39mwriteStream \
[1;32m      6[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mmemory[39m[38;5;124m"[39m) \
[1;32m      7[0m     [38;5;241m.[39mqueryName([38;5;124m"[39m[38;5;124mjoinedStreamQuery[39m[38;5;124m"[39m) \
[1;32m      8[0m     [38;5

Assignment: Creating a Complete ETL Pipeline using Delta Live Tables
(DLT)
Task 1: Create an ETL Pipeline using DLT (Python)
1. Create a Delta Live Table pipeline using PySpark to perform the following:
Read the source data from a CSV or Parquet file.
Transform the data by performing the following:
Add a new column for TotalAmount which is the result of
multiplying Quantity by Price .
Filter records where the Quantity is greater than 1.
Load the transformed data into a Delta table.

2. Ensure the pipeline is repeatable and can handle incremental loads by re-running with new data.

In [0]:
orders_csv_path = 'file:/Workspace/Shared/orders.csv'
dbutils.fs.cp(orders_csv_path, "dbfs:/Workspace/Shared/orders.csv")

True

In [0]:
import dlt
from pyspark.sql.functions import col, expr

# Read data from a CSV source
@dlt.table
def orders_raw():
    return spark.read.format("csv").option("header", True).load("dbfs:/Workspace/Shared/orders.csv")

# Transform data (add TotalAmount and filter Quantity > 1)
@dlt.table
def orders_transformed():
    df = dlt.read("orders_raw")
    df = df.withColumn("TotalAmount", col("Quantity") * col("Price"))
    return df.filter(col("Quantity") > 1)

# Load the transformed data into a Delta table
@dlt.table
def orders_final():
    dlt.read("orders_transformed").write.format("delta").mode("overwrite").save("dbfs:/Workspace/Sharedorders_final")
    return dlt.read("orders_transformed")


Name,Type
OrderID,string
OrderDate,string
CustomerID,string
Product,string
Quantity,string
Price,string
TotalAmount,double


Task 2: Create an ETL Pipeline using DLT (SQL)
1. Create a similar Delta Live Table pipeline using SQL:
Use SQL to read the source data, perform the same transformations (as
above), and write the data into a Delta table.

Ensure the pipeline can process incremental data without losing records
or creating duplicates.

In [0]:
# python
df = spark.read.format("csv").load("dbfs:/Workspace/Shared/orders.csv")
df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/assignment17sep/delta/orders")

# Read data from Delta Table
df = spark.read.format("delta").load("dbfs:/FileStore/assignment17sep/delta/orders")
df.show()

# Insert new record
df = df.union(spark.createDataFrame([(106, "2024-01-12", "C006", "Keyboard", 3, 50)], ["OrderID", "OrderDate", "CustomerID", "Product", "Quantity", "Price"]))
df.show()

# Update prices (increase price by 10%)
df = df.filter(col("_c3") == "Laptop").withColumn("Price", col("_c5") * 1.1)
df.show()

# Delete rows where Quantity < 2
df = df.filter(col("_c4") >= 2)
df.show()

+-------+----------+----------+-------+--------+-----+
|    _c0|       _c1|       _c2|    _c3|     _c4|  _c5|
+-------+----------+----------+-------+--------+-----+
|OrderID| OrderDate|CustomerID|Product|Quantity|Price|
|    101|2024-01-01|      C001| Laptop|       2| 1000|
|    102|2024-01-02|      C002|  Phone|       1|  500|
|    103|2024-01-03|      C003| Tablet|       3|  300|
|    104|2024-01-04|      C004|Monitor|       1|  150|
|    105|2024-01-05|      C005|  Mouse|       5|   20|
+-------+----------+----------+-------+--------+-----+

+-------+----------+----------+--------+--------+-----+
|    _c0|       _c1|       _c2|     _c3|     _c4|  _c5|
+-------+----------+----------+--------+--------+-----+
|OrderID| OrderDate|CustomerID| Product|Quantity|Price|
|    101|2024-01-01|      C001|  Laptop|       2| 1000|
|    102|2024-01-02|      C002|   Phone|       1|  500|
|    103|2024-01-03|      C003|  Tablet|       3|  300|
|    104|2024-01-04|      C004| Monitor|       1|  150|
|

Task 3: Perform Read, Write, Update, and Delete Operations on Delta Table
(SQL + PySpark)
1. Read the data from the Delta table created in Task 1 and Task 2.
2. Update the table by changing the price of a product (e.g., increase the price
of laptops by 10%).
3. Delete rows from the Delta table where the quantity is less than 2.
4. Insert a new record into the Delta table using PySpark or SQL.

In [0]:
#SQL

#SQL

# Read data as Delta Table
spark.sql("CREATE TABLE IF NOT EXISTS delta_orders_table USING DELTA LOCATION 'dbfs:/FileStore/assignment17sep/delta/orders_final'")

# Update prices (increase laptops by 10%)
spark.sql("UPDATE delta_orders_table SET Price = Price * 1.1 WHERE Product = 'Laptop'").show()

# Delete rows where Quantity < 2
spark.sql("DELETE FROM delta_orders_table WHERE Quantity < 2").show()

# Insert new record
spark.sql("INSERT INTO delta_orders_table (OrderID, OrderDate, CustomerID, Product, Quantity, Price) VALUES (106, '2024-01-12', 'C006', 'Keyboard', 3, 50)")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-458381364531283>, line 9[0m
[1;32m      6[0m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124mCREATE TABLE IF NOT EXISTS delta_orders_table USING DELTA LOCATION [39m[38;5;124m'[39m[38;5;124mdbfs:/FileStore/assignment17sep/delta/orders_final[39m[38;5;124m'[39m[38;5;124m"[39m)
[1;32m      8[0m [38;5;66;03m# Update prices (increase laptops by 10%)[39;00m
[0;32m----> 9[0m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124mUPDATE delta_orders_table SET Price = Price * 1.1 WHERE Product = [39m[38;5;124m'[39m[38;5;124mLaptop[39m[38;5;124m'[39m[38;5;124m"[39m)[38;5;241m.[39mshow()
[1;32m     11[0m [38;5;66;03m# Delete rows where Quantity < 2[39;00m
[1;32m     12[0m spark[38;5;241m.[39msql([38;5;124m"[39m[38;5;124mDELETE FROM delta_orders_table WHERE Quant

Task 4: Merge Data (Slowly Changing Dimension - SCD Type 2)
1. Create a new dataset representing updated orders with new prices and products.
Implement a MERGE operation to simulate a Slowly Changing Dimension Type 2
(SCD2) scenario. Ensure that:
The Quantity , Price , and TotalAmount columns are updated if there is
a match on OrderID .
If no match is found, insert the new record into the Delta table.

In [0]:
data = [
    (101, '2024-01-10', 'C001', 'Laptop', 2, 1200), 
    (106, '2024-01-12', 'C006', 'Keyboard', 3, 50)
    ]
    
schema = ["OrderID", "OrderDate", "CustomerID", "Product", "Quantity", "Price"]


new_orders_df = spark.createDataFrame(data, schema=schema)

new_orders_df.createOrReplaceTempView("new_orders_data")

print("Merging new data into Delta table...")

orders_df = spark.read.csv("dbfs:/Workspace/Shared/orders.csv", header=True, inferSchema=True)
orders_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/assignment17sep/delta/orders1")


dbfs_path = 'dbfs:/FileStore/assignment17sep/delta/orders1'
spark.sql(f"""
MERGE INTO delta.`{dbfs_path}` AS target
USING new_orders_data AS source
ON target.OrderID = source.OrderID
WHEN MATCHED THEN UPDATE SET
    target.Quantity = source.Quantity, target.Price = source.Price
WHEN NOT MATCHED THEN INSERT (OrderID, OrderDate, CustomerID, Product, Quantity, Price) 
VALUES (source.OrderID, source.OrderDate, source.CustomerID, source.Product, source.Quantity, source.Price)
""")

print("New data merged successfully!")


Merging new data into Delta table...
New data merged successfully!


Task 5: Explore Delta Table Internals
1. Inspect the Delta table's transaction logs and explore the metadata using SQL
queries:
Display the history of changes to the Delta table using the DESCRIBE
HISTORY command.
Check the file size and modification times using DESCRIBE DETAIL .

In [0]:
%sql
-- View the history of changes
DESCRIBE HISTORY delta.`dbfs:/FileStore/assignment17sep/delta/orders`;

-- View the detailed metadata
DESCRIBE DETAIL delta.`dbfs:/FileStore/assignment17sep/delta/orders`;

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,aae9b655-1e4a-43b4-baef-e17c900ef700,,,dbfs:/FileStore/assignment17sep/delta/orders,2024-09-17T09:18:52.338Z,2024-09-17T09:18:53Z,List(),List(),1,1559,Map(delta.enableDeletionVectors -> true),3,7,List(deletionVectors),"Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)"


Task 6: Time Travel in Delta Tables
1. Use time travel to query the Delta table as it existed at a previous point in
time.
Query the table as it existed before the last merge operation.
Demonstrate time travel by using both the version of the table and the
timestamp.

In [0]:
%sql
-- Query the table before the last merge
SELECT * FROM delta.`dbfs:/FileStore/assignment17sep/delta/orders` VERSION AS OF 0;

_c0,_c1,_c2,_c3,_c4,_c5
OrderID,OrderDate,CustomerID,Product,Quantity,Price
101,2024-01-01,C001,Laptop,2,1000
102,2024-01-02,C002,Phone,1,500
103,2024-01-03,C003,Tablet,3,300
104,2024-01-04,C004,Monitor,1,150
105,2024-01-05,C005,Mouse,5,20


Task 7: Optimize Delta Table
1. Optimize the Delta table for faster queries using Z-Ordering.
Optimize the table on the Product column to reduce I/O and improve
query performance.
2. Use vacuum to remove any old files that are no longer necessary after the
optimization process.

In [0]:
# Optimize the table for faster queries with Z-ordering
spark.sql("OPTIMIZE delta.`dbfs:/FileStore/assignment17sep/delta/orders`")

# Vacuum the table to remove old files
spark.sql("VACUUM delta.`dbfs:/FileStore/assignment17sep/delta/orders` RETAIN 168 HOURS")

DataFrame[path: string]

Task 8: Converting Parquet Files to Delta Format
1. You are provided with Parquet files containing historical order data. Convert
these files into a Delta table format using either PySpark or SQL.
Perform a simple query

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Step 1: Create a DataFrame with the specified schema
data = [
    (101, '2024-01-10', 'C001', 'Laptop', 2, 1200), 
    (106, '2024-01-12', 'C006', 'Keyboard', 3, 50)
]
schema = ["OrderID", "OrderDate", "CustomerID", "Product", "Quantity", "Price"]
orders_df = spark.createDataFrame(data, schema=schema)

# Step 2: Write the DataFrame to a Parquet file
parquet_path = "dbfs:/Workspace/Shared/historical_orders.parquet"
orders_df.write.mode("overwrite").parquet(parquet_path)

# Step 3: Read the Parquet file into a DataFrame
historical_orders_df = spark.read.parquet(parquet_path)

# Step 4: Convert the DataFrame into a Delta table
delta_path = "dbfs:/FileStore/historical_orders/delta"
historical_orders_df.write.format("delta").mode("overwrite").save(delta_path)

# Step 5: Create a temporary view and perform SQL queries
spark.read.format("delta").load(delta_path).createOrReplaceTempView("historical_orders_delta")
display(spark.sql("SELECT COUNT(*) AS TotalOrders FROM historical_orders_delta"))

TotalOrders
2
