In [0]:
filename= dbutils.widgets.get('filename')
print(filename)

orders.csv


In [0]:
dbutils.fs.mounts()

[MountInfo(mountPoint='/databricks-datasets', source='databricks-datasets', encryptionType=''),
 MountInfo(mountPoint='/Volumes', source='UnityCatalogVolumes', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-tracking', source='databricks/mlflow-tracking', encryptionType=''),
 MountInfo(mountPoint='/databricks-results', source='databricks-results', encryptionType=''),
 MountInfo(mountPoint='/databricks/mlflow-registry', source='databricks/mlflow-registry', encryptionType=''),
 MountInfo(mountPoint='/Volume', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/mnt/sales', source='wasbs://sales@ordersproj1.blob.core.windows.net', encryptionType=''),
 MountInfo(mountPoint='/volumes', source='DbfsReserved', encryptionType=''),
 MountInfo(mountPoint='/', source='DatabricksRoot', encryptionType=''),
 MountInfo(mountPoint='/volume', source='DbfsReserved', encryptionType='')]

In [0]:
dbServer = 'orders-proj1-server.database.windows.net'
dbPort = '1433'
dbName = 'orders_proj1'
dbUser = 'order-proj-username'
dbPassword = 'sql-password'
databricksScope = 'new_secret_scope'

In [0]:
storage_account_access_key = dbutils.secrets.get(scope= databricksScope , key= 'storage-account-access-key' )

In [0]:
alreadyMounted= False
for x in dbutils.fs.mounts():
    if x.mountPoint == '/mnt/sales':
        alreadyMounted = True
        break
    else:
        alreadyMounted= False

if not alreadyMounted:
    dbutils.fs.mount(source = 'wasbs://sales@ordersproj1.blob.core.windows.net', 
    mount_point = '/mnt/sales',
    extra_configs={'fs.azure.account.key.ordersproj1.blob.core.windows.net':storage_account_access_key}
     )
    
    alreadyMounted = True
    print("Mounting Done Successfully")
else:
    print("Mounting was already done")
    

Mounting was already done


True

In [0]:
order_df =spark.read.csv(f'/mnt/sales/Landing/{filename}', inferSchema = True, header = True)

In [0]:
order_df_distinct = order_df.distinct()
errorFlag = False

if order_df.count() != order_df_distinct.count():
    errorFlag = True

if errorFlag:
    dbutils.fs.mv(f'/mnt/sales/Landing/{filename}','/mnt/sales/Discarded')
    dbutils.notebook.exit({"errorFlag":"true","errorMessage":"OrderId is repeated"})

order_df.createOrReplaceTempView('order_table')

In [0]:
dbPassword = dbutils.secrets.get(scope = databricksScope, key='sql-password')

connectionUrl =f'jdbc:sqlserver://{dbServer}:{dbPort};database={dbName};user={dbUser}@{dbServer};password={dbPassword};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;'

connectionProperties = {
    'password': dbPassword,
    'driver':'com.microsoft.sqlserver.jdbc.SQLServerDriver'}
              

In [0]:
validStatusDf= spark.read.jdbc(url = connectionUrl, table= 'dbo.valid_order_status', properties = connectionProperties)

In [0]:
validStatusDf.createOrReplaceTempView('valid_status')

In [0]:
invalidRowsDf =spark.sql("select * from order_table where order_table.order_status not in (select * from valid_status)")

In [0]:
if invalidRowsDf.count() >0:
        errorFlag= True
if errorFlag:
    dbutils.fs.mv(f'/mnt/sales/Landing/{filename}','/mnt/sales/Discarded')
    dbutils.notebook.exit({'errorFlag':'true','errorMsg':'Invalid order status detected'})
else:
    dbutils.fs.mv(f'/mnt/sales/Landing/{filename}','/mnt/sales/Staging')
    


In [0]:
order_item_df = spark.read.csv('/mnt/sales/Order_items/order_items_proj1.csv', header  = True, inferSchema= True)
order_item_df.createOrReplaceTempView('order_items')

In [0]:
customers_df = spark.read.jdbc(url = connectionUrl , table= 'dbo.customers', properties = connectionProperties)

In [0]:
customers_df.createOrReplaceTempView('customers')

In [0]:
order_df =spark.read.csv(f'/mnt/sales/Staging/{filename}', inferSchema = True, header = True)
order_df.createOrReplaceTempView('order_table')


In [0]:
resultDf= spark.sql("""
          with cte as (select c.customer_id, c.customer_fname, c.customer_lname, c.customer_city, c.customer_state,o.order_id, oi.order_item_subtotal
            from customers c
            join order_table o on c.customer_id = o.order_customer_id
            join order_items oi on o.order_id = oi.order_item_order_id
            )

            select customer_id, customer_fname, customer_lname, customer_city, customer_state, count(order_id) as total_orders,
            round(sum(order_item_subtotal),2) as total_spent
            from cte
            group by customer_id, customer_fname, customer_lname, customer_city, customer_state
            order by total_spent desc
        """)

In [0]:
resultDf.write.jdbc(url = connectionUrl , table= 'dbo.report_sales_data' , properties= connectionProperties , mode='overwrite')