In [5]:
from os import path
import pandas as pd
import duckdb
from src.utils import store_df_as_hyper


base_path = "/Users/aprajita/Desktop/Aprajita_Obeta/obeta_project/data"
curation_path = path.join(base_path, "curation")
datamart_path = path.join(base_path, "data_mart_new")
common_aggs = [
    "product_group",
    "origin",
    "warehouse_section",
]

In [6]:
d_date = pd.read_parquet(path.join(curation_path, "d_date.parquet"))
pick_df = pd.read_parquet(path.join(curation_path, "f_order_picks.parquet"))
product_df = pd.read_parquet(path=path.join(curation_path,"d_product_details.parquet"))
pick_df = pick_df.merge(
    product_df,
    on="product_id",
    how="left"
)
error_df = pd.read_parquet(path=path.join(base_path, "curation", "f_pick_errors.parquet"))
d_warehouse_section = pd.read_parquet(path=path.join(base_path, "curation", "d_warehouse_section.parquet"))

In [7]:
error_df = error_df.merge(
    product_df,
    on="product_id",
    how="left"
)

In [10]:
import os
def process_sql(input_sql_script, datamart_name, prefix = ""):
    location = path.join(datamart_path, prefix)
    if not path.exists(location):
        os.mkdir(location)
    agg_df = duckdb.sql(input_sql_script).df()
    agg_df.to_parquet(
        path.join(location, datamart_name + ".parquet"),
    )
    store_df_as_hyper(
        df=agg_df,
        table_name=datamart_name,
        location=location,
    )
    return agg_df



# Total Pick Volume

In [13]:
sql_script = """
            WITH agg_cte AS (
                SELECT 
                    pick_date,
                    sum(pick_volume) AS pick_volume
                FROM
                    pick_df
                GROUP BY pick_df.pick_date
            )
            SELECT
                d_date.date,
                coalesce(agg_cte.pick_volume, 0) as pick_volume,
                d_date.week,
                d_date.month,
                d_date.quarter,
                d_date.year_half,
                d_date.year
            FROM
                agg_cte
            LEFT JOIN
                d_date
            ON
                d_date.date == agg_cte.pick_date
            ORDER BY d_date.date
"""
df = process_sql(
    sql_script,
    "total_pick_volume",
    prefix="total_pick_volume"
)
df.head()

Unnamed: 0,date,pick_volume,week,month,quarter,year_half,year
0,2011-06-23,868210.0,2011_25,2011_06,2011_Q2,2011_H1,2011
1,2011-06-24,756518.0,2011_25,2011_06,2011_Q2,2011_H1,2011
2,2011-06-26,60575.0,2011_25,2011_06,2011_Q2,2011_H1,2011
3,2011-06-27,788479.0,2011_26,2011_06,2011_Q2,2011_H1,2011
4,2011-06-28,788617.0,2011_26,2011_06,2011_Q2,2011_H1,2011


In [16]:
for agg_col in common_aggs:
    sql_script = f"""
                WITH agg_cte AS (
                    SELECT 
                        pick_date,
                        {agg_col},
                        sum(pick_volume) AS pick_volume
                    FROM
                        pick_df
                    GROUP BY 1, 2
                )
                SELECT
                    d_date.date,
                    {agg_col},
                    coalesce(agg_cte.pick_volume, 0) as pick_volume,
                    d_date.week,
                    d_date.month,
                    d_date.quarter,
                    d_date.year_half,
                    d_date.year
                FROM
                    agg_cte
                LEFT JOIN
                    d_date
                ON
                    d_date.date == agg_cte.pick_date
                ORDER BY d_date.date
    """
    process_sql(
        sql_script,
        f"total_pick_volume_per_{agg_col}",
        prefix="total_pick_volume"
    )

# Total Orders Processed

In [17]:
sql_script = """
            WITH agg_cte AS (
                SELECT 
                    pick_date,
                    COUNT(DISTINCT(sk_order_id)) AS order_volume
                FROM
                    pick_df
                GROUP BY pick_df.pick_date
            )
            SELECT
                d_date.date,
                coalesce(agg_cte.order_volume, 0) as order_volume,
                d_date.week,
                d_date.month,
                d_date.quarter,
                d_date.year_half,
                d_date.year
            FROM
                d_date
            LEFT JOIN
                agg_cte
            ON
                d_date.date == agg_cte.pick_date
            ORDER BY d_date.date

        """
df = process_sql(
    sql_script,
    "total_orders_processed",
    prefix="total_orders_processed"
)
df.head()

