## Branch and Relations Manager Dimension Tables

#### Mount `datatechstorage` Data Lake Storage

In [0]:
# the first piece of info needed is the container and storage account name.
storage_account = "datatechstorage"
container = "users"

# now form blobEnpoint
config_key = f"fs.azure.account.key.{storage_account}.blob.core.windows.net"
blobEndpoint = f"wasbs://{container}@{storage_account}.blob.core.windows.net"
mount_point = f"/mnt/{storage_account}/{container}"

dbutils.fs.mount(
    source = blobEndpoint
    ,mount_point = mount_point
    ,extra_configs = {config_key:dbutils.secrets.get(scope = "datatechkeyvault2", key = "ADLGS") }
  )
print(mount_point)

#### Configure access to the `psbs` blob storage account and `psdlsg2` data lake storage from Azure Databricks.

In [0]:
# Provide the configuration to access the Azure Storage account from Azure Databricks.
# This temp directory holds temp files during data load using polybase
blobStorage = "datatechblob.blob.core.windows.net"
blobContainer = "stage-for-sql-warehouse"
blobAccessKey =  dbutils.secrets.get(scope = "datatechkeyvault2", key = "Databricks-Blob-Storage-Key") 

tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
#tempDir = "abfss://users@datatechstorage.dfs.core.windows.net/temp/"

acntInfo = "fs.azure.account.key."+ blobStorage
sc._jsc.hadoopConfiguration().set(acntInfo, blobAccessKey)

# Azure Synapse Analytics data warehouse related settings
dwHostname = "datatechws.sql.azuresynapse.net" # this is the server name
dwPort = 1433 # default sql port
dwUsername = dbutils.secrets.get(scope = "datatechkeyvault2", key = "password") 
dwPassword = dbutils.secrets.get(scope = "datatechkeyvault2", key = "username") 
dwDatabase = "edw"

print(dwUsername)
#dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(dwHostname,dwPort,dwDatabase,dwUsername,dwPassword)

#### Read `JSON` from the  Data Lake Gen 2 Storage

In [0]:
mount_point = '/mnt/datatechstorage/users'
branch_data = sqlContext.read.json(f"{mount_point}/data/branch/*.json")
relations_managers_data = sqlContext.read.json(f"{mount_point}/data/relations_managers/*.json")

#### Prepare `Branch` Data

In [0]:
# create a temp table for the branch data to allow sql access
branch_data.createOrReplaceTempView('branch_data')

# display and explore the data
display(branch_data)
# observe that the branch name for the branch_id `1652` is empty. We will have to clean that up
# In addition, we have to type cast the date_created to a proper date type and rename columns for reporting

branch_id,branch_location,branch_name,date_created
1654,SYXGBYTPZG,,2011-02-20
1114,DTSDUCVRPFEGBD,ocnbnmajbwcmxx,2010-05-08
1025,FYPPUD,bqwvjydyjinvjm,2005-04-08
1759,XSXOQTRSDOJA,eyfbn,2014-05-04
1281,CQTJPLVYMCE,fiqexliefrj,2019-11-07
1250,QFZHFLBLCVQVLZT,emafcszcdbn,2018-02-03
1228,PC,ejhvlzqvqeyqjj,2020-11-29
1142,UY,ptpygczk,2015-05-20
1754,HGIENHHJESKBKD,tycyikr,2006-04-26
1104,ICTCMQINGC,cuvvevzswzptorq,2014-12-13


In [0]:
%sql
SELECT branch_id as BranchID
	,branch_location as BranchLocation
	,CASE 
		WHEN length(branch_name) = 0
			THEN lower(branch_location)||'-'||branch_id
		ELSE branch_name
		END AS BranchName
	,cast(date_created as date) as DateCreated
FROM branch_data

-- here we use sql to clean our data and display the final format as desired.
-- Now, obserce the empty branch name is replaced with the branch_location and the branch_id.
-- Also, all column names have been name according to the business reporting requirements

BranchID,BranchLocation,BranchName,DateCreated
1654,SYXGBYTPZG,syxgbytpzg-1654,2011-02-20
1114,DTSDUCVRPFEGBD,ocnbnmajbwcmxx,2010-05-08
1025,FYPPUD,bqwvjydyjinvjm,2005-04-08
1759,XSXOQTRSDOJA,eyfbn,2014-05-04
1281,CQTJPLVYMCE,fiqexliefrj,2019-11-07
1250,QFZHFLBLCVQVLZT,emafcszcdbn,2018-02-03
1228,PC,ejhvlzqvqeyqjj,2020-11-29
1142,UY,ptpygczk,2015-05-20
1754,HGIENHHJESKBKD,tycyikr,2006-04-26
1104,ICTCMQINGC,cuvvevzswzptorq,2014-12-13


