Skip to content

Commit

Permalink
Rebuild entire (small) DWH in one go
Browse files Browse the repository at this point in the history
  • Loading branch information
gtoonstra committed Jun 26, 2017
1 parent ca9ae65 commit 747ada8
Show file tree
Hide file tree
Showing 23 changed files with 106 additions and 412 deletions.
132 changes: 18 additions & 114 deletions examples/hive-example/dags/process_hive_dwh.py
Expand Up @@ -16,15 +16,15 @@
import airflow
from datetime import datetime, timedelta
from airflow.operators.hive_operator import HiveOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.models import Variable


args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(7),
'provide_context': True,
'depends_on_past': True
'depends_on_past': False
}

tmpl_search_path = Variable.get("hive_sql_path")
Expand All @@ -47,7 +47,7 @@
dagrun_timeout=timedelta(minutes=60),
template_searchpath=tmpl_search_path,
default_args=args,
max_active_runs=1)
max_active_runs=10)

customer_step_1 = HiveOperator(
hql='customer/step_1.hql',
Expand All @@ -65,62 +65,6 @@
task_id='customer_step_2',
dag=dag)

customer_step_3 = HiveOperator(
hql='customer/step_3.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='customer_step_3',
dag=dag)

customer_step_4 = HiveOperator(
hql='customer/step_4.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='customer_step_4',
dag=dag)

customer_step_5 = HiveOperator(
hql='customer/step_5.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='customer_step_5',
dag=dag)

customer_step_6 = HiveOperator(
hql='customer/step_6.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='customer_step_6',
dag=dag)

customer_step_7 = HiveOperator(
hql='customer/step_7.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='customer_step_7',
dag=dag)

customer_step_8 = HiveOperator(
hql='customer/step_8.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='customer_step_8',
dag=dag)

customer_step_9 = HiveOperator(
hql='customer/step_9.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='customer_step_9',
dag=dag)


product_step_1 = HiveOperator(
hql='product/step_1.hql',
Expand All @@ -138,76 +82,36 @@
task_id='product_step_2',
dag=dag)

product_step_3 = HiveOperator(
hql='product/step_3.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='product_step_3',
latest_only = LatestOnlyOperator(
task_id='latest_only',
dag=dag)

product_step_4 = HiveOperator(
hql='product/step_4.hql',
fact_order_step_1 = HiveOperator(
hql='order/step_1.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='product_step_4',
task_id='fact_order_step_1',
dag=dag)

product_step_5 = HiveOperator(
hql='product/step_5.hql',
fact_order_step_2 = HiveOperator(
hql='order/step_2.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='product_step_5',
task_id='fact_order_step_2',
dag=dag)

product_step_6 = HiveOperator(
hql='product/step_6.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='product_step_6',
dag=dag)

product_step_7 = HiveOperator(
hql='product/step_7.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='product_step_7',
dag=dag)

product_step_8 = HiveOperator(
hql='product/step_8.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='product_step_8',
dag=dag)

product_step_9 = HiveOperator(
hql='product/step_9.hql',
hive_cli_conn_id='hive_staging',
schema='default',
hiveconf_jinja_translate=True,
task_id='product_step_9',
dag=dag)

dummy = DummyOperator(
task_id='dummy',
dag=dag)
latest_only >> customer_step_1
latest_only >> product_step_1

customer_step_1 >> customer_step_2 >> customer_step_3 >> customer_step_4
customer_step_4 >> customer_step_5 >> customer_step_6 >> customer_step_7
customer_step_7 >> customer_step_8 >> customer_step_9
customer_step_1 >> customer_step_2
product_step_1 >> product_step_2

product_step_1 >> product_step_2 >> product_step_3 >> product_step_4
product_step_4 >> product_step_5 >> product_step_6 >> product_step_7
product_step_7 >> product_step_8 >> product_step_9
customer_step_2 >> fact_order_step_1
product_step_2 >> fact_order_step_1

customer_step_9 >> dummy
product_step_9 >> dummy
fact_order_step_1 >> fact_order_step_2


if __name__ == "__main__":
Expand Down
8 changes: 4 additions & 4 deletions examples/hive-example/dags/staging-oltp.py
Expand Up @@ -41,7 +41,7 @@
hive_table='customer_staging',
postgres_conn_id='postgres_oltp',
hive_cli_conn_id='hive_staging',
partition={"change_date": "{{ ds_nodash }}"},
partition={"change_date": "{{ ds }}"},
parameters={"window_start_date": "{{ ds }}", "window_end_date": "{{ tomorrow_ds }}"},
task_id='stage_customer',
dag=dag)
Expand All @@ -51,7 +51,7 @@
hive_table='order_info_staging',
postgres_conn_id='postgres_oltp',
hive_cli_conn_id='hive_staging',
partition={"change_date": "{{ ds_nodash }}"},
partition={"change_date": "{{ ds }}"},
parameters={"window_start_date": "{{ ds }}", "window_end_date": "{{ tomorrow_ds }}"},
task_id='stage_orderinfo',
dag=dag)
Expand All @@ -61,7 +61,7 @@
hive_table='orderline_staging',
postgres_conn_id='postgres_oltp',
hive_cli_conn_id='hive_staging',
partition={"change_date": "{{ ds_nodash }}"},
partition={"change_date": "{{ ds }}"},
parameters={"window_start_date": "{{ ds }}", "window_end_date": "{{ tomorrow_ds }}"},
task_id='stage_orderline',
dag=dag)
Expand All @@ -71,7 +71,7 @@
hive_table='product_staging',
postgres_conn_id='postgres_oltp',
hive_cli_conn_id='hive_staging',
partition={"change_date": "{{ ds_nodash }}"},
partition={"change_date": "{{ ds }}"},
parameters={"window_start_date": "{{ ds }}", "window_end_date": "{{ tomorrow_ds }}"},
task_id='stage_product',
dag=dag)
Expand Down
4 changes: 3 additions & 1 deletion examples/hive-example/hql/customer/step_1.hql
@@ -1,5 +1,7 @@
-- Make sure a dimension table exists of the right type

DROP TABLE IF EXISTS dim_customer;

CREATE TABLE IF NOT EXISTS dim_customer (
dim_customer_key BIGINT
, customer_id STRING
Expand All @@ -9,8 +11,8 @@ CREATE TABLE IF NOT EXISTS dim_customer (
, scd_version INT -- historical version of the record (1 is the oldest)
, scd_start_date DATE -- start date
, scd_end_date DATE -- end date and time (9999-12-31 by default)
, scd_active BOOLEAN -- whether it's the latest version or not
)
PARTITIONED BY (scd_active STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
STORED AS ORC;
24 changes: 19 additions & 5 deletions examples/hive-example/hql/customer/step_2.hql
@@ -1,7 +1,21 @@
-- Start by creating a new dimension table, which we use to copy over the production one.
-- Regenerate entire dimension from scratch.

DROP TABLE IF EXISTS dim_customer_new;
FROM (SELECT
row_number() OVER () AS dim_customer_key
, customer_id
, LEAD(customer_id) OVER (PARTITION BY customer_id ORDER BY change_date) as id_lead
, LAST_VALUE(cust_name) OVER (PARTITION BY customer_id ORDER BY change_date RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as cust_name
, row_number() OVER (PARTITION BY customer_id ORDER BY change_date) AS scd_version
, street
, city
, change_date
, LEAD(change_date) OVER (PARTITION BY customer_id ORDER BY change_date) as end_date
FROM
customer_staging) a
INSERT OVERWRITE TABLE dim_customer PARTITION(scd_active='T')
SELECT a.dim_customer_key, a.customer_id, a.cust_name, a.street, a.city, scd_version, TO_DATE(a.change_date) as scd_start_date, TO_DATE('9999-12-31') as scd_end_date
WHERE a.id_lead IS NULL

CREATE TABLE dim_customer_new
LIKE dim_customer
STORED AS ORC;
INSERT OVERWRITE TABLE dim_customer PARTITION(scd_active='F')
SELECT a.dim_customer_key, a.customer_id, a.cust_name, a.street, a.city, scd_version, TO_DATE(a.change_date) as scd_start_date, TO_DATE(a.end_date) as scd_end_date
WHERE a.id_lead = a.customer_id;
11 changes: 0 additions & 11 deletions examples/hive-example/hql/customer/step_3.hql

This file was deleted.

19 changes: 0 additions & 19 deletions examples/hive-example/hql/customer/step_4.hql

This file was deleted.

21 changes: 0 additions & 21 deletions examples/hive-example/hql/customer/step_5.hql

This file was deleted.

21 changes: 0 additions & 21 deletions examples/hive-example/hql/customer/step_6.hql

This file was deleted.

33 changes: 0 additions & 33 deletions examples/hive-example/hql/customer/step_7.hql

This file was deleted.

0 comments on commit 747ada8

Please sign in to comment.