<img src ='https://assets.zuora.com/wp-content/uploads/2020/04/pluralsight-logo-2.png' alt='Pluralsight logo' height='150' width='300'>

## Process Branch and Relations Manager Dimension Tables

---

#### Mount `psdlsg2` Data Lake Storage

In [0]:
# the first piece of info needed is the container and storage account name.
_storage_account = "psdlg2"
_container = "rawdatastore"

# now form blobEnpoint
_config_key = f"fs.azure.account.key.{_storage_account}.blob.core.windows.net"
_blobEndpoint = f"wasbs://{_container}@{_storage_account}.blob.core.windows.net"
_mount_point = f"/mnt/{_storage_account}/{_container}"

dbutils.fs.mount(
    source = _blobEndpoint
    ,mount_point = _mount_point
    ,extra_configs = {_config_key:dbutils.secrets.get(scope = "pskv", key = "Databricks-Data-Lake-Storage-Key") }
  )
print(_mount_point)

#### Configure access to the `psbs` blob storage account and `psdlsg2` data lake storage from Azure Databricks.

In [0]:
# Provide the configuration to access the Azure Storage account from Azure Databricks.
# This temp directory holds temp files during data load using polybase
blobStorage = "psbs.blob.core.windows.net"
blobContainer = "temp-for-sql-warehouse"
blobAccessKey =  dbutils.secrets.get(scope = "pskv", key = "Databricks-Blob-Storage-Key") 

# Specify a temporary folder to use while moving data between Azure Databricks and Azure SQL Data Warehouse.
tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"

# Run the following snippet to store Azure Blob storage access keys in the configuration. This action ensures that you don't have to keep the access key in the notebook in plain text.
acntInfo = "fs.azure.account.key."+ blobStorage
sc._jsc.hadoopConfiguration().set(acntInfo, blobAccessKey)

# GlobomanticsDWH data warehouse related settings
dwHostname = "psasaw.sql.azuresynapse.net" # this is the server name
dwPort = 1433 # default sql port
dwUsername = dbutils.secrets.get(scope = "pskv", key = "GlobomanticsDWH-Username") 
dwPassword = dbutils.secrets.get(scope = "pskv", key = "GlobomanticsDWH-Password") 
dwDatabase = "GlobomanticsDWH"

dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"
sqlDwUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4};dwJdbcExtraOptions={5}".format(dwHostname,dwPort,dwDatabase,dwUsername,dwPassword,dwJdbcExtraOptions)

#### Read `JSON` from the `psdlsg2` Data Lake Storage

In [0]:
_mount_point = '/mnt/psdlg2/rawdatastore'
branch_data = sqlContext.read.json(f"{_mount_point}/data/branch/*.json")
relations_managers_data = sqlContext.read.json(f"{_mount_point}/data/relations_managers/*.json")

#### Prepare `Branch` Data

In [0]:
# create a temp table for the branch data to allow sql access
branch_data.createOrReplaceTempView('branch_data')

# display and explore the data
display(branch_data)
# observe that the branch name for the branch_id `1652` is empty. We will have to clean that up
# In addition, we have to type cast the date_created to a proper date type and rename columns for reporting

In [0]:
%sql
SELECT branch_id as BranchID
	,branch_location as BranchLocation
	,CASE 
		WHEN length(branch_name) = 0
			THEN lower(branch_location)||'-'||branch_id
		ELSE branch_name
		END AS BranchName
	,cast(date_created as date) as DateCreated
FROM branch_data

-- here we use sql to clean our data and display the final format as desired.
-- Now, obserce the empty branch name is replaced with the branch_location and the branch_id.
-- Also, all column names have been name according to the business reporting requirements

In [0]:
branch = sql("""
SELECT branch_id as BranchID
	,branch_location as BranchLocation
	,CASE 
		WHEN length(branch_name) = 0
			THEN lower(branch_location)||'-'||branch_id
		ELSE branch_name
		END AS BranchName
	,cast(date_created as date) as DateCreated
FROM branch_data
""")

#### Write branch data to the data warehouse as a table

In [0]:
# Observe that the branch table is written as a REPLICATE type. 
# This is due to the small size and the need to be used as a dimension table

branch.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sqlDwUrl) \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("dbtable", "branch") \
    .option("tableOptions", "CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = REPLICATE") \
    .mode("overwrite") \
    .option("tempdir", tempDir) \
    .save()

#### Prepare `Relations Manager` Data

In [0]:
relations_managers_data.createOrReplaceTempView('relations_managers_data')
display(relations_managers_data)
# observe that the first_names for the relationship_manager_ids `503478`, `579131` and `595647` are empty. We will have to clean that up
# In addition, we have to type cast the last_updated to a proper date type and rename columns for reporting

In [0]:
%sql
SELECT branch_id AS BranchID
	,CASE 
		WHEN length(first_name) = 0
			THEN relationship_manager_id
		ELSE first_name
		END AS RmFirstName
	,last_name AS RmLastName
	,cast(last_updated AS DATE) AS LastUpdateDate
	,relationship_manager_id AS RmID
FROM relations_managers_data

In [0]:
relations_manager = sql("""
SELECT branch_id AS BranchID
	,CASE 
		WHEN length(first_name) = 0
			THEN relationship_manager_id
		ELSE first_name
		END AS RmFirstName
	,last_name AS RmLastName
	,cast(last_updated AS DATE) AS LastUpdateDate
	,relationship_manager_id AS RmID
FROM relations_managers_data
""")

#### Write relations manager data to the data warehouse as a table

In [0]:
# Notice how we overwrite these dimensions whenever we reload the table. 
# These are slow changing dimensions. Hence, we can afford to do so.

relations_manager.write \
    .format("com.databricks.spark.sqldw") \
    .option("url", sqlDwUrl) \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("dbtable", "relations_manager") \
    .option("tableOptions", "CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = REPLICATE") \
    .mode("overwrite") \
    .option("tempdir", tempDir) \
    .save()