In [4]:
from pyspark.sql.functions import * 
from pyspark.sql.types  import * 
from delta.tables import * 




StatementMeta(, 44c32148-9305-4348-84ca-ac21dfcf14e4, 6, Finished, Available, Finished)

In [5]:
orders_schema = StructType([
    StructField('SalesOrderNumber' , StringType() , False) , 
    StructField('SalesOrderLineNumber' , IntegerType() , False) , 
    StructField('OrderDate' , DateType() , False) , 
    StructField('CustomerName' , StringType() , False) , 
    StructField('Email' , StringType() , False) , 
    StructField('Item' , StringType() , False) , 
    StructField('Quantity' , IntegerType() , False) , 
    StructField('UnitPrice' , FloatType() , False) , 
    StructField('Tax' , FloatType() , False) 
])
orders_data = spark.read.format('csv')\
                .option('header',False)\
                .schema(orders_schema)\
                .load('Files/bronze/*.csv')





orders_data = orders_data.withColumn('FileName' , input_file_name())
orders_data = orders_data.withColumn('IsFlagged' , when( col('OrderDate') > '2019-08-01' , True ).otherwise(False))


orders_data.select('IsFlagged').distinct().show()

orders_data.groupBy('IsFlagged').agg({'SalesOrderNumber':'count'}).show()


orders_data= orders_data.withColumn('CreatedTS' , date_format(from_utc_timestamp( current_timestamp() , 'Asia/Kolkata'), 'yyyy-MM-dd hh:mm:ss'))\
            .withColumn('ModifiedTS' , date_format(from_utc_timestamp( current_timestamp() , 'America/Chicago') , 'yyyy-MM-dd hh:mm:ss'))



StatementMeta(, 44c32148-9305-4348-84ca-ac21dfcf14e4, 7, Finished, Available, Finished)

+---------+
|IsFlagged|
+---------+
|     true|
|    false|
+---------+

+---------+-----------------------+
|IsFlagged|count(SalesOrderNumber)|
+---------+-----------------------+
|     true|                  32422|
|    false|                    296|
+---------+-----------------------+



In [6]:
DeltaTable.createIfNotExists(spark) \
   .tableName("sales.sales_silver") \
   .addColumn("SalesOrderNumber", StringType()) \
   .addColumn("SalesOrderLineNumber", IntegerType()) \
   .addColumn("OrderDate", DateType()) \
   .addColumn("CustomerName", StringType()) \
   .addColumn("Email", StringType()) \
   .addColumn("Item", StringType()) \
   .addColumn("Quantity", IntegerType()) \
   .addColumn("UnitPrice", FloatType()) \
   .addColumn("Tax", FloatType()) \
   .addColumn("FileName", StringType()) \
   .addColumn("IsFlagged", BooleanType()) \
   .addColumn("CreatedTS", TimestampType()) \
   .addColumn("ModifiedTS", TimestampType()) \
   .execute()



sales_silver_tbl = DeltaTable.forPath(spark , 'Tables/sales_silver')
update_dict ={
"SalesOrderLineNumber": "bronze.SalesOrderLineNumber", 
"Email": "bronze.Email", 
"Quantity": "bronze.Quantity", 
"UnitPrice": "bronze.UnitPrice", 
"Tax": "bronze.Tax", 
"FileName": "bronze.FileName", 
"IsFlagged": "bronze.IsFlagged", 
"CreatedTS": "bronze.CreatedTS", 
"ModifiedTS": "bronze.ModifiedTS"
}

insert_dict ={
    "SalesOrderNumber": "bronze.SalesOrderNumber",
"SalesOrderLineNumber": "bronze.SalesOrderLineNumber", 
"OrderDate": "bronze.OrderDate", 
"CustomerName": "bronze.CustomerName", 
"Email": "bronze.Email", 
"Item": "bronze.Item", 
"Quantity": "bronze.Quantity", 
"UnitPrice": "bronze.UnitPrice", 
"Tax": "bronze.Tax", 
"FileName": "bronze.FileName", 
"IsFlagged": "bronze.IsFlagged", 
"CreatedTS": "bronze.CreatedTS", 
"ModifiedTS": "bronze.ModifiedTS"

}

sales_silver_tbl.alias('silver')\
                .merge( orders_data.alias("bronze") , \
                "silver.SalesOrderNumber==bronze.SalesOrderNumber and\
                silver.OrderDate==bronze.OrderDate and \
                silver.CustomerName==bronze.CustomerName and \
                silver.Item==bronze.Item and ")\
                .whenNotMatchedInsert(values=insert_dict)\
                .whenMatchedUpdate(set=update_dict).execute()


StatementMeta(, 44c32148-9305-4348-84ca-ac21dfcf14e4, 8, Finished, Available, Finished)

In [7]:
rev_data = spark.sql("""
SELECT year(OrderDate) , cast(sum(Quantity * UnitPrice ) as Numeric(12,2 ))  as Revenue 
-- , cast(sum((Quantity * UnitPrice ) - Tax ) as Numeric(12,2 ))  as Tax_adjusted_Revenue 
from sales.sales_silver
group by year(OrderDate)
ORDER by year(OrderDate)
""")
display(rev_data)


rev_data_2020 = spark.sql("""

SELECT month(OrderDate) , cast(sum(Quantity * UnitPrice ) as Numeric(12,2 ))  as Revenue 
-- , cast(sum((Quantity * UnitPrice ) - Tax ) as Numeric(12,2 ))  as Tax_adjusted_Revenue 
from sales.sales_silver
where year(OrderDate)=2020
group by month(OrderDate)
ORDER by month(OrderDate)
""")
display(rev_data_2020)


top_cust = spark.sql("""
SELECT  CustomerName , cast(sum(Quantity * UnitPrice ) as Numeric(12,2 ))  as Revenue 
from sales.sales_silver
group by CustomerName
ORDER by Revenue desc  limit 100
""")
display(top_cust)

StatementMeta(, 44c32148-9305-4348-84ca-ac21dfcf14e4, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5429f326-2036-401b-a473-ea4696c6fe69)

SynapseWidget(Synapse.DataFrame, 0be63a56-a04b-49cd-b24e-01ba32e0f057)

SynapseWidget(Synapse.DataFrame, f0d4308b-5d8b-4f6e-a2e1-2c93cd88a69b)