# Delta Demo

1.	Spark, scala delta, apis, sql magics - merge (see [this notebook](https://github.com/hfleitas/synapsedelta/blob/main/notebook/DeltaChanges3.json))
2.	Serverless, credentials, open query (see sql [script](https://github.com/hfleitas/synapsedelta/blob/main/sqlscript/DeltaServerless.json))
3.	Integration pipeline, copy activity with overwrite if exists (see [pipeline](https://github.com/hfleitas/synapsedelta/blob/main/pipeline/Delta-Import.json))
4.	Sql pools - merge (see sql [script](https://github.com/hfleitas/synapsedelta/blob/main/sqlscript/DeltaSQLPoolMerge.json))

In [16]:
spark.version

StatementMeta(threedot0, 7, 1, Finished, Available)

res5: String = 3.1.2.5.0-49430596


In [55]:
case class Data(key: String, value: String)

case class ChangeData(key: String, newValue: String, deleted: Boolean, time: Long) {
  assert(newValue != null ^ deleted)
}

StatementMeta(threedot0, 8, 4, Finished, Available)

defined class Data
defined class ChangeData


In [56]:
val target = Seq(
  Data("a", "0"),
  Data("b", "1"),
  Data("c", "2"),
  Data("d", "3")
).toDF()

StatementMeta(threedot0, 8, 5, Finished, Available)

target: org.apache.spark.sql.DataFrame = [key: string, value: string]


In [57]:
sql("drop table if exists target")
target.write.format("delta").mode("overwrite").saveAsTable("target")

StatementMeta(threedot0, 8, 6, Finished, Available)

res7: org.apache.spark.sql.DataFrame = []


In [58]:
// import com.microsoft.spark.sqlanalytics.utils.Constants
// import org.apache.spark.sql.SqlAnalyticsConnector._

val targetDF = spark.sqlContext.sql("select * from target")

StatementMeta(threedot0, 8, 7, Finished, Available)

targetDF: org.apache.spark.sql.DataFrame = [key: string, value: string]


In [59]:
%%sql
select * from target order by key

StatementMeta(threedot0, 8, 8, Finished, Available)

<Spark SQL result set with 4 rows and 2 fields>

In [60]:
%%sql
describe extended target

StatementMeta(threedot0, 8, 9, Finished, Available)

<Spark SQL result set with 11 rows and 3 fields>

# 🧂 Shake it up

In [61]:
val changeDataSource = Seq(
  ChangeData("a", "10", deleted = false, time = 0),
  ChangeData("a", null, deleted = true, time = 1),   // a was updated and then deleted
  ChangeData("b", null, deleted = true, time = 2),   // b was just deleted once
  ChangeData("c", null, deleted = true, time = 3),   // c was deleted and then updated twice
  ChangeData("c", "20", deleted = false, time = 4),
  ChangeData("c", "200", deleted = false, time = 5)
).toDF().createOrReplaceTempView("changes")

StatementMeta(threedot0, 8, 10, Finished, Available)

changeDataSource: Unit = ()


### SQL Example

In [62]:
%%sql 
MERGE INTO target t
USING (
  -- Find the latest change for each key based on the timestamp
  SELECT key, latest.newValue as newValue, latest.deleted as deleted FROM (    
    -- Note: For nested structs, max on struct is computed as 
    -- max on first struct field, if equal fall back to second fields, and so on.
    SELECT key, max(struct(time, newValue, deleted)) as latest FROM changes GROUP BY key
  )
) s
ON s.key = t.key
WHEN MATCHED AND s.deleted = true THEN DELETE
WHEN MATCHED THEN UPDATE SET key = s.key, value = s.newValue
WHEN NOT MATCHED AND s.deleted = false THEN INSERT (key, value) VALUES (key, newValue)

StatementMeta(threedot0, 8, 11, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [63]:
val changesDF = spark.sql("select * from changes")
changesDF.head()

StatementMeta(threedot0, 8, 12, Finished, Available)

changesDF: org.apache.spark.sql.DataFrame = [key: string, newValue: string ... 2 more fields]
res12: org.apache.spark.sql.Row = [a,10,false,0]


In [64]:
val latestChangeForEachKey = changesDF.
selectExpr("key", "struct(time, newValue, deleted) as otherCols").
groupBy("key").
agg(max("otherCols").
as("latest")).
selectExpr("key", "latest.*")

StatementMeta(threedot0, 8, 13, Finished, Available)

latestChangeForEachKey: org.apache.spark.sql.DataFrame = [key: string, time: bigint ... 2 more fields]


In [65]:
latestChangeForEachKey.show() // shows the latest change for each key

StatementMeta(threedot0, 8, 14, Finished, Available)

+---+----+--------+-------+
|key|time|newValue|deleted|
+---+----+--------+-------+
|  c|   5|     200|  false|
|  b|   2|    null|   true|
|  a|   1|    null|   true|
+---+----+--------+-------+



In [66]:
latestChangeForEachKey.createOrReplaceTempView("lastchanges")

StatementMeta(threedot0, 8, 15, Finished, Available)

In [67]:
%%sql
select * from lastchanges

StatementMeta(threedot0, 8, 16, Finished, Available)

<Spark SQL result set with 3 rows and 4 fields>

In [68]:
latestChangeForEachKey.write.format("delta").mode("overwrite").saveAsTable("lastchanges") 

StatementMeta(threedot0, 8, 17, Finished, Available)

### SCALA Merge example (Skipped)

In [16]:
// deltaTable.as("t")
//   .merge(
//     latestChangeForEachKey.as("s"),
//     "s.key = t.key")
//   .whenMatched("s.deleted = true")
//   .delete()
//   .whenMatched()
//   .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
//   .whenNotMatched("s.deleted = false")
//   .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
//   .execute()

In [70]:
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

val lastchangesDF = spark.sqlContext.sql("select * from lastchanges")

lastchangesDF.write.option(Constants.
SERVER, "wplushiramsynapse.sql.azuresynapse.net").
synapsesql("wplussynapsedw.dbo.lastchanges", Constants.INTERNAL)

StatementMeta(threedot0, 8, 19, Finished, Available)

import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._
lastchangesDF: org.apache.spark.sql.DataFrame = [key: string, time: bigint ... 2 more fields]


## Resources
* https://docs.databricks.com/_static/notebooks/merge-in-cdc.html
* https://techcommunity.microsoft.com/t5/azure-synapse-analytics/query-delta-lake-files-using-t-sql-language-in-azure-synapse/ba-p/2388398
* https://docs.microsoft.com/en-us/azure/synapse-analytics/sql/query-delta-lake-format
* https://databricks.com/blog/2019/03/19/efficient-upserts-into-data-lakes-databricks-delta.html
* https://docs.microsoft.com/en-us/sql/t-sql/statements/merge-transact-sql?view=azure-sqldw-latest
* https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html
* https://docs.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-develop-ctas
* https://www.mssqltips.com/sqlservertip/6282/azure-data-factory-multiple-file-load-example-part-2/