Install PySpark

In [0]:
!pip install pyspark py4j

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-c3bb51da-7d32-4201-8d18-10635bc988e7/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
spark.sparkContext.getConf().getAll()

Out[108]: [('spark.databricks.preemption.enabled', 'true'),
 ('spark.sql.hive.metastore.jars', '/databricks/databricks-hive/*'),
 ('spark.driver.tempDirectory', '/local_disk0/tmp'),
 ('spark.sql.warehouse.dir', 'dbfs:/user/hive/warehouse'),
 ('spark.databricks.managedCatalog.clientClassName',
  'com.databricks.managedcatalog.ManagedCatalogClientImpl'),
 ('spark.databricks.credential.scope.fs.gs.auth.access.tokenProviderClassName',
  'com.databricks.backend.daemon.driver.credentials.CredentialScopeGCPTokenProvider'),
 ('spark.hadoop.fs.fcfs-s3.impl.disable.cache', 'true'),
 ('spark.sql.streaming.checkpointFileManagerClass',
  'com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager'),
 ('spark.databricks.service.dbutils.repl.backend',
  'com.databricks.dbconnect.ReplDBUtils'),
 ('spark.hadoop.databricks.s3.verifyBucketExists.enabled', 'false'),
 ('spark.databricks.clusterUsageTags.clusterOwnerUserId', '2171760166807434'),
 ('spark.streaming.driver.writeAheadLog.allowBatching'

In [0]:
# Importing necessary libraries and modules
from pyspark.sql import SparkSession
# Importing necessary libraries and modules
from pyspark.sql.functions import *
# Importing necessary libraries and modules
from pyspark.sql.types import *
# Importing necessary libraries and modules
from pyspark.sql import Window
# Initialize a Spark session for data processing
spark = SparkSession.builder.appName("E-Commerce Data Pipeline").config('spark.ui.port', '4050').getOrCreate()

Data preprocessing

The items dim table had null value for a column called 'unit'. Dropped the record using df.drop.na() method which drops a record if there is a null value for any of the columns. To specify a particular column use df.na.drop(subset=['col1','col2'])
We can also make use of items.filter(items.unit.isNull()).show() i.e isNull() & isNotNull() 

In [0]:
# Reading data from CSV files into a DataFrame
items=spark.read.format('csv').option('inferSchema',True).option('header',True).load("dbfs:/FileStore/shared_uploads/akagidwani@gmail.com/unprocessed_data/tbl_dim_items.csv")
items.show()
items.printSchema()
items=items.filter(items.unit.isNotNull())
# Applying transformations to columns in the DataFrame
items=items.withColumnRenamed('desc','category').withColumnRenamed('man_country','manufacturer')

+--------+--------------------+------------------+----------+-------------+--------------------+----+
|item_key|           item_name|              desc|unit_price|  man_country|            supplier|unit|
+--------+--------------------+------------------+----------+-------------+--------------------+----+
|  I00001|A&W Root Beer - 1...|a. Beverage - Soda|      11.5|  Netherlands|     Bolsius Boxmeer|cans|
|  I00002|A&W Root Beer Die...|a. Beverage - Soda|      6.75|       poland|  CHROMADURLIN S.A.S|cans|
|  I00003|Barq's Root Beer ...|a. Beverage - Soda|      6.75|   Bangladesh|        DENIMACH LTD|cans|
|  I00004|    Cherry Coke 12oz|a. Beverage - Soda|      6.75|  Netherlands|     Bolsius Boxmeer|cans|
|  I00005|Cherry Coke Zero ...|a. Beverage - Soda|      6.75|      Finland|         HARDFORD AB|cans|
|  I00006|Coke Classic 12 o...|a. Beverage - Soda|     16.25|    Lithuania|            BIGSO AB|cans|
|  I00007|Coke Classic 12 o...|a. Beverage - Soda|      6.75|        India|Indo Co

In [0]:
# Writing the final DataFrame to the target storage or table
items.write.mode('overwrite').saveAsTable("items_tbl")

In [0]:
# Reading data from CSV files into a DataFrame
payments=spark.read.format('csv').option('inferSchema',True).option('header',True).load("dbfs:/FileStore/shared_uploads/akagidwani@gmail.com/unprocessed_data/tbl_dim_payments.csv")
payments.printSchema()
payments.count()
payments=payments.filter(col('bank_name')!='None')
payments.count()

+-----------+----------+--------------------+
|payment_key|trans_type|           bank_name|
+-----------+----------+--------------------+
|       P001|      cash|                None|
|       P002|      card|     AB Bank Limited|
|       P003|      card|Bangladesh Commer...|
|       P004|      card|   Bank Asia Limited|
|       P005|      card|   BRAC Bank Limited|
|       P006|      card|Citizens Bank Lim...|
|       P007|      card|   City Bank Limited|
|       P008|      card|Community Bank Ba...|
|       P009|      card|  Dhaka Bank Limited|
|       P010|      card|Dutch-Bangla Bank...|
|       P011|      card|Eastern Bank Limited|
|       P012|      card|   IFIC Bank Limited|
|       P013|      card| Jamuna Bank Limited|
|       P014|      card| Meghna Bank Limited|
|       P015|      card|Mercantile Bank L...|
|       P016|      card|Midland Bank Limited|
|       P017|      card|Modhumoti Bank Li...|
|       P018|      card|Mutual Trust Bank...|
|       P019|      card|National B

In [0]:
# Writing the final DataFrame to the target storage or table
payments.write.mode('overwrite').saveAsTable("payments_tbl")

In [0]:
# Reading data from CSV files into a DataFrame
time=spark.read.format('csv').option('inferSchema',True).option('header',True).load("dbfs:/FileStore/shared_uploads/akagidwani@gmail.com/unprocessed_data/tbl_dim_time.csv")
time.show()
time.printSchema()
# Applying transformations to columns in the DataFrame
time=time.withColumn('timestamp',to_timestamp(col('date'), 'dd-MM-yyyy HH:mm')).drop('date')
time.printSchema()


+--------+----------------+----+---+--------+-----+-------+----+
|time_key|            date|hour|day|    week|month|quarter|year|
+--------+----------------+----+---+--------+-----+-------+----+
|  T00001|20-05-2017 14:56|  14| 20|3rd Week|    5|     Q2|2017|
|  T00002|30-01-2015 22:14|  22| 30|4th Week|    1|     Q1|2015|
|  T00003|14-03-2020 02:34|   2| 14|2nd Week|    3|     Q1|2020|
|  T00004|27-04-2018 12:19|  12| 27|4th Week|    4|     Q2|2018|
|  T00005|14-04-2018 10:43|  10| 14|2nd Week|    4|     Q2|2018|
|  T00006|10-02-2017 06:56|   6| 10|2nd Week|    2|     Q1|2017|
|  T00007|06-05-2015 17:52|  17|  6|1st Week|    5|     Q2|2015|
|  T00008|23-09-2014 13:59|  13| 23|4th Week|    9|     Q3|2014|
|  T00009|16-04-2016 06:45|   6| 16|3rd Week|    4|     Q2|2016|
|  T00010|26-10-2015 22:00|  22| 26|4th Week|   10|     Q4|2015|
|  T00011|02-05-2018 16:40|  16|  2|1st Week|    5|     Q2|2018|
|  T00012|29-11-2017 18:55|  18| 29|4th Week|   11|     Q4|2017|
|  T00013|17-09-2014 22:4

In [0]:
# Writing the final DataFrame to the target storage or table
time.write.mode('overwrite').saveAsTable('time_tbl')

In [0]:
# Reading data from CSV files into a DataFrame
merchants=spark.read.format('csv').option('inferSchema',True).option('header',True).load("dbfs:/FileStore/shared_uploads/akagidwani@gmail.com/unprocessed_data/tbl_dim_merchants.csv")
merchants.show()
merchants.printSchema()
# Applying transformations to columns in the DataFrame
merchants=merchants.withColumnRenamed("store_key","merchant_key").withColumnRenamed("upazila","city")

+---------+--------+-----------+-----------------+
|store_key|division|   district|          upazila|
+---------+--------+-----------+-----------------+
|    S0001|  SYLHET|   HABIGANJ|       AJMIRIGANJ|
|    S0002|  SYLHET|   HABIGANJ|          BAHUBAL|
|    S0003|  SYLHET|   HABIGANJ|       BANIACHONG|
|    S0004|  SYLHET|   HABIGANJ|      CHUNARUGHAT|
|    S0005|  SYLHET|   HABIGANJ|   HABIGANJ SADAR|
|    S0006|  SYLHET|   HABIGANJ|           LAKHAI|
|    S0007|  SYLHET|   HABIGANJ|        MADHABPUR|
|    S0008|  SYLHET|   HABIGANJ|         NABIGANJ|
|    S0009|  SYLHET|MAULVIBAZAR|         BARLEKHA|
|    S0010|  SYLHET|MAULVIBAZAR|             JURI|
|    S0011|  SYLHET|MAULVIBAZAR|        KAMALGANJ|
|    S0012|  SYLHET|MAULVIBAZAR|          KULAURA|
|    S0013|  SYLHET|MAULVIBAZAR|MAULVIBAZAR SADAR|
|    S0014|  SYLHET|MAULVIBAZAR|         RAJNAGAR|
|    S0015|  SYLHET|MAULVIBAZAR|       SREEMANGAL|
|    S0016|  SYLHET|  SUNAMGANJ|    BISHWAMBARPUR|
|    S0017|  SYLHET|  SUNAMGANJ

In [0]:
# Writing the final DataFrame to the target storage or table
merchants.write.mode('overwrite').saveAsTable('merchants_tbl')

In [0]:
# Reading data from CSV files into a DataFrame
customers=spark.read.format('csv').option('inferSchema',True).option('header',True).load("dbfs:/FileStore/shared_uploads/akagidwani@gmail.com/unprocessed_data/tbl_dim_customers.csv")
customers.show()
customers.printSchema()
# Applying transformations to columns in the DataFrame
customers=customers.withColumnRenamed("coustomer_key","customer_key")

+-------------+----------------+-------------+-------------+
|coustomer_key|            name|   contact_no|          nid|
+-------------+----------------+-------------+-------------+
|      C000001|           sumit|8801920345851|7505075708899|
|      C000002|        tammanne|8801817069329|1977731324842|
|      C000003|   kailash kumar|8801663795774|3769494056318|
|      C000004| bhagwati prasad|8801533627961|9378834712725|
|      C000005|            ajay|8801943715786|3540815556323|
|      C000006|        silender|8801586293092|8516471122484|
|      C000007|          deepak|8801839144857|6155559095495|
|      C000008|        akhilesh|8801721432538|3287330990302|
|      C000009|  dipendra kumar|8801690939578|3411433613839|
|      C000010|           nitin|8801515224771|9450023534903|
|      C000011|doodhnath pandit|8801738809307|6062101033058|
|      C000012|     aslam allam|8801748537389|9707236385043|
|      C000013|           rahul|8801737700905|5790562512938|
|      C000014|  jitende

In [0]:
# Writing the final DataFrame to the target storage or table
customers.write.mode('overwrite').saveAsTable('customer_tbl')

In [0]:
# Reading data from CSV files into a DataFrame
orders=spark.read.format('csv').option('inferSchema',True).option('header',True).load("dbfs:/FileStore/shared_uploads/akagidwani@gmail.com/unprocessed_data/tbl_fact_orders.csv")
orders.show()
orders.printSchema()
# Applying transformations to columns in the DataFrame
orders=orders.withColumnRenamed("store_key","merchant_key").withColumnRenamed("coustomer_key","customer_key").drop('unit','unit_price','total_price')


+-----------+-------------+--------+--------+---------+--------+-------+----------+-----------+
|payment_key|coustomer_key|time_key|item_key|store_key|quantity|   unit|unit_price|total_price|
+-----------+-------------+--------+--------+---------+--------+-------+----------+-----------+
|       P026|      C004510| T049189|  I00177|   S00307|       1|     ct|      35.0|       35.0|
|       P022|      C008967| T041209|  I00248|   S00595|       1|  rolls|      26.0|       26.0|
|       P030|      C007261|  T03633|  I00195|   S00496|       8|     ct|      12.5|      100.0|
|       P032|      C007048| T084631|  I00131|    S0086|       8|     ct|      14.0|      112.0|
|       P014|      C006430| T071276|  I00050|   S00488|       8|   cans|       8.0|       64.0|
|       P006|      C007574| T026950|  I00058|   S00328|       5|bottles|      22.0|      110.0|
|       P023|      C005695| T096704|  I00075|   S00196|       2|   cans|      15.5|       31.0|
|       P027|      C008728| T010756|  I0

Orders RCA

In [0]:
# orders.show()
# Performing a join operation to combine DataFrames
orders=orders.join(items,items.item_key==orders.item_key,'left').where(col("unit").isNotNull()).select(orders["*"])
orders.count()
#Null count: 3723
#Not Null count: 996277

Out[133]: 996277

In [0]:
orders.select(col('item_key')).distinct().count()

Out[134]: 263

In [0]:
# Performing a join operation to combine DataFrames
orders=orders.join(time,time.time_key==orders.time_key,"left")\
    .select(orders["*"],time["timestamp"]).orderBy(time["timestamp"])#,orders["customer_key"]
orders.show()

+-----------+------------+--------+--------+------------+--------+-------------------+
|payment_key|customer_key|time_key|item_key|merchant_key|quantity|          timestamp|
+-----------+------------+--------+--------+------------+--------+-------------------+
|       P014|     C005534| T017051|  I00054|       S0064|       2|2014-01-20 14:06:00|
|       P024|     C007996| T017051|  I00204|      S00187|       6|2014-01-20 14:06:00|
|       P017|     C004986| T017051|  I00130|       S0075|       2|2014-01-20 14:06:00|
|       P030|     C005284| T017051|  I00234|      S00123|       4|2014-01-20 14:06:00|
|       P024|     C004079| T017051|  I00149|      S00463|       6|2014-01-20 14:06:00|
|       P024|     C002449| T017051|  I00109|      S00226|       6|2014-01-20 14:06:00|
|       P014|     C001335| T017051|  I00214|       S0038|      11|2014-01-20 14:06:00|
|       P023|     C005337| T017051|  I00057|      S00143|       1|2014-01-20 14:06:00|
|       P018|     C002181| T017051|  I00135

In [0]:
orders.printSchema()

root
 |-- payment_key: string (nullable = true)
 |-- customer_key: string (nullable = true)
 |-- time_key: string (nullable = true)
 |-- item_key: string (nullable = true)
 |-- merchant_key: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [0]:
orders.count()

Out[137]: 996277

In [0]:
windowSpec=Window.partitionBy().orderBy('timestamp')
# Applying transformations to columns in the DataFrame
orders=orders.withColumn("order_key",row_number().over(windowSpec)).select("*")

In [0]:
orders.printSchema()

root
 |-- payment_key: string (nullable = true)
 |-- customer_key: string (nullable = true)
 |-- time_key: string (nullable = true)
 |-- item_key: string (nullable = true)
 |-- merchant_key: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- order_key: integer (nullable = false)



In [0]:
orders.select('order_key').distinct().count()

Out[142]: 996277

In [0]:
# Writing the final DataFrame to the target storage or table
orders.write.mode('overwrite').saveAsTable('orders_tbl')

In [0]:
orders = (
    orders.filter(year(col("timestamp")) == 2020)
    .drop("timestamp")
# Applying transformations to columns in the DataFrame
    .withColumn("order_status", lit("Delivered"))
# Applying transformations to columns in the DataFrame
    .withColumn("modification_time", current_timestamp())
)
# Writing the final DataFrame to the target storage or table
orders.write.mode("overwrite").saveAsTable("orders_stg_tbl")

In [0]:
orders.count()

Out[149]: 142126

In [0]:
denorm_df = (
# Performing a join operation to combine DataFrames
    orders.join(items, 'item_key', "left")
# Performing a join operation to combine DataFrames
    .join(payments, 'payment_key', "left")
# Performing a join operation to combine DataFrames
    .join(customers, 'customer_key', "left")
# Performing a join operation to combine DataFrames
    .join(time, 'time_key', "left")
# Performing a join operation to combine DataFrames
    .join(merchants, 'merchant_key', "left")
)
denorm_df.show(truncate=False)

+------------+--------+------------+-----------+--------+--------+---------+------------+----------------------+---------------------------------------+---------------------------+----------+-------------+------------------------------+-------+----------+---------------------------------------+--------------+-------------+-------------+----+---+--------+-----+-------+----+-------------------+----------+------------+-----------------+
|merchant_key|time_key|customer_key|payment_key|item_key|quantity|order_key|order_status|modification_time     |item_name                              |category                   |unit_price|manufacturer |supplier                      |unit   |trans_type|bank_name                              |name          |contact_no   |nid          |hour|day|week    |month|quarter|year|timestamp          |division  |district    |city             |
+------------+--------+------------+-----------+--------+--------+---------+------------+----------------------+------------

In [0]:
denorm_df.printSchema()

root
 |-- merchant_key: string (nullable = true)
 |-- time_key: string (nullable = true)
 |-- customer_key: string (nullable = true)
 |-- payment_key: string (nullable = true)
 |-- item_key: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- order_key: integer (nullable = false)
 |-- order_status: string (nullable = false)
 |-- modification_time: timestamp (nullable = false)
 |-- item_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- trans_type: string (nullable = true)
 |-- bank_name: string (nullable = true)
 |-- name: string (nullable = true)
 |-- contact_no: long (nullable = true)
 |-- nid: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: string (nullable = true)
 |-- month: integer (nullable = true)
 |-- quarter: string

In [0]:
denorm_df=denorm_df.select('order_key','item_key','item_name','category','unit_price','quantity',(col('unit_price')*col('quantity')).alias('total_price'),'order_status','merchant_key','manufacturer','supplier','payment_key','trans_type','customer_key','name','contact_no','time_key','timestamp')
denorm_df.show()

+---------+--------+--------------------+--------------------+----------+--------+-----------+------------+------------+-------------+--------------------+-----------+----------+------------+--------------+-------------+--------+-------------------+
|order_key|item_key|           item_name|            category|unit_price|quantity|total_price|order_status|merchant_key| manufacturer|            supplier|payment_key|trans_type|customer_key|          name|   contact_no|time_key|          timestamp|
+---------+--------+--------------------+--------------------+----------+--------+-----------+------------+------------+-------------+--------------------+-----------+----------+------------+--------------+-------------+--------+-------------------+
|   845855|  I00229|Kellogg's Cereals...|       Food - Sweets|      13.0|       7|       91.0|   Delivered|      S00198|      Germany|  Friedola 1888 GmbH|       P012|      card|     C006730|       shabnur|8801896338860|  T00470|2020-01-01 01:35:00|


In [0]:
denorm_df.printSchema()

root
 |-- order_key: integer (nullable = false)
 |-- item_key: string (nullable = true)
 |-- item_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: double (nullable = true)
 |-- order_status: string (nullable = false)
 |-- merchant_key: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- payment_key: string (nullable = true)
 |-- trans_type: string (nullable = true)
 |-- customer_key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- contact_no: long (nullable = true)
 |-- time_key: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [0]:
denorm_df.count()

Out[160]: 142126

##SCD Type 2 Operations

In [0]:
%sql
drop schema if exists temp_db CASCADE;
create database temp_db;

In [0]:
%sql
create or replace table temp_db.items_table_stg as
select * from items_tbl;

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from temp_db.items_table_stg;           

item_key,item_name,category,unit_price,manufacturer,supplier,unit
I00001,A&W Root Beer - 12 oz cans,a. Beverage - Soda,11.5,Netherlands,Bolsius Boxmeer,cans
I00002,A&W Root Beer Diet - 12 oz cans,a. Beverage - Soda,6.75,poland,CHROMADURLIN S.A.S,cans
I00003,Barq's Root Beer - 12 oz cans,a. Beverage - Soda,6.75,Bangladesh,DENIMACH LTD,cans
I00004,Cherry Coke 12oz,a. Beverage - Soda,6.75,Netherlands,Bolsius Boxmeer,cans
I00005,Cherry Coke Zero 12 pack,a. Beverage - Soda,6.75,Finland,HARDFORD AB,cans
I00006,Coke Classic 12 oz cans,a. Beverage - Soda,16.25,Lithuania,BIGSO AB,cans
I00007,Coke Classic 12 oz cans,a. Beverage - Soda,6.75,India,Indo Count Industries Ltd,cans
I00008,Coke Zero Sugar 12 oz cans,a. Beverage - Soda,16.25,India,Indo Count Industries Ltd,cans
I00009,Diet Coke - 12 oz cans,a. Beverage - Soda,16.25,Netherlands,Bolsius Boxmeer,cans
I00010,Diet Coke - 12 oz cans,a. Beverage - Soda,6.75,Lithuania,BIGSO AB,cans


In [0]:
%sql
describe table temp_db.items_table_stg  

col_name,data_type,comment
item_key,string,
item_name,string,
category,string,
unit_price,double,
manufacturer,string,
supplier,string,
unit,string,


In [0]:
%sql
-- delete from items_table_dim;
create or replace table items_table_dim(
  items_surrogate_key string,
  item_key string,
  item_name string,
  category string,
  unit_price double,
  manufacturer string,
  supplier string,
  unit string,
  start_date timestamp,
  end_date timestamp,
  is_valid string
)
using delta
location "/user/hive/warehouse/items_table/"

In [0]:
%sql
select * from items_table_dim order by item_key

items_surrogate_key,item_key,item_name,category,unit_price,manufacturer,supplier,unit,start_date,end_date,is_valid


In [0]:
%sql
DESCRIBE EXTENDED items_table_dim;

col_name,data_type,comment
items_surrogate_key,string,
item_key,string,
item_name,string,
category,string,
unit_price,double,
manufacturer,string,
supplier,string,
unit,string,
start_date,timestamp,
end_date,timestamp,


In [0]:
# spark.sql("select * from temp_db.items_table_stg").show()
spark.sql("select count(*) from temp_db.items_table_stg").show()

+--------+
|count(1)|
+--------+
|     263|
+--------+



In [0]:
# spark.sql("select * from items_table_dim").show()
spark.sql("select count(*) from items_table_dim").show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



In [0]:
%sql
select * from temp_db.items_table_stg where item_key='I00001'

item_key,item_name,category,unit_price,manufacturer,supplier,unit
I00001,A&W Root Beer - 12 oz cans,a. Beverage - Soda,11.5,Netherlands,Bolsius Boxmeer,cans


In [0]:
%sql
-- Describe table
DESCRIBE EXTENDED temp_db.items_table_stg;

col_name,data_type,comment
item_key,string,
item_name,string,
category,string,
unit_price,double,
manufacturer,string,
supplier,string,
unit,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
spark.sql("select * from items_table_dim order by item_key").show(truncate=False)

+-------------------+--------+---------+--------+----------+------------+--------+----+----------+--------+--------+
|items_surrogate_key|item_key|item_name|category|unit_price|manufacturer|supplier|unit|start_date|end_date|is_valid|
+-------------------+--------+---------+--------+----------+------------+--------+----+----------+--------+--------+
+-------------------+--------+---------+--------+----------+------------+--------+----+----------+--------+--------+



In [0]:
spark.sql("select * from items_table_dim where item_key='I00001'").show(truncate=False)

+-------------------+--------+---------+--------+----------+------------+--------+----+----------+--------+--------+
|items_surrogate_key|item_key|item_name|category|unit_price|manufacturer|supplier|unit|start_date|end_date|is_valid|
+-------------------+--------+---------+--------+----------+------------+--------+----+----------+--------+--------+
+-------------------+--------+---------+--------+----------+------------+--------+----+----------+--------+--------+



In [0]:
%sql
select count(*) FROM temp_db.items_table_stg

count(1)
263


In [0]:
%sql
update temp_db.items_table_stg
set unit_price=69.69
where item_key='I00001'

num_affected_rows
1


In [0]:
%sql
MERGE INTO items_table_dim a
USING
(
    -- Block 1: Existing records (for updates or no change)
    SELECT 
        a.item_key AS mergeKey, 
        NULL AS items_surrogate_key, 
        a.item_key,
        a.item_name, 
        a.category, 
        a.unit_price, 
        a.manufacturer, 
        a.supplier, 
        a.unit, 
        NULL AS start_date, 
        NULL AS end_date, 
        NULL AS is_valid
    FROM temp_db.items_table_stg a

    UNION 

    -- Block 2: Records requiring updates only (existing records with a price difference)
    SELECT 
        NULL AS mergeKey, 
        NULL AS items_surrogate_key, 
        a.item_key,
        a.item_name, 
        a.category, 
        a.unit_price, 
        a.manufacturer, 
        a.supplier, 
        a.unit, 
        current_timestamp() AS start_date, 
        '9999-12-31 00:00:00.000' AS end_date, 
        'True' AS is_valid
    FROM temp_db.items_table_stg a
    JOIN items_table_dim b
    ON a.item_key = b.item_key 
    WHERE b.unit_price != a.unit_price AND b.is_valid = 'True'

    UNION ALL

    SELECT b.item_key AS mergeKey, b.*
    FROM items_table_dim b
    LEFT JOIN temp_db.items_table_stg a
    ON a.item_key = b.item_key
    WHERE a.item_key IS NULL AND b.end_date = '9999-12-31 00:00:00.000' AND b.is_valid = 'True'

) b
ON a.item_key = b.mergeKey

-- Update matching records with price change (set end_date and is_valid)
WHEN MATCHED AND b.unit_price != a.unit_price AND a.end_date = '9999-12-31 00:00:00.000' THEN
    UPDATE SET 
        a.end_date = current_timestamp() - INTERVAL 1 SECOND, 
        a.is_valid = 'False'
WHEN MATCHED AND b.is_valid = 'True' AND a.is_valid = 'True' THEN
    UPDATE SET a.is_valid = 'Deleted', a.end_date = current_timestamp() - INTERVAL 1 SECOND
-- Insert new records and updated versions of existing records
WHEN NOT MATCHED THEN
    INSERT (items_surrogate_key,
            item_key,
            item_name,
            category,
            unit_price,
            manufacturer,
            supplier,
            unit,
            start_date,
            end_date,
            is_valid)
    VALUES (concat(item_key, "-", unix_timestamp(current_timestamp())),
            item_key,
            item_name,
            category,
            unit_price,
            manufacturer,
            supplier,
            unit,
            current_timestamp(), 
            '9999-12-31 00:00:00.000',
            'True'
            )


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
2,1,0,1


In [0]:
%sql
select * from items_table_dim order by start_date desc, item_key;
-- select count(*) from items_table_dim;

items_surrogate_key,item_key,item_name,category,unit_price,manufacturer,supplier,unit,start_date,end_date,is_valid
I00001-1728757917,I00001,A&W Root Beer - 12 oz cans,a. Beverage - Soda,69.69,Netherlands,Bolsius Boxmeer,cans,2024-10-12T18:31:57.097+0000,9999-12-31T00:00:00.000+0000,True
I00001-1728757905,I00001,A&W Root Beer - 12 oz cans,a. Beverage - Soda,11.5,Netherlands,Bolsius Boxmeer,cans,2024-10-12T18:31:45.736+0000,2024-10-12T18:31:56.097+0000,False
I00002-1728757905,I00002,A&W Root Beer Diet - 12 oz cans,a. Beverage - Soda,6.75,poland,CHROMADURLIN S.A.S,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True
I00003-1728757905,I00003,Barq's Root Beer - 12 oz cans,a. Beverage - Soda,6.75,Bangladesh,DENIMACH LTD,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True
I00004-1728757905,I00004,Cherry Coke 12oz,a. Beverage - Soda,6.75,Netherlands,Bolsius Boxmeer,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True
I00005-1728757905,I00005,Cherry Coke Zero 12 pack,a. Beverage - Soda,6.75,Finland,HARDFORD AB,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True
I00006-1728757905,I00006,Coke Classic 12 oz cans,a. Beverage - Soda,16.25,Lithuania,BIGSO AB,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True
I00007-1728757905,I00007,Coke Classic 12 oz cans,a. Beverage - Soda,6.75,India,Indo Count Industries Ltd,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True
I00008-1728757905,I00008,Coke Zero Sugar 12 oz cans,a. Beverage - Soda,16.25,India,Indo Count Industries Ltd,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True
I00009-1728757905,I00009,Diet Coke - 12 oz cans,a. Beverage - Soda,16.25,Netherlands,Bolsius Boxmeer,cans,2024-10-12T18:31:45.736+0000,9999-12-31T00:00:00.000+0000,True


##Incremental Load

In [0]:
%sql
create or replace table orders_delta_load
as select payment_key, customer_key, time_key, item_key, merchant_key, quantity, order_key, 'In transit' order_status, current_timestamp() modification_time from orders_tbl
where date(timestamp) = '2021-01-01';

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- select count(*) from orders_stg_tbl
select count(*) from orders_delta_load

count(1)
385


In [0]:
denorm_df.printSchema()

root
 |-- order_key: integer (nullable = false)
 |-- item_key: string (nullable = true)
 |-- item_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: double (nullable = true)
 |-- order_status: string (nullable = false)
 |-- merchant_key: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- payment_key: string (nullable = true)
 |-- trans_type: string (nullable = true)
 |-- customer_key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- contact_no: long (nullable = true)
 |-- time_key: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [0]:

%sql
create
or replace table denorm_delta_load as
select
  o.order_key,
  i.item_key,
  i.item_name,
  i.category,
  i.unit_price,
  o.quantity,
  o.quantity * i.unit_price as total_price,
  o.order_status,
  m.merchant_key,
  i.manufacturer,
  i.supplier,
  p.payment_key,
  p.trans_type,
  c.customer_key,
  c.name,
  c.contact_no,
  t.`timestamp`
# Importing necessary libraries and modules
from
  orders_delta_load o
# Performing a join operation to combine DataFrames
  left join items_table_dim i on o.item_key = i.item_key
  and i.is_valid = true
# Performing a join operation to combine DataFrames
  left join payments_tbl p on o.payment_key = p.payment_key
# Performing a join operation to combine DataFrames
  left join customer_tbl c on o.customer_key = c.customer_key
# Performing a join operation to combine DataFrames
  left join time_tbl t on o.time_key = t.time_key
# Performing a join operation to combine DataFrames
  left join merchants_tbl m on o.merchant_key = m.merchant_key

num_affected_rows,num_inserted_rows


In [0]:
%sql
select count(*) from denorm_delta_load

count(1)
385


In [0]:
%sql
select * from orders_stg_tbl;

payment_key,customer_key,time_key,item_key,merchant_key,quantity,order_key,order_status,modification_time
P012,C006730,T00470,I00229,S00198,7,845855,Delivered,2024-10-12T18:29:52.191+0000
P018,C005524,T00470,I00032,S00262,9,845856,Delivered,2024-10-12T18:29:52.191+0000
P018,C004732,T00470,I00133,S00346,8,845857,Delivered,2024-10-12T18:29:52.191+0000
P017,C003729,T00470,I00020,S00102,2,845858,Delivered,2024-10-12T18:29:52.191+0000
P015,C004828,T00470,I00191,S00242,8,845859,Delivered,2024-10-12T18:29:52.191+0000
P003,C009091,T00470,I00055,S00163,9,845860,Delivered,2024-10-12T18:29:52.191+0000
P026,C006493,T00470,I00130,S00412,8,845861,Delivered,2024-10-12T18:29:52.191+0000
P007,C007536,T00470,I00246,S00141,2,845862,Delivered,2024-10-12T18:29:52.191+0000
P003,C000696,T00470,I00150,S00518,5,845863,Delivered,2024-10-12T18:29:52.191+0000
P020,C003651,T00470,I00001,S00411,11,845864,Delivered,2024-10-12T18:29:52.191+0000


In [0]:
%sql
select count(*) from orders_stg_tbl;


count(1)
142126


In [0]:
%sql
describe table orders_tbl;

col_name,data_type,comment
payment_key,string,
customer_key,string,
time_key,string,
item_key,string,
merchant_key,string,
quantity,int,
timestamp,timestamp,
order_key,int,


In [0]:
%sql
describe table orders_stg_tbl;

col_name,data_type,comment
payment_key,string,
customer_key,string,
time_key,string,
item_key,string,
merchant_key,string,
quantity,int,
order_key,int,
order_status,string,
modification_time,timestamp,


In [0]:
%sql
select count(*) from orders_stg_tbl;

count(1)
142126


In [0]:
%sql
create or replace table temp_db.orders_final_tbl
as select * from orders_stg_tbl limit 0;

-- select * from temp_db.orders_final_tbl;

num_affected_rows,num_inserted_rows


In [0]:
%sql
select count(*) from temp_db.orders_final_tbl;

count(1)
0


In [0]:
%sql
select count(*) from temp_db.orders_final_tbl;

count(1)
0


In [0]:
%sql
update orders_stg_tbl 
set order_status='Canceled', modification_time = current_timestamp()
where order_key=845855

num_affected_rows
1


In [0]:
%sql
Merge into temp_db.orders_final_tbl a
using (
  select * from orders_stg_tbl where modification_time > (select ifnull(max(modification_time),'1900-01-01T00:00:00') from temp_db.orders_final_tbl)
) b
on a.order_key=b.order_key
when matched then 
  update set a.order_status = b.order_status, a.modification_time = current_timestamp()
when not matched then
  INSERT (
    payment_key,
    customer_key,
    time_key,
    item_key,
    merchant_key,
    quantity,
    order_key,
    order_status,
    modification_time
) VALUES (
    b.payment_key,
    b.customer_key,
    b.time_key,
    b.item_key,
    b.merchant_key,
    b.quantity,
    b.order_key,
    b.order_status,
    current_timestamp()
)


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1,1,0,0


In [0]:
%sql
select * from temp_db.orders_final_tbl
order by order_key;

payment_key,customer_key,time_key,item_key,merchant_key,quantity,order_key,order_status,modification_time
P012,C006730,T00470,I00229,S00198,7,845855,Canceled,2024-10-12T18:32:57.105+0000
P018,C005524,T00470,I00032,S00262,9,845856,Delivered,2024-10-12T18:32:40.720+0000
P018,C004732,T00470,I00133,S00346,8,845857,Delivered,2024-10-12T18:32:40.720+0000
P017,C003729,T00470,I00020,S00102,2,845858,Delivered,2024-10-12T18:32:40.720+0000
P015,C004828,T00470,I00191,S00242,8,845859,Delivered,2024-10-12T18:32:40.720+0000
P003,C009091,T00470,I00055,S00163,9,845860,Delivered,2024-10-12T18:32:40.720+0000
P026,C006493,T00470,I00130,S00412,8,845861,Delivered,2024-10-12T18:32:40.720+0000
P007,C007536,T00470,I00246,S00141,2,845862,Delivered,2024-10-12T18:32:40.720+0000
P003,C000696,T00470,I00150,S00518,5,845863,Delivered,2024-10-12T18:32:40.720+0000
P020,C003651,T00470,I00001,S00411,11,845864,Delivered,2024-10-12T18:32:40.720+0000


In [0]:
denorm_df.show()

+---------+--------+--------------------+--------------------+----------+--------+-----------+------------+------------+-------------+--------------------+-----------+----------+------------+--------------+-------------+--------+-------------------+
|order_key|item_key|           item_name|            category|unit_price|quantity|total_price|order_status|merchant_key| manufacturer|            supplier|payment_key|trans_type|customer_key|          name|   contact_no|time_key|          timestamp|
+---------+--------+--------------------+--------------------+----------+--------+-----------+------------+------------+-------------+--------------------+-----------+----------+------------+--------------+-------------+--------+-------------------+
|   845855|  I00229|Kellogg's Cereals...|       Food - Sweets|      13.0|       7|       91.0|   Delivered|      S00198|      Germany|  Friedola 1888 GmbH|       P012|      card|     C006730|       shabnur|8801896338860|  T00470|2020-01-01 01:35:00|


In [0]:
# # %rm -r "/dbfs:/user/hive/warehouse/denorm_table"
# dbutils.fs.rm('/dbfs:/user/hive/warehouse/denorm_table1')

Out[231]: False

In [0]:
# Writing the final DataFrame to the target storage or table
denorm_df.write.mode('overwrite').saveAsTable('denorm_table2')

In [0]:
%sql
create or replace table denorm_table as
select 
  o.order_key,  
  d.item_key,
  d.item_name,
  d.category,
  d.unit_price,
  d.quantity,
  d.quantity * d.unit_price as total_price,
  o.order_status,
  d.merchant_key,
  d.manufacturer,
  d.supplier,
  d.payment_key,
  d.trans_type,
  d.customer_key,
  d.name,
  d.contact_no,
# Performing a join operation to combine DataFrames
  d.`timestamp` from denorm_table2 d left join temp_db.orders_final_tbl o on d.order_key=o.order_key
-- where d.order_key=845855

num_affected_rows,num_inserted_rows


In [0]:
%sql
select * from denorm_table where order_key=845855

order_key,item_key,item_name,category,unit_price,quantity,total_price,order_status,merchant_key,manufacturer,supplier,payment_key,trans_type,customer_key,name,contact_no,timestamp
845855,I00229,Kellogg's Cereals Assortment Pack,Food - Sweets,13.0,7,91.0,Canceled,S00198,Germany,Friedola 1888 GmbH,P012,card,C006730,shabnur,8801896338860,2020-01-01T01:35:00.000+0000


In [0]:
%sql
insert into denorm_table 
select * from denorm_delta_load

num_affected_rows,num_inserted_rows
385,385


In [0]:
%sql
select count(*) from denorm_table

count(1)
142511


In [0]:
%sql
select * from denorm_table where unit_price=69.69

order_key,item_key,item_name,category,unit_price,quantity,total_price,order_status,merchant_key,manufacturer,supplier,payment_key,trans_type,customer_key,name,contact_no,timestamp
988178,I00001,A&W Root Beer - 12 oz cans,a. Beverage - Soda,69.69,11,766.5899999999999,In transit,S00222,Netherlands,Bolsius Boxmeer,P030,card,C005922,madhuri,8801796692296,2021-01-01T10:27:00.000+0000
988199,I00001,A&W Root Beer - 12 oz cans,a. Beverage - Soda,69.69,3,209.07,In transit,S00427,Netherlands,Bolsius Boxmeer,P023,card,C006115,komal,8801885896877,2021-01-01T10:59:00.000+0000
988234,I00001,A&W Root Beer - 12 oz cans,a. Beverage - Soda,69.69,10,696.9,In transit,S00116,Netherlands,Bolsius Boxmeer,P019,card,C006625,neha,8801968715473,2021-01-01T11:56:00.000+0000


#E-commerce data analysis

In [0]:
denorm_df=spark.read.table("denorm_table")
denorm_df.printSchema()
denorm_df.show()

root
 |-- order_key: integer (nullable = true)
 |-- item_key: string (nullable = true)
 |-- item_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_price: double (nullable = true)
 |-- order_status: string (nullable = true)
 |-- merchant_key: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- payment_key: string (nullable = true)
 |-- trans_type: string (nullable = true)
 |-- customer_key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- contact_no: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+---------+--------+--------------------+--------------------+----------+--------+-----------+------------+------------+-------------+--------------------+-----------+----------+------------+--------------+-------------+-------------------+
|order_key|item_key|           item_name|  

####Daily revenue

In [0]:
denorm_df.filter(to_date('timestamp')=='2020-12-31').select(sum(col("total_price"))).show()

+----------------+
|sum(total_price)|
+----------------+
|         45867.5|
+----------------+



####Seller with most sale

In [0]:
merchants.show()

+------------+--------+-----------+-----------------+
|merchant_key|division|   district|             city|
+------------+--------+-----------+-----------------+
|       S0001|  SYLHET|   HABIGANJ|       AJMIRIGANJ|
|       S0002|  SYLHET|   HABIGANJ|          BAHUBAL|
|       S0003|  SYLHET|   HABIGANJ|       BANIACHONG|
|       S0004|  SYLHET|   HABIGANJ|      CHUNARUGHAT|
|       S0005|  SYLHET|   HABIGANJ|   HABIGANJ SADAR|
|       S0006|  SYLHET|   HABIGANJ|           LAKHAI|
|       S0007|  SYLHET|   HABIGANJ|        MADHABPUR|
|       S0008|  SYLHET|   HABIGANJ|         NABIGANJ|
|       S0009|  SYLHET|MAULVIBAZAR|         BARLEKHA|
|       S0010|  SYLHET|MAULVIBAZAR|             JURI|
|       S0011|  SYLHET|MAULVIBAZAR|        KAMALGANJ|
|       S0012|  SYLHET|MAULVIBAZAR|          KULAURA|
|       S0013|  SYLHET|MAULVIBAZAR|MAULVIBAZAR SADAR|
|       S0014|  SYLHET|MAULVIBAZAR|         RAJNAGAR|
|       S0015|  SYLHET|MAULVIBAZAR|       SREEMANGAL|
|       S0016|  SYLHET|  SUN

In [0]:
denorm_df\
    .filter(to_date(col("timestamp"))=='2020-12-31')\
    .groupBy(denorm_df.merchant_key)\
    .agg(sum('total_price').alias('sale_by_merchant'))\
    .orderBy(col('sale_by_merchant').desc()).show(5)

+------------+----------------+
|merchant_key|sale_by_merchant|
+------------+----------------+
|      S00102|          938.25|
|       S0085|           792.0|
|      S00392|           637.0|
|      S00686|           605.0|
|      S00351|           567.0|
+------------+----------------+
only showing top 5 rows



####Most profitable category

In [0]:
items.groupBy('category').agg(count('category').alias('count')).orderBy(col('count').desc()).show()

+--------------------+-----+
|            category|count|
+--------------------+-----+
|  a. Beverage - Soda|   29|
|      Food - Healthy|   29|
|a. Beverage Spark...|   21|
|        Food - Chips|   20|
|    Kitchen Supplies|   19|
|    Food - Chocolate|   14|
|Beverage - Energy...|   13|
|       Food - Sweets|   11|
| Beverage - Gatorade|   10|
|      Beverage Water|    9|
|         Gum - Mints|    8|
|    Coffee Sweetener|    8|
|       Coffee K-Cups|    7|
|    Beverage - Juice|    7|
| Beverage - Iced Tea|    7|
| Dishware - Cups Hot|    6|
|         Food - Nuts|    6|
|   Dishware - Plates|    6|
|        Coffee Cream|    6|
|       Coffee Ground|    5|
+--------------------+-----+
only showing top 20 rows



In [0]:
denorm_df.groupBy('category').agg(sum('total_price').alias('category_wise_sale')).orderBy(col('category_wise_sale').desc()).show()

+--------------------+------------------+
|            category|category_wise_sale|
+--------------------+------------------+
|      Food - Healthy|         1482946.0|
|Beverage - Energy...|         1446523.0|
|    Kitchen Supplies|         1186897.5|
|       Coffee K-Cups|         1091843.0|
|        Food - Chips|         1068260.0|
|a. Beverage Spark...|         1066799.0|
|    Food - Chocolate|          967891.0|
|  a. Beverage - Soda|         931377.56|
| Beverage - Gatorade|          585599.0|
|    Coffee Sweetener|          519102.0|
|       Food - Sweets|          510732.0|
| Dishware - Cups Hot|          461189.5|
|    Beverage - Juice|          381408.0|
|      Beverage Water|          373376.0|
| Beverage - Iced Tea|          367639.5|
|         Gum - Mints|          353564.0|
|   Dishware - Plates|          342022.0|
|         Food - Nuts|          331205.5|
|       Coffee Ground|          288875.0|
|        Coffee Cream|          236812.0|
+--------------------+------------

####Payment method with most transactions

In [0]:
# Performing a join operation to combine DataFrames
denorm_df.join(payments,payments.payment_key==denorm_df.payment_key,'left').groupBy('bank_name').agg(sum('total_price').alias('transaction_sum'),count('bank_name').alias('transaction_count')).orderBy(col('transaction_sum').desc()).show(1000)                

+--------------------+---------------+-----------------+
|           bank_name|transaction_sum|transaction_count|
+--------------------+---------------+-----------------+
|Bangladesh Commer...|       405426.5|             3771|
|United Commercial...|       395945.0|             3702|
|               Nagad|       395422.0|             3745|
|              Rocket|       395220.5|             3729|
|Modhumoti Bank Li...|      394635.75|             3697|
|Standard Bank Lim...|      394476.25|             3726|
|National Bank Lim...|      393548.15|             3668|
|     AB Bank Limited|       392876.0|             3725|
|   City Bank Limited|       392390.5|             3691|
| Meghna Bank Limited|       391211.5|             3649|
|  Trust Bank Limited|      390671.25|             3679|
|   IFIC Bank Limited|      389933.25|             3675|
|Citizens Bank Lim...|      389320.75|             3630|
|    NRB Bank Limited|      388435.75|             3659|
|Dutch-Bangla Bank...|      386

####Hourly sales analysis

In [0]:
current_date=lit('2020-12-31')
denorm_df.\
    filter(to_date(col("timestamp")).between(date_sub(current_date,7),current_date))\
    .select(denorm_df["*"],to_date(denorm_df["timestamp"]).alias("date"),hour(denorm_df["timestamp"]).alias("hour"))\
    .groupBy("hour").count()\
    .orderBy(col("count").desc())\
    .show()

+----+-----+
|hour|count|
+----+-----+
|   5|  215|
|   8|  197|
|  18|  195|
|  11|  178|
|  13|  175|
|  12|  173|
|  16|  170|
|  10|  170|
|   2|  169|
|  23|  163|
|   1|  156|
|   6|  154|
|   0|  154|
|   3|  153|
|  22|  146|
|  19|  146|
|   7|  135|
|   9|  124|
|  14|  114|
|  15|  111|
+----+-----+
only showing top 20 rows



####Total orders per day/week/month

In [0]:
current_date=lit('2020-12-31')
denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,7),current_date))\
    .groupBy(to_date(col("timestamp"))).count()\
    .show()

