## Imports

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Reading datasets

In [0]:
file_path = "dbfs:/mnt/shopping_data_dbfs/Silver-layer/BG-Thushara/capstone-azure-training/raw/main/Data/New_data/inventory_data.parquet"

# Read the CSV file using Spark DataFrame
inventory = spark.read.parquet(file_path, header=True)  
display(inventory) 

Date,Product_ID,Inventory
2019-01-03,PROD039,158
2019-01-12,PROD019,80
2019-01-14,PROD007,73
2019-01-22,PROD036,352
2019-02-01,PROD005,872
2019-02-01,PROD016,423
2019-02-07,PROD035,403
2019-02-21,PROD039,456
2019-02-27,PROD002,438
2019-02-28,PROD033,502


In [0]:
file_path1 = "dbfs:/mnt/shopping_data_dbfs/Silver-layer/BG-Thushara/capstone-azure-training/raw/main/Data/New_data/sales_data.parquet"

# Read the CSV file using Spark DataFrame
sales = spark.read.parquet(file_path1, header=True)  
display(sales) 

sale_id,store_id,customer_id,product_id,sale_date,sale_amount
SALE0470,45,2249,PROD008,2020-04-14,746
SALE0026,44,1894,PROD035,2019-01-26,746
SALE0458,51,1049,PROD026,2020-04-02,746
SALE0705,27,1098,PROD011,2020-12-05,746
SALE0816,57,2105,PROD033,2021-03-26,746
SALE0301,49,2188,PROD009,2019-10-28,746
SALE0511,44,2060,PROD020,2020-05-25,746
SALE0016,33,1726,PROD037,2019-01-16,746
SALE0598,2,2203,PROD033,2020-08-20,746
SALE0232,52,2585,PROD012,2019-08-20,746


In [0]:
inventory.select('Date').agg(max('Date')).show()

+----------+
| max(Date)|
+----------+
|2021-09-27|
+----------+



In [0]:
sales.select('sale_date').agg(max('sale_date')).show()

+--------------+
|max(sale_date)|
+--------------+
|    2046-05-18|
+--------------+



- let us take the data realistic.
- let's put a filter of sales data till max date of inventory.

In [0]:
sales = sales.filter(col('sale_date')<='2021-09-27')

In [0]:
sales.select('sale_date').agg(max('sale_date')).show()

+--------------+
|max(sale_date)|
+--------------+
|    2021-09-27|
+--------------+



### Priliminary data quality checks are done while transitioning the data from bronze layer to the silver layer.
##### Now, we are futher are making the data ready for extracting insights for business problem statements.


In [0]:
## How many stores are there?
sales.select('store_id').distinct().count()

59

In [0]:
inventory.select('Product_ID').distinct().count()

40

In [0]:
sales.select('product_id').distinct().count()

40

- All the products avaliable in inventory are beling sold.

In [0]:
inventory.groupBy().agg(min(col('Inventory')),\
              max(col('Inventory'))).show()

+--------------+--------------+
|min(Inventory)|max(Inventory)|
+--------------+--------------+
|            50|          1000|
+--------------+--------------+



- No where the inventory went to 0, no records can be excluded in that scenario.


In [0]:
## checking if there are any returns tracked
sales.filter(col('sale_amount')<0).show()

+-------+--------+-----------+----------+---------+-----------+
|sale_id|store_id|customer_id|product_id|sale_date|sale_amount|
+-------+--------+-----------+----------+---------+-----------+
+-------+--------+-----------+----------+---------+-----------+



- Therfore no returns are tracked, need not further deal the data.

## We are able to conclude that the data has passed the business validations, now we can load the data into gold layer in the desired format.

### My approach:
- Since the inventory data alone donesent give much insights, let's now join the sales data and inventory data.
- But since we will end up having many to many join and much complex dataset, let's reduce sales data to product and date level.


