
#### Using Azure key vault to fetch the access key from the Azure Storage account

In [0]:
accesskey = dbutils.secrets.get(scope= "MountingADLS", key= "mountingAccessKey")


#### Mounting Storage account on Databricks

In [0]:
%python
alreadyMounted = False

for m in dbutils.fs.mounts():
    if m.mountPoint == "/mnt/sales":
        alreadyMounted = True
        print("This mountpoint is already there")
        break

if not alreadyMounted:
    storage_account_key = accesskey
    storage_account_name = "ttstorageaccount12a"
    container_name = "sales"

    dbutils.fs.mount(
        source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
        mount_point="/mnt/sales",
        extra_configs={"fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account_name): storage_account_key}
    )

    alreadyMounted = True
    print("Mounting done successfully")

This mountpoint is already there



#### Fetching the file name from the Azure Data Factory's Storage Event Trigger Parameter

In [0]:
file = dbutils.widgets.get("IncomingFile")
print(file)


#### Reading the fetched file

In [0]:
orders_df = spark.read \
    .format("csv") \
        .option("header", "true") \
            .option("path", f"dbfs:/mnt/sales/landing/{file}") \
                .load()

In [0]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
| 1111111|2013-07-25 00:00:...|      11599|         CLOSED|
| 2222222|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
| 3333333|2013-07-25 00:00:...|      12111|       COMPLETE|
| 4444444|2013-07-25 00:00:...|       8827|         CLOSED|
+--------+--------------------+-----------+---------------+



In [0]:
errorFlag = False

ordersCount = orders_df.count()

print(ordersCount)

4


In [0]:
distinctOrdersCount = orders_df.select("order_id").distinct().count()

print(distinctOrdersCount)

4



#### Setting the first validation check on the basis of distinct id

In [0]:
if ordersCount != distinctOrdersCount:
    errorFlag = True 

if errorFlag == True:
    dbutils.fs.mv(f"dbfs:/mnt/sales/landing/{file}", "dbfs:/mnt/sales/discarded")
    dbutils.notebook.exit('{"errorFlag": "True", "errorMsg":"Orderid is repeated"}')



#### Now connecting to Azure SQL Database for the Second Validation check

In [0]:
dbServer = "tt-capstone-sql-server.database.windows.net"
dbPort = "1433"
dbName = "tt-capstone-database"
dbUser = "lovepreetadmin"
dbPassword = dbutils.secrets.get(scope= "MountingADLS", key= "SQL-password")

In [0]:
connectionUrl = 'jdbc:sqlserver://{0}:{1};database={2};user={3}'.format(dbServer, dbPort, dbName, dbUser)



In [0]:
%python
connectionProperties = {
    "user": dbUser,
    "password": dbPassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

valid_status_df = spark.read.jdbc(
    url=connectionUrl,
    table='dbo.valid_order_status',
    properties=connectionProperties
)

In [0]:
valid_status_df.show()

+---------------+
|    status_name|
+---------------+
|        ON_HOLD|
| PAYMENT_REVIEW|
|     PROCESSING|
|         CLOSED|
|SUSPECTED_FRAUD|
|       COMPLETE|
|        PENDING|
|       CANCELED|
|PENDING_PAYMENT|
+---------------+



In [0]:
valid_status_df.createOrReplaceTempView("valid_status")

In [0]:
from pyspark.sql.functions import *


#### Building logic for the second validation check

In [0]:
invalidRowDf = spark.sql("""
          SELECT * FROM orders WHERE order_status NOT IN (SELECT status_name FROM valid_status)
          """)

In [0]:
invalidRowDf.show()

+--------+----------+-----------+------------+
|order_id|order_date|customer_id|order_status|
+--------+----------+-----------+------------+
+--------+----------+-----------+------------+




##### If the file passed both the validations then it moves to Staging folder in Azure Storage Account

In [0]:
if invalidRowDf.count() > 0:
    errorFlag = True 

if errorFlag:
    dbutils.fs.mv(f"dbfs:/mnt/sales/landing/{file}", "dbfs:/mnt/sales/discarded")
    dbutils.notebook.exit('{"errorFlag": "True", "errorMsg":"Invalid order status found"}')

else:
    dbutils.fs.mv(f"dbfs:/mnt/sales/landing/{file}", "dbfs:/mnt/sales/staging")
    


#### Read the same orders file from staging folder

In [0]:
orders_staging_df = spark.read \
    .format("csv") \
        .option("header", "true") \
            .option("path", f"dbfs:/mnt/sales/staging/{file}") \
            .load()

In [0]:
# Creating this Temp View

orders_staging_df.createOrReplaceTempView("orders")


#### Reading another file orders item, which is in another folder of Azure Storage account

In [0]:
order_item_df = spark.read \
    .format("csv") \
    .option("header", "true") \
        .option("path", "dbfs:/mnt/sales/order_items/order_items1.csv") \
            .load()

In [0]:
order_item_df.show(5)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|                250|                      50|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
+-------------+-

In [0]:
order_item_df.createOrReplaceTempView("order_item")


#### Reading another file customers data from SQLDB

In [0]:
dbServer = "tt-capstone-sql-server.database.windows.net"
dbPort = "1433"
dbName = "tt-capstone-database"
dbUser = "lovepreetadmin"
dbPassword = dbutils.secrets.get(scope= "MountingADLS", key = "SQL-password")

In [0]:
connectionUrl = 'jdbc:sqlserver://{0}:{1};database={2};user={3}'.format(dbServer, dbPort, dbName, dbUser)

connectionProperties = {
    "user": dbUser,
    "password": dbPassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

customers_df = spark.read.jdbc(
    url=connectionUrl,
    table='dbo.customers',
    properties=connectionProperties
)

In [0]:
customers_df.show(5)

+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|customer_id|customer_fname|customer_lname| username| password|             address|       city|state|pincode|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
|          1|       Richard|     Hernandez|XXXXXXXXX|XXXXXXXXX|  6303 Heather Plaza|Brownsville|   TX|  78521|
|          2|          Mary|       Barrett|XXXXXXXXX|XXXXXXXXX|9526 Noble Embers...|  Littleton|   CO|  80126|
|          3|           Ann|         Smith|XXXXXXXXX|XXXXXXXXX|3422 Blue Pioneer...|     Caguas|   PR|  00725|
|          4|          Mary|         Jones|XXXXXXXXX|XXXXXXXXX|  8324 Little Common| San Marcos|   CA|  92069|
|          5|        Robert|        Hudson|XXXXXXXXX|XXXXXXXXX|10 Crystal River ...|     Caguas|   PR|  00725|
+-----------+--------------+--------------+---------+---------+--------------------+-----------+-----+-------+
o

In [0]:
customers_df.createOrReplaceTempView("customers")


#### Setting the business logic

In [0]:
result_df = spark.sql("""
          SELECT c.customer_id, c.customer_fname, c.customer_lname, c.city, c.state, c.pincode, COUNT(order_id) AS num_orders_placed,
          ROUND(SUM(order_item_subtotal), 2) AS total_amount
          from customers c
          JOIN orders o ON c.customer_id = o.customer_id
          JOIN order_item oi ON o.order_id = oi.order_item_order_id
          GROUP BY c.customer_id, c.customer_fname, c.customer_lname, c.city, c.state, c.pincode
          ORDER BY total_amount DESC
          """)

In [0]:
result_df.show(5, truncate= False)


#### Now writing this result back to Azure SQL DB so that any visualization tool can use this data

In [0]:
result_df.write.jdbc(url=connectionUrl, table='dbo.sales_reporting', mode='overwrite', properties=connectionProperties)

In [0]:
dbutils.notebook.exit("Success")