# Delta Lake Tables
Use this notebook to explore Delta Lake functionalit

In [19]:
from pyspark.sql.types import StructType, IntegerType, StringType, DoubleType
# define the schema
schema = StructType().add("ProductID", IntegerType(), True).add("ProductName", StringType(), True).add("Category", StringType(), True).add("ListPrice", DoubleType(), True)
df = spark.read.format("csv").option("header","true").schema(schema).load("Files/products/products.csv")
# Ahora df es un dataframe de spark con datos csv de ruta files/products/products
display(df)

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 23, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2958561c-4c20-4069-8af3-5666aec4f05d)

In [20]:
df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 24, Finished, Available, Finished)

In [21]:
df.write.format("delta").saveAsTable("external_products", path= "abfss://1bf7ab2d-e1c2-4b79-826b-1b400c490062@onelake.dfs.fabric.microsoft.com/54eea3d3-9718-4394-b429-47ffb2992f40/Files/external_products")

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 25, Finished, Available, Finished)

In [22]:
%%sql 
DESCRIBE FORMATTED managed_products;

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 26, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 3 fields>

In [23]:
%%sql
drop table managed_products;
DROP TABLE external_products;

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 28, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

In [30]:
%%sql
CREATE TABLE products
using DELTA
LOCATION 'Files/external_products';

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 35, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [32]:
%%sql
SELECT * FROM products;

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 37, Finished, Available, Finished)

<Spark SQL result set with 295 rows and 4 fields>

In [33]:
%%sql
UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 38, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 1 fields>

In [34]:
%%sql
DESCRIBE history products;

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 39, Finished, Available, Finished)

<Spark SQL result set with 2 rows and 15 fields>

In [36]:
delta_table_path= 'Files/external_products'
#obtener los datos actuales
current_data=spark.read.format("delta").load(delta_table_path)
display(current_data)

#obtener la version 0 (la que no tiene el descuento)
original_data= spark.read.format("delta").option("versionAsOf", 0 ).load(delta_table_path)
display(original_data)

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 41, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 99607d8f-1b3e-49b5-8ae5-b8a82467ba2c)

SynapseWidget(Synapse.DataFrame, 2430941a-4eb4-46ed-9d57-2f7b8a33508f)

In [37]:
 %%sql
 -- Create a temporary view
 CREATE OR REPLACE TEMPORARY VIEW products_view
 AS
     SELECT Category, COUNT(*) AS NumProducts, MIN(ListPrice) AS MinPrice, MAX(ListPrice) AS MaxPrice, AVG(ListPrice) AS AvgPrice
     FROM products
     GROUP BY Category;

 SELECT *
 FROM products_view
 ORDER BY Category;    

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 43, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 37 rows and 5 fields>

In [38]:
 %%sql
 SELECT Category, NumProducts
 FROM products_view
 ORDER BY NumProducts DESC
 LIMIT 10;

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 44, Finished, Available, Finished)

<Spark SQL result set with 10 rows and 2 fields>

In [39]:
from pyspark.sql.functions import col, desc

df_products= spark.sql("SELECT Category,MinPrice, MaxPrice, AvgPrice from products_view").orderBy(col("AvgPrice").desc())
display(df_products)

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 45, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4020c7a0-58a2-4cbc-b86d-470c7a62d5c3)

# Use delta con streaming data por api

In [40]:
from notebookutils import mssparkutils
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "data.txt", device_data, True)

print("Source stream created...")

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 46, Finished, Available, Finished)

Source stream created...


In [42]:
#escribir la fuente a una tabla delta
delta_stream_table_path='Tables/iotdevicedata'
checkpointpath= 'Files/delta/checkpoint'
deltastream= iotstream.writeStream.format("delta").option("checkpointLocation",checkpointpath).start(delta_stream_table_path)
print("Streaming to delta sink...")

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 48, Finished, Available, Finished)

Streaming to delta sink...


In [45]:
%%sql 
select * from IotDeviceData;

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 51, Finished, Available, Finished)

<Spark SQL result set with 32 rows and 2 fields>

In [44]:
 # Add more data to the source stream
 more_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''

 mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 50, Finished, Available, Finished)

True

In [46]:
deltastream.stop()

StatementMeta(, 52bba158-b46a-4800-84e3-572885e35af9, 52, Finished, Available, Finished)