+------------------+-----+
|to_date(timestamp)|count|
+------------------+-----+
|        2020-12-25|  486|
|        2020-12-30|  515|
|        2020-12-24|  395|
|        2020-12-31|  423|
|        2020-12-28|  425|
|        2020-12-26|  474|
|        2020-12-27|  371|
|        2020-12-29|  480|
+------------------+-----+



####Most selling product

In [0]:
denorm_df.filter(to_date(col("timestamp"))=='2020-12-31')\
    .select("item_key","item_name","total_price")\
    .groupBy(['item_key','item_name']).agg(count('item_key').alias('count'),sum('total_price').alias("total_sum"))\
    .orderBy(col('count').desc(),col('total_sum').desc())\
    .show(truncate=False)

+--------+-------------------------------------------+-----+---------+
|item_key|item_name                                  |count|total_sum|
+--------+-------------------------------------------+-----+---------+
|I00042  |Topo Chico 12 oz glass                     |6    |980.0    |
|I00074  |Gatorade Zero Variety 20 oz                |6    |620.0    |
|I00069  |Gatorade Frost Variety 12 oz               |5    |630.0    |
|I00207  |Nature Valley Sweet/Salty Almond Bars      |5    |574.0    |
|I00182  |Reese's Pieces Peanutbutter Candy          |5    |450.0    |
|I00139  |Foam Coffee Cups - 16 ounce                |4    |912.0    |
|I00217  |Unsalted Premium Blend Nuts                |4    |574.0    |
|I00071  |Gatorade Original Variety 12 oz            |4    |560.0    |
|I00166  |Pop Chips Variety bags                     |4    |476.0    |
|I00209  |Quaker Instant Oatmeal Maple Brn Sugar     |4    |416.0    |
|I00230  |Nabisco Classic Mix ccooki, cracker        |4    |390.0    |
|I0010

