# Databricks notebook source
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://sales-data@dataprojectstorage.dfs.core.windows.net/",
  mount_point = "/mnt/sales-data",
  extra_configs = configs)


In [0]:

# COMMAND ----------

dbutils.fs.ls("/mnt/sales-data/bronze/SalesLT/")

[FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/Address/', name='Address/', size=0, modificationTime=1728217724000),
 FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/Customer/', name='Customer/', size=0, modificationTime=1728217725000),
 FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/CustomerAddress/', name='CustomerAddress/', size=0, modificationTime=1728217723000),
 FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/Product/', name='Product/', size=0, modificationTime=1728217725000),
 FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/ProductCategory/', name='ProductCategory/', size=0, modificationTime=1728217728000),
 FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/ProductDescription/', name='ProductDescription/', size=0, modificationTime=1728217725000),
 FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/ProductModel/', name='ProductModel/', size=0, modificationTime=1728217725000),
 FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/ProductModelProductDescription/', n

In [0]:
dbutils.fs.ls("/mnt/sales-data/bronze")

[FileInfo(path='dbfs:/mnt/sales-data/bronze/SalesLT/', name='SalesLT/', size=0, modificationTime=1728217723000)]

In [0]:
table_name = []

for i in dbutils.fs.ls('mnt/sales-data/bronze/SalesLT/'):
  print(i.name)
  table_name.append(i.name.split('/')[0])

Address/
Customer/
CustomerAddress/
Product/
ProductCategory/
ProductDescription/
ProductModel/
ProductModelProductDescription/
SalesOrderDetail/
SalesOrderHeader/


In [0]:
table_name

['Address',
 'Customer',
 'CustomerAddress',
 'Product',
 'ProductCategory',
 'ProductDescription',
 'ProductModel',
 'ProductModelProductDescription',
 'SalesOrderDetail',
 'SalesOrderHeader']

In [0]:

from pyspark.sql.functions import from_utc_timestamp, date_format
from pyspark.sql.types import TimestampType

for i in table_name:
  path = '/mnt/sales-data/bronze/SalesLT/' + i + '/' + i + '.parquet'
  df = spark.read.format('parquet').load(path)
  column = df.columns

  for col in column:
    if "Date" in col or "date" in col:
      df = df.withColumn(col, date_format(from_utc_timestamp(df[col].cast(TimestampType()), "UTC"), "yyyy-MM-dd"))

  output_path = '/mnt/sales-data/silver/SalesLT/' +i +'/'
  df.write.format('delta').mode("overwrite").save(output_path)
