In [0]:
filename=dbutils.widgets.get("filename")
# filename='orders.csv'

In [0]:
alreadyMounted=False
for mp in dbutils.fs.mounts():
  if mp.mountPoint == '/mnt/sales':
    alreadyMounted=True
    break


In [0]:
if not alreadyMounted:
    dbutils.fs.mount(
    source='wasbs://sales@salesss.blob.core.windows.net',
    mount_point='/mnt/sales',
    extra_configs={'fs.azure.account.key.salesss.blob.core.windows.net': dbutils.secrets.get(scope="salesProject", key="ADLS")})
    alreadyMounted=True
    print("Mounted Successfully")
else:
    print("Already Mounted")

In [0]:
dbutils.fs.ls("mnt/sales/landing")

In [0]:
df=spark.read.csv('/mnt/sales/landing/{}'.format(filename),header=True,inferSchema=True)

In [0]:
df.display()

In [0]:
errorFlag=False

OrdersCount = df.count()
DistinctOrdersCount = df.select('order_id').distinct().count()
if OrdersCount != DistinctOrdersCount:
  print("ERROR: There are duplicate order ids in the orders file")
  errorFlag=True

if errorFlag:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename), '/mnt/sales/discarded')
    dbutils.notebook.exit("FAILURE: Data quality check failed")

df.createOrReplaceTempView("orders")

In [0]:
%sql
select * from orders

In [0]:
dbServer='mothership-server'
dbName='capproject'
dbPort='1433'
dbUser='pdeep'
dbScope="salesProject"
dbKey="database"

In [0]:
 connection_url='jdbc:sqlserver://{}.database.windows.net:{};database={};user={};'.format(dbServer,dbPort,dbName,dbUser)

 dbPassword=dbutils.secrets.get(scope=dbScope,key=dbKey)

connection_properties={"password":dbPassword,
                       'driver':'com.microsoft.sqlserver.jdbc.SQLServerDriver'}

In [0]:
valid_df=spark.read.jdbc(url=connection_url,table='dbo.valid_status',properties=connection_properties)

In [0]:
display(valid_df)

In [0]:
valid_df.createOrReplaceTempView("valid_status")

In [0]:
invalid_rows_df=spark.sql('select * from orders where order_status not in (select status from valid_status)')

In [0]:
display(invalid_rows_df)

In [0]:
errorFlag=False
if invalid_rows_df.count()>0:
    errorFlag=True
if errorFlag:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename), '/mnt/sales/discarded')
    dbutils.notebook.exit("FAILURE: Data quality check failed")
else:
    dbutils.fs.mv('/mnt/sales/landing/{}'.format(filename), '/mnt/sales/staging')

In [0]:
 orders_item_df=spark.read.csv('/mnt/sales/orderItems/order_items.csv',header=True,inferSchema=True)
 display(orders_item_df)
 orders_item_df.createOrReplaceTempView('order_items')

In [0]:
customer_df=spark.read.jdbc(url=connection_url,table='dbo.customers',properties=connection_properties)
display(customer_df)
customer_df.createOrReplaceTempView('customers')

In [0]:
df=spark.read.csv('/mnt/sales/staging/{}'.format(filename),header=True,inferSchema=True)
df.createOrReplaceTempView('orders')
display(df)

In [0]:
dis=spark.sql('select customers.customer_id,customers.customer_fname,customers.customer_lname,customers.customer_city,customers.customer_state,customers.customer_zipcode, count(order_id) as num_orders_placed, round(sum(order_item_subtotal)) as  total_amount from customers,orders,order_items where customers.customer_id=orders.customer_id and orders.order_id=order_items.order_item_order_id group by customers.customer_id,customers.customer_fname,customers.customer_lname,customers.customer_city,customers.customer_state,customers.customer_zipcode order by total_amount desc')

In [0]:
display(dis)

In [0]:
dis.write.mode("overwrite").option("truncate", "true").jdbc(url=connection_url, table="dbo.sales_reporting", properties=connection_properties)

In [0]:
dbutils.fs.unmount('/mnt/sales')