# Explore data in a dataframe

In [2]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
df = spark.read.format("csv").option("header","true").load("Files/data/products.csv")
# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv".
display(df)

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 4, Finished, Available)

SynapseWidget(Synapse.DataFrame, e804ddca-17c4-414e-a2f4-efb9c4a56391)

# Create a managed table

In [2]:
df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, 79471d39-fc1c-4759-a25c-211a62ea15c2, 4, Finished, Available)

In [3]:
df = spark.sql("SELECT * FROM MyLakehouse.managed_products LIMIT 1000")
display(df)

StatementMeta(, 79471d39-fc1c-4759-a25c-211a62ea15c2, 5, Finished, Available)

SynapseWidget(Synapse.DataFrame, 1cf72867-d8f9-420e-b4b2-a9c2ed0472a6)

# Create an external table

In [None]:
df.write.format("delta").mode("overwrite").saveAsTable("external_products", path="abfss://5a3d48d7-0be3-418b-bd4e-c28f727bec51@onelake.dfs.fabric.microsoft.com/4862f3b3-dd67-4a30-8b15-7d76d14f8f6e/Files/external_products")

# Compare managed and external tables

In [5]:
%%sql

DESCRIBE FORMATTED managed_products;

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 7, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

In [6]:
%%sql

DESCRIBE FORMATTED external_products;

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 8, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

In [7]:
%%sql

DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, , -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [8]:
%%sql

CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products';

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 11, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [9]:
%%sql

SELECT * FROM products;

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 12, Finished, Available)

<Spark SQL result set with 295 rows and 4 fields>

# Explore table versioning

In [10]:
%%sql

UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 13, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

In [11]:
%%sql

DESCRIBE HISTORY products;

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 14, Finished, Available)

<Spark SQL result set with 3 rows and 15 fields>

In [12]:
delta_table_path = 'Files/external_products'

# Get the current data
current_data = spark.read.format("delta").load(delta_table_path)
display(current_data)

# Get the version 0 data
original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(original_data)

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 24c980f3-e8fe-4e5c-9321-3322cb9fda98)

SynapseWidget(Synapse.DataFrame, ed89342a-1e46-4eef-b2ea-a6fb5c32ff45)

# 

# Use delta tables for streaming data

In [30]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/streaming_data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''
mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
print("Source stream created...")

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 33, Finished, Available)

Source stream created...


In [31]:
# Write the stream to a delta table
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath = 'Files/delta/checkpoint'
deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 34, Finished, Available)

Streaming to delta sink...


In [32]:
# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 35, Finished, Available)

True

In [33]:
deltastream.stop()

StatementMeta(, 73431cac-39af-4527-b2f4-f930c7456531, 36, Finished, Available)