Import Libraries

In [None]:
from pyspark.sql.functions import col, current_timestamp, expr
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

Synapse Dedicated Pool Connection

In [None]:
hostname      = dbutils.secrets.get(scope="key-vault-secrets", key="synapse-dedicated-pool-server-name")
database_name = dbutils.secrets.get(scope="key-vault-secrets", key="synapse-dedicated-pool-database-name")
username      = dbutils.secrets.get(scope="key-vault-secrets", key="synapse-dedicated-pool-user")
password      = dbutils.secrets.get(scope="key-vault-secrets", key="synapse-dedicated-pool-password")

jdbc_url = f"jdbc:sqlserver://{hostname}.database.windows.net:1433;database={database_name}"
connection_properties = {
    "user": username,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

Capture Synapse Variables

In [None]:
vDay                    = dbutils.widgets.get("pDay")
vMonth                  = dbutils.widgets.get("pMonth")
vYear                   = dbutils.widgets.get("pYear")
vContainer              = dbutils.widgets.get("pContainer")
vStorageAccountName     = dbutils.widgets.get("pStorageAccountName")
vDatabase               = dbutils.widgets.get("pDatabase")
vDatabaseName           = dbutils.widgets.get("pDatabaseName")
vSchemaName             = dbutils.widgets.get("pSchemaName")
vTableName              = dbutils.widgets.get("pTableName")
vLayer                  = dbutils.widgets.get("pLayer")
vPrimaryKey             = dbutils.widgets.get("pPrimaryKey")
vProcessType            = dbutils.widgets.get("pProcessType")

In [None]:
# vDay                    = "07"
# vMonth                  = "11"
# vYear                   = "2024"
# vContainer              = "raw"
# vStorageAccountName     = "stgeusproddata01"
# vDatabase               = "sqlserver"
# vDatabaseName           = "FCSACESSO"
# vSchemaName             = "dbo"
# vTableName              = "TUSU"
# vLayer                  = "silver"
# vPrimaryKey             = "USNOMEUSU"
# vProcessType            = "INCR"

Verify if bronze layer have data

In [None]:
df = spark.sql(f"""SELECT count(*) FROM bronze_{vDatabaseName}.{vTableName}""")
df_count = df.count()

Exit command if bronze layer is empty

In [None]:
if df_count == 0:
    dbutils.notebook.exit("Succeeded")

Mount ADLS

In [None]:
spark.conf.set(f"fs.azure.account.key.{vStorageAccountName}.dfs.core.windows.net",
                dbutils.secrets.get(scope = "key-vault-secrets", key = "token-storage-datalake"))
                
display(dbutils.fs.ls(f"abfss://{vContainer}@{vStorageAccountName}.dfs.core.windows.net/{vDatabase}/{vDatabaseName}/{vSchemaName}/{vTableName}/{vYear}/{vMonth}/{vDay}"))

Create the column names

In [None]:
# Initialize Spark session (if needed)
spark = SparkSession.builder.appName("Retrieve Columns").getOrCreate()

# Query to get column names
query = f"""
SELECT COLUMN_NAME
FROM information_schema.columns
WHERE 
    table_schema = lower('bronze_{vDatabaseName}') AND table_name = lower('{vTableName}') AND COLUMN_NAME not in ('SYS_CHANGE_VERSION', 'SYS_CHANGE_OPERATION', 'SYS_CHANGE_CREATION_VERSION', 'COMMIT_TIME', 'PARTITION_TIME')
ORDER BY ordinal_position
"""

# Execute the query and get the DataFrame
columns_df = spark.sql(query)

# Collect column names into a list
column_names = [row.COLUMN_NAME for row in columns_df.collect()]

# Concatenate column names into a single string
columns_string = ', '.join(column_names)

# Now you can use the columns_string variable
print(columns_string)

Create the command to update values

In [None]:
spark = SparkSession.builder.appName("Retrieve Columns").getOrCreate()

# Query to get column names
query = f"""
SELECT CONCAT('S.',COLUMN_NAME) as COLUMN_NAME
FROM information_schema.columns
WHERE 
    table_schema = lower('bronze_{vDatabaseName}') AND table_name = lower('{vTableName}') AND COLUMN_NAME not in ('SYS_CHANGE_VERSION', 'SYS_CHANGE_OPERATION', 'SYS_CHANGE_CREATION_VERSION', 'COMMIT_TIME', 'PARTITION_TIME')
ORDER BY ordinal_position
"""

# Execute the query and get the DataFrame
columns_df = spark.sql(query)

# Collect column names into a list
column_names = [row.COLUMN_NAME for row in columns_df.collect()]

# Concatenate column names into a single string
columns_string_insert_values = ', '.join(column_names)

# Now you can use the columns_string variable
print(columns_string_insert_values)

Create the command to update values

In [None]:
# Initialize Spark session (if needed)
spark = SparkSession.builder.appName("Retrieve Columns").getOrCreate()

# Query to get column names
query = f"""
SELECT CONCAT('T.',COLUMN_NAME, ' = ', 'S.',COLUMN_NAME ) as COLUMN_NAME
FROM information_schema.columns
WHERE 
    table_schema = lower('bronze_{vDatabaseName}') AND table_name = lower('{vTableName}') AND COLUMN_NAME not in ('SYS_CHANGE_VERSION', 'SYS_CHANGE_OPERATION', 'SYS_CHANGE_CREATION_VERSION', 'COMMIT_TIME', 'PARTITION_TIME') AND column_name not in ( '{vPrimaryKey}' )
ORDER BY ordinal_position 
"""

# Execute the query and get the DataFrame
columns_df = spark.sql(query)

# Collect column names into a list
column_names = [row.COLUMN_NAME for row in columns_df.collect()]

# Concatenate column names into a single string
columns_string_update_values = ', '.join(column_names)

# Now you can use the columns_string variable
print(columns_string_update_values)

Create the command to compare primary keys

In [None]:
# Split the primary key into a list without spaces
primary_keys = vPrimaryKey.split(',')

# Create the command string
command_primary_key = ' AND '.join([f'T.{key} = S.{key}' for key in primary_keys])

# Output the result
print(command_primary_key)

Create full command to silver layer if processtype is full

In [None]:
query_silver = f"SELECT {columns_string} FROM bronze_{vDatabaseName}.{vTableName} where PARTITION_TIME = (SELECT MAX(PARTITION_TIME) FROM bronze_{vDatabaseName}.{vTableName})"

Create merge command to silver layer if processtype is incremental

In [None]:
# Construct the final SQL query
query_merge = f"""
MERGE INTO silver_{vDatabaseName}.{vTableName} T
USING (
  WITH TABLE_MERGE AS (
    SELECT
      ROW_NUMBER() OVER (PARTITION BY {vPrimaryKey} ORDER BY commit_time DESC) AS dense_rank,
      *
    FROM
      bronze_{vDatabaseName}.{vTableName}
    WHERE PARTITION_TIME >= DATEADD(DAY, -3, GETDATE())
  ),
  TABLE_FILTER AS (
    SELECT
      sys_change_operation,
      {columns_string}
    FROM
      TABLE_MERGE
    WHERE
      dense_rank = 1
  )
  SELECT
    DISTINCT *
  FROM
    TABLE_FILTER
) S
ON {command_primary_key}
WHEN MATCHED AND S.sys_change_operation = 'D' THEN 
  DELETE
WHEN MATCHED AND S.sys_change_operation = 'U' THEN
  UPDATE SET {columns_string_update_values}
WHEN NOT MATCHED AND S.sys_change_operation IN ('I','U') THEN
  INSERT ({columns_string})
  VALUES ({columns_string_insert_values});
"""

Execute load data to silver layer

In [None]:
if vProcessType == 'FULL':
    df = spark.sql(query_silver)
    df = df.withColumn("DATA_CARGA", F.current_timestamp() - F.expr("INTERVAL 3 HOURS"))
else:
    spark.sql(query_merge)
    spark.sql(f"""UPDATE silver_{vDatabaseName}.{vTableName}  SET DATA_CARGA = DATEADD(HOUR, -3, GETDATE())""")
    df = spark.sql(f"""SELECT * FROM silver_{vDatabaseName}.{vTableName}""")

Save dataframe to delta table inside raw layer

In [None]:
#df.write.format("delta").partitionBy("PARTITION_TIME").mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{vLayer}_{vDatabaseName}.{vTableName}")
df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{vLayer}_{vDatabaseName}.{vTableName}")

Display lines inside delta table of raw layer

In [None]:
query = f"SELECT * FROM {vLayer}_{vDatabaseName}.{vTableName} LIMIT 10"
display_query = spark.sql(query)
display(display_query)

Write dataframe to container silver inside storage

In [None]:
local_path = f"abfss://{vLayer}@{vStorageAccountName}.dfs.core.windows.net/{vDatabase}/{vDatabaseName}/{vSchemaName}/{vTableName}/{vYear}/{vMonth}/{vDay}/"
df.write.mode("overwrite").parquet(f"{local_path}")