Unnamed: 0,date,order_volume,week,month,quarter,year_half,year
0,2011-06-01,0,2011_22,2011_06,2011_Q2,2011_H1,2011
1,2011-06-02,0,2011_22,2011_06,2011_Q2,2011_H1,2011
2,2011-06-03,0,2011_22,2011_06,2011_Q2,2011_H1,2011
3,2011-06-04,0,2011_22,2011_06,2011_Q2,2011_H1,2011
4,2011-06-05,0,2011_22,2011_06,2011_Q2,2011_H1,2011


In [18]:
for agg_col in common_aggs:
    sql_script = f"""
            WITH agg_cte AS (
                SELECT 
                    pick_date,
                    {agg_col},
                    COUNT(DISTINCT(sk_order_id)) AS order_volume
                FROM
                    pick_df
                GROUP BY pick_df.pick_date, {agg_col}
            )
            SELECT
                d_date.date,
                {agg_col},
                coalesce(agg_cte.order_volume, 0) as order_volume,
                d_date.week,
                d_date.month,
                d_date.quarter,
                d_date.year_half,
                d_date.year
            FROM
                d_date
            LEFT JOIN
                agg_cte
            ON
                d_date.date == agg_cte.pick_date
            ORDER BY d_date.date

        """
    process_sql(
        sql_script,
        f"total_orders_processed_{agg_col}",
        prefix="total_orders_processed"
    )

# Pick Errors

In [19]:
sql_script = f"""
    with total_errors_cte as (
        select 
            d_date.date,
            d_date.week,
            d_date.month,
            count(*) as total_errors
        from error_df as pe
        left join d_date
            on pe.pick_date = d_date.date
        group by 1, 2, 3
    ),
    total_picks_cte as (
        select 
            d_date.date,
            d_date.week,
            d_date.month,
            count(*) as total_picks
        from pick_df as op
        left join d_date
            on op.pick_date = d_date.date
        group by 1, 2, 3
    )
    select 
        total_errors_cte.*,
        total_picks_cte.total_picks 
    from total_errors_cte
    left join total_picks_cte
    on total_errors_cte.date = total_picks_cte.date
    and total_errors_cte.week = total_picks_cte.week
    and total_errors_cte.month = total_picks_cte.month;
    
    """
process_sql(
    sql_script,
    "pick_errors",
    prefix="pick_errors"
)

Unnamed: 0,date,week,month,total_errors,total_picks
0,2019-03-06,2019_09,2019_03,11,19651
1,2019-04-16,2019_15,2019_04,5,18895
2,2019-04-17,2019_15,2019_04,14,19522
3,2019-04-24,2019_16,2019_04,4,17598
4,2019-05-23,2019_20,2019_05,17,19191
...,...,...,...,...,...
2699,2020-03-18,2020_11,2020_03,13,24400
2700,2015-08-14,2015_32,2015_08,63,8759
2701,2017-12-19,2017_51,2017_12,3,18468
2702,2018-02-05,2018_06,2018_02,5,18256


In [20]:
for agg_col in common_aggs:
    sql_script = f"""
    with total_errors_cte as (
        select 
            d_date.date,
            d_date.week,
            d_date.month,
            pe.{agg_col},
            count(*) as total_errors
        from error_df as pe
        left join d_date
            on pe.pick_date = d_date.date
        group by 1, 2, 3, 4
    ),
    total_picks_cte as (
        select 
            d_date.date,
            d_date.week,
            d_date.month,
            op.{agg_col},
            count(*) as total_picks
        from pick_df as op
        left join d_date
            on op.pick_date = d_date.date
        group by 1, 2, 3, 4
    )
    select 
        total_errors_cte.*,
        total_picks_cte.total_picks 
    from total_errors_cte
    left join total_picks_cte
    on total_errors_cte.date = total_picks_cte.date
    and total_errors_cte.week = total_picks_cte.week
    and total_errors_cte.month = total_picks_cte.month
    and total_errors_cte.{agg_col} = total_picks_cte.{agg_col};
    
    """
    process_sql(
        sql_script,
        f"pick_errors_{agg_col}",
        prefix="pick_errors"
    )

# Top n Products - Weekly

