Link: https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/03-delta-lake.html#create-a-lakehouse-and-upload-data

In [26]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType

# define the schema
schema = StructType() \
.add("ProductID", IntegerType(), True) \
.add("ProductName", StringType(), True) \
.add("Category", StringType(), True) \
.add("ListPrice", DoubleType(), True)

df = spark.read.format("csv").option("header","true").schema(schema).load("Files/products/product.csv")
# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv".
display(df)

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 30, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1a07d98d-eb50-401f-bd96-b25e1d40b72a)

In [27]:
# Create a managed Delta Table

df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 31, Finished, Available, Finished)

In [28]:
# Create external table

df.write.format("delta").saveAsTable("external_products", path="abfss://1820e0ca-6318-4d70-a0e4-692382506724@onelake.dfs.fabric.microsoft.com/d4808097-5433-42d4-ae9e-0e3f57bc86e4/Files/external_products")

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 32, Finished, Available, Finished)

In [29]:
%%sql

DESCRIBE FORMATTED managed_products;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 33, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

In [30]:
%%sql
DESCRIBE FORMATTED external_products;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 34, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

In [31]:
%%sql
DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 36, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

**Use SQL to create a Delta table**

In [32]:
%%sql
CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products';

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 37, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [33]:
%%sql
SELECT * FROM products;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 38, Finished, Available, Finished)

<Spark SQL result set with 295 rows and 4 fields>

**Explore table versioning**

In [34]:
%%sql
UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 39, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [35]:
%%sql
DESCRIBE HISTORY products;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 40, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 15 fields>

In [36]:
delta_table_path = 'Files/external_products'
# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)

# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 41, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 652828cf-fa90-4925-bff2-b5885b60439d)

SynapseWidget(Synapse.DataFrame, a9cd8ef4-8255-4376-9b1b-0539c2615568)

**Analyze Delta table data with SQL queries**

In [37]:
%%sql
-- Create a temporary view
CREATE OR REPLACE TEMPORARY VIEW products_view
AS
    SELECT Category, COUNT(*) AS NumProducts, MIN(ListPrice) AS MinPrice, MAX(ListPrice) AS MaxPrice, AVG(ListPrice) AS AvgPrice
        FROM products
        GROUP BY Category;

SELECT *
    FROM products_view
    ORDER BY Category;
        

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 43, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 37 rows and 5 fields>

In [38]:
%%sql
SELECT Category, NumProducts
    FROM products_view
    ORDER BY NumProducts DESC
    LIMIT 10;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 44, Finished, Available, Finished)

<Spark SQL result set with 10 rows and 2 fields>

In [39]:
from pyspark.sql.functions import col, desc

df_products = spark.sql("SELECT Category, MinPrice, MaxPrice, AvgPrice FROM products_view").orderBy(col("AvgPrice").desc())
display(df_products.limit(6))

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 45, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 35141ea1-77b0-4631-b4ea-47b95b65045d)

**Use Delta tables for streaming data**

In [40]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "data.txt", device_data, True)

print("Source stream created...")

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 46, Finished, Available, Finished)

Source stream created...


In [41]:
# Write the stream to a delta table
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 47, Finished, Available, Finished)

Streaming to delta sink...


In [42]:
%%sql
SELECT * FROM IotDeviceData;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 48, Finished, Available, Finished)

<Spark SQL result set with 9 rows and 2 fields>

In [43]:
# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 49, Finished, Available, Finished)

True

In [44]:
%%sql
SELECT * FROM IotDeviceData;

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 50, Finished, Available, Finished)

<Spark SQL result set with 16 rows and 2 fields>

In [45]:
deltastream.stop()

StatementMeta(, 7fb31d3b-6123-4851-a711-ecd6006fb08a, 51, Finished, Available, Finished)