####Sales per customer

In [0]:
denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,30),current_date))\
    .groupBy(denorm_df.customer_key).agg(count(denorm_df.customer_key).alias('order_count'),sum('total_price').alias('total_sum'))\
    .orderBy(col('order_count').desc())\
    .show()


+------------+-----------+---------+
|customer_key|order_count|total_sum|
+------------+-----------+---------+
|     C002537|          8|    986.0|
|     C001350|          7|   458.25|
|     C002049|          7|    528.0|
|     C000418|          6|    385.0|
|     C004681|          6|    852.0|
|     C005130|          6|   859.25|
|     C004731|          6|    978.0|
|     C000979|          6|   1173.5|
|     C002784|          6|   500.25|
|     C006094|          6|    602.5|
|     C000500|          6|    541.0|
|     C002956|          6|    561.0|
|     C000898|          6|    280.5|
|     C004958|          6|    633.0|
|     C003986|          6|    713.0|
|     C002067|          6|    741.0|
|     C008138|          6|    505.0|
|     C007815|          6|    758.0|
|     C006591|          6|    491.5|
|     C002117|          6|    659.0|
+------------+-----------+---------+
only showing top 20 rows



####Average order value daily/weekly/monthly

In [0]:
current_date=lit('2020-04-30')
denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,7),current_date))\
    .groupBy(to_date(col("timestamp")).alias("date"))\
    .agg((sum("total_price")/count("*")).alias('average order price'))\
    .orderBy(col('date'))\
    .show()