In [0]:
branch = sql("""
SELECT branch_id as BranchID
	,branch_location as BranchLocation
	,CASE 
		WHEN length(branch_name) = 0
			THEN lower(branch_location)||'-'||branch_id
		ELSE branch_name
		END AS BranchName
	,cast(date_created as date) as DateCreated
FROM branch_data
""")

#### Write branch data to the data warehouse as a table

In [0]:
# Observe that the branch table is written as a REPLICATE type. 
# This is due to the small size and the need to be used as a dimension table
print(sqlDwUrl)
#"jdbc:sqlserver://datatechws.sql.azuresynapse.net:1433;database=edw;user=datatechadmin;password=Ssd@12345"

branch.write \
    .format("com.databricks.spark.sqldw") \
    .option("url","jdbc:sqlserver://datatechws.sql.azuresynapse.net:1433;database=edw;user=datatechadmin;password=Ssd@12345") \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("tableOptions", "CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = REPLICATE") \
    .option("tempdir", tempDir) \
    .option("dbtable", "dbo.branch") \
    .mode("overwrite") \
    .save()

#### Prepare `Relations Manager` Data

In [0]:
relations_managers_data.createOrReplaceTempView('relations_managers_data')
display(relations_managers_data)
# observe that the first_names for the relationship_manager_ids `503478`, `579131` and `595647` are empty. We will have to clean that up
# In addition, we have to type cast the last_updated to a proper date type and rename columns for reporting

branch_id,first_name,last_name,last_updated,relationship_manager_id
1104,ldvfrvnucun,MVRLPCRNJAPJ,2012-12-28,588696
1654,hlhdhyvdw,NOZQLILC,2005-08-29,597080
1250,qrydj,UBMHTPYVZ,2007-05-19,571482
1250,q,J,2003-01-14,511395
1654,e,NDPLWKSTIHDVQ,2017-06-08,577397
1142,g,M,2002-10-28,555302
1228,bzz,IFCPDNMLBJUNECR,2011-05-07,504165
1228,lxizdbt,I,2019-12-22,503905
1759,bn,KOCCOANBENEE,2019-08-03,512280
1754,qdrgrzuiix,EIWGHFNOKP,2019-10-13,528657


In [0]:
%sql
SELECT branch_id AS BranchID
	,CASE 
		WHEN length(first_name) = 0
			THEN relationship_manager_id
		ELSE first_name
		END AS RmFirstName
	,last_name AS RmLastName
	,cast(last_updated AS DATE) AS LastUpdateDate
	,relationship_manager_id AS RmID
FROM relations_managers_data

BranchID,RmFirstName,RmLastName,LastUpdateDate,RmID
1104,ldvfrvnucun,MVRLPCRNJAPJ,2012-12-28,588696
1654,hlhdhyvdw,NOZQLILC,2005-08-29,597080
1250,qrydj,UBMHTPYVZ,2007-05-19,571482
1250,q,J,2003-01-14,511395
1654,e,NDPLWKSTIHDVQ,2017-06-08,577397
1142,g,M,2002-10-28,555302
1228,bzz,IFCPDNMLBJUNECR,2011-05-07,504165
1228,lxizdbt,I,2019-12-22,503905
1759,bn,KOCCOANBENEE,2019-08-03,512280
1754,qdrgrzuiix,EIWGHFNOKP,2019-10-13,528657


In [0]:
relations_manager = sql("""
SELECT branch_id AS BranchID
	,CASE 
		WHEN length(first_name) = 0
			THEN relationship_manager_id
		ELSE first_name
		END AS RmFirstName
	,last_name AS RmLastName
	,cast(last_updated AS DATE) AS LastUpdateDate
	,relationship_manager_id AS RmID
FROM relations_managers_data
""")

#### Write relations manager data to the data warehouse as a table

In [0]:
# Notice how we overwrite these dimensions whenever we reload the table. 
# These are slow changing dimensions. Hence, we can afford to do so.

relations_manager.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sqlDwUrl) \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("dbtable", "relations_manager") \
    .option("tableOptions", "CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = REPLICATE") \
    .mode("overwrite") \
    .option("tempdir", tempDir) \
    .save()