In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='sceretkey')]

In [0]:
dbutils.secrets.list('sceretkey')

[SecretMetadata(key='blobstorgekey'),
 SecretMetadata(key='datakey'),
 SecretMetadata(key='secret')]

In [0]:
client_id = dbutils.secrets.get("sceretkey", "blobstorgekey")
client_secret = dbutils.secrets.get("sceretkey", "datakey")
directory_id = dbutils.secrets.get("sceretkey", "secret")

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": client_id,
          "fs.azure.account.oauth2.client.secret": client_secret,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{directory_id}/oauth2/token"}
     

In [0]:
storage_account = "destdatalake"

container_raw = "silver"
container_processed = "gold"

source_raw = f"abfss://{container_raw}@{storage_account}.dfs.core.windows.net/"
mount_point_raw = f"/mnt/{storage_account}/{container_raw}"

source_processed = f"abfss://{container_processed}@{storage_account}.dfs.core.windows.net/"
mount_point_processed = f"/mnt/{storage_account}/{container_processed}"

In [0]:
mount_point = "/mnt/destdatalake/silver"

if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    # Unmount the existing mount point
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted existing mount at {mount_point}")

try:
    dbutils.fs.mount(
        source="abfss://silver@destdatalake.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )
    print(f"Mounted successfully at{mount_point}")
except Exception as e:
    print(f"Error mounting: {e}")

Error mounting: An error occurred while calling o440.mount.
: java.lang.NullPointerException
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenCall(AzureADAuthenticator.java:320)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenCall(AzureADAuthenticator.java:287)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenUsingClientCreds(AzureADAuthenticator.java:110)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.verifyAzureOAuth(DBUtilsCore.scala:1224)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.verifyAzureFileSystem(DBUtilsCore.scala:1236)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.createOrUpdateMount(DBUtilsCore.scala:1120)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.$anonfun$mount$1(DBUtilsCore.scala:1168)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:527)
	at com.databrick

In [0]:
mount_point = "/mnt/destdatalake/gold"

if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    # Unmount the existing mount point
    dbutils.fs.unmount(mount_point)
    print(f"Unmounted existing mount at {mount_point}")

try:
    dbutils.fs.mount(
        source="abfss://gold@destdatalake.dfs.core.windows.net/",
        mount_point=mount_point,
        extra_configs=configs
    )
    print(f"Mounted successfully at{mount_point}")
except Exception as e:
    print(f"Error mounting: {e}")

Error mounting: An error occurred while calling o440.mount.
: java.lang.NullPointerException
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenCall(AzureADAuthenticator.java:320)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenCall(AzureADAuthenticator.java:287)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.getTokenUsingClientCreds(AzureADAuthenticator.java:110)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.verifyAzureOAuth(DBUtilsCore.scala:1224)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.verifyAzureFileSystem(DBUtilsCore.scala:1236)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.createOrUpdateMount(DBUtilsCore.scala:1120)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.$anonfun$mount$1(DBUtilsCore.scala:1168)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:527)
	at com.databrick

In [0]:
storage_account_name = "destdatalake"


In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CustomerTotalBalance").getOrCreate()


In [0]:
# Step 1: Extract data from Silver container
silver_container_path = f"abfss://silver@{storage_account_name}.dfs.core.windows.net/"
accounts_path = f"{silver_container_path}/accountnew"
customers_path = f"{silver_container_path}/customernew"

In [0]:
accounts_df = spark.read.format("delta").load(accounts_path)
customers_df = spark.read.format("delta").load(customers_path)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum


#Step 2: Transform data - Calculate total balance for each customer
joined_df = accounts_df.join(customers_df, on="customer_id", how="inner")

# Calculate the total balance per customer
from pyspark.sql import functions as F

total_balance_df = joined_df.groupBy("customer_id", "first_name").agg(
    F.sum("balance").alias("total_balance")
)

# Join the calculated total_balance back with the original joined dataframe
# Assuming necessary imports are already done
from pyspark.sql import functions as F

# Join the calculated total_balance back with the original joined dataframe
# Rename first_name in total_balance_df to avoid ambiguity
total_balance_df = total_balance_df.withColumnRenamed("first_name", "total_first_name")

# Perform the join and select the necessary columns
final_df = joined_df.join(
    total_balance_df, 
    on="customer_id", 
    how="inner"
).select(
    "customer_id",
    "first_name",        # Keep first_name from joined_df
    "account_id",
    "balance",
    "total_balance",
    "total_first_name"   # Use renamed first_name from total_balance_df
)





In [0]:
# Step 3: Load data into Gold container
gold_container_path = f"abfss://gold@{storage_account_name}.dfs.core.windows.net/"
gold_output_path = f"{gold_container_path}/customer_total_balance"

final_df.write.format("delta").mode("overwrite").save(gold_output_path)

In [0]:
# Verify data in Gold container
gold_df = spark.read.format("delta").load(gold_output_path)
gold_df.show(3)

+-----------+----------+----------+-------+-------------+----------------+
|customer_id|first_name|account_id|balance|total_balance|total_first_name|
+-----------+----------+----------+-------+-------------+----------------+
|         15|        47|        38| 3900.5|      7930.25|              66|
|         75|        61|        59| 475.75|      2556.75|              26|
|          4|        34|        78| 7900.5|     11880.25|              89|
+-----------+----------+----------+-------+-------------+----------------+
only showing top 3 rows