+----------+-------------------+
|      date|average order price|
+----------+-------------------+
|2020-04-23|  109.8491124260355|
|2020-04-24|  107.1986111111111|
|2020-04-25|  104.1934250764526|
|2020-04-26| 101.67191601049869|
|2020-04-27| 106.81426886792453|
|2020-04-28| 107.36075949367088|
|2020-04-29| 102.82530120481928|
|2020-04-30| 107.11699507389163|
+----------+-------------------+



####Average order value per seller

In [0]:
current_date=lit('2020-04-30')
denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,7),current_date))\
    .groupBy(denorm_df.merchant_key).agg(count('*').alias('count'),sum('total_price').alias('total_price'),avg('total_price'))\
    .orderBy(col('count').desc(),col('total_price').desc())\
    .show()

+------------+-----+-----------+------------------+
|merchant_key|count|total_price|  avg(total_price)|
+------------+-----+-----------+------------------+
|      S00201|   11|    1364.25|124.02272727272727|
|      S00648|   11|    1103.75| 100.3409090909091|
|      S00325|   10|     1334.5|            133.45|
|      S00462|   10|    1230.75|           123.075|
|      S00430|   10|     930.75|            93.075|
|      S00403|   10|      863.0|              86.3|
|      S00280|   10|      859.5|             85.95|
|      S00505|    9|    1200.25|133.36111111111111|
|      S00319|    9|     1121.0|124.55555555555556|
|      S00532|    9|      884.5| 98.27777777777777|
|      S00194|    9|      865.5| 96.16666666666667|
|      S00707|    9|     794.75| 88.30555555555556|
|      S00673|    9|      748.5| 83.16666666666667|
|      S00315|    9|      634.5|              70.5|
|      S00665|    8|    1195.25|         149.40625|
|      S00661|    8|     1039.0|           129.875|
|       S004

