This notebook references documentation at: 
https://learn.microsoft.com/en-us/azure/databricks/storage/azure-storage#--access-azure-data-lake-storage-gen2-or-blob-storage-using-oauth-20-with-an-azure-service-principal

###Account Key Access

In [None]:
%sql
--this will enable the change data feed for all tables
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

key,value
spark.databricks.delta.properties.defaults.enableChangeDataFeed,True


In [None]:
storage_account = dbutils.widgets.get("storage_account_name")
container = dbutils.widgets.get("container_name")
folder = dbutils.widgets.get("folder_name")
# account key below, this would be replaced with a secret retrieval rather than hard code into the notebook
# your instructor will give you the account key prior to running this lab
# CHALLENGE: create a secret in the key vault and retrieve it from there
account_key = "<<ask your instructor for this key value>>"

spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    f"{account_key}"
    )

In [None]:
# list the subfolders in the given storage account, container and folder
# store the results in an array, pfolders

pfolders = dbutils.fs.ls(f"abfss://{container}@{storage_account}.dfs.core.windows.net/{folder}")
display(pfolders)

path,name,size,modificationTime
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/address/,address/,0,1715808618000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/customer/,customer/,0,1715808615000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/customeraddress/,customeraddress/,0,1715808618000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/product/,product/,0,1715808616000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/productcategory/,productcategory/,0,1715808615000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/productdescription/,productdescription/,0,1715808617000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/productmodel/,productmodel/,0,1715808615000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/productmodelproductdescription/,productmodelproductdescription/,0,1715808618000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/salesorderdetail/,salesorderdetail/,0,1715808616000
abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/salesorderheader/,salesorderheader/,0,1715808617000


In [None]:
# copy the link of one of the folders above and list the contents, what kind of file is this?
dbutils.fs.ls('abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/address/')

[FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/address/SalesLT.Address.parquet', name='SalesLT.Address.parquet', size=35577, modificationTime=1715808618000)]

In [None]:
# create temporary views for each of the subfolders containing parquet files
for p in pfolders:
  table_name = p.name.replace('/','')
  print('reading ' + table_name + '...')
  df = spark.read.format("parquet").load(p.path)
  df.createOrReplaceTempView(table_name)  

print(' ')
print('all temporary views successfully created')

reading address...
reading customer...
reading customeraddress...
reading product...
reading productcategory...
reading productdescription...
reading productmodel...
reading productmodelproductdescription...
reading salesorderdetail...
reading salesorderheader...
 
all temporary views successfully created


In [None]:
%sql
--see all temporary views available
SHOW VIEWS

namespace,viewName,isTemporary,isMaterialized
,address,True,False
,customer,True,False
,customeraddress,True,False
,product,True,False
,productcategory,True,False
,productdescription,True,False
,productmodel,True,False
,productmodelproductdescription,True,False
,salesorderdetail,True,False
,salesorderheader,True,False


In [None]:
%sql
--select from a temporary view to examine the data
select * from salesorderdetail

SalesOrderID,SalesOrderDetailID,OrderQty,ProductID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
71774,110562,1,836,356.898,0.0,356.898,e3a1994c-7a68-4ce8-96a3-77fdd3bbd730,2008-06-01T00:00:00Z
71774,110563,1,822,356.898,0.0,356.898,5c77f557-fdb6-43ba-90b9-9a7aec55ca32,2008-06-01T00:00:00Z
71776,110567,1,907,63.9,0.0,63.9,6dbfe398-d15d-425e-aa58-88178fe360e5,2008-06-01T00:00:00Z
71780,110616,4,905,218.454,0.0,873.816,377246c9-4483-48ed-a5b9-e56f005364e0,2008-06-01T00:00:00Z
71780,110617,2,983,461.694,0.0,923.388,43a54bcd-536d-4a1b-8e69-24d083507a14,2008-06-01T00:00:00Z
71780,110618,6,988,112.998,0.4,406.7928,12706fab-f3a2-48c6-b7c7-1ccde4081f18,2008-06-01T00:00:00Z
71780,110619,2,748,818.7,0.0,1637.4,b12f0d3b-5b4e-4f1f-b2f0-f7cde99dd826,2008-06-01T00:00:00Z
71780,110620,1,990,323.994,0.0,323.994,f117a449-039d-44b8-a4b2-b12001dacc01,2008-06-01T00:00:00Z
71780,110621,1,926,149.874,0.0,149.874,92e5052b-72d0-4c91-9a8c-42591803667e,2008-06-01T00:00:00Z
71780,110622,1,743,809.76,0.0,809.76,8bd33bed-c4f6-4d44-84fb-a7d04afcd794,2008-06-01T00:00:00Z


###Write the data to a lakehouse database named dw_test

In [None]:
%sql
--create the database and set it to the current db
drop database if exists dw_test cascade;
create database dw_test;
use dw_test;

In [None]:
pfolders

[FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/address/', name='address/', size=0, modificationTime=1715808618000),
 FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/customer/', name='customer/', size=0, modificationTime=1715808615000),
 FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/customeraddress/', name='customeraddress/', size=0, modificationTime=1715808618000),
 FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/product/', name='product/', size=0, modificationTime=1715808616000),
 FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/productcategory/', name='productcategory/', size=0, modificationTime=1715808615000),
 FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/productdescription/', name='productdescription/', size=0, modificationTime=1715808617000),
 FileInfo(path='abfss://raw@jdobbinsidoc.dfs.core.windows.net/saleslt/productmodel/', name='productmodel/', si

In [None]:
for f in pfolders:
  print('creating ' + f.name.replace('/','') + ' table...')
  spark.sql(f"create table if not exists dw_test.{f.name.replace('/','')} as select * from {f.name.replace('/','')}")
print(' ')
print('all tables successfully created')

creating address table...
creating customer table...
creating customeraddress table...
creating product table...
creating productcategory table...
creating productdescription table...
creating productmodel table...
creating productmodelproductdescription table...
creating salesorderdetail table...
creating salesorderheader table...
 
all tables successfully created


In [None]:
%sql
--write a sql query to count the number of addresses by state in the United States, display the results as a map visual
select StateProvince, count(*) 
from address
where CountryRegion = 'United States'
group by StateProvince
order by 2 desc

StateProvince,count(1)
California,78
Washington,51
Texas,40
Oregon,18
Michigan,16
Illinois,15
Arizona,13
Missouri,11
Colorado,10
Utah,9


Databricks visualization. Run in Databricks to view.

In [None]:
# this cell will drop all temporary views in the session (our temporary views share the same name as our tables, so we need to drop the views as we don't have need for them anymore)
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

for v in spark.catalog.listTables():
    if v.isTemporary:
        spark.catalog.dropTempView(v.name)

###Write some SQL statements to update and delete records from a table you choose

In [None]:
%sql
--write an update statement
update customer set CompanyName = 'Some New Company' where CompanyName = 'Friendly Bike Shop'

num_affected_rows
4


In [None]:
%sql
-- delete some companies that no longer are our customers
delete from customer where CompanyName in ('Roving Sports','General Industries','Mail-Order Outlet');

num_affected_rows
6


In [None]:
%sql

--insert a new record
insert into customer
select * from customer where CustomerID = 1


num_affected_rows,num_inserted_rows
1,1


In [None]:
%sql
-- view the history of the customer table
describe history dw_test.customer

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2024-05-18T22:07:57Z,1182361662310501,vt-dbx-practice@outlook.com,WRITE,"Map(mode -> Append, statsOnLoad -> false, partitionBy -> [])",,List(531434962335228),0515-181843-d5ocglup,2.0,WriteSerializable,False,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 5000)",,Databricks-Runtime/13.3.x-scala2.12
2,2024-05-18T22:07:54Z,1182361662310501,vt-dbx-practice@outlook.com,DELETE,"Map(predicate -> [""CompanyName#7880 IN (Roving Sports,General Industries,Mail-Order Outlet)""])",,List(531434962335228),0515-181843-d5ocglup,1.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 90616, numCopiedRows -> 841, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 1667, numDeletedRows -> 6, scanTimeMs -> 856, numAddedFiles -> 1, numAddedBytes -> 90107, rewriteTimeMs -> 811)",,Databricks-Runtime/13.3.x-scala2.12
1,2024-05-18T22:07:50Z,1182361662310501,vt-dbx-practice@outlook.com,UPDATE,"Map(predicate -> [""(CompanyName#6899 = Friendly Bike Shop)""])",,List(531434962335228),0515-181843-d5ocglup,0.0,WriteSerializable,False,"Map(numRemovedFiles -> 1, numRemovedBytes -> 90387, numCopiedRows -> 843, numDeletionVectorsAdded -> 0, numDeletionVectorsRemoved -> 0, numAddedChangeFiles -> 1, executionTimeMs -> 2796, scanTimeMs -> 1344, numAddedFiles -> 1, numUpdatedRows -> 4, numAddedBytes -> 90616, rewriteTimeMs -> 1430)",,Databricks-Runtime/13.3.x-scala2.12
0,2024-05-18T22:07:11Z,1182361662310501,vt-dbx-practice@outlook.com,CREATE TABLE AS SELECT,"Map(partitionBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableChangeDataFeed"":""true""}, statsOnLoad -> false)",,List(531434962335228),0515-181843-d5ocglup,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 847, numOutputBytes -> 90387)",,Databricks-Runtime/13.3.x-scala2.12


In [None]:
%sql
-- see how many records are in each version of customer
select count(*), 0 as versionnum from customer version as of 0
union all
select count(*), 3 as versionnum from customer version as of 3

count(1),versionnum
847,0
842,3


In [None]:
%sql
-- use timestamp as of to time travel back to version 2 (the timestamp below will need to change, use it from the describe history cell)
select * from customer timestamp as of '2024-05-16T21:49:36.000+00:00'

In [None]:
%sql
--view changes between versions 1 and 2...pay special attention to the _change_type column (scroll to the right)
-- this column stores the type of change (delete, write, pre-update and post-update values)
select * from table_changes('customer', 1, 2)

In [None]:
%sql
-- now we will restore the table to its original version
RESTORE TABLE dw_test.customer TO VERSION AS OF 0

In [None]:
%sql
-- view the versions, are they still there?
describe history customer

In [None]:
%sql

--see what the row counts are for each version, does this make sense knowing what table operations were performed?
select count(*), 4 as versionnum from customer version as of 4
union all
select count(*), 3 as versionnum from customer version as of 3
union all
select count(*), 2 as versionnum from customer version as of 2
union all
select count(*), 1 as versionnum from customer version as of 1
union all 
select count(*), 0 as versionnum from customer version as of 0