In [21]:
n = 10
sql_script = f"""
    with total_picks_cte as (
        select 
            d_date.week,
            product_id,
            count(*) as total_picks
        from pick_df
        left join d_date
            on pick_df.pick_date = d_date.date
        group by 1, 2
    ),
    ranked_picks_cte as (
        select 
            total_picks_cte.*,
            row_number() over (partition by week order by total_picks desc) as rank
        from total_picks_cte
    )
    select 
        week,
        product_id,
        total_picks
    from ranked_picks_cte
    where rank <= {n}
    """
process_sql(
    sql_script,
    f"top_{n}_products_weekly",
    prefix=f"top_{n}_products_weekly"
)

Unnamed: 0,week,product_id,total_picks
0,2013_35,109910,398
1,2013_35,104131,240
2,2013_35,109956,163
3,2013_35,109905,151
4,2013_35,189919,140
...,...,...,...
4765,2015_08,104251,137
4766,2015_08,231906,122
4767,2015_08,189920,117
4768,2015_08,104270,115


In [22]:
n = 10
for agg_col in common_aggs:
        
    sql_script = f"""
        with total_picks_cte as (
            select 
                d_date.week,
                product_id,
                {agg_col},
                count(*) as total_picks
            from pick_df
            left join d_date
                on pick_df.pick_date = d_date.date
            group by 1, 2, 3
        ),
        ranked_picks_cte as (
            select 
                total_picks_cte.*,
                row_number() over (partition by week, {agg_col} order by total_picks desc) as rank
            from total_picks_cte
        )
        select 
            week,
            product_id,
            {agg_col},
            total_picks
        from ranked_picks_cte
        where rank <= {n}
        """
    process_sql(
        sql_script,
        f"top_{n}_products_weekly_per_{agg_col}",
        prefix=f"top_{n}_products_weekly"
    )

# Average Products Picked Per Order

In [23]:
sql_script = f"""
    with order_distribution_cte as (
        select 
            sk_order_id,
            min(pick_date) as order_date,
            count(distinct product_id) as unique_product_count
        from pick_df
        group by 1
    ),
    daily_distribution as (
        select 
            d_date.week,
            avg(unique_product_count) as avg_products_picked_per_order
        from order_distribution_cte
        left join d_date
            on order_distribution_cte.order_date = d_date.date
        group by 1
        order by 1 asc
    )
    select 
        *
    from daily_distribution
    """
process_sql(
    sql_script,
    f"avg_products_picked_per_order",
    prefix=f"avg_products_picked_per_order"
)

Unnamed: 0,week,avg_products_picked_per_order
0,2011_25,3.370189
1,2011_26,3.240904
2,2011_27,3.366953
3,2011_28,3.353689
4,2011_29,3.359104
...,...,...
472,2020_24,3.584272
473,2020_25,3.627221
474,2020_26,3.535945
475,2020_27,3.632066


# Order Mix: order_per_origin

In [24]:
sql_script = f"""
            WITH cte_48 AS (
                SELECT 
                    d_date.week,
                    COUNT(DISTINCT(sk_order_id)) AS order_volume
                FROM
                    pick_df
                left join d_date
                    on pick_df.pick_date = d_date.date
                where
                    pick_df.origin = '48'
                GROUP BY 1
            ),
            cte_46 AS (
                SELECT 
                    d_date.week,
                    COUNT(DISTINCT(sk_order_id)) AS order_volume
                FROM
                    pick_df
                left join d_date
                    on pick_df.pick_date = d_date.date
                where
                    pick_df.origin = '46'
                GROUP BY 1
            ),
            all_orders as ( 
                SELECT
                    COALESCE(cte_48.week, cte_46.week) as week,
                    COALESCE(cte_48.order_volume, 0) as order_volume_48,
                    COALESCE(cte_46.order_volume, 0) as order_volume_46,
                    cte_48.order_volume + cte_46.order_volume as total_order_volume,
                    round(cte_48.order_volume / (cte_48.order_volume + cte_46.order_volume), 4) * 100 as order_percentage_48,
                    round(cte_46.order_volume / (cte_48.order_volume + cte_46.order_volume), 4) * 100 as order_percentage_46
                FROM
                    cte_48
                FULL OUTER JOIN
                    cte_46
                ON
                    cte_48.week = cte_46.week
            )
            select
                all_orders.*,
                case when order_volume_48 > 0 then round(order_volume_46 / order_volume_48, 2) else 0 end as ratio_46_48,
                case when order_volume_46 > 0 then round(order_volume_48 / order_volume_46, 2) else 0 end as ratio_48_46
            from
                all_orders
            
        """
process_sql(
    sql_script,
    f"order_per_origin",
    prefix=f"order_per_origin"
)


