# Delta Lake tables
Use this notebook to explore Delta Lake functionality

In [4]:
from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType
# define the schema
schema = StructType() \
.add("ProductID", IntegerType(), True) \
.add("ProductName", StringType(), True) \
.add("Category", StringType(), True) \
.add("ListPrice", DoubleType(), True)
df =spark.read.format("csv").option("header","true").schema(schema).load("Files/products/products.csv")
# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv".
display(df)

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f19fd099-c758-4782-aa38-ca791f7f3279)

In [5]:
df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 7, Finished, Available, Finished)

In [6]:
df.write.format("delta").saveAsTable("external_products", path="abfss://DeltaLake1@onelake.dfs.fabric.microsoft.com/Products.Lakehouse/Files/external_products")

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 8, Finished, Available, Finished)

In [7]:
%%sql
DESCRIBE FORMATTED managed_products;
--> Managed Table :
/*
# Detailed Table Information,
Name,spark_catalog.products.managed_products
Type,MANAGED
Location,abfss://xxxxxxxxx-xxxx-xxxxx-xxxxx-xxxxx@onelake.dfs.fabric.microsoft.com/1dabxxxx-abd2-xxc3-a34a-xxxxxxd/Tables/managed_products
Provider,delta
Owner,trusted-service-user
Table Properties,[delta.minReaderVersion=1,delta.minWriterVersion=2,delta.parquet.vorder.enabled=true]
*/

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 9, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

In [8]:
%%sql
DESCRIBE FORMATTED external_products;
--> External Table:
/*
# Detailed Table Information,
Name,spark_catalog.products.external_products
Type,EXTERNAL
Location,abfss://DeltaLake1@onelake.dfs.fabric.microsoft.com/Products.Lakehouse/Files/external_products
Provider,delta
Owner,trusted-service-user
Table Properties,[delta.minReaderVersion=1,delta.minWriterVersion=2,delta.parquet.vorder.enabled=true]
*/

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 10, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

In [9]:
%%sql
DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 12, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [10]:
%%sql
CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products';

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 13, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

The table being created is an external table. This is evident because of the LOCATION clause, which specifies an external storage path. In the case of a managed table, the data files would be stored in the default warehouse location managed by the system.

Managed Table vs. External Table
1. Managed Table
Definition: The data files are fully managed by the system. When you create a managed table, the system (like a database or data warehouse) decides where to store the data.
Storage: Data is stored in the default location (typically within the system's managed storage).
Lifecycle: When you drop a managed table, the system deletes both the table definition and the data files.
Advantages:
Simpler Management: The system handles where and how the data is stored.
Automatic Cleanup: Deleting the table removes the data, so there’s no need for manual cleanup.
2. External Table
Definition: The data files are stored outside the managed storage, often on a specified path like a file system or cloud storage (e.g., AWS S3, Azure Blob Storage).
Storage: Data is stored at the location you specify with the LOCATION clause.
Lifecycle: Dropping the table only removes the table definition; the data files remain intact.
Advantages:
Flexibility: Useful when data is shared between different systems or needs to be accessed by multiple platforms.
Persistence: Dropping the table does not delete the data, which is beneficial if you need to maintain the data for other purposes.
Control: You have control over where the data is stored, including integration with external storage systems.
Summary
Managed Tables: System manages storage, and data is automatically deleted when the table is dropped. Simple and easy to use.
External Tables: Users manage storage location, and data persists even if the table is dropped. Offers more control and flexibility.

In [11]:
%%sql
UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 14, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [12]:
%%sql
DESCRIBE HISTORY products;

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 15, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 15 fields>

In [13]:
delta_table_path = 'Files/external_products'
# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)
# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 16, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5533eb23-7282-4110-863a-35bc14e4f2d7)

SynapseWidget(Synapse.DataFrame, 3423e242-53dd-4ffe-9916-bb0056c7181d)

In [None]:
delta_table_path = 'Files/external_products'
# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)
# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

In [14]:
%%sql
-- Create a temporary view
CREATE OR REPLACE TEMPORARY VIEW products_view
AS
SELECT Category, COUNT(*) AS NumProducts, MIN(ListPrice) AS MinPrice, MAX(ListPrice) AS
MaxPrice, AVG(ListPrice) AS AvgPrice
FROM products
GROUP BY Category;
SELECT *
FROM products_view
ORDER BY Category;

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 18, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 37 rows and 5 fields>

In [15]:
%%sql
SELECT Category, NumProducts
FROM products_view
ORDER BY NumProducts DESC
LIMIT 10;

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 19, Finished, Available, Finished)

<Spark SQL result set with 10 rows and 2 fields>

In [17]:
from pyspark.sql.functions import col, desc
df_products = spark.sql("SELECT Category, MinPrice, MaxPrice, AvgPrice FROM products_view").orderBy(col("AvgPrice").desc())
display(df_products.limit(6))

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, cab84b3d-5209-477b-a906-adbd743a43bb)

In [18]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)
# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)
# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
print("Source stream created...")


StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 22, Finished, Available, Finished)

True

In [19]:
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation",
checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 23, Finished, Available, Finished)

Streaming to delta sink...


In [20]:
%%sql
SELECT * FROM IotDeviceData;

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 24, Finished, Available, Finished)

<Spark SQL result set with 9 rows and 2 fields>

In [21]:
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 25, Finished, Available, Finished)

True

In [22]:
%%sql
SELECT * FROM IotDeviceData;

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 26, Finished, Available, Finished)

<Spark SQL result set with 16 rows and 2 fields>

In [23]:
deltastream.stop()

StatementMeta(, 44e9243a-86fd-4d84-9fb2-24ebaaa7dd42, 27, Finished, Available, Finished)