In [None]:
filename = dbutils.widgets.get("filename")
print(filename)

In [None]:
alreadyMounted = False
for x in dbutils.fs.mounts():
    if x.mountPoint == '/mnt/sales':
        alreadyMounted = True
        break
    else:
        alreadyMounted = False
print(alreadyMounted)


In [None]:
if not alreadyMounted:
    dbutils.fs.mount(
source = 'wasbs://sales@retaildbsales.blob.core.windows.net',
mount_point = '/mnt/sales',
extra_configs={'fs.azure.account.key.retaildbsales.blob.core.windows.net':'<storage_acccount_accesskey'})
    alreadyMounted = True
    print("mounting done successfully")
else:
    print("already mounted")


In [None]:
%fs 
ls /mnt/sales

path,name,size,modificationTime
dbfs:/mnt/sales/discarded/,discarded/,0,0
dbfs:/mnt/sales/landing/,landing/,0,0
dbfs:/mnt/sales/staging/,staging/,0,0


In [None]:
%fs 
ls /mnt/sales/landing


path,name,size,modificationTime
dbfs:/mnt/sales/landing/orders.csv,orders.csv,224,1724833484000


In [None]:
df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/mnt/sales/landing/{}".format(filename))

In [None]:
df.show()

+--------+-------------------+-----------+---------------+
|order_id|         order_date|customer_id|   order_status|
+--------+-------------------+-----------+---------------+
| 1111111|2013-07-25 00:00:00|      11599|         CLOSED|
| 2222222|2013-07-25 00:00:00|        256|PENDING_PAYMENT|
| 3333333|2013-07-25 00:00:00|      12111|       COMPLETE|
| 4444444|2013-07-25 00:00:00|       8827|         CLOSED|
+--------+-------------------+-----------+---------------+



In [None]:
display(df)

order_id,order_date,customer_id,order_status
1111111,2013-07-25T00:00:00Z,11599,CLOSED
2222222,2013-07-25T00:00:00Z,256,PENDING_PAYMENT
3333333,2013-07-25T00:00:00Z,12111,COMPLETE
4444444,2013-07-25T00:00:00Z,8827,CLOSED


In [None]:
errorFlg = False

orderscount = df.count()
distinct_orders = df.select("order_id").distinct().count()

if orderscount != distinct_orders:
    errorFlg = True
if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename), 'mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg": "true", "errorMsg":"Orderid is repeated"}')
    
df.createOrReplaceTempView("orders")

In [None]:
print(distinct_orders)
print(orderscount)

4
4


In [None]:
dbServer = 'retaildb-sales'
dbPort = '1433'
dbName = 'retaildb'
dbUser = 'prem'
dbPassword = 'sqlpassword'
databricksScope = 'salesprojectscope'

connectionUrl = 'jdbc:sqlserver://{}.database.windows.net:{};database={};user={};'.format(dbServer,dbPort, dbName, dbUser)

dbPassword = dbutils.secrets.get(scope = databricksScope, key='sqlpassword')

connectionProperties = {'password': dbPassword,'driver':'com.microsoft.sqlserver.jdbc.SQLServerDriver'}

In [None]:
valid_status_df = spark.read.jdbc(url = connectionUrl, table = 'dbo.valid_order_status', properties = connectionProperties)

In [None]:
valid_status_df.show()

+---------------+
|    status_name|
+---------------+
|        ON_HOLD|
| PAYMENT_REVIEW|
|     PROCESSING|
|         CLOSED|
|SUSPECTED_FRAUD|
|       COMPLETE|
|        PENDING|
|       CANCELED|
|PENDING_PAYMENT|
+---------------+



In [None]:
valid_status_df.createOrReplaceTempView('valid_status')

In [None]:
invalid_rows_df = spark.sql("select * from orders where order_status not in (select * from valid_status)")

In [None]:
invalid_rows_df.show()

+--------+----------+-----------+------------+
|order_id|order_date|customer_id|order_status|
+--------+----------+-----------+------------+
+--------+----------+-----------+------------+



In [None]:
if invalid_rows_df.count() > 0:
    errorFlg = True
if errorFlg:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename), 'mnt/sales/discarded')
    dbutils.notebook.exit('{"errorFlg": "true", "errorMsg":"invalid Order status found}')
else:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename), 'mnt/sales/staging')
    

order_items

In [None]:
order_items_df = spark.read.csv('/mnt/sales/order_items/order_items.csv', header=True, inferSchema=True)
order_items_df.createOrReplaceTempView("order_items")
display(order_items_df)

customers

In [None]:
customers_df = spark.read.jdbc(url = connectionUrl, table = 'dbo.customers', properties = connectionProperties)
customers_df.show()
customers_df.createOrReplaceTempView("customers")

In [None]:
ordersdf = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/mnt/sales/staging/{}".format(filename))
ordersdf.createOrReplaceTempView("orders")

In [None]:
df1 = spark.sql("""
    SELECT 
        customers.customer_id, 
        customers.customer_fname, 
        customers.customer_lname, 
        customers.customer_city,
        customers.customer_state, 
        customers.customer_zipcode, 
        COUNT(orders.order_id) AS num_orders_placed, 
        ROUND(SUM(order_items.order_item_subtotal), 2) AS total_amount 
    FROM 
        customers, 
        orders, 
        order_items 
    WHERE 
        customers.customer_id = orders.customer_id 
        AND order_items.order_item_order_id = orders.order_id 
    GROUP BY 
        customers.customer_id, 
        customers.customer_fname, 
        customers.customer_lname, 
        customers.customer_city,
        customers.customer_state, 
        customers.customer_zipcode 
    ORDER BY 
        total_amount DESC
""")

In [None]:
df1.write.jdbc(url = connectionUrl, table = 'dbo.sales_reporting', mode = 'overwrite', properties = connectionProperties)