## Spark Delta Table Merge Experiment. 

* This notebook will create a new spark database. Once the database is established, we can create multiple managed tables inside it and allow the data to be taken from the parquet files inside the data lake and push that into the delta table that is created using the merge command. Once the merge happens, the data in the delta table will have the most current version of the data. 

In [None]:
print(spark)

* First create the database for spark to store its tables. Once the database is created a warehouse directory gets created inside ADLS under the path datalake/synapse/workspaces/venkysyn1001/warehouse/spark_temperatures_db.db/

<img src="./images/img_017.png" />

In [None]:
%%sql
create database spark_temperatures_db;

In [None]:
%%sql
SHOW DATABASES;

In [None]:
%%sql
CREATE TABLE temperatures_ext_delta ( 
    latitude float, 
    longitude float, 
    time string,
	temperature_2m float
) USING DELTA;

In [None]:
%%sql
SELECT * from temperatures_ext_delta;

* Even though the tables were created in the Synapse serverless pools, these tables are in ADLS and can be referenced directly inside the spark notebook giving the path to the ADLS directory. Note that the person running this notebook needs to have the permissions to the folder we are referencing to make this connection happen. This is very similar to how the pass thro auth happens in the serverless pools case. 

In [None]:
%%sql
CREATE EXTERNAL TABLE temperatures_2018 
USING PARQUET 
LOCATION "abfss://datalake@venkydatalake1001.dfs.core.windows.net/temperatures/AirQualityIndexWithTemperatures_5/"

* Just count the records inside the external table we created. 

In [None]:
%%sql
SELECT COUNT(*) FROM temperatures_2018

* Merge the dataset we have inside this external table, to the delta table we have created to be managed inside spark, and see how the data merges from the immutable parquet table to the managed spark table. Note the usage of the back tick symbol to escape the time that is a reserve word.

In [None]:
%%sql
MERGE INTO temperatures_ext_delta AS TARGET
USING temperatures_2018 AS SOURCE
ON TARGET.latitude = SOURCE.latitude AND
TARGET.longitude = SOURCE.longitude AND 
TARGET.`time` = SOURCE.`time` 
WHEN MATCHED THEN 
UPDATE SET 
    TARGET.temperature_2m = SOURCE.temperature_2m  
WHEN NOT MATCHED THEN
INSERT 
    ( TARGET.latitude, TARGET.longitude, TARGET.`time`, TARGET.temperature_2m )
VALUES 
    ( SOURCE.latitude, SOURCE.longitude, SOURCE.`time`, SOURCE.temperature_2m )


* As we see the output that comes, the number of matches, and the rows that were inserted are shown. Assuming there were updates in the source data, the delta table will see those as updates, else it will see them as inserts. 

In [None]:
%%sql
SELECT count(*) from temperatures_ext_delta;

* Since we started out as an empty delta table, we can see that the number of records in the delta table equal the row counts from the 2018 data we just merged into it. 

<img src="./images/img_018.png" />