####RPR Repeat Purchase Rate

In [0]:
current_date=lit('2021-01-01')
denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,364),current_date))\
    .groupBy(denorm_df.customer_key).agg(count('*').alias('count'))\
    .filter(col('count')>10)\
    .select(((count("*")/customers.count())*100).alias("RPR"))\
    .show()

+-----------------+
|              RPR|
+-----------------+
|89.78611959842864|
+-----------------+



####Customer Lifetime Value(CLV)

In [0]:
current_date=lit('2020-12-31')
denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,365),current_date))\
    .groupBy(to_date(col("timestamp"))).agg(count('*').alias('count'))\
    .orderBy(to_date(col("timestamp")))\
    .show(10000)

+------------------+-----+
|to_date(timestamp)|count|
+------------------+-----+
|        2020-01-01|  431|
|        2020-01-02|  378|
|        2020-01-03|  411|
|        2020-01-04|  350|
|        2020-01-05|  407|
|        2020-01-06|  343|
|        2020-01-07|  471|
|        2020-01-08|  348|
|        2020-01-09|  355|
|        2020-01-10|  439|
|        2020-01-11|  334|
|        2020-01-12|  273|
|        2020-01-13|  408|
|        2020-01-14|  464|
|        2020-01-15|  372|
|        2020-01-16|  448|
|        2020-01-17|  398|
|        2020-01-18|  426|
|        2020-01-19|  414|
|        2020-01-20|  363|
|        2020-01-21|  312|
|        2020-01-22|  440|
|        2020-01-23|  393|
|        2020-01-24|  406|
|        2020-01-25|  382|
|        2020-01-26|  413|
|        2020-01-27|  406|
|        2020-01-28|  415|
|        2020-01-29|  483|
|        2020-01-30|  396|
|        2020-01-31|  451|
|        2020-02-01|  297|
|        2020-02-02|  311|
|        2020-02-03|  486|
|

