# Delta Lake tables
# Use this notebook to explore Delta Lake functionality


In [1]:
from pyspark.sql.types import StructType, IntegerType, DoubleType, StringType 

# define the schema
schema = StructType() \
.add("ProductID", IntegerType(), nullable=True) \
.add("ProductName", StringType(), nullable=True) \
.add("Category", StringType(), nullable=True) \
.add("ListPrice", DoubleType(), nullable=True)

df = spark.read.format('csv').option("header", "true").schema(schema).load('Files/products/products.csv')
# df now is a spark dataframe containing CSV data from "Files/products/products.csv"
display(df)

StatementMeta(, 3be68633-d519-483d-ac6c-34d9308b132d, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5c1c5672-fda0-4e04-b1fe-31fd654fee96)

### You can save the DataFrame as a Delta table by using the saveAsTable method. Delta Lake supports the creation of both managed and external tables:

###### **Managed** Delta tables benefit from higher performance, as Fabric manages both the schema metadata and the data files. 
###### **External** tables allow you to store data externally, with the metadata managed by Fabric.

In [2]:
# Create a managed table
df.write.format('delta').saveAsTable('managed_products')

StatementMeta(, 3be68633-d519-483d-ac6c-34d9308b132d, 4, Finished, Available, Finished)

**[!NOTE]:** The triangle icon next to the file name indicates a Delta table. 

The files for managed tables are stored in the Tables folder in the lakehouse. A folder named managed_products has been created which stores the Parquet files and delta_log folder for the table.

The ABFS path is the fully qualified path to the lakehouse Files folder \
**ABFS path:** abfss://376436a8-fa6c-4f70-8116-2f3c3f3a3a64@onelake.dfs.fabric.microsoft.com/542b7a1d-d0d6-4e33-a393-ee3cffa2970e/Files

In [3]:
# Create an external table
abfs_path = 'abfss://376436a8-fa6c-4f70-8116-2f3c3f3a3a64@onelake.dfs.fabric.microsoft.com/542b7a1d-d0d6-4e33-a393-ee3cffa2970e/Files'
df.write.format('delta').saveAsTable('external_products', path=f'{abfs_path}/external_products')

StatementMeta(, 3be68633-d519-483d-ac6c-34d9308b132d, 5, Finished, Available, Finished)

In [8]:
%%sql
-- Compare managed and external tables
DESCRIBE FORMATTED managed_products;

DESCRIBE FORMATTED external_products;

StatementMeta(, 3be68633-d519-483d-ac6c-34d9308b132d, 11, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

<Spark SQL result set with 12 rows and 3 fields>

In [9]:
%%sql
DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, 3be68633-d519-483d-ac6c-34d9308b132d, 13, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In the Explorer pane, <mark>Refresh the Tables folder to verify that</mark> **no tables are listed in the Tables node**. \
In the Explorer pane, <mark>Refresh the Files folder and verify that</mark> **the external_products file has not been deleted. Select this folder to view the Parquet data files and _delta_log folder**. \
<mark>The metadata for the external table was deleted, but not the data file.</mark>

In [4]:
%%sql
-- Use SQL to create a Delta table
CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products'

StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 5, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [5]:
%%sql
SELECT * 
FROM products;

StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 6, Finished, Available, Finished)

<Spark SQL result set with 295 rows and 4 fields>

###### <mark>Transaction history</mark> for Delta tables is <mark>stored in JSON files in the delta_log folder</mark>. <mark>You can use this transaction log to manage data versioning</mark>.

In [6]:
%%sql
-- Explore table versioning
-- The code implements a 10% reduction in the price for mountain bikes
UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 7, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [7]:
%%sql
-- Versioning
DESCRIBE HISTORY products;

StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 8, Finished, Available, Finished)

<Spark SQL result set with 3 rows and 15 fields>

In [8]:
delta_table_path = 'Files/external_products'

# Get the current data 
current_data = spark.read.format('delta').load(delta_table_path)
display(current_data)

# Get the version 0
original_data = spark.read.format('delta').option('versionAsof', 0).load(delta_table_path)
display(original_data)

StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, f27f45cc-1916-4487-9177-4dd41dedd488)

SynapseWidget(Synapse.DataFrame, ed484e8f-9ede-4f37-86c0-2694abcf7872)

##### Two result sets are returned - one containing the data after the price reduction, and the other showing the original version of the data.

In [11]:
%%sql
-- Analyze Delta table data with SQL queries
CREATE OR REPLACE TEMPORARY VIEW products_view
AS
    SELECT Category, COUNT(*) AS NumProducts, MIN(ListPrice) AS MinPrice, MAX(ListPrice) AS MaxPrice, AVG(ListPrice) AS AvgPrice
    FROM products
    GROUP BY Category;

SELECT *
FROM products_view
ORDER BY Category;


StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 14, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 37 rows and 5 fields>

In [14]:
%%sql
SELECT Category, NumProducts
FROM products_view
ORDER BY NumProducts DESC
LIMIT 10;

StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 17, Finished, Available, Finished)

<Spark SQL result set with 10 rows and 2 fields>

In [16]:
from pyspark.sql.functions import col, desc

df_products = spark.sql('SELECT Category, MinPrice, MaxPrice, AvgPrice FROM products_view').orderBy(col('AvgPrice').desc())
display(df_products.limit(6))

StatementMeta(, cfa15a1c-a00b-4f67-86a4-9878bd7543ff, 19, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fc0556b8-2167-48d1-bc5d-4e01d8fcfef0)

###### **Delta Lake supports streaming data**. Delta tables can be a sink or a source for data streams created using the <mark>Spark Structured Streaming API</mark>. here i will use a Delta table as a sink for some streaming data in a simulated internet of things (IoT) scenario.

In [22]:
# Use Delta tables for streaming data
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/stream_data/'
mssparkutils.fs.mkdirs(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + 'data.json',device_data, True)

# Create a stream that read data from the folder using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])

iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)


print('Source stream created...')

StatementMeta(, 6770c44d-4748-41d9-b912-45d3121b0373, 24, Finished, Available, Finished)

Source stream created...


###### The code just ran has created a streaming data source based on a folder to which some data has been saved, representing readings from hypothetical IoT devices.

In [24]:
# Write the stream to a delta table
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format('delta').option('checkpointLocation', checkpointpath).start(delta_stream_table_path)
print('streaming to delta sink...')

StatementMeta(, 6770c44d-4748-41d9-b912-45d3121b0373, 26, Finished, Available, Finished)

streaming to delata sink...


In [25]:
%%sql
SELECT *
FROM IotDeviceData

StatementMeta(, 6770c44d-4748-41d9-b912-45d3121b0373, 27, Finished, Available, Finished)

<Spark SQL result set with 9 rows and 2 fields>

In [26]:
# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + 'more_data.json', more_data, True)
print('New streaming data added to delata sink...')

StatementMeta(, 6770c44d-4748-41d9-b912-45d3121b0373, 28, Finished, Available, Finished)

New streaming data added to delata sink...


In [27]:
%%sql
SELECT *
FROM iotdevicedata

StatementMeta(, 6770c44d-4748-41d9-b912-45d3121b0373, 29, Finished, Available, Finished)

<Spark SQL result set with 16 rows and 2 fields>

In [28]:
deltastream.stop()

StatementMeta(, 6770c44d-4748-41d9-b912-45d3121b0373, 30, Finished, Available, Finished)