In [0]:
sales.groupBy('product_id','sale_date').agg(sum('sale_amount').alias('sale_amount'),count('customer_id').alias('number_of_customers'),count('store_id').alias('number_of_stores')).display()

product_id,sale_date,sale_amount,number_of_customers,number_of_stores
PROD002,2019-07-30,199,1,1
PROD040,2019-07-19,1282,1,1
PROD003,2021-02-23,604,1,1
PROD005,2020-09-15,481,1,1
PROD026,2020-05-11,1189,1,1
PROD026,2020-01-13,582,1,1
PROD021,2019-10-03,228,1,1
PROD025,2020-04-27,1112,1,1
PROD034,2020-10-29,1030,1,1
PROD020,2020-05-18,574,1,1


- Number of stores and number of customers is not a meaningful data, as it seems a product on a day is sold at one store and bought by only one customer.
- Hense dropping of this number of customers and number stores. 

In [0]:
prod_lev_sales_data = sales.groupBy('product_id','sale_date').agg(sum('sale_amount').alias('sale_amount'))

In [0]:
join_df = prod_lev_sales_data.withColumnRenamed('product_id','Product_ID')\
                             .withColumnRenamed('sale_date','Date').join(inventory,['Product_ID','Date'],'outer')

In [0]:
for i in join_df.columns:
    if (join_df.filter(col(i).isNull()).count()>0):
        print(i, 'have nulls')
    print(i)

Product_ID
Date
sale_amount have nulls
sale_amount
Inventory


- Therefore, there are no nulls in data, the maping is exactly one to one

In [0]:
join_df.display()

Product_ID,Date,sale_amount,Inventory
PROD002,2019-07-30,199.0,869
PROD040,2019-07-19,1282.0,675
PROD025,2020-04-27,1112.0,930
PROD020,2020-05-18,574.0,746
PROD003,2021-02-23,604.0,635
PROD026,2020-05-11,1189.0,991
PROD005,2020-09-15,481.0,476
PROD021,2019-10-03,228.0,97
PROD026,2020-01-13,582.0,595
PROD034,2020-10-29,1030.0,819


#### Now let's figure out product level information.

In [0]:
product_data = inventory.groupBy('Product_ID').agg(sum('Inventory').alias('Inventory'))\
    .join(sales.groupBy('product_id').agg(sum('sale_amount').alias('sale_amount')\
        ,countDistinct('sale_id').alias('frq_of_product'))\
    .withColumnRenamed('product_id','Product_ID'),'Product_ID','inner')

In [0]:
product_data.display()

Product_ID,Inventory,sale_amount,frq_of_product
PROD011,528231,24756,34
PROD035,534614,17344,27
PROD003,529347,33244,39
PROD025,524151,12971,17
PROD034,501157,12517,15
PROD023,524786,16214,22
PROD036,525849,14863,22
PROD026,530282,19283,24
PROD037,526487,20564,31
PROD027,529457,20049,28


### we have all the final datasets handy.
Writing them to the gold layer.

In [0]:
inventory.coalesce(1).write.mode('overwrite').parquet('dbfs:/mnt/shopping_data_dbfs/Gold-layer/BG-Thushara/capstone-azure-training/raw/main/Data/New_data/inventory_data.parquet')

In [0]:
sales.coalesce(1).write.mode('overwrite').parquet('dbfs:/mnt/shopping_data_dbfs/Gold-layer/BG-Thushara/capstone-azure-training/raw/main/Data/New_data/sales_data.parquet')

In [0]:
join_df.coalesce(1).write.mode('overwrite').parquet('dbfs:/mnt/shopping_data_dbfs/Gold-layer/BG-Thushara/capstone-azure-training/raw/main/Data/New_data/product_level_sales_data.parquet')

In [0]:
product_data.coalesce(1).write.mode('overwrite').parquet('dbfs:/mnt/shopping_data_dbfs/Gold-layer/BG-Thushara/capstone-azure-training/raw/main/Data/New_data/product_data.parquet')