-
Notifications
You must be signed in to change notification settings - Fork 232
/
Retail Sales.sql
59 lines (51 loc) · 2.66 KB
/
Retail Sales.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
-- Databricks notebook source
CREATE STREAMING LIVE TABLE customers
COMMENT "The customers buying finished products, ingested from /databricks-datasets."
TBLPROPERTIES ("myCompanyPipeline.quality" = "mapping")
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv");
-- COMMAND ----------
CREATE STREAMING LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from /databricks-datasets."
TBLPROPERTIES ("myCompanyPipeline.quality" = "bronze")
AS
SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true"))
-- COMMAND ----------
CREATE STREAMING LIVE TABLE sales_orders_cleaned(
CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
)
PARTITIONED BY (order_date)
COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
TBLPROPERTIES ("myCompanyPipeline.quality" = "silver")
AS
SELECT f.customer_id, f.customer_name, f.number_of_line_items,
TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
FROM STREAM(LIVE.sales_orders_raw) f
LEFT JOIN LIVE.customers c
ON c.customer_id = f.customer_id
AND c.customer_name = f.customer_name
-- COMMAND ----------
CREATE LIVE TABLE sales_order_in_la
COMMENT "Sales orders in LA."
TBLPROPERTIES ("myCompanyPipeline.quality" = "gold")
AS
SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr, SUM(ordered_products_explode.price) as sales, SUM(ordered_products_explode.qty) as quantity, COUNT(ordered_products_explode.id) as product_count
FROM (
SELECT city, order_date, customer_id, customer_name, EXPLODE(ordered_products) as ordered_products_explode
FROM LIVE.sales_orders_cleaned
WHERE city = 'Los Angeles'
)
GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr
-- COMMAND ----------
CREATE LIVE TABLE sales_order_in_chicago
COMMENT "Sales orders in Chicago."
TBLPROPERTIES ("myCompanyPipeline.quality" = "gold")
AS
SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr, SUM(ordered_products_explode.price) as sales, SUM(ordered_products_explode.qty) as quantity, COUNT(ordered_products_explode.id) as product_count
FROM (
SELECT city, order_date, customer_id, customer_name, EXPLODE(ordered_products) as ordered_products_explode
FROM LIVE.sales_orders_cleaned
WHERE city = 'Chicago'
)
GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr