In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

StatementMeta(, b6cee5d0-b6f1-4f74-85ab-d57a2ef70df3, 3, Finished, Available, Finished)

###### <mark>**Creating gold_fact_sale**</mark>

In [2]:
%%sql
CREATE TABLE if not exists Ecommerce_project.gold_fact_sale
(
    Order_ID string,
    Price float,
    Quantity float,
    Sales float,
    Discount float,
    Profit float,
    Shipping_Cost float,
    Order_Date date,
    Shipping_Date date,
    Product_ID integer,
    Priority_ID integer,
    ShipMode_ID integer,
    Customer_ID string,
    Order_Year integer,
    Order_Month integer,
    Created_TS timestamp,
    Modified_TS timestamp
)
USING DELTA
PARTITIONED BY (Order_Year,Order_Month)

StatementMeta(, b6cee5d0-b6f1-4f74-85ab-d57a2ef70df3, 4, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

###### **<mark>Creating data frame with required columns</mark>**

In [3]:
bronze_df = spark.sql("""select
bronze_sales.Order_ID,
bronze_sales.Sales as Price,
bronze_sales.Quantity,
bronze_sales.Sales * bronze_sales.Quantity as Sales,
bronze_sales.Discount,
bronze_sales.Profit,
bronze_sales.Shipping_Cost,
bronze_sales.Order_Date,
bronze_sales.Shipping_Date,
gold_product.Product_ID,
gold_orderpriority.Priority_ID,
gold_shipmode.ShipMode_ID,
bronze_sales.Customer_ID,
Year(Order_Date) as Order_Year,
Month(Order_Date) as Order_Month
from Ecommerce_project.bronze_sales
INNER JOIN Ecommerce_project.gold_product on bronze_sales.Product=gold_product.Product and
                                             bronze_sales.Product_Category=gold_product.Product_Category
INNER JOIN Ecommerce_project.gold_orderpriority on bronze_sales.Order_Priority=gold_orderpriority.Order_Priority
INNER JOIN Ecommerce_project.gold_shipmode on bronze_sales.Ship_Mode=gold_shipmode.Ship_Mode
""")

StatementMeta(, b6cee5d0-b6f1-4f74-85ab-d57a2ef70df3, 5, Finished, Available, Finished)

###### **<mark>Creating view from data frame</mark>**

In [4]:
bronze_df.createOrReplaceTempView("ViewfactSale")

StatementMeta(, b6cee5d0-b6f1-4f74-85ab-d57a2ef70df3, 6, Finished, Available, Finished)

###### **<mark>Inserting data to gold_fact_sale from view using merge query</mark>**

In [5]:
%%sql
merge INTO Ecommerce_project.gold_fact_sale as gfs
USING ViewfactSale as vfs
on gfs.Order_Year=vfs.Order_Year and gfs.Order_Month=vfs.Order_Month and gfs.Order_ID=vfs.Order_ID
WHEN MATCHED THEN
UPDATE SET
gfs.Sales = vfs.Sales,
gfs.Price = vfs.Price,
gfs.Discount =vfs.Discount,
gfs.Quantity =vfs.Quantity,
gfs.Profit = vfs.Profit,
gfs.Shipping_Cost =vfs.Shipping_Cost,
gfs.Order_Date =vfs.Order_Date,
gfs.Shipping_Date =vfs.Shipping_Date,
gfs.Product_ID =vfs.Product_ID,
gfs.Priority_ID =vfs.Priority_ID,
gfs.ShipMode_ID =vfs.ShipMode_ID,
gfs.Customer_ID =vfs.Customer_ID,
gfs.Modified_TS =current_timestamp()


WHEN NOT MATCHED THEN
INSERT
(
gfs.Order_ID,
gfs.Price ,
gfs.Quantity ,
gfs.Sales,
gfs.Discount ,
gfs.Profit ,
gfs.Shipping_Cost,
gfs.Order_Date ,
gfs.Shipping_Date ,
gfs.Product_ID,
gfs.Priority_ID,
gfs.ShipMode_ID,
gfs.Customer_ID,
gfs.Order_Year,
gfs.Order_Month,
gfs.Created_TS,
gfs.Modified_TS
)
VALUES
(
vfs.Order_ID,
vfs.Price ,
vfs.Quantity ,
vfs.Sales,
vfs.Discount ,
vfs.Profit ,
vfs.Shipping_Cost,
vfs.Order_Date ,
vfs.Shipping_Date ,
vfs.Product_ID,
vfs.Priority_ID,
vfs.ShipMode_ID,
vfs.Customer_ID,
vfs.Order_Year,
vfs.Order_Month,
CURRENT_TIMESTAMP(),
CURRENT_TIMESTAMP()
)


StatementMeta(, b6cee5d0-b6f1-4f74-85ab-d57a2ef70df3, 7, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>