In [1]:
df = spark.read.format("csv").option("header","true").load("Files/products/products.csv")
# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv".
display(df)

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, ff3fcc8c-2551-42ba-b06f-b82a855d832d)

# Create delta tables

This could be: 

- Managed 
- Unmanaged

Managed tables are tables for which both the schema metadata and the data files are managed by Fabric. The data files for the table are created in the Tables folder.

**1. manage table**

In [4]:
df.write.format("delta").saveAsTable("managed_products")

# This will create a table called managed_products in the Table folder of the Lakehouse. You can refesh to view the table

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 6, Finished, Available)

**2. Create an external table**

To create an external tables for which the schema metadata is defined in the metastore for the lakehouse, but the data files are stored in an external location use the below code:

In [6]:
# The code below creates external_products delta tables in both Tables and Files folders. Confirm by refreshing both folders

df.write.format("delta").saveAsTable("external_products", path="abfss://deltaTablesApache@onelake.dfs.fabric.microsoft.com/DT_Apache.Lakehouse/Files/external_products")

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 8, Finished, Available)

In [7]:
%%sql
-- Comparing Managed and external tables using Describe formatted

DESCRIBE FORMATTED managed_products;

-- In the results, view the Location property for the table, which should be a path to the OneLake storage for the lakehouse ending with /Tables/managed_products (you may need to widen the Data type column to see the full path).

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 9, Finished, Available)

<Spark SQL result set with 14 rows and 3 fields>

In [8]:
%%sql


-- Modifying the above code for the external table

DESCRIBE FORMATTED external_products;

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 10, Finished, Available)

<Spark SQL result set with 15 rows and 3 fields>

The files for managed table are stored in the Tables folder in the OneLake storage for the lakehouse. In this case, a folder named managed_products has been created to store the Parquet files and delta_log folder for the table you created.

In [9]:
%%sql

-- Let's drop the two tables and confirm what happens:
DROP TABLE external_products;
DROP TABLE managed_products;


StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In the Lakehouse explorer pane, expand the Files folder and verify that the external_products has not been deleted. Select this folder to view the Parquet data files and _delta_log folder for the data that was previously in the external_products table. The table metadata for the external table was deleted, but the files were not affected.

# Use SQL to create a table

In [10]:
%%sql

-- To create a table product in the Table folder, use the below code
CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products';



StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 13, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

- After running the code, refresh the table folder and notice that products delta file is now available there!!!
- You can also see the that the **schema** or fields of the products table **is same** with the original dataframe saved in **external_products**.

In [11]:
%%sql

-- To confirm the above claim, let's check the entire table datasets in products

SELECT * FROM products;

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 14, Finished, Available)

<Spark SQL result set with 295 rows and 4 fields>

# Explore table versioning
Transaction history for delta tables is stored in JSON files in the delta_log folder. You can use this transaction log to manage data versioning.

In [14]:
%%sql

-- Let's do some update on the table and see how versioning is tracked
-- Multiple Listprice of Mountain Bikes by 0.9 which is a 10% reduction in Mountain Bikes Price

UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE category = 'Mountain Bikes';

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 17, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [15]:
%%sql

-- Let's describe and see the history of the table now 

DESCRIBE HISTORY products;

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 18, Finished, Available)

<Spark SQL result set with 3 rows and 15 fields>

The above results shows the history of transactions recorded for the table

In [16]:

delta_table_path = 'Files/external_products'

# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)

# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

StatementMeta(, 8b325959-df42-4c90-b245-49000579cbec, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5117b16c-1706-483d-a79a-f76c6d2ac312)

SynapseWidget(Synapse.DataFrame, 5fa6741d-6b29-4610-9306-fcac64c651ea)

The 2 dataframes above shows: 
- **Dataframe after reduction.**
- **Dataframe before reductions using the VersionAsOf 0 method**


# **Delta tables for streaming data**

Delta lake supports streaming data. Delta tables can be a sink or a source for data streams created using the Spark Structured Streaming API.

In [1]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
print("Source stream created...")

StatementMeta(, d9edc5ea-5fa9-4012-b27f-2e8381070fe2, 3, Finished, Available)

Source stream created...


Check that the message Source stream created... is printed. The code you just executed built a streaming data source based on a folder to which some data representing readings from imaginary IoT devices was stored.

In [2]:
# Write the stream to a delta table
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(, d9edc5ea-5fa9-4012-b27f-2e8381070fe2, 4, Finished, Available)

Streaming to delta sink...


This code saves the streaming device data in delta format to the iotdevicedata subdirectory. Because the path for the folder is in the Tables folder, a table will be created for it automatically.

In [3]:
%%sql
-- Let's query the IotDeviceData table, which contain the device data from the streaming source

SELECT * FROM IotDeviceData;

StatementMeta(, d9edc5ea-5fa9-4012-b27f-2e8381070fe2, 5, Finished, Available)

<Spark SQL result set with 9 rows and 2 fields>

In [4]:
# You can add or writes more hypothetical device data to the streaming source
# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, d9edc5ea-5fa9-4012-b27f-2e8381070fe2, 6, Finished, Available)

True

In [5]:
%%sql
--Let's confirm if the above extra data has been added to the IotDeviceData table

SELECT * FROM IotDeviceData;

-- Great!!! It has been added.

StatementMeta(, d9edc5ea-5fa9-4012-b27f-2e8381070fe2, 7, Finished, Available)

<Spark SQL result set with 16 rows and 2 fields>

In [6]:
# To stop the stream, use
deltastream.stop()

StatementMeta(, d9edc5ea-5fa9-4012-b27f-2e8381070fe2, 8, Finished, Available)