# 1. Save the path to the directory where delta files will be stored.

In [11]:
# configure path variable
path = "/delta/table"

StatementMeta(cassharedspark, 21, 11, Finished, Available)



# 2. Read in data. Write to delta format and save to the directory above. 

In [12]:
# read in data
data = spark.read.format('csv').options(header='true', inferSchema='true', multiline='true').load('abfss://public-data@statsconviddsinternal.dfs.core.windows.net/incoming/data_duplicate.csv')
data.show()
# write to delta format
data.write.format("delta").save(path)

StatementMeta(cassharedspark, 21, 12, Finished, Available)

+---+-------------+----------+-----------+-------------+--------------------+----------------+--------------+------------+-----+-------------+---+------------+----------+---------------+----------+
| id|       rec_id|given_name|    surname|street_number|           address_1|       address_2|        suburb|    postcode|state|date_of_birth|age|phone_number|soc_sec_id|blocking_number|entity_id|
+---+-------------+----------+-----------+-------------+--------------------+----------------+--------------+------------+-----+-------------+---+------------+----------+---------------+----------+
|  1|rec-349-dup-0|     amber|     hiltoj|          158|      padbury hsreet|     tarwyn park|      waterman|            |  vic|     19529121|   | 04 33788556|   9212598|              9|  rec-349|
|  3|rec-414-dup-0|      rhzn|       reid|            8|       hovell street|                |     brighmgon|        6153|  qld|     19930211| 34| 07 06621319|   6647722|              4|  rec-414|
|  6|rec-3

# 3. Create an SQL Table Using Delta.

In [13]:
# create an SQL table
spark.sql("CREATE TABLE example USING DELTA LOCATION '{0}'".format(path))

StatementMeta(cassharedspark, 21, 13, Finished, Available)

DataFrame[]

# 4. Now you can run queries on the delta table.

In [2]:
%%sql
SELECT * FROM example LIMIT 100

StatementMeta(cassharedspark, 24, 0, Finished, Available)

<Spark SQL result set with 100 rows and 16 fields>

Note: The current version of Delta Lake included in Synapse does not support SQL. **Some SELECT statements work**; however, other statements (notably UPDATE and the Time Travel feature) will result in an error. **Supported languages include PySpark, Scala, and .NET (C#).**

In [47]:
# conditional update
from pyspark.sql.functions import *
from delta.tables import *

delta_table = DeltaTable.forPath(spark, path)

# delete all even numbered rows
delta_table.delete("id % 2 == 0")

# change name of people with id < 10 to jade
delta_table.update("id < 10", { "given_name": "'jade'" } )

StatementMeta(cassharedspark, 21, 47, Finished, Available)



In [39]:
%%sql
SELECT * FROM example WHERE id % 2 == 0

StatementMeta(cassharedspark, 21, 39, Finished, Available)

<Spark SQL result set with 0 rows and 16 fields>

In [48]:
%%sql
SELECT id, given_name FROM example WHERE id < 10

StatementMeta(cassharedspark, 21, 48, Finished, Available)

<Spark SQL result set with 3 rows and 2 fields>

# 5. You can view the update history of a table.

In [49]:
# display history of a table
delta_table.history().show(20, 1000, False)

StatementMeta(cassharedspark, 21, 49, Finished, Available)

+-------+-------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------+
|version|          timestamp|userId|userName|operation|                       operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|                                                                       operationMetrics|
+-------+-------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+---------------------------------------------------------------------------------------+
|      2|2021-06-14 18:00:32|  null|    null|   UPDATE|             [predicate -> (id#2266 < 10)]|null|    null|     null|          1|          null|        false|  [numRemovedFiles -> 1, numAddedFiles -> 1, numUpdatedRows -> 3, numCopiedRows -

# 6. You Can Use Time Travel to Query Previous Versions of Your Data

In [51]:
df = spark.read.format("delta").option("versionAsOf", 0).load(path)
df.show()

StatementMeta(cassharedspark, 21, 51, Finished, Available)

+---+-------------+----------+-----------+-------------+--------------------+----------------+--------------+------------+-----+-------------+---+------------+----------+---------------+----------+
| id|       rec_id|given_name|    surname|street_number|           address_1|       address_2|        suburb|    postcode|state|date_of_birth|age|phone_number|soc_sec_id|blocking_number|entity_id|
+---+-------------+----------+-----------+-------------+--------------------+----------------+--------------+------------+-----+-------------+---+------------+----------+---------------+----------+
|  1|rec-349-dup-0|     amber|     hiltoj|          158|      padbury hsreet|     tarwyn park|      waterman|            |  vic|     19529121|   | 04 33788556|   9212598|              9|  rec-349|
|  3|rec-414-dup-0|      rhzn|       reid|            8|       hovell street|                |     brighmgon|        6153|  qld|     19930211| 34| 07 06621319|   6647722|              4|  rec-414|
|  6|rec-3

# 7. Changes to delta tables propagate between Azure services.
In the example below, we create a table using delta files previously created in Databricks. Then we run a SELECT statement on this table, and see that the data returned is the most recent version, which was modified from the original data within Databricks.

In [8]:
spark.sql("CREATE TABLE IF NOT EXISTS example2 USING DELTA LOCATION 'abfss://public-data@statsconviddsinternal.dfs.core.windows.net/delta/'")

StatementMeta(cassharedspark, 25, 8, Finished, Available)

DataFrame[]

In [10]:
%%sql
SELECT * FROM example2 WHERE data_quality_grade == 'B'

StatementMeta(cassharedspark, 25, 10, Finished, Available)

<Spark SQL result set with 1000 rows and 31 fields>