Unnamed: 0,week,order_volume_48,order_volume_46,total_order_volume,order_percentage_48,order_percentage_46,ratio_46_48,ratio_48_46
0,2017_42,22946,1814,24760,92.67,7.33,0.08,12.65
1,2018_12,21603,1814,23417,92.25,7.75,0.08,11.91
2,2018_33,19891,1750,21641,91.91,8.09,0.09,11.37
3,2018_37,21873,1730,23603,92.67,7.33,0.08,12.64
4,2018_46,25742,1842,27584,93.32,6.68,0.07,13.98
...,...,...,...,...,...,...,...,...
472,2017_06,19654,2428,22082,89.00,11.00,0.12,8.09
473,2011_34,14342,2496,16838,85.18,14.82,0.17,5.75
474,2012_52,1840,897,2737,67.23,32.77,0.49,2.05
475,2013_36,15896,2459,18355,86.60,13.40,0.15,6.46


# Warehouse utilization per section

In [25]:
sql_script = f"""
            WITH section_agg AS (
                SELECT 
                    d_date.week,
                    warehouse_section,
                    sum(pick_volume) AS pick_volume
                FROM
                    pick_df
                LEFT JOIN
                    d_date
                on pick_df.pick_date = d_date.date
                GROUP BY 1, 2
            ),
            total_agg AS (
                SELECT 
                    d_date.week,
                    sum(pick_volume) AS pick_volume
                FROM
                    pick_df
                LEFT JOIN
                    d_date
                on pick_df.pick_date = d_date.date
                GROUP BY 1
            )
            select 
                total_agg.week,
                section_agg.warehouse_section,
                (round(
                    coalesce(section_agg.pick_volume, 0) / total_agg.pick_volume,
                    4 
                ) * 100) as section_utilization
            from total_agg
            left join section_agg
            on total_agg.week = section_agg.week
            order by 1, 2 desc
"""
process_sql(
    sql_script,
    f"warehouse_section_utilization",
    prefix="warehouse_section_utilization"
)
# duckdb.sql(sql_script).df()

Unnamed: 0,week,warehouse_section,section_utilization
0,2011_25,Manuell,8.12
1,2011_25,Kabellager,6.02
2,2011_25,HRL,31.37
3,2011_25,AKL,54.50
4,2011_26,Manuell,8.60
...,...,...,...
2158,2020_28,SHL,52.76
2159,2020_28,Manuell,3.64
2160,2020_28,Kabellager,30.45
2161,2020_28,HRL,11.10


# Pick Throughput

In [26]:
sql_script = f"""
            WITH hourly_agg_cte AS (
                SELECT 
                    pick_date,
                    hour(pick_df.pick_timestamp) as pick_hour,
                    sum(pick_volume) AS pick_volume
                FROM
                    pick_df
                GROUP BY 1, 2
            ), 
            weekly_avg as (
                select 
                    d_date.week,
                    round(avg(pick_volume), 2) as weekly_pick_throughput_avg
                from hourly_agg_cte
                left join d_date
                    on hourly_agg_cte.pick_date = d_date.date
                group by 1
            )
            select * from weekly_avg
"""
process_sql(
    sql_script,
    f"pick_throughput",
    prefix="pick_throughput"
)
duckdb.sql(sql_script).df()

Unnamed: 0,week,weekly_pick_throughput_avg
0,2017_26,45333.53
1,2018_32,41144.46
2,2018_24,50534.97
3,2019_08,46583.42
4,2018_45,53691.51
...,...,...
472,2013_31,26013.34
473,2012_38,31046.80
474,2012_06,27322.38
475,2016_30,41243.74


In [27]:
for agg_col in common_aggs:
    sql_script = f"""
                WITH hourly_agg_cte AS (
                    SELECT 
                        pick_date,
                        hour(pick_df.pick_timestamp) as pick_hour,
                        {agg_col},
                        sum(pick_volume) AS pick_volume
                    FROM
                        pick_df
                    GROUP BY 1, 2, 3
                ), 
                weekly_avg as (
                    select 
                        d_date.week,
                        {agg_col},
                        round(avg(pick_volume), 2) as weekly_pick_throughput_avg
                    from hourly_agg_cte
                    left join d_date
                        on hourly_agg_cte.pick_date = d_date.date
                    group by 1, 2
                )
                select * from weekly_avg
    """
    process_sql(
        sql_script,
        f"pick_throughput_{agg_col}",
        prefix="pick_throughput"
    )