In [None]:
DROP TABLE IF EXISTS dwh.customer_report_datamart;
CREATE TABLE IF NOT EXISTS dwh.customer_report_datamart (
	id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY NOT NULL,
	customer_id BIGINT NOT NULL,
	customer_name varchar NOT NULL,	
	customer_address varchar NOT NULL,
	customer_birthday date NOT NULL,
	customer_email varchar NOT NULL,
	customer_money numeric(15,2) NOT NULL,	
	platform_money numeric(15,2) NOT NULL,
	count_order int8 NOT NULL,
	avg_price_order numeric(10, 2) NOT NULL,
	median_time_order_completed numeric(10, 1) NULL,
	top_product_type varchar NOT NULL,
	top_craftsman_id BIGINT NOT NULL,
	count_order_created BIGINT NOT NULL,
	count_order_in_progress BIGINT NOT NULL,
	count_order_delivery BIGINT NOT NULL,
	count_order_done BIGINT NOT NULL,
	count_order_not_done BIGINT NOT NULL,
	report_period varchar NOT NULL,
	CONSTRAINT customer_report_datamart_pk PRIMARY KEY (id)
);


In [None]:
DROP TABLE IF EXISTS dwh.load_dates_customer_report_datamart;

CREATE TABLE IF NOT EXISTS dwh.load_dates_customer_report_datamart (
    id bigint GENERATED ALWAYS AS IDENTITY NOT NULL,
    load_dttm date NOT NULL,
    CONSTRAINT load_dates_customer_report_datamart_pk PRIMARY KEY (id)
);

WITH
load_date AS (
	SELECT COALESCE(MAX(load_dttm),'1900-01-01') AS last_date
	FROM dwh.load_dates_customer_report_datamart
),

top_craftsman AS (
	SELECT top_craft.customer_id AS customer_id,
		   top_craft.craftsman_id AS craftsman_id
	FROM
		(SELECT fo.customer_id,
			    fo.craftsman_id,
			    ROW_NUMBER() OVER (PARTITION BY fo.customer_id ORDER BY COUNT(*) DESC) row_rank
		FROM dwh.f_order fo
		GROUP BY fo.customer_id, fo.craftsman_id
		) top_craft
	WHERE top_craft.row_rank = 1
),

top_product_type AS (
	SELECT top_product.customer_id AS customer_id,
		   top_product.product_type AS product_type
	FROM
		(SELECT fo.customer_id as customer_id,
			    prod.product_type as product_type,
			    ROW_NUMBER() OVER (PARTITION BY fo.customer_id ORDER BY COUNT(*) DESC) AS row_rank
		FROM dwh.f_order fo
		INNER join dwh.d_product prod ON fo.product_id = prod.product_id
		GROUP BY fo.customer_id,
			     prod.product_type
		) top_product
	WHERE top_product.row_rank = 1
),

dwh_delta AS (
    SELECT cu.customer_id AS customer_id,
		   cu.customer_name AS customer_name,
		   cu.customer_address AS customer_address,
		   cu.customer_birthday AS customer_birthday,
		   cu.customer_email AS customer_email,
		   crd.customer_id AS exist_customer_id,
		   fo.order_id AS order_id,
		   prod.product_id AS product_id,
		   prod.product_price AS product_price,
		   prod.product_type AS product_type,
		   fo.order_completion_date - fo.order_created_date AS diff_order_date, 
		   fo.order_status AS order_status,
		   TO_CHAR(fo.order_created_date, 'yyyy-mm') AS report_period,
		   cr.load_dttm AS craftsman_load_dttm,
		   cu.load_dttm AS customer_load_dttm,
		   prod.load_dttm AS product_load_dttm
	FROM dwh.f_order fo
	INNER JOIN dwh.d_craftsman cr ON fo.craftsman_id = cr.craftsman_id
	INNER JOIN dwh.d_customer cu ON fo.customer_id = cu.customer_id
	INNER JOIN dwh.d_product prod ON fo.product_id = prod.product_id
	LEFT JOIN dwh.customer_report_datamart crd ON fo.customer_id = crd.customer_id
	WHERE (fo.load_dttm > (SELECT COALESCE(MAX(load_dttm),'1900-01-01') FROM dwh.load_dates_customer_report_datamart)) OR
		(cr.load_dttm > (SELECT COALESCE(MAX(load_dttm),'1900-01-01') FROM dwh.load_dates_customer_report_datamart)) OR
		(cu.load_dttm > (SELECT COALESCE(MAX(load_dttm),'1900-01-01') FROM dwh.load_dates_customer_report_datamart)) OR
		(prod.load_dttm > (SELECT COALESCE(MAX(load_dttm),'1900-01-01') FROM dwh.load_dates_customer_report_datamart))
),

dwh_update_delta AS (
    SELECT DISTINCT exist_customer_id AS customer_id
    FROM dwh_delta 
    WHERE exist_customer_id IS NOT NULL        
),

dwh_delta_insert_result AS ( 
    SELECT del.customer_id AS customer_id,
		   del.customer_name AS customer_name,
		   del.customer_address AS customer_address,
		   del.customer_birthday AS customer_birthday,
		   del.customer_email AS customer_email,
		   SUM(del.product_price) AS customer_money,
		   SUM(del.product_price) * 0.1 AS platform_money,
		   COUNT(del.order_id) AS count_order,
		   AVG(del.product_price) AS avg_price_order,
		   top_product.product_type AS top_product_type,
		   top_craft.craftsman_id AS top_craftsman_id,
		   PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY diff_order_date) AS median_time_order_completed,
		   SUM(CASE WHEN del.order_status = 'created' THEN 1 ELSE 0 END) AS count_order_created,
		   SUM(CASE WHEN del.order_status = 'in progress' THEN 1 ELSE 0 END) AS count_order_in_progress,
		   SUM(CASE WHEN del.order_status = 'delivery' THEN 1 ELSE 0 END) AS count_order_delivery, 
		   SUM(CASE WHEN del.order_status = 'done' THEN 1 ELSE 0 END) AS count_order_done, 
		   SUM(CASE WHEN del.order_status != 'done' THEN 1 ELSE 0 END) AS count_order_not_done,
		   del.report_period AS report_period
	FROM dwh_delta del
	INNER JOIN top_craftsman top_craft ON del.customer_id = top_craft.customer_id
	INNER JOIN top_product_type top_product ON del.customer_id = top_product.customer_id
	WHERE del.exist_customer_id IS NULL
	GROUP BY del.customer_id, del.customer_name, del.customer_address, del.customer_birthday, del.customer_email, top_product.product_type, top_craft.craftsman_id, del.report_period
),

dwh_delta_update_result AS ( 
	SELECT cu.customer_id AS customer_id,
		cu.customer_name AS customer_name,
		cu.customer_address AS customer_address,
		cu.customer_birthday AS customer_birthday,
		cu.customer_email AS customer_email,
		SUM(prod.product_price) AS customer_money,
		SUM(prod.product_price)*0.1 AS platform_money,
		COUNT(fo.order_id) AS count_order,
		AVG(prod.product_price) AS avg_price_order,
		PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY (fo.order_completion_date - fo.order_created_date)) AS median_time_order_completed,
		top_product.product_type AS top_product_type,
		top_craft.craftsman_id AS top_craftsman_id,
		SUM(CASE WHEN fo.order_status = 'created' THEN 1 ELSE 0 END) AS count_order_created, 
		SUM(CASE WHEN fo.order_status = 'in progress' THEN 1 ELSE 0 END) AS count_order_in_progress, 
		SUM(CASE WHEN fo.order_status = 'delivery' THEN 1 ELSE 0 END) AS count_order_delivery, 
		SUM(CASE WHEN fo.order_status = 'done' THEN 1 ELSE 0 END) AS count_order_done, 
		SUM(CASE WHEN fo.order_status != 'done' THEN 1 ELSE 0 END) AS count_order_not_done,
		TO_CHAR(fo.order_created_date, 'yyyy-mm') AS report_period
	FROM dwh.f_order fo 
	INNER JOIN dwh.d_customer cu ON fo.customer_id = cu.customer_id
	INNER JOIN dwh_update_delta upd ON fo.customer_id = upd.customer_id
	INNER JOIN dwh.d_product prod ON fo.product_id = prod.product_id
	INNER JOIN top_craftsman top_craft ON fo.customer_id = top_craft.customer_id
	INNER JOIN top_product_type top_product ON fo.customer_id = top_product.customer_id
	GROUP BY cu.customer_id, cu.customer_name, cu.customer_address, cu.customer_birthday, cu.customer_email, top_product.product_type, top_craft.craftsman_id,
		TO_CHAR(fo.order_created_date, 'yyyy-mm')
),

insert_delta AS (
    INSERT INTO dwh.customer_report_datamart (
        customer_id,
		customer_name,
		customer_address,
		customer_birthday,
		customer_email,
		customer_money,
		platform_money,
		count_order,
		avg_price_order,
		median_time_order_completed,
		top_product_type,
		top_craftsman_id,
		count_order_created,
		count_order_in_progress,
		count_order_delivery,
		count_order_done,
		count_order_not_done,
		report_period		
    ) SELECT 		
		customer_id,
		customer_name,
		customer_address,
		customer_birthday,
		customer_email,
		customer_money,
		platform_money,
		count_order,
		avg_price_order,
		median_time_order_completed,
		top_product_type,
		top_craftsman_id,
		count_order_created,
		count_order_in_progress, 
		count_order_delivery, 
		count_order_done, 
		count_order_not_done,
		report_period
    FROM dwh_delta_insert_result
),

update_delta AS ( 
    UPDATE dwh.customer_report_datamart SET
		customer_name = upd.customer_name,
		customer_address = upd.customer_address,
		customer_birthday = upd.customer_birthday,
		customer_email = upd.customer_email,
		customer_money = upd.customer_money,
		platform_money = upd.platform_money,
		count_order = upd.count_order,
		avg_price_order = upd.avg_price_order,
		median_time_order_completed = upd.median_time_order_completed,
		top_product_type = upd.top_product_type,
		top_craftsman_id = upd.top_craftsman_id,
		count_order_created = upd.count_order_created,
		count_order_in_progress = upd.count_order_in_progress, 
		count_order_delivery = upd.count_order_delivery, 
		count_order_done = upd.count_order_done, 
		count_order_not_done = upd.count_order_not_done,
		report_period = upd.report_period
    FROM (
        SELECT 
            customer_id,
			customer_name,
			customer_address,
			customer_birthday,
			customer_email,
			customer_money,
			platform_money,
			count_order,
			avg_price_order,
			median_time_order_completed,
			top_product_type,
			top_craftsman_id,
			count_order_created,
			count_order_in_progress, 
			count_order_delivery, 
			count_order_done, 
			count_order_not_done,
			report_period
        FROM dwh_delta_update_result) upd
    WHERE dwh.customer_report_datamart.customer_id = upd.customer_id
),

insert_load_date AS (
    INSERT INTO dwh.load_dates_customer_report_datamart (load_dttm)
    SELECT 
		GREATEST(COALESCE(MAX(craftsman_load_dttm), NOW()), 
                 COALESCE(MAX(customer_load_dttm), NOW()),
                 COALESCE(MAX(product_load_dttm), NOW()))
    FROM dwh_delta
)

SELECT 'updated datamart dwh.customer_report_datamart';


In [None]:
/* создание таблицы tmp_sources с данными из всех источников */
DROP TABLE IF EXISTS dwh.tmp_sources;
CREATE TABLE dwh.tmp_sources AS
SELECT
    order_id,
    order_created_date,
    order_completion_date,
    order_status,
    craftsman_id,
    craftsman_name,
    craftsman_address,
    craftsman_birthday,
    craftsman_email,
    product_id,
    product_name,
    product_description,
    product_type,
    product_price,
    customer_id,
    customer_name,
    customer_address,
    customer_birthday,
    customer_email
FROM source1.craft_market_wide
UNION
SELECT
    t2.order_id,
    t2.order_created_date,
    t2.order_completion_date,
    t2.order_status,
    t1.craftsman_id,
    t1.craftsman_name,
    t1.craftsman_address,
    t1.craftsman_birthday,
    t1.craftsman_email,
    t1.product_id,
    t1.product_name,
    t1.product_description,
    t1.product_type,
    t1.product_price,
    t2.customer_id,
    t2.customer_name,
    t2.customer_address,
    t2.customer_birthday,
    t2.customer_email
FROM source2.craft_market_masters_products t1
     JOIN source2.craft_market_orders_customers t2
     	ON t2.product_id = t1.product_id
     	AND t1.craftsman_id = t2.craftsman_id
UNION
SELECT
	t1.order_id,
    t1.order_created_date,
    t1.order_completion_date,
    t1.order_status,
    t2.craftsman_id,
    t2.craftsman_name,
    t2.craftsman_address,
    t2.craftsman_birthday,
    t2.craftsman_email,
    t1.product_id,
    t1.product_name,
    t1.product_description,
    t1.product_type,
    t1.product_price,
    t3.customer_id,
    t3.customer_name,
    t3.customer_address,
    t3.customer_birthday,
    t3.customer_email
FROM source3.craft_market_orders t1
     JOIN source3.craft_market_craftsmans t2
     	ON t1.craftsman_id = t2.craftsman_id
     JOIN source3.craft_market_customers t3
     	ON t1.customer_id = t3.customer_id
UNION
SELECT
	t1.order_id,
    t1.order_created_date,
    t1.order_completion_date,
    t1.order_status,
    t1.craftsman_id,
    t1.craftsman_name,
    t1.craftsman_address,
    t1.craftsman_birthday,
    t1.craftsman_email,
    t1.product_id,
    t1.product_name,
    t1.product_description,
    t1.product_type,
    t1.product_price,
    t1.customer_id,
    t2.customer_name,
    t2.customer_address,
    t2.customer_birthday,
    t2.customer_email
FROM
	external_source.craft_products_orders t1
	JOIN external_source.customers t2
		ON t1.customer_id = t2.customer_id;

In [None]:
MERGE INTO dwh.d_craftsman d
USING (SELECT DISTINCT craftsman_name, craftsman_address, craftsman_birthday, craftsman_email FROM dwh.tmp_sources) t
	ON d.craftsman_name = t.craftsman_name
	AND d.craftsman_email = t.craftsman_email
WHEN MATCHED THEN
  UPDATE SET
	craftsman_address = t.craftsman_address,
	craftsman_birthday = t.craftsman_birthday,
	load_dttm = current_timestamp
WHEN NOT MATCHED THEN
  INSERT (craftsman_name, craftsman_address, craftsman_birthday, craftsman_email, load_dttm)
  VALUES (t.craftsman_name, t.craftsman_address, t.craftsman_birthday, t.craftsman_email, current_timestamp);

/* обновление существующих записей и добавление новых в dwh.d_products */
MERGE INTO dwh.d_product d
USING (SELECT DISTINCT product_name, product_description, product_type, product_price FROM dwh.tmp_sources) t
	ON d.product_name = t.product_name
	AND d.product_description = t.product_description
	AND d.product_price = t.product_price
WHEN MATCHED THEN
  UPDATE SET
	product_type = t.product_type,
	load_dttm = current_timestamp
WHEN NOT MATCHED THEN
  INSERT (product_name, product_description, product_type, product_price, load_dttm)
  VALUES (t.product_name, t.product_description, t.product_type, t.product_price, current_timestamp);

/* обновление существующих записей и добавление новых в dwh.d_customer */
MERGE INTO dwh.d_customer d
USING (SELECT DISTINCT customer_name, customer_address, customer_birthday, customer_email FROM dwh.tmp_sources) t
	ON d.customer_name = t.customer_name
	AND d.customer_email = t.customer_email
WHEN MATCHED THEN
  UPDATE SET
	customer_address= t.customer_address,
	customer_birthday = t.customer_birthday,
	load_dttm = current_timestamp
WHEN NOT MATCHED THEN
  INSERT (customer_name, customer_address, customer_birthday, customer_email, load_dttm)
  VALUES (t.customer_name, t.customer_address, t.customer_birthday, t.customer_email, current_timestamp);

In [None]:
DROP TABLE IF EXISTS dwh.tmp_sources_fact;
CREATE TABLE dwh.tmp_sources_fact AS
SELECT
	dp.product_id,
	dc.craftsman_id,
	dcust.customer_id,
	src.order_created_date,
	src.order_completion_date,
	src.order_status,
	current_timestamp
FROM
	dwh.tmp_sources src
	JOIN dwh.d_craftsman dc
		ON dc.craftsman_name = src.craftsman_name
		AND dc.craftsman_email = src.craftsman_email
	JOIN dwh.d_customer dcust
		ON dcust.customer_name = src.customer_name
		AND dcust.customer_email = src.customer_email
	JOIN dwh.d_product dp
		ON dp.product_name = src.product_name
		AND dp.product_description = src.product_description
		AND dp.product_price = src.product_price;

/* обновление существующих записей и добавление новых в dwh.f_order */
MERGE INTO dwh.f_order f
USING dwh.tmp_sources_fact t
	ON f.product_id = t.product_id
	AND f.craftsman_id = t.craftsman_id
	AND f.customer_id = t.customer_id
	AND f.order_created_date = t.order_created_date
WHEN MATCHED THEN
  UPDATE SET
	order_completion_date = t.order_completion_date,
	order_status = t.order_status,
	load_dttm = current_timestamp
WHEN NOT MATCHED THEN
  INSERT (product_id, craftsman_id, customer_id, order_created_date, order_completion_date, order_status, load_dttm)
  VALUES (t.product_id, t.craftsman_id, t.customer_id, t.order_created_date, t.order_completion_date, t.order_status, current_timestamp);

  drop table if exists dwh.tmp_sources;
  drop table if exists dwh.tmp_sources_fact;