In [5]:
%%sql

drop table if exists fact_inventory

StatementMeta(, bde2154d-c9f0-4134-aca5-2a0b486cca9f, 6, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [8]:
%%sql 
create table fact_inventory as 
WITH starting_inventory AS (
  SELECT 
    TCLAVE as store_id_from,
    ICLAVE as product_id,
    ATMFECHA as full_date,
    cantidad as daily_qty_start_of_day,
    movements_sum as movements,
    movements_sum_costo as movement_cost,
    end_of_day as daily_qty_end_of_day,
    inventario_inicial_costo as daily_cost_start_of_day,
    inventario_final_costo as daily_cost_end_of_day 
  FROM starting_inventory_records i
  join dimstores s on s.src_store_id = i.TCLAVE
), filtered_movements AS (
    SELECT 
        f.movement_date AS full_date,
        f.store_id_from,
        f.product_id,
        SUM(-1 * f.qty) AS daily_qty,  -- outflows are negative
        SUM(f.cost_amt) AS daily_cost_amt,
        SUM(ABS(f.cost_amt))/NULLIF(SUM(ABS(f.qty)), 0) AS cost_per_product,
        SUM(
            CASE 
                WHEN (f.movement_id IN ('VT', 'DC') 
                    OR (f.movement_id = 'CA' AND f.ReferenceDoc LIKE 'DC%')) 
                THEN f.cost_amt 
                ELSE 0 
            END
        ) AS cogs,
        SUM(
            CASE 
                WHEN (f.movement_id IN ('VT', 'DC') 
                    OR (f.movement_id = 'CA' AND f.ReferenceDoc LIKE 'DC%')) 
                THEN f.qty *-1 
                ELSE 0 
            END
        ) AS units_sold
    FROM fmovarti_intermediate_silver f
    JOIN starting_inventory c
        ON f.store_id_from = c.store_id_from
        AND f.product_id = c.product_id
        AND f.movement_date >= c.full_date
    GROUP BY f.movement_date, f.store_id_from, f.product_id
)
, combined_data AS (
    SELECT 
        store_id_from,
        product_id,
        full_date,
        daily_qty_start_of_day,
        movements,
        movement_cost,
        daily_qty_end_of_day,
        daily_cost_end_of_day / daily_qty_end_of_day as unit_cost,
        daily_cost_start_of_day,
        daily_cost_end_of_day,
        0 AS daily_qty,
        0.0 AS daily_cost_amt,
        0.0 as cogs,
        0.0 as units_sold,
        0 AS record_priority
    FROM starting_inventory

    UNION ALL

    SELECT 
        store_id_from,
        product_id,
        full_date,
        NULL AS daily_qty_start_of_day,
        NULL AS movement_qty,
        NULL AS movement_cost_amt,
        NULL AS daily_qty_end_of_day,
        cost_per_product AS unit_cost,
        NULL AS daily_cost_amt_start_of_day,
        NULL AS daily_cost_amt_end_of_day,
        daily_qty,
        daily_cost_amt,
        cogs,
        units_sold,
        1 AS record_priority
    FROM filtered_movements
), inventory_calc AS (
    SELECT 
        full_date,
        store_id_from,
        product_id,
        FIRST_VALUE(daily_qty_start_of_day) OVER (
            PARTITION BY store_id_from, product_id 
            ORDER BY full_date, record_priority
            ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
        ) AS initial_qty,

        SUM(daily_qty) OVER (
            PARTITION BY store_id_from, product_id 
            ORDER BY full_date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS running_qty,
        unit_cost,
        daily_cost_amt, 
        LEAD(full_date) OVER (
            PARTITION BY store_id_from, product_id 
            ORDER BY full_date
        ) AS next_date,
        cogs,
        units_sold
    FROM combined_data)
, final_inventory AS (
    SELECT 
        store_id_from,
        product_id,
        full_date,
        -- Start of day = prior day's end of day, default to initial_qty
        LAG(initial_qty + running_qty, 1, initial_qty) OVER (
            PARTITION BY store_id_from, product_id 
            ORDER BY full_date
        ) AS daily_qty_start_of_day,
        initial_qty + running_qty AS daily_qty_end_of_day,
        running_qty,
        unit_cost,
        daily_cost_amt, 
        LEAD(full_date) OVER (
            PARTITION BY store_id_from, product_id 
            ORDER BY full_date
        ) AS next_date,
        cogs,
        units_sold       
    FROM inventory_calc)
, days_held as (
SELECT 
    store_id_from,
    product_id,
    full_date,
    daily_qty_start_of_day,
    daily_qty_end_of_day - daily_qty_start_of_day AS movement_qty,
    daily_qty_end_of_day,
    unit_cost * daily_qty_start_of_day  as daily_cost_amt_start_of_day,
    daily_cost_amt,
    unit_cost * daily_qty_end_of_day as daily_cost_amt_end_of_day,
    COALESCE(DATEDIFF(day, full_date, COALESCE(next_date, CAST(current_date AS DATE))), 0) AS days_held,
    cogs,
    units_sold,
    unit_cost
    FROM final_inventory
)
, tally AS (
  SELECT CAST(id AS int) as n
  FROM range(0, 720)
)
--, Add start of day, movements, pretty much all columns from days held cte
SELECT 
DATE_ADD(b.full_date, t.n) as full_date,
    b.store_id_from,
    b.product_id,
    COALESCE(LAG(daily_qty_end_of_day) OVER (
        PARTITION BY store_id_from, product_id
        ORDER BY full_date
    ), 0) AS daily_qty_start_of_day,
    CASE 
        WHEN t.n = 0 THEN b.movement_qty 
        ELSE 0 
    END AS movement_qty,
    b.daily_qty_end_of_day, -- Carry forward end of day too
    unit_cost * COALESCE(LAG(daily_qty_end_of_day) OVER (
        PARTITION BY store_id_from, product_id
        ORDER BY full_date
    ), 0) AS daily_cost_amt_start_of_day,
    CASE 
        WHEN t.n = 0 THEN b.daily_cost_amt 
        ELSE 0 
    END AS daily_cost_amt,
    b.daily_cost_amt_end_of_day,
    CASE WHEN t.n=0 then b.cogs ELSE 0 END as cost_of_goods_sold,
    CASE WHEN t.n=0 then b.units_sold ELSE 0 END as units_sold,
    b.unit_cost,
    b.days_held
FROM days_held b
JOIN tally t
  ON t.n < b.days_held



StatementMeta(, bde2154d-c9f0-4134-aca5-2a0b486cca9f, 9, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>