In [0]:
current_date=lit('2020-12-31')
# Average order value=total_sale/number of orders
aov=denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,365),current_date))\
    .select(sum('total_price')/count("*"))
# Average purchase frequency=number of orders/total number of customers
apf=denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,365),current_date))\
    .select(count("*")/customers.count())

print(type(aov))
aov_=aov.head()[0]
print(aov_)
print(type(aov_))

print(type(apf))
apf_=apf.head()[0]
print(apf_)
print(type(apf_))

# Customer Value=Average order value * Average purchase frequency
customer_value=aov_*apf_
print(customer_value)
print(type(customer_value))

# Performing a join operation to combine DataFrames
acl=4 # Average customer lifespan in years (here assumed as 4 as we don't have data for customer join date)

CLV=customer_value*acl
print(CLV)
print(type(CLV))

<class 'pyspark.sql.dataframe.DataFrame'>
105.42660209954548
<class 'float'>
<class 'pyspark.sql.dataframe.DataFrame'>
15.509166302924488
<class 'float'>
1635.0787047140986
<class 'float'>
6540.314818856395
<class 'float'>


####Purchase frequency aka Avg orders per customer

In [0]:
current_date=lit('2021-01-01')
denorm_df\
    .filter(to_date(col("timestamp")).between(date_sub(current_date,364),current_date))\
    .select(((count("*")/customers.count())).alias("Purchase Frequency"))\
    .show()

+------------------+
|Purchase Frequency|
+------------------+
|  15.4628982976866|
+------------------+

