In [6]:
1+1

# Delta Lake and Other Topics

Let's continue from last time we met.  



## Configuration
Make sure you modify this as appropriate.

In [7]:
# variables, setup, and imports

# where can you write your data?  I call this a "sandbox"

#CHANGEME
whoami = mssparkutils.env.getUserName()
print(whoami)
sandboxRoot = "abfss://sandbox@davewsynapsedl.dfs.core.windows.net"

delta_table_path = "{0}/{1}/delta-testing".format(sandboxRoot,whoami)
print(delta_table_path)

**Doublecheck:  is this the sandbox path you want to use?**

Let's doublecheck that you can write out some sample data without issue.

In [8]:
dfData = spark.range(0,5)
display(dfData)


In [11]:
(dfData
    .write
    .format("delta")
    .mode("overwrite")
    .save(delta_table_path))

Now, let's go look at what was created in the datalake using the FileExplorer.  

Or, we can do this directly from code


In [12]:
mssparkutils.fs.ls(delta_table_path) 

You should see a series of `snappy.parquet` files.  

Basically, at the start of every notebook you'll likely want to have a spot where you can write out "temp" data and we do that in the "sandbox".  

It's difficult to remember those long `abfss://` paths.  

## Using Mounts Might be easier

Let's check out how to do that.  

`/` should be mounted to your Primary datalake in your workspace.  

This helps with moving code from "dev" to "test" to "prod"

In [13]:

mssparkutils.fs.ls("/") 

This is the exact same thing, but using a `cell magic`

In [14]:
%fs ls /

What we really want to do is start to think about abstracting away the implementation details of the data lake and its structure.  Let's be honest...thinking about filepaths is way too much cognitive burden.  We just want to think about the data and solving business problems.  

## Maybe we can create a pattern that will make our lives easier.  

Over time you need access to multiple data lakes:
* the corporate data lake
* the marketing team's data lake
* your sandbox

And over time the locations of those data lakes will change **especially as your code is promoted from dev to test to prod**.  We need a better way.  

One pattern is to have a simple folder structure that everyone understands and can reference easily.  We use `mounts`.  

Here's the mount pattern we'll use:

* `/` : we start here
* `/lake`:  this will be the location we write out our datasets.  If our code ever moves from dev to test to prod or our datalake location changes in Azure, it's not a problem, **we just change the mount location**
* `/bronze`:  we'll map this to MY datalake so we can read data.  

So the pattern for the rest of the day will be:
* read the data from `/bronze` and write the data to `/lake`

Make sense?  

## In the real world...

* your "administrator" would do all the mounts for you.  This would be done automatically whenever your Spark pool fires up **or** as a `%%run notebook` call at the top of every one of your notebooks.  
* always create your mounts under `/mnt`

In [15]:
# let's doublecheck our variables
print(sandboxRoot)
print(whoami)

lakepath = "{0}/{1}/".format(sandboxRoot,whoami)
print(lakepath)

In [16]:
## we need to know the name of the Synapse "LinkedService" to the datalake.  
## by doing this we want need to hardcode secrets/passwords in our code
# CHANGEME
LinkedService = "davew-synapse-WorkspaceDefaultStorage"

In [17]:
# you might need to run this if you don't get it right the first time
# or...at the end of your notebook to "cleanup"
#mssparkutils.fs.unmount("/mnt/lake") 


In [18]:
# Step one:  mount /lake to lakepath var.  This will be YOUR sandbox
mssparkutils.fs.mount( 
    lakepath, 
    "/mnt/lake", 
    {"LinkedService":LinkedService} 
) 

In [19]:
mssparkutils.fs.ls(lakepath) 

In [20]:
# Let's make sure that worked, notice it didn't.  Why?  
mssparkutils.fs.ls("/") 

In [21]:
# it's due to "sessions"
# here's the syntax we should be using 
# mssparkutils.env.getJobId() gets our session
# the path is really synfs:/{session}/mnt/lake
synfsPath = "synfs:/{0}/mnt/lake/".format(mssparkutils.env.getJobId())
print(synfsPath)
mssparkutils.fs.ls(synfsPath) 

In [22]:
# we should see the "delta-testing folder" we created earlier.  Let's do an ls on that
mssparkutils.fs.ls(synfsPath + "delta-testing") 

Using mounts should be much easier.  

You will need to develop your "mount pattern" at your company.  

In [24]:
# now let's read that little df we built earlier

dfReadFromSynFS = (spark
    .read
    .format("delta")
    .load(synfsPath + "delta-testing") )
display(dfReadFromSynFS)

## Let's continue looking at Delta tables

In [25]:
columns = ["CustomerID", "FirstName", "LastName", "Balance"]
vals = [
     (1, "Tre", "Cool", 1887),
     (2, "Matt", "Cameron", 1920),
     (3, "Taylor", "Hawkins", 1892),
     (4, "Dave", "Grohl", 1893),
     (5, "Jimmy", "Chamberlin", 1901)
]
dfCustomers = spark.createDataFrame(vals, columns)
dfCustomers.printSchema()
display(dfCustomers)

In [27]:
#let's save it to your sandbox
(dfCustomers
    .write
    .format("delta")
    .mode("overwrite")
    .save(synfsPath + "customers"))

Remember, we can query a data lake directly from SQL

BUT...we either need to get the path first, or "marshall" the python variable so SQL can use it.  

In [28]:
print(synfsPath)

spark.conf.set("nv.synfsPath", synfsPath)

In [30]:
%%sql
--you can use this syntax...
--SELECT * FROM delta.`synfs:/0/mnt/lake/customers`;
--or, this is probably better
SELECT * FROM delta.`${nv.synfsPath}/customers`;

## Updates

You can update, delete, and merge (upsert) data into tables. 

In [31]:
%%sql

-- increase every balance by 10%
UPDATE delta.`${nv.synfsPath}/customers` SET Balance = Balance * 1.10

In [32]:
%%sql

SELECT * FROM delta.`${nv.synfsPath}/customers`;

In [33]:
%%sql

DELETE FROM delta.`${nv.synfsPath}/customers` WHERE CustomerID = 3;

In [34]:
%%sql 

SELECT * FROM delta.`${nv.synfsPath}/customers`;

## Read older versions of data using Time Travel

You can query previous snapshots of your Delta Lake table by using a feature called Time Travel. If you want to access the data that you overwrote, you can query a snapshot of the table before you overwrote the first set of data using the versionAsOf option.

Once you run the cell below, you should see the first set of data, from before you overwrote it. Time Travel is an extremely powerful feature that takes advantage of the power of the Delta Lake transaction log to access data that is no longer in the table. Removing the version 0 option (or specifying version 1) would let you see the newer data again. For more information, see [Query an older snapshot of a table (time travel)](https://docs.delta.io/latest/delta-batch.html#deltatimetravel).


In [35]:
%%sql

DESCRIBE HISTORY delta.`${nv.synfsPath}/customers`;

We can of course do something similar in pySpark.  Something like this:

```spark
delta_table.history().show(20, 1000, False)

Here are some other methods that might be interesting:

In [36]:
%%sql

select * from delta.`${nv.synfsPath}customers` VERSION AS OF 1;
select * from delta.`${nv.synfsPath}customers` VERSION AS OF 2;
--you'll need to change the time accordingly
select * from delta.`${nv.synfsPath}customers` TIMESTAMP AS OF '2023-05-10 15:48:07';

In [37]:
%%sql

--what do you think this code would do?  
select * from delta.`${nv.synfsPath}customers@v1`
EXCEPT ALL
select * from delta.`${nv.synfsPath}customers@v2`



Let's assume you are doing a data load and you've just built a new df that has all of the latest data and now you want to `UPSERT` that data into the existing `customers` delta file. 

Let's do that.  

Let's assume `dfCustomersFromLatestDataLoad` already has the latest data staged and pre-processed.  

In [38]:
%%sql
CREATE OR REPLACE TEMP VIEW dfCustomersFromLatestDataLoad 
AS 
SELECT 6 AS CustomerID, 'New' AS FirstName, 'Customer' AS LastName , 0 AS Balance
    UNION ALL 
SELECT 5, 'JimmyJimmy', 'Chamberlin', 0
;

SELECT * FROM dfCustomersFromLatestDataLoad
;

Let's UPSERT the data.  

In [41]:
%%sql

MERGE INTO delta.`${nv.synfsPath}customers` target
USING dfCustomersFromLatestDataLoad source
    ON source.CustomerID = target.CustomerID
WHEN MATCHED THEN
    UPDATE SET 
        target.FirstName = source.FirstName,
        target.LastName = source.LastName
WHEN NOT MATCHED THEN 
    INSERT (CustomerID, FirstName, LastName, Balance) VALUES (source.CustomerID, source.FirstName, source.LastName, source.Balance)


We can of course do all of this for pySpark using a python syntax, if desired.  