## Use Delta tables in Apache Spark

Learn how to work with delta tables in Microsoft Fabric.

Load file **products.csv** to folder **Files**

In [2]:
df = spark.read.format("csv").option("header","true").load("abfss://dp_fabric@onelake.dfs.fabric.microsoft.com/LH_2.Lakehouse/Files/products/products.csv")
# df now is a Spark DataFrame containing CSV data from "abfss://dp_fabric@onelake.dfs.fabric.microsoft.com/LH_2.Lakehouse/Files/products/products.csv".
display(df)

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, 40bac0ee-ab40-4363-a621-9f52e56065b6)

### Create delta tables

Save the dataframe as a delta table using the **saveAsTable** method.

In [3]:
df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 5, Finished, Available)

_External_ tables for which the schema metadata is defined in the metastore for the lakehouse, but the data files are stored in a external location.

In [7]:
df.write.format("delta").saveAsTable("external_products", path="abfss://dp_fabric@onelake.dfs.fabric.microsoft.com/LH_2.Lakehouse/Files/external_products")

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 9, Finished, Available)

In [8]:
%%sql
DESCRIBE FORMATTED managed_products;

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 10, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

In [9]:
%%sql

DESCRIBE FORMATTED external_products;

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 11, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

In [10]:
%%sql

DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 13, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [11]:
%%sql

CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products';

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 14, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [13]:
%%sql

SELECT * FROM products LIMIT 10;

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 16, Finished, Available)

<Spark SQL result set with 10 rows and 4 fields>

### Explore table versioning

Transaction history for delta tables is stored in JSON files in the **delta_log** folder.

In [14]:
%%sql

UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 17, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [15]:
%%sql

DESCRIBE HISTORY products;

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 18, Finished, Available)

<Spark SQL result set with 2 rows and 15 fields>

In [16]:
delta_table_path = 'Files/external_products'

# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)

# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 19, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2e06132e-7072-4c0d-ae5a-41fd6f678fa4)

SynapseWidget(Synapse.DataFrame, 0bb12dbd-57f3-4525-b86e-f53363020f3e)

### Use delta tables for streaming data

Delta lake supports streaming data. Delta tables can be a sink or a source for data streams created using the Spark Structured Streaming API.

In [17]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
print("Source stream created...")

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 20, Finished, Available)

Source stream created...


In [18]:
# Write the stream to a delta table
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 21, Finished, Available)

Streaming to delta sink...


The next code queries the IotDeviceData table, which constains the device data from streaming source.

In [19]:
%%sql

SELECT * FROM IotDeviceData;

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 22, Finished, Available)

<Spark SQL result set with 9 rows and 2 fields>

In [20]:
# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 23, Finished, Available)

True

In [21]:
%%sql

SELECT * FROM IotDeviceData;

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 24, Finished, Available)

<Spark SQL result set with 16 rows and 2 fields>

In [22]:
deltastream.stop()

StatementMeta(, 5fcb3369-15fa-4fa3-a755-dd34f47a0ba1, 25, Finished, Available)