#### Loading a CSV file into a dataframe

In [2]:
df = spark.read.format("csv").option("header","true").load("Files/products/products.csv")
# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv".
display(df)

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 6, Finished, Available)

SynapseWidget(Synapse.DataFrame, 21bbc762-30d7-4c1c-811d-eeea17519974)

#### Saving the spark dataframe as a managed delta table 

In [3]:
 df.write.format("delta").saveAsTable("managed_products")

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 7, Finished, Available)

#### Saving the spark dataframe as an external delta table using the ABFS path for the files folder

In [4]:
 df.write.format("delta").saveAsTable("external_products", path="abfss://learn_deltaTables_inSpark@onelake.dfs.fabric.microsoft.com/deltaTbl_learningLake.Lakehouse/Files/external_products")

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 8, Finished, Available)

#### Checking to see the difference in the properties of the managed table and the external table using the SQL DESCRIBE command
> After running the code its seen that the storage location of the managed table is different from that of the external table 

In [5]:
 %%sql

 DESCRIBE FORMATTED managed_products;

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 9, Finished, Available)

<Spark SQL result set with 14 rows and 3 fields>

In [6]:
 %%sql

 DESCRIBE FORMATTED external_products;

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 10, Finished, Available)

<Spark SQL result set with 15 rows and 3 fields>

#### Dropping both tables to observe the changes in the Lakehouse explorer
> After running the code, even though both delta tables have been deleted under the **Tables node** of the lakehouse explorer, the external table's data is still present under the **Files node** *The table metadata for the external table was deleted, but the files were not affected.*

In [7]:
 %%sql

 DROP TABLE managed_products;
 DROP TABLE external_products;

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

#### Using SQL to create an external delta table from a parquet data file stored under the files node (metastore)
> A new Delta table *products* was created and can  be seen undet the **Tables node**. its schema matches that of the underlying parquet data

In [8]:
%%sql

 CREATE TABLE products
 USING DELTA
 LOCATION 'Files/external_products';

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 13, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

#### Writing an SQL staement to visually inspect the table 

In [9]:
 %%sql

SELECT * FROM products;

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 14, Finished, Available)

<Spark SQL result set with 295 rows and 4 fields>

#### Updating the delta table to add a new version to the transaction log
*This code implements a 10% reduction in the price for mountain bikes.*

In [10]:
 %%sql

 UPDATE products
 SET ListPrice = ListPrice * 0.9
 WHERE Category = 'Mountain Bikes';

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 15, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

#### Checking the transactions history  recorded for the updated table using the ```DESCRIBE HISTORY``` SQL command
> The Update operation can be seen recorded as the version #1(second version) of the table

In [11]:
 %%sql

 DESCRIBE HISTORY products;

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 16, Finished, Available)

<Spark SQL result set with 2 rows and 15 fields>

#### Comparing dataframes of version #0 and #1 of the table 
*Version #0 gotten using the ```.option("versionAsOf", 0)``` method*

*Note that the **File path** was used to create the version dataframe and not the **Table path** .* 

In [12]:
 delta_table_path = 'Files/external_products'

 # Get the current data
 current_data = spark.read.format("delta").load(delta_table_path)
 display(current_data)

 # Get the version 0 data
 original_data = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
 display(original_data)

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 17, Finished, Available)

SynapseWidget(Synapse.DataFrame, dfde282d-dcd3-4c79-bc94-0c1e2f3c79ac)

SynapseWidget(Synapse.DataFrame, 9f0d080c-7421-4ace-8b26-b88ec1215616)

#### Using Delta tables as sink for streaming data 
##### Writing code to simulate and IOT device data

In [13]:
 from notebookutils import mssparkutils
 from pyspark.sql.types import *
 from pyspark.sql.functions import *

 # Create a folder
 inputPath = 'Files/data/'
 mssparkutils.fs.mkdirs(inputPath)

 # Create a stream that reads data from the folder, using a JSON schema
 jsonSchema = StructType([
 StructField("device", StringType(), False),
 StructField("status", StringType(), False)
 ])
 iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

 # Write some event data to the folder
 device_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"ok"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''
 mssparkutils.fs.put(inputPath + "data.txt", device_data, True)
 print("Source stream created...")

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 18, Finished, Available)

Source stream created...


##### Writing the stream to a delta table

In [14]:
 # Write the stream to a delta table
 delta_stream_table_path = 'Tables/iotdevicedata'
 checkpointpath = 'Files/delta/checkpoint'
 deltastream = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)
 print("Streaming to delta sink...")

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 19, Finished, Available)

Streaming to delta sink...


##### Querying the delta table containing data from the streaming source

In [15]:
 %%sql

 SELECT * FROM IotDeviceData;

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 20, Finished, Available)

<Spark SQL result set with 9 rows and 2 fields>

##### Adding more data to the source stream 

In [16]:
 # Add more data to the source stream
 more_data = '''{"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"ok"}
 {"device":"Dev1","status":"error"}
 {"device":"Dev2","status":"error"}
 {"device":"Dev1","status":"ok"}'''

 mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 21, Finished, Available)

True

##### Checking the delta table again 
> The new stream had been automatically added to the delta table 

In [17]:
 %%sql

 SELECT * FROM IotDeviceData;

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 22, Finished, Available)

<Spark SQL result set with 16 rows and 2 fields>

##### Stopping the stream process

In [18]:
 deltastream.stop()

StatementMeta(, 5760a54f-ceed-4e99-a059-f54502a975b5, 23, Finished, Available)