# Delta Lake tables
Use this notebook to explore Delta Lake functionality


## Create a schema
The code below defines a schema and shows the dataframe from "products.csv" file.

In [1]:
from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType

# define the schema
schema = StructType() \
.add("ProductID", IntegerType(), True) \
.add("ProductName", StringType(), True) \
.add("Category", StringType(), True) \
.add("ListPrice", DoubleType(), True)

df = spark.read.format("csv").option("header","true").schema(schema).load("Files/products/products.csv")

# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv"

display(df)

StatementMeta(, 5cb1da51-70ae-4692-8f0f-da321e888e30, 3, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8dad67ab-c8c0-477b-ad68-6129f2b9d192)

## Create managed and external Delta tables

In [2]:
# Creates a managed Delta table
df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, 5cb1da51-70ae-4692-8f0f-da321e888e30, 4, Finished, Available, Finished)

In [3]:
# Sets the file path and creates a external Delta table
dfPath = "abfss://8fb3e71f-f4ce-4f5f-a159-4da9b4cfe6f0@onelake.dfs.fabric.microsoft.com/a4042216-07c2-46e2-840a-5a5414a9b6fd/Files/external_products"

df.write.format("delta").saveAsTable("external_products", path=dfPath)

StatementMeta(, 5cb1da51-70ae-4692-8f0f-da321e888e30, 5, Finished, Available, Finished)

Shows the detail of both tables. 

In [4]:
%%sql
DESCRIBE FORMATTED managed_products;

StatementMeta(, 5cb1da51-70ae-4692-8f0f-da321e888e30, 6, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

In [5]:
%%sql
DESCRIBE FORMATTED external_products;

StatementMeta(, 5cb1da51-70ae-4692-8f0f-da321e888e30, 7, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

Then drops both tables, to indicate that all files in the managed table were deleted, while the external table was deleted but the parquet and log files were kept.

In [6]:
%%sql
DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, 5cb1da51-70ae-4692-8f0f-da321e888e30, 9, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

Creates a Delta table using SQL

In [1]:
%%sql
CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products';

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 2, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

Then select some data, just for testing.

In [4]:
%%sql
SELECT * FROM products
LIMIT 100;

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 5, Finished, Available, Finished)

<Spark SQL result set with 100 rows and 4 fields>

## Testing table versioning

Set a 10% price reduction for mountain bikes.

In [5]:
%%sql
UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 6, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

Lists the table history.

In [6]:
%%sql
DESCRIBE HISTORY products;

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 7, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 15 fields>

Get both current and first version (version 0) data from the table

In [7]:
delta_table_path = 'Files/external_products'

# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)

# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 62dfdcd0-3c7b-4806-9fb6-3520893926e5)

SynapseWidget(Synapse.DataFrame, c5701788-d9da-47cf-8fbc-1fc4d73cc58f)

Creates a a temporary view, ordered by Category.

In [8]:
 %%sql
 -- Create a temporary view
 CREATE OR REPLACE TEMPORARY VIEW products_view
 AS
     SELECT Category, COUNT(*) AS NumProducts, MIN(ListPrice) AS MinPrice, MAX(ListPrice) AS MaxPrice, AVG(ListPrice) AS AvgPrice
     FROM products
     GROUP BY Category;

 SELECT *
 FROM products_view
 ORDER BY Category;    

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 11, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 37 rows and 5 fields>

Gets the top 10 categories by number of products, from the products view.

In [9]:
 %%sql
 SELECT Category, NumProducts
 FROM products_view
 ORDER BY NumProducts DESC
 LIMIT 10;

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 12, Finished, Available, Finished)

<Spark SQL result set with 10 rows and 2 fields>

Same query, using PySpark.

In [10]:
 from pyspark.sql.functions import col, desc

 df_products = spark.sql("SELECT Category, MinPrice, MaxPrice, AvgPrice FROM products_view").orderBy(col("AvgPrice").desc())
 display(df_products.limit(6))

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 28a90c7d-c75a-494a-87d3-fac9cde0aa3f)

## Streaming data with Delta tables

In [11]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "data.txt", device_data, True)

print("Source stream created...")

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 14, Finished, Available, Finished)

Source stream created...


In [12]:
 # Write the stream to a delta table
 delta_stream_table_path = 'Tables/iotdevicedata'
 checkpointpath = 'Files/delta/checkpoint'
 deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
 print("Streaming to delta sink...")

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 15, Finished, Available, Finished)

Streaming to delta sink...


In [13]:
 %%sql
 SELECT * FROM IotDeviceData;

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 16, Finished, Available, Finished)

<Spark SQL result set with 9 rows and 2 fields>

In [14]:
 # Add more data to the source stream
 more_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''

 mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 17, Finished, Available, Finished)

True

In [15]:
 %%sql
 SELECT * FROM IotDeviceData;

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 18, Finished, Available, Finished)

<Spark SQL result set with 16 rows and 2 fields>

Stops the stream.

In [16]:
 deltastream.stop()

StatementMeta(, 78fa6e96-a821-4d4b-b3d3-682ca2b6d962, 19, Finished, Available, Finished)