# BQ dbt mart source data extracts
Creates the simulated source data extract files in a BigQuery storage bucket.

Data extracts are based on the GCP public data set `bigquery-public-data.thelook_ecommerce` for a defined controlled
time window - e.g. 14 days.  

A week is taken to be Mon - Sun to mimic the 'start-of-business' Monday scenario at the end of the defined time window.

Included data sets:
  - `bigquery-public-data.thelook_ecommerce.users`

---
FYI - The base SQL extract statements were initally created using the dbt analysis SQL in `./bq-dbt-mart/sample_bq_dbt_mart/analyses`.

The SQL statements are rendered in the `<dbt-project-dir>/target/compiled/analysis/...` directory.

In [60]:
# project imports
from google.cloud import bigquery
import pendulum
from structlog import get_logger


In [61]:
# load required notebook extensions
%load_ext google.cloud.bigquery


The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery


In [62]:
# setup the BigQuery client connection
bq_client = bigquery.Client()

# setup a formatted logger
logger = get_logger()

In [63]:
# batch run parameters
batch_start_dt = pendulum.parse('2023-12-04').date()
batch_end_dt = batch_start_dt.add(days=13)

raw_data_bucket = 'data-pipeline-tools-raw-data'

logger.info('Batch interval', batch_start_dt=batch_start_dt.to_date_string())
logger.info('Batch interval', batch_end_dt=batch_end_dt.to_date_string())

[2m2024-01-07 18:11:00[0m [[32m[1minfo     [0m] [1mBatch interval                [0m [36mbatch_start_dt[0m=[35m2023-12-04[0m
[2m2024-01-07 18:11:00[0m [[32m[1minfo     [0m] [1mBatch interval                [0m [36mbatch_end_dt[0m=[35m2023-12-17[0m


In [104]:
def full_refresh_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:

    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"

    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/full-refresh/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT *
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    WHERE
        created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY;
    """
    return sql

def full_table_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:

    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"

    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/full-refresh/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT *
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    """
    return sql

# queries to extract order_items and orders records based on daily changes
#   expects the following columns to be present in the source table:
#     - created_at
#     - shipped_at
#     - delivered_at
#     - returned_at
# future timestamp values are NULL-ifed
def orders_initial_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:
    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"

    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/eod-delta/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        order_id
        , user_id
        , status
        , gender
        , case
            when created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then returned_at
            else null
            end as returned_at
        , case
            when shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then shipped_at
            else null
            end as shipped_at
        , case
            when delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then delivered_at
            else null
            end as delivered_at
        , num_of_item
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    WHERE
        created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        OR
        shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        OR
        delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        OR
        returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        ;
    """

    return sql

def order_items_initial_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:
    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"
    
    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/eod-delta/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        id
        , order_id
        , user_id
        , product_id
        , inventory_item_id
        , status
        , case
            when created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then returned_at
            else null
            end as returned_at
        , case
            when shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then shipped_at
            else null
            end as shipped_at
        , case
            when delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then delivered_at
            else null
            end as delivered_at
        , sale_price
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    WHERE
        created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        OR
        shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        OR
        delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        OR
        returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        ;
    """
    return sql

def inventory_items_initial_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:
    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"
    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/eod-delta/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        id
        , product_id
        , case
            when created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when sold_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then sold_at
            else null
            end as sold_at
        , cost
        , product_category
        , product_name
        , product_brand
        , product_retail_price
        , product_department
        , product_sku
        , product_distribution_center_id
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    WHERE
        created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        OR
        sold_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
        ;
    """
    return sql

def orders_eod_delta_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:

    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"

    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/eod-delta/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        order_id
        , user_id
        , status
        , gender
        , case
            when created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then returned_at
            else null
            end as returned_at
        , case
            when shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then shipped_at
            else null
            end as shipped_at
        , case
            when delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then delivered_at
            else null
            end as delivered_at
        , num_of_item
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    WHERE
        (created_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        OR
        (shipped_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        OR
        (delivered_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        OR
        (returned_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        ;
    """
    return sql

def order_items_eod_delta_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:

    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"

    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/eod-delta/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        id
        , order_id
        , user_id
        , product_id
        , inventory_item_id
        , status
        , case
            when created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then returned_at
            else null
            end as returned_at
        , case
            when shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then shipped_at
            else null
            end as shipped_at
        , case
            when delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then delivered_at
            else null
            end as delivered_at
        , sale_price
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    WHERE
        (created_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        OR
        (shipped_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        shipped_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        OR
        (delivered_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        delivered_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        OR
        (returned_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        returned_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        ;
    """
    return sql

# inventory_items - eod delta extract
def inventory_items_eod_delta_extract_sql(src_table: str, batch_dt: pendulum.date, extract_bucket: str, extract_prefix: str) -> str:
    partition_key = f"p_year={batch_dt.year:04d}/p_month={batch_dt.month:02d}/p_day={batch_dt.day:02d}"

    sql = f"""
    export data options (
        uri='gs://{ extract_bucket }/{ extract_prefix }/eod-delta/{ src_table }/{ partition_key }/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        id
        , product_id
        , case
            when created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when sold_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then sold_at
            else null
            end as sold_at
        , cost
        , product_category
        , product_name
        , product_brand
        , product_retail_price
        , product_department
        , product_sku
        , product_distribution_center_id
    FROM `bigquery-public-data`.`thelook_ecommerce`.`{ src_table }`
    WHERE
        (created_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        created_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        OR
        (sold_at >= CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') AND
        sold_at < CAST('{ batch_dt.to_date_string() } 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY)
        ;
    """
    return sql


In [59]:
# users - daily full refresh
source_table = 'users'
target_source_name = 'thelook'

for dt in pendulum.interval(batch_start_dt, batch_end_dt).range('days'):
    logger.info('batch date', dt=dt.to_date_string())

    extract_sql = full_refresh_extract_sql(source_table, dt, raw_data_bucket, target_source_name)
    logger.debug('extract SQL', sql=extract_sql)
    query_job = bq_client.query(extract_sql)
    results = query_job.result()

    logger.info('result result', status=query_job.state, errors=query_job.errors)



[2m2024-01-04 15:51:54[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-04[0m
[2m2024-01-04 15:51:54[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pipeline-tools-raw-data/thelook/full-refresh/users/p_year=2023/p_month=12/p_day=04/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT *
    FROM `bigquery-public-data`.`thelook_ecommerce`.`users`
    WHERE
        created_at < CAST('2023-12-04 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY;
    [0m
[2m2024-01-04 15:52:02[0m [[32m[1minfo     [0m] [1mresult result                 [0m [36merrors[0m=[35mNone[0m [36mstatus[0m=[35mDONE[0m
[2m2024-01-04 15:52:02[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-05[0m
[2m2024-01-04 15:52:02[0m [[32m

In [68]:
# distribution_centers - full table dump
# No timestamp columns available
source_table = 'distribution_centers'
targt_source_name = 'thelook'

for dt in pendulum.interval(batch_start_dt, batch_end_dt).range('days'):
    logger.info('batch date', dt=dt.to_date_string())

    extract_sql = full_table_extract_sql(source_table, dt, raw_data_bucket, target_source_name)
    logger.debug('extract SQL', sql=extract_sql)
    query_job = bq_client.query(extract_sql)
    results = query_job.result()

    logger.info('result', status=query_job.state, errors=query_job.errors)

[2m2024-01-07 18:28:57[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-04[0m
[2m2024-01-07 18:28:57[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pipeline-tools-raw-data/thelook/full-refresh/distribution_centers/p_year=2023/p_month=12/p_day=04/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT *
    FROM `bigquery-public-data`.`thelook_ecommerce`.`distribution_centers`
    [0m
[2m2024-01-07 18:29:00[0m [[32m[1minfo     [0m] [1mresult result                 [0m [36merrors[0m=[35mNone[0m [36mstatus[0m=[35mDONE[0m
[2m2024-01-07 18:29:00[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-05[0m
[2m2024-01-07 18:29:00[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pi

In [69]:
# products - full table dump
# No timestamp columns available
source_table = 'products'
targt_source_name = 'thelook'

for dt in pendulum.interval(batch_start_dt, batch_end_dt).range('days'):
    logger.info('batch date', dt=dt.to_date_string())

    extract_sql = full_table_extract_sql(source_table, dt, raw_data_bucket, target_source_name)
    logger.debug('extract SQL', sql=extract_sql)
    query_job = bq_client.query(extract_sql)
    results = query_job.result()

    logger.info('result', status=query_job.state, errors=query_job.errors)

[2m2024-01-07 18:38:13[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-04[0m
[2m2024-01-07 18:38:13[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pipeline-tools-raw-data/thelook/full-refresh/products/p_year=2023/p_month=12/p_day=04/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT *
    FROM `bigquery-public-data`.`thelook_ecommerce`.`products`
    [0m
[2m2024-01-07 18:38:18[0m [[32m[1minfo     [0m] [1mresult result                 [0m [36merrors[0m=[35mNone[0m [36mstatus[0m=[35mDONE[0m
[2m2024-01-07 18:38:18[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-05[0m
[2m2024-01-07 18:38:18[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pipeline-tools-raw-data/th

In [91]:
# orders - daily eod delta - timestamp activity during the batch day (Australia/Melbourne)
source_table = 'orders'
target_source_name = 'thelook'

orders_initial_extract_sql = orders_initial_extract_sql(source_table, batch_start_dt, raw_data_bucket, target_source_name)
query_job = bq_client.query(orders_initial_extract_sql)
results = query_job.result()
logger.info('result', status=query_job.state, errors=query_job.errors)

for dt in pendulum.interval(batch_start_dt.add(days=1), batch_end_dt).range('days'):
    logger.info('batch date', dt=dt.to_date_string())

    extract_sql = orders_eod_delta_extract_sql(source_table, dt, raw_data_bucket, target_source_name)
    logger.debug('extract SQL', sql=extract_sql)
    query_job = bq_client.query(extract_sql)
    results = query_job.result()

    logger.info('result', status=query_job.state, errors=query_job.errors)


[2m2024-01-07 20:13:17[0m [[32m[1minfo     [0m] [1mresult                        [0m [36merrors[0m=[35mNone[0m [36mstatus[0m=[35mDONE[0m
[2m2024-01-07 20:13:17[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-05[0m
[2m2024-01-07 20:13:17[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pipeline-tools-raw-data/thelook/eod-delta/orders/p_year=2023/p_month=12/p_day=05/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        order_id
        , user_id
        , status
        , gender
        , case
            when created_at < CAST('2023-12-05 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when returned_at < CAST('2023-12-05 00

In [93]:
# order_items - daily eod delta - timestamp activity during the batch day (Australia/Melbourne)
source_table = 'order_items'
target_source_name = 'thelook'

initial_extract_sql = order_items_initial_extract_sql(source_table, batch_start_dt, raw_data_bucket, target_source_name)
query_job = bq_client.query(initial_extract_sql)
results = query_job.result()
logger.info('result', status=query_job.state, errors=query_job.errors)

for dt in pendulum.interval(batch_start_dt.add(days=1), batch_end_dt).range('days'):
    logger.info('batch date', dt=dt.to_date_string())

    extract_sql = order_items_eod_delta_extract_sql(source_table, dt, raw_data_bucket, target_source_name)
    logger.debug('extract SQL', sql=extract_sql)
    query_job = bq_client.query(extract_sql)
    results = query_job.result()

    logger.info('result', status=query_job.state, errors=query_job.errors)


[2m2024-01-08 11:52:48[0m [[32m[1minfo     [0m] [1mresult                        [0m [36merrors[0m=[35mNone[0m [36mstatus[0m=[35mDONE[0m
[2m2024-01-08 11:52:48[0m [[32m[1minfo     [0m] [1mbatch date                    [0m [36mdt[0m=[35m2023-12-05[0m
[2m2024-01-08 11:52:48[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pipeline-tools-raw-data/thelook/eod-delta/order_items/p_year=2023/p_month=12/p_day=05/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        id
        , order_id
        , user_id
        , product_id
        , inventory_item_id
        , status
        , case
            when created_at < CAST('2023-12-05 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case


In [105]:
# inventory_items - daily eod delta - timestamp activity during the batch day (Australia/Melbourne)
source_table = 'inventory_items'
target_source_name = 'thelook'

initial_extract_sql = inventory_items_initial_extract_sql(source_table, batch_start_dt, raw_data_bucket, target_source_name)
logger.debug('extract SQL', sql=initial_extract_sql)
query_job = bq_client.query(initial_extract_sql)
results = query_job.result()
logger.info('result', status=query_job.state, errors=query_job.errors)

for dt in pendulum.interval(batch_start_dt.add(days=1), batch_end_dt).range('days'):
    logger.info('batch date', dt=dt.to_date_string())

    extract_sql =  inventory_items_eod_delta_extract_sql(source_table, dt, raw_data_bucket, target_source_name)
    logger.debug('extract SQL', sql=extract_sql)
    query_job = bq_client.query(extract_sql)
    results = query_job.result()

    logger.info('result', status=query_job.state, errors=query_job.errors)

[2m2024-01-08 17:49:57[0m [[32m[1mdebug    [0m] [1mextract SQL                   [0m [36msql[0m=[35m
    export data options (
        uri='gs://data-pipeline-tools-raw-data/thelook/eod-delta/inventory_items/p_year=2023/p_month=12/p_day=04/*.csv',
        format='CSV',
        overwrite=true,
        header=true
    ) AS
    SELECT
        id
        , product_id
        , case
            when created_at < CAST('2023-12-04 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then created_at
            else null
            end as created_at
        , case
            when sold_at < CAST('2023-12-04 00:00:00' AS TIMESTAMP FORMAT 'YYYY-MM-DD HH24:MI:SS' AT TIME ZONE 'Australia/Melbourne') + INTERVAL '1' DAY
                then sold_at
            else null
            end as sold_at
        , cost
        , product_category
        , product_name
        , product_brand
        , product_retail_price
   