# Data Extraction Module for Pricing & Offers System


## Imports


In [1]:
import os
import numpy as np
import pandas as pd
import snowflake.connector
import setup_environment_2
import slack
import pytz
from common_functions import upload_dataframe_to_snowflake,send_text_slack

# Cairo timezone for consistent timestamps
CAIRO_TZ = pytz.timezone('Africa/Cairo')


  warn_incompatible_dep(


## Variables


In [2]:
# Region
REGION = "Egypt"

# Snowflake Warehouse
WAREHOUSE = "COMPUTE_WH"

# Date Variables
from datetime import datetime, timedelta
TODAY = datetime.now(CAIRO_TZ).date()
YESTERDAY = TODAY - timedelta(days=1)


## Initialize Environment


In [3]:
setup_environment_2.initialize_env()


/home/ec2-user/.Renviron
/home/ec2-user/service_account_key.json


## Warehouse & Cohort Mapping


In [4]:
# Warehouse Mapping: (region, warehouse_name, warehouse_id, cohort_id)
WAREHOUSE_MAPPING = [
    ('Cairo', 'Mostorod', 1, 700),
    ('Giza', 'Barageel', 236, 701),
    ('Giza', 'Sakkarah', 962, 701),
    ('Delta West', 'El-Mahala', 337, 703),
    ('Delta West', 'Tanta', 8, 703),
    ('Delta East', 'Mansoura FC', 339, 704),
    ('Delta East', 'Sharqya', 170, 704),
    ('Upper Egypt', 'Assiut FC', 501, 1124),
    ('Upper Egypt', 'Bani sweif', 401, 1126),
    ('Upper Egypt', 'Menya Samalot', 703, 1123),
    ('Upper Egypt', 'Sohag', 632, 1125),
    ('Alexandria', 'Khorshed Alex', 797, 702),
]



# All Cohort IDs
COHORT_IDS = [700, 701, 702, 703, 704, 1123, 1124, 1125, 1126]


## Snowflake Query Function


In [5]:
def query_snowflake(query):
    """Execute a query on Snowflake and return results as DataFrame."""
    con = snowflake.connector.connect(
        user=os.environ["SNOWFLAKE_USERNAME"],
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        database=os.environ["SNOWFLAKE_DATABASE"]
    )
    try:
        cur = con.cursor()
        cur.execute("USE WAREHOUSE COMPUTE_WH")
        cur.execute(query)
        data = cur.fetchall()
        columns = [desc[0].lower() for desc in cur.description]  # Get column names from cursor
        return pd.DataFrame(data, columns=columns)
    except Exception as e:
        print(f"Snowflake Error: {e}")
        return pd.DataFrame()
    finally:
        cur.close()
        con.close()


In [6]:
def get_snowflake_timezone():
    """Get the current timezone from Snowflake."""
    query = "SHOW PARAMETERS LIKE 'TIMEZONE'"
    result = query_snowflake(query)
    return result.value[0] if len(result) > 0 else "UTC"


## Helper Functions


In [7]:
def get_warehouse_df():
    """Get warehouse mapping as DataFrame."""
    return pd.DataFrame(
        WAREHOUSE_MAPPING,
        columns=['region', 'warehouse', 'warehouse_id', 'cohort_id'])
    


def convert_to_numeric(df):
    """Convert DataFrame columns to numeric where possible."""
    df.columns = df.columns.str.lower()
    for col in df.columns:
        df[col] = pd.to_numeric(df[col], errors='ignore')
    return df


## Get Snowflake Timezone


In [8]:
TIMEZONE = get_snowflake_timezone()
print(f"Snowflake timezone: {TIMEZONE}")


Snowflake timezone: America/Los_Angeles


In [9]:
# =============================================================================
# 8. ALL-TIME HIGH MARGIN QUERY (P80 margin weighted by gross profit)
# This calculates the top 80% margin based on days with best gross profit
# Gross Profit = NMV × margin, so high-margin + high-sales days rank higher
# =============================================================================
ALL_TIME_HIGH_MARGIN_QUERY = f'''
WITH daily_margin_data AS (
    SELECT 
        pso.warehouse_id,
        pso.product_id,
        so.created_at::DATE AS sale_date,
        SUM(pso.total_price) AS daily_nmv,
        SUM(COALESCE(f.wac_p, 0) * pso.purchased_item_count * pso.basic_unit_count) AS daily_cogs,
        CASE 
            WHEN SUM(pso.total_price) > 0 
            THEN (SUM(pso.total_price) - SUM(COALESCE(f.wac_p, 0) * pso.purchased_item_count * pso.basic_unit_count)) / SUM(pso.total_price)
            ELSE 0 
        END AS daily_margin
    FROM product_sales_order pso
    JOIN sales_orders so ON so.id = pso.sales_order_id
    JOIN finance.all_cogs f ON f.product_id = pso.product_id
        AND f.from_date::DATE <= so.created_at::DATE 
        AND f.to_date::DATE > so.created_at::DATE
    WHERE so.created_at::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 240
        AND so.created_at::DATE < CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
    GROUP BY pso.warehouse_id, pso.product_id, so.created_at::DATE
),

-- Calculate gross profit and rank days by it
ranked_by_gross_profit AS (
    SELECT 
        warehouse_id,
        product_id,
        sale_date,
        daily_nmv,
        daily_margin,
        daily_nmv * daily_margin AS gross_profit,
        -- Rank by gross profit (1 = highest gross profit day)
        ROW_NUMBER() OVER (
            PARTITION BY warehouse_id, product_id 
            ORDER BY daily_nmv * daily_margin DESC
        ) AS gp_rank,
        COUNT(*) OVER (PARTITION BY warehouse_id, product_id) AS total_days
    FROM daily_margin_data
    WHERE daily_nmv > 0 AND daily_margin > 0
)

-- Take P80 of margins from TOP-ranked days by gross profit
-- P80 = take the 80th percentile of margins from the best-performing days
SELECT 
    warehouse_id,
    product_id,
    -- P80 margin from days ranked by gross profit
    PERCENTILE_CONT(0.8) WITHIN GROUP (ORDER BY daily_margin) AS all_time_high_margin,
    -- Also provide some context stats
    MAX(daily_margin) AS max_daily_margin,
    AVG(daily_margin) AS avg_daily_margin,
    COUNT(*) AS days_with_profit
FROM ranked_by_gross_profit
-- Include only top 80% of days by gross profit rank
WHERE gp_rank <= GREATEST(1, CEIL(total_days * 0.8))
GROUP BY warehouse_id, product_id
'''

print("All-time high margin query defined (P80 margin weighted by gross profit)")


All-time high margin query defined (P80 margin weighted by gross profit)


## Market Prices Extraction Queries
Queries for external market price data:
1. **Ben Soliman Prices** - Competitor reference prices
2. **Marketplace Prices** - Min, Max, Mod prices from marketplace
3. **Scrapped Data** - Competitor prices from scraping


In [10]:
## Additional Data Queries (Sales, Groups, WAC)


In [11]:
# =============================================================================
# 4. PRODUCT BASE DATA QUERY (product_id, sku, brand, cat, wac1, wac_p, current_price)
# =============================================================================
PRODUCT_BASE_QUERY = f'''
WITH skus_prices AS (
    WITH local_prices AS (
        SELECT  
            CASE 
                WHEN cpu.cohort_id IN (700, 695) THEN 'Cairo'
                WHEN cpu.cohort_id IN (701) THEN 'Giza'
                WHEN cpu.cohort_id IN (704, 698) THEN 'Delta East'
                WHEN cpu.cohort_id IN (703, 697) THEN 'Delta West'
                WHEN cpu.cohort_id IN (696, 1123, 1124, 1125, 1126) THEN 'Upper Egypt'
                WHEN cpu.cohort_id IN (702, 699) THEN 'Alexandria'
            END AS region,
            cohort_id,
            pu.product_id,
            pu.packing_unit_id,
            pu.basic_unit_count,
            AVG(cpu.price) AS price
        FROM cohort_product_packing_units cpu
        JOIN PACKING_UNIT_PRODUCTS pu ON pu.id = cpu.product_packing_unit_id
        WHERE cpu.cohort_id IN (700,701,702,703,704,695,696,697,698,699,1123,1124,1125,1126)
            AND cpu.created_at::date <> '2023-07-31'
            AND cpu.is_customized = TRUE
        GROUP BY ALL
    ),
    
    live_prices AS (
        SELECT 
            region, cohort_id, product_id, 
            pu_id AS packing_unit_id, 
            buc AS basic_unit_count, 
            NEW_PRICE AS price
        FROM materialized_views.DBDP_PRICES
        WHERE created_at = CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date
            AND DATE_PART('hour', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::time) 
                BETWEEN SPLIT_PART(time_slot, '-', 1)::int AND (SPLIT_PART(time_slot, '-', 1)::int) + 1
            AND cohort_id IN (700,701,702,703,704,695,696,697,698,699,1123,1124,1125,1126)
    ),
    
    prices AS (
        SELECT *
        FROM (
            SELECT *, 1 AS priority FROM live_prices
            UNION ALL
            SELECT *, 2 AS priority FROM local_prices
        )
        QUALIFY ROW_NUMBER() OVER (PARTITION BY region, cohort_id, product_id, packing_unit_id ORDER BY priority) = 1
    )
    
    SELECT region, cohort_id, product_id, price
    FROM prices
    WHERE basic_unit_count = 1
        AND ((product_id = 1309 AND packing_unit_id = 2) OR (product_id <> 1309))
)

SELECT distinct
    region, cohort_id, p.product_id,
    CONCAT(products.name_ar, ' ', products.size, ' ', product_units.name_ar) AS sku,
    b.name_ar AS brand,
    cat.name_ar AS cat,
    wac1, wac_p, p.price as current_price
FROM skus_prices p
JOIN finance.all_cogs c ON c.product_id = p.product_id 
    AND CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP()) BETWEEN c.from_date AND c.to_date
JOIN products ON products.id = p.product_id
JOIN categories cat ON cat.id = products.category_id
JOIN brands b ON b.id = products.brand_id
JOIN product_units ON product_units.id = products.unit_id
WHERE wac1 > 0 AND wac_p > 0
GROUP BY ALL
'''

# =============================================================================
# 5. SALES DATA QUERY (120-day NMV by cohort/product)
# =============================================================================
SALES_QUERY = f'''
SELECT DISTINCT cpc.cohort_id, pso.product_id,
    CONCAT(products.name_ar,' ',products.size,' ',product_units.name_ar) as sku,
    brands.name_ar as brand, categories.name_ar as cat,
    sum(pso.total_price) as nmv
FROM product_sales_order pso
JOIN sales_orders so ON so.id = pso.sales_order_id
JOIN COHORT_PRICING_CHANGES cpc ON cpc.id = pso.COHORT_PRICING_CHANGE_id
JOIN products ON products.id = pso.product_id
JOIN brands ON products.brand_id = brands.id 
JOIN categories ON products.category_id = categories.id
JOIN product_units ON product_units.id = products.unit_id 
WHERE so.created_at::date BETWEEN CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date - 120 
    AND CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date - 1 
    AND so.sales_order_status_id NOT IN (7, 12)
    AND so.channel IN ('telesales', 'retailer')
    AND pso.purchased_item_count <> 0
    AND cpc.cohort_id IN (700,701,702,703,704,1123,1124,1125,1126)
GROUP BY ALL
'''

# =============================================================================
# 6. MARGIN STATS QUERY (STD and average margins)  
# =============================================================================
MARGIN_STATS_QUERY = f'''
select product_id, cohort_id, 
    (0.6*product_std) + (0.3*brand_std) + (0.1*cat_std) as std, 
    avg_margin
from (
    select product_id, cohort_id, 
        stddev(product_margin) as product_std, 
        stddev(brand_margin) as brand_std,
        stddev(cat_margin) as cat_std, 
        avg(product_margin) as avg_margin
    from (
        select distinct product_id, order_date, cohort_id,
            (nmv-cogs_p)/nmv as product_margin, 
            (brand_nmv-brand_cogs)/brand_nmv as brand_margin,
            (cat_nmv-cat_cogs)/cat_nmv as cat_margin
        from (
            SELECT DISTINCT so.created_at::date as order_date, cpc.cohort_id, pso.product_id,
                brands.name_ar as brand, categories.name_ar as cat,
                sum(COALESCE(f.wac_p,0) * pso.purchased_item_count * pso.basic_unit_count) as cogs_p,
                sum(pso.total_price) as nmv,
                sum(nmv) over(partition by order_date, cat, brand) as brand_nmv,
                sum(cogs_p) over(partition by order_date, cat, brand) as brand_cogs,
                sum(nmv) over(partition by order_date, cat) as cat_nmv,
                sum(cogs_p) over(partition by order_date, cat) as cat_cogs
            FROM product_sales_order pso
            JOIN sales_orders so ON so.id = pso.sales_order_id   
            JOIN COHORT_PRICING_CHANGES cpc on cpc.id = pso.cohort_pricing_change_id
            JOIN products on products.id = pso.product_id
            JOIN brands on products.brand_id = brands.id 
            JOIN categories ON products.category_id = categories.id
            JOIN finance.all_cogs f ON f.product_id = pso.product_id
                AND f.from_date::date <= so.created_at::date AND f.to_date::date > so.created_at::date
            WHERE so.created_at::date between 
                date_trunc('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date - 120) 
                and CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date
                AND so.sales_order_status_id not in (7,12)
                AND so.channel IN ('telesales','retailer')
                AND pso.purchased_item_count <> 0
            GROUP BY ALL
        )
    ) group by all 
)
'''

# =============================================================================
# 7. TARGET MARGINS QUERY
# =============================================================================
TARGET_MARGINS_QUERY = f'''
WITH cat_brand_target as (
    SELECT DISTINCT cat, brand, margin as target_bm
    FROM performance.commercial_targets cplan
    QUALIFY CASE 
        WHEN DATE_TRUNC('month', MAX(DATE) OVER()) = DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date) 
        THEN DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date)
        ELSE DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date - INTERVAL '1 month') 
    END = DATE_TRUNC('month', date)
),
cat_target as (
    select cat, sum(target_bm * (target_nmv/cat_total)) as cat_target_margin
    from (
        select *, sum(target_nmv) over(partition by cat) as cat_total
        from (
            select cat, brand, avg(target_bm) as target_bm, sum(target_nmv) as target_nmv
            from (
                SELECT DISTINCT date, city as region, cat, brand, margin as target_bm, nmv as target_nmv
                FROM performance.commercial_targets cplan
                QUALIFY CASE 
                    WHEN DATE_TRUNC('month', MAX(DATE) OVER()) = DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date) 
                    THEN DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date)
                    ELSE DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::date - INTERVAL '1 month') 
                END = DATE_TRUNC('month', date)
            ) group by all
        )
    ) group by all 
)
SELECT DISTINCT cbt.cat, cbt.brand, cbt.target_bm, ct.cat_target_margin
FROM cat_brand_target cbt
LEFT JOIN cat_target ct ON ct.cat = cbt.cat
'''


In [None]:
## Execute All Queries


In [12]:
# =============================================================================
# Execute all queries
# =============================================================================
print("Loading data from Snowflake...")

# NOTE: Ben Soliman, Marketplace, and Scrapped prices are now fetched via
# market_data_module.ipynb get_market_data() function - no need to load here

# 1. Product Base Data (product_id, sku, brand, cat, wac1, wac_p, current_price)
print("  1. Loading product base data...")
df_product_base = query_snowflake(PRODUCT_BASE_QUERY)
df_product_base = convert_to_numeric(df_product_base)
print(f"     Loaded {len(df_product_base)} product base records")

# 2. Sales Data
print("  2. Loading sales data...")
df_sales = query_snowflake(SALES_QUERY)
df_sales = convert_to_numeric(df_sales)
print(f"     Loaded {len(df_sales)} sales records")

# 3. Margin Stats
print("  3. Loading margin stats...")
df_margin_stats = query_snowflake(MARGIN_STATS_QUERY)
df_margin_stats = convert_to_numeric(df_margin_stats)
print(f"     Loaded {len(df_margin_stats)} margin stat records")

# 4. Target Margins
print("  4. Loading target margins...")
df_targets = query_snowflake(TARGET_MARGINS_QUERY)
df_targets = convert_to_numeric(df_targets)
print(f"     Loaded {len(df_targets)} target margin records")

# 5. Product Groups (from PostgreSQL)
print("  5. Loading product groups...")
df_groups = query_snowflake(
    '''SELECT * FROM materialized_views.sku_commercial_groups'''
)
df_groups.columns = df_groups.columns.str.lower()
df_groups = convert_to_numeric(df_groups)
print(f"     Loaded {len(df_groups)} group records")

# 6. All-Time High Margin (P80 margin weighted by gross profit)
print("  6. Loading all-time high margin data...")
df_all_time_high_margin = query_snowflake(ALL_TIME_HIGH_MARGIN_QUERY)
df_all_time_high_margin = convert_to_numeric(df_all_time_high_margin)
print(f"     Loaded {len(df_all_time_high_margin)} all-time high margin records")

print("\nBase queries completed!")
print("NOTE: Market data (Ben Soliman, Marketplace, Scrapped) will be fetched via market_data_module")
print(f"\n{'='*60}")
print("df_product_base DataFrame available with columns:")
print("  - region, cohort_id, product_id, sku, brand, cat, wac1, wac_p, current_price")
print(f"{'='*60}")


Loading data from Snowflake...
  1. Loading product base data...


  df[col] = pd.to_numeric(df[col], errors='ignore')


     Loaded 102592 product base records
  2. Loading sales data...


  df[col] = pd.to_numeric(df[col], errors='ignore')


     Loaded 20916 sales records
  3. Loading margin stats...


  df[col] = pd.to_numeric(df[col], errors='ignore')


     Loaded 29000 margin stat records
  4. Loading target margins...


  df[col] = pd.to_numeric(df[col], errors='ignore')


     Loaded 469 target margin records
  5. Loading product groups...


  df[col] = pd.to_numeric(df[col], errors='ignore')


     Loaded 1580 group records
  6. Loading all-time high margin data...
     Loaded 36788 all-time high margin records

Base queries completed!
NOTE: Market data (Ben Soliman, Marketplace, Scrapped) will be fetched via market_data_module

df_product_base DataFrame available with columns:
  - region, cohort_id, product_id, sku, brand, cat, wac1, wac_p, current_price


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [13]:
# =============================================================================
# PART A: Get market_data from market_data_module
# =============================================================================
# Instead of duplicating the market data processing logic here, 
# we use the centralized market_data_module which handles:
# - Ben Soliman prices
# - Marketplace prices  
# - Scrapped prices
# - Group-level price aggregation
# - Price coverage filtering
# - Price percentile calculation
# - Margin tier conversion
# =============================================================================

print("Loading market_data from market_data_module...")
print("(This fetches Ben Soliman, Marketplace, and Scrapped prices from Snowflake)")
print()

# Run market_data_module to get access to get_market_data() function
%run modules/market_data_module.ipynb

# Get fresh market data using the module (no input required)
market_data = get_market_data()

# The market_data now contains:
# - cohort_id, product_id, region
# - Raw prices: ben_soliman_price, final_min_price, final_max_price, etc.
# - Percentiles: minimum, percentile_25, percentile_50, percentile_75, maximum
# - Margins: below_market, market_min, market_25, market_50, market_75, market_max, above_market

print(f"\n{'='*60}")
print(f"MARKET DATA LOADED FROM MODULE")
print(f"{'='*60}")
print(f"Total records: {len(market_data)}")
print(f"  - With marketplace prices: {(~market_data['final_min_price'].isna()).sum()}")
print(f"  - With scrapped prices: {(~market_data['min_scrapped'].isna()).sum()}")
print(f"  - With Ben Soliman prices: {(~market_data['ben_soliman_price'].isna()).sum()}")


Loading market_data from market_data_module...
(This fetches Ben Soliman, Marketplace, and Scrapped prices from Snowflake)

/home/ec2-user/.Renviron
/home/ec2-user/service_account_key.json
Market Data Module loaded at 2026-02-10 14:17:09 Cairo time
Snowflake timezone: America/Los_Angeles
All queries defined ✓
Helper functions defined ✓
get_market_data() function defined ✓
get_margin_tiers() function defined ✓

MARKET DATA MODULE READY

Available functions (NO INPUT REQUIRED):
  - get_market_data()   : Fetch and process all market prices
  - get_margin_tiers()  : Fetch and calculate margin tiers

Usage:
  %run market_data_module.ipynb
  df_market = get_market_data()
  df_tiers = get_margin_tiers()

FETCHING MARKET DATA
Timestamp: 2026-02-10 14:17:09 Cairo time

Step 1: Fetching raw price data...
  1.1 Ben Soliman prices...
      Loaded 1542 records
  1.2 Marketplace prices...
      Loaded 10742 records
  1.3 Scrapped prices...
      Loaded 3085 records
  1.4 Product groups...
      Load

  .apply(lambda g: pd.Series({


    Records after group processing: 24383

Step 5: Adding WAC and margin data...
    Records with WAC: 23987

Step 6: Filtering by price coverage...
    Records after price coverage filter: 12075

Step 7: Calculating price percentiles...
    Records after price analysis: 11556

Step 8: Converting prices to margins...

MARKET DATA COMPLETE
Total records: 11556
  - With marketplace prices: 11361
  - With scrapped prices: 3774
  - With Ben Soliman prices: 8010

MARKET DATA LOADED FROM MODULE
Total records: 11556
  - With marketplace prices: 11361
  - With scrapped prices: 3774
  - With Ben Soliman prices: 8010


## PART A: Build Main pricing_data DataFrame
Start with df_product_base (all our SKUs) and LEFT JOIN the processed market_data


In [14]:
# =============================================================================
# PART B: Build Main pricing_data DataFrame from df_product_base
# =============================================================================
print("Building main pricing_data DataFrame...")

# =============================================================================
# Step 1: Start with df_product_base as the MAIN dataframe (all our SKUs)
# =============================================================================
print("  Step 1: Starting with product base (all SKUs)...")
pricing_data = df_product_base.copy()
print(f"     Product base: {len(pricing_data)} records")

# =============================================================================
# Step 2: Add warehouse mapping (warehouse_id and warehouse name)
# =============================================================================
print("  Step 2: Adding warehouse mapping...")
warehouse_df = get_warehouse_df()
pricing_data = pricing_data.merge(
    warehouse_df[['cohort_id', 'warehouse_id', 'warehouse']], 
    on='cohort_id'
)
print(f"     After warehouse mapping: {len(pricing_data)} records")

# =============================================================================
# Step 3: LEFT JOIN processed market_data
# =============================================================================
print("  Step 3: Left joining processed market data...")
pricing_data = pricing_data.merge(
    market_data, 
    on=['cohort_id', 'product_id','region'], 
    how='left'
)
print(f"     After market data join: {len(pricing_data)} records")

# =============================================================================
# Step 4: LEFT JOIN supporting data (sales, margins, targets, groups)
# =============================================================================
print("  Step 4: Left joining supporting data...")

# Merge sales data (nmv only)
pricing_data = pricing_data.merge(
    df_sales[['cohort_id', 'product_id', 'nmv']], 
    on=['cohort_id', 'product_id'], 
    how='left'
)
pricing_data['nmv'] = pricing_data['nmv'].fillna(0)

# Merge margin statistics (by cohort_id + product_id)
pricing_data = pricing_data.merge(df_margin_stats, on=['cohort_id', 'product_id'], how='left')

# Merge target margins (by brand + cat)
pricing_data = pricing_data.merge(df_targets, on=['brand', 'cat'], how='left')
pricing_data['target_margin'] = pricing_data['target_bm'].fillna(pricing_data['cat_target_margin']).fillna(0)
pricing_data = pricing_data.drop(columns=['target_bm', 'cat_target_margin'], errors='ignore')

# Fill NaN values with defaults
pricing_data['std'] = pricing_data['std'].fillna(0.01)
pricing_data['avg_margin'] = pricing_data['avg_margin'].fillna(0)

# Merge product groups
pricing_data = pricing_data.merge(df_groups, on='product_id', how='left')

# =============================================================================
# Step 5: Calculate current margin
# =============================================================================
pricing_data['current_margin'] = (pricing_data['current_price'] - pricing_data['wac_p']) / pricing_data['current_price']

# Remove duplicates
pricing_data = pricing_data.drop_duplicates(subset=['cohort_id', 'product_id','warehouse_id'])

# =============================================================================
# Reorder columns
# =============================================================================
final_columns = [
    # Product Base Info
    'cohort_id', 'product_id', 'region', 'warehouse_id', 'warehouse', 'sku', 'brand', 'cat',
    # Cost & Price
    'wac1', 'wac_p', 'current_price', 'current_margin',
    # Sales
    'nmv',
    # Market Prices (raw)
    'ben_soliman_price', 
    'final_min_price', 'final_max_price', 'final_mod_price', 'final_true_min', 'final_true_max',
    'min_scrapped', 'scrapped25', 'scrapped50', 'scrapped75', 'max_scrapped',
    # Price Percentiles
    'minimum', 'percentile_25', 'percentile_50', 'percentile_75', 'maximum',
    # Margin Tiers
    'below_market', 'market_min', 'market_25', 'market_50', 'market_75', 'market_max', 'above_market',
    # Supporting Data
    'std', 'avg_margin', 'target_margin', 'group'
]
pricing_data = pricing_data[[c for c in final_columns if c in pricing_data.columns]]

print(f"\n{'='*60}")
print(f"PRICING DATA COMPLETE")
print(f"{'='*60}")
print(f"Total records: {len(pricing_data)}")
print(f"\nRecords with market data: {len(pricing_data[~pricing_data['minimum'].isna()])}")
print(f"Records without market data: {len(pricing_data[pricing_data['minimum'].isna()])}")
print(f"\nRecords with sales (nmv > 0): {len(pricing_data[pricing_data['nmv'] > 0])}")
print(f"Records without sales (nmv = 0): {len(pricing_data[pricing_data['nmv'] == 0])}")
print(f"\nSample data:")
pricing_data.head()


Building main pricing_data DataFrame...
  Step 1: Starting with product base (all SKUs)...
     Product base: 102592 records
  Step 2: Adding warehouse mapping...
     After warehouse mapping: 86720 records
  Step 3: Left joining processed market data...
     After market data join: 86720 records
  Step 4: Left joining supporting data...

PRICING DATA COMPLETE
Total records: 86712

Records with market data: 14901
Records without market data: 71811

Records with sales (nmv > 0): 29636
Records without sales (nmv = 0): 57076

Sample data:


Unnamed: 0,cohort_id,product_id,region,warehouse_id,warehouse,sku,brand,cat,wac1,wac_p,...,below_market,market_min,market_25,market_50,market_75,market_max,above_market,std,avg_margin,target_margin
0,703,10124,Delta West,337,El-Mahala,مسحوق ليدر عادى لافندر - 2 كجم,ليدر,منظفات,270.736154,265.785817,...,,,,,,,,0.031044,0.065331,0.082573
1,703,10124,Delta West,8,Tanta,مسحوق ليدر عادى لافندر - 2 كجم,ليدر,منظفات,270.736154,265.785817,...,,,,,,,,0.031044,0.065331,0.082573
2,1126,10124,Upper Egypt,401,Bani sweif,مسحوق ليدر عادى لافندر - 2 كجم,ليدر,منظفات,270.736154,265.785817,...,,,,,,,,0.034143,0.078555,0.082573
3,701,8915,Giza,236,Barageel,بيبسى - 1.47 لتر,بيبسي,حاجه ساقعه,179.632398,177.297894,...,0.020454,0.036424,0.049341,0.066853,0.126611,0.139331,0.151685,0.021007,0.048939,0.05
4,701,8915,Giza,962,Sakkarah,بيبسى - 1.47 لتر,بيبسي,حاجه ساقعه,179.632398,177.297894,...,0.020454,0.036424,0.049341,0.066853,0.126611,0.139331,0.151685,0.021007,0.048939,0.05


## Discount Analysis - Price & Margin After Discount


In [16]:
# =============================================================================
# Discount Query - Get discount percentage by warehouse and product
# =============================================================================
DISCOUNT_QUERY = f'''
SELECT warehouse_id, product_id, total_discount/total_nmv AS discount_perc
FROM (
    SELECT  
        pso.warehouse_id,
        pso.product_id,
        SUM(pso.total_price) AS total_nmv,
        SUM((ITEM_QUANTITY_DISCOUNT_VALUE * pso.purchased_item_count) + 
            (ITEM_DISCOUNT_VALUE * pso.purchased_item_count)) AS total_discount
    FROM product_sales_order pso 
    JOIN sales_orders so ON so.id = pso.sales_order_id
    WHERE so.created_at::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 1 
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
    GROUP BY ALL
)
WHERE total_nmv > 0
'''

# Execute discount query
print("Loading discount data...")
df_discount = query_snowflake(DISCOUNT_QUERY)
df_discount = convert_to_numeric(df_discount)
print(f"Loaded {len(df_discount)} discount records")


Loading discount data...
Loaded 12431 discount records


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [17]:
# =============================================================================
# Create pricing_with_discount DataFrame
# =============================================================================
print("Creating pricing_with_discount DataFrame...")

# Copy pricing_data
pricing_with_discount = pricing_data.copy()

# Merge discount data (by warehouse_id + product_id)
pricing_with_discount = pricing_with_discount.merge(
    df_discount[['warehouse_id', 'product_id', 'discount_perc']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing discount_perc with 0 (no discount)
pricing_with_discount['discount_perc'] = pricing_with_discount['discount_perc'].fillna(0)

# Merge all-time high margin data (P80 margin weighted by gross profit)
pricing_with_discount = pricing_with_discount.merge(
    df_all_time_high_margin[['warehouse_id', 'product_id', 'all_time_high_margin']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing all_time_high_margin with target_margin (fallback)
pricing_with_discount['all_time_high_margin'] = pricing_with_discount['all_time_high_margin'].fillna(
    pricing_with_discount['target_margin']
)

# =============================================================================
# Calculate price and margin after discount
# =============================================================================
# Price after discount = current_price * (1 - discount_perc)
pricing_with_discount['price_after_discount'] = (
    pricing_with_discount['current_price'] * (1 - pricing_with_discount['discount_perc'])
)

# Margin after discount = (price_after_discount - wac_p) / price_after_discount
pricing_with_discount['margin_after_discount'] = (
    (pricing_with_discount['price_after_discount'] - pricing_with_discount['wac_p']) / 
    pricing_with_discount['price_after_discount']
)

print(f"\n{'='*60}")
print(f"PRICING WITH DISCOUNT DATA COMPLETE")
print(f"{'='*60}")
print(f"Total records: {len(pricing_with_discount)}")
print(f"Records with discount (discount_perc > 0): {len(pricing_with_discount[pricing_with_discount['discount_perc'] > 0])}")
print(f"Records without discount: {len(pricing_with_discount[pricing_with_discount['discount_perc'] == 0])}")
print(f"\nNew columns added:")
print("  - discount_perc: discount percentage from sales")
print("  - price_after_discount: current_price * (1 - discount_perc)")
print("  - margin_after_discount: (price_after_discount - wac_p) / price_after_discount")
print(f"\nSample data with discounts:")
pricing_with_discount[pricing_with_discount['discount_perc'] > 0][
    ['product_id', 'warehouse_id', 'current_price', 'current_margin', 
     'discount_perc', 'price_after_discount', 'margin_after_discount']
].head(10)


Creating pricing_with_discount DataFrame...

PRICING WITH DISCOUNT DATA COMPLETE
Total records: 86712
Records with discount (discount_perc > 0): 3842
Records without discount: 82870

New columns added:
  - discount_perc: discount percentage from sales
  - price_after_discount: current_price * (1 - discount_perc)
  - margin_after_discount: (price_after_discount - wac_p) / price_after_discount

Sample data with discounts:


Unnamed: 0,product_id,warehouse_id,current_price,current_margin,discount_perc,price_after_discount,margin_after_discount
3,8915,236,186.5,0.049341,0.006524,185.283335,0.043099
8,3456,8,132.25,0.034296,0.003819,131.744923,0.030593
11,12902,1,94.5,0.043549,0.000663,94.437304,0.042914
13,2327,337,211.0,0.113751,0.002666,210.437409,0.111381
19,9186,1,340.5,0.046656,0.004985,338.802564,0.04188
20,11772,501,153.0,0.115429,0.009608,151.529991,0.106848
27,24567,501,92.25,0.068818,0.004458,91.838769,0.064649
33,2776,236,211.0,0.055903,0.004375,210.076833,0.051754
34,2776,962,211.0,0.055903,0.00876,209.151609,0.047559
40,11772,797,137.5,0.015714,0.0061,136.66125,0.009673


In [18]:
# =============================================================================
# Price Position - Determine where price_after_discount falls in market tiers
# =============================================================================

def get_price_position(row):
    """Determine the price position relative to market price tiers."""
    price = row['price_after_discount']
    wac = row['wac_p']
    
    # Check if we have market data (minimum price exists)
    if pd.isna(row['minimum']) or pd.isna(price):
        return "No Market Data"
    
    # Get price tiers
    min_price = row['minimum']
    p25 = row['percentile_25']
    p50 = row['percentile_50']
    p75 = row['percentile_75']
    max_price = row['maximum']
    
    # Calculate below_market and above_market prices from margins
    # margin = (price - wac) / price  =>  price = wac / (1 - margin)
    below_market_margin = row['below_market']
    above_market_margin = row['above_market']
    
    below_market_price = wac / (1 - below_market_margin) if below_market_margin < 1 else min_price
    above_market_price = wac / (1 - above_market_margin) if above_market_margin < 1 else max_price
    
    # Determine position based on price tiers
    if price < below_market_price:
        return "Below Market"
    elif price < min_price:
        return "Below Min"
    elif price < p25:
        return "At Min"
    elif price < p50:
        return "At 25th"
    elif price < p75:
        return "At 50th"
    elif price < max_price:
        return "At 75th"
    elif price < above_market_price:
        return "At Max"
    else:
        return "Above Market"

# Apply price position function
pricing_with_discount['price_position'] = pricing_with_discount.apply(get_price_position, axis=1)

# Summary of price positions
print(f"\n{'='*60}")
print(f"PRICE POSITION ANALYSIS")
print(f"{'='*60}")
print("\nPrice Position Distribution:")
print(pricing_with_discount['price_position'].value_counts().to_string())
print(f"\nPrice Position Percentages:")
print((pricing_with_discount['price_position'].value_counts(normalize=True) * 100).round(2).astype(str) + '%')

# Sample data showing price position
print(f"\nSample data with price position:")
pricing_with_discount[
    ['product_id', 'warehouse_id', 'sku', 'current_price', 'discount_perc', 
     'price_after_discount', 'minimum', 'maximum', 'price_position']
].head(15)



PRICE POSITION ANALYSIS

Price Position Distribution:
price_position
No Market Data    71811
Below Market       5008
At Min             2567
At 25th            1936
At 50th            1755
Above Market       1378
Below Min           931
At 75th             867
At Max              459

Price Position Percentages:
price_position
No Market Data    82.82%
Below Market       5.78%
At Min             2.96%
At 25th            2.23%
At 50th            2.02%
Above Market       1.59%
Below Min          1.07%
At 75th             1.0%
At Max             0.53%
Name: proportion, dtype: object

Sample data with price position:


Unnamed: 0,product_id,warehouse_id,sku,current_price,discount_perc,price_after_discount,minimum,maximum,price_position
0,10124,337,مسحوق ليدر عادى لافندر - 2 كجم,270.25,0.0,270.25,,,No Market Data
1,10124,8,مسحوق ليدر عادى لافندر - 2 كجم,270.25,0.0,270.25,,,No Market Data
2,10124,401,مسحوق ليدر عادى لافندر - 2 كجم,261.75,0.0,261.75,,,No Market Data
3,8915,236,بيبسى - 1.47 لتر,186.5,0.006524,185.283335,184.0,206.0,At Min
4,8915,962,بيبسى - 1.47 لتر,186.5,0.0,186.5,184.0,206.0,At 25th
5,23127,703,ويفر فينجرز بكريمة شوكولاتة البندق 6 عبوة - 33 جم,26.0,0.0,26.0,,,No Market Data
6,22286,401,مكرونة كايرو مرمرية - 1 كجم,228.0,0.0,228.0,,,No Market Data
7,3456,337,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,132.25,0.0,132.25,136.4,145.0,Below Market
8,3456,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,132.25,0.003819,131.744923,136.4,145.0,Below Market
9,23548,703,ريف بسكويت ويفر بحشو كريمة البندق - 12 قطعه,118.25,0.0,118.25,,,No Market Data


In [19]:
# =============================================================================
# Stock Query - Get available stock by warehouse and product
# =============================================================================
STOCK_QUERY = '''
SELECT 
    pw.warehouse_id,
    pw.product_id,
    pw.available_stock::INTEGER AS stocks
FROM product_warehouse pw
WHERE pw.warehouse_id NOT IN (6, 9, 10)
    AND pw.is_basic_unit = 1
'''

# Execute stock query
print("Loading stock data...")
df_stocks = query_snowflake(STOCK_QUERY)
df_stocks = convert_to_numeric(df_stocks)
print(f"Loaded {len(df_stocks)} stock records")

# Merge stock data with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_stocks[['warehouse_id', 'product_id', 'stocks']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing stocks with 0
pricing_with_discount['stocks'] = pricing_with_discount['stocks'].fillna(0).astype(int)

print(f"\nStock data merged!")
print(f"Records with stock (stocks > 0): {len(pricing_with_discount[pricing_with_discount['stocks'] > 0])}")
print(f"Records without stock (stocks = 0): {len(pricing_with_discount[pricing_with_discount['stocks'] == 0])}")
print(f"\nSample data with stocks:")
pricing_with_discount[
    ['product_id', 'warehouse_id', 'sku', 'stocks', 'price_after_discount', 'price_position']
].head(10)


Loading stock data...


  df[col] = pd.to_numeric(df[col], errors='ignore')


Loaded 1866178 stock records

Stock data merged!
Records with stock (stocks > 0): 19393
Records without stock (stocks = 0): 67319

Sample data with stocks:


Unnamed: 0,product_id,warehouse_id,sku,stocks,price_after_discount,price_position
0,10124,337,مسحوق ليدر عادى لافندر - 2 كجم,27,270.25,No Market Data
1,10124,8,مسحوق ليدر عادى لافندر - 2 كجم,22,270.25,No Market Data
2,10124,401,مسحوق ليدر عادى لافندر - 2 كجم,12,261.75,No Market Data
3,8915,236,بيبسى - 1.47 لتر,215,185.283335,At Min
4,8915,962,بيبسى - 1.47 لتر,128,186.5,At 25th
5,23127,703,ويفر فينجرز بكريمة شوكولاتة البندق 6 عبوة - 33 جم,0,26.0,No Market Data
6,22286,401,مكرونة كايرو مرمرية - 1 كجم,16,228.0,No Market Data
7,3456,337,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,17,132.25,Below Market
8,3456,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,52,131.744923,Below Market
9,23548,703,ريف بسكويت ويفر بحشو كريمة البندق - 12 قطعه,0,118.25,No Market Data


In [20]:
# =============================================================================
# Zero Demand Query - Identify SKUs with zero/low demand
# =============================================================================
ZERO_DEMAND_QUERY = f'''
WITH last_oss AS (
    SELECT product_id, warehouse_id, TIMESTAMP AS last_in_stock_day
    FROM (
        SELECT *, ROW_NUMBER() OVER(PARTITION BY product_id, warehouse_id ORDER BY TIMESTAMP DESC) AS rnk 
        FROM materialized_views.STOCK_DAY_CLOSE
        WHERE AVAILABLE_STOCK = 0 
            AND TIMESTAMP >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 120
        QUALIFY rnk = 1 
    )
),

current_stocks AS (
    SELECT product_id, warehouse_id, AVAILABLE_STOCK, activation
    FROM PRODUCT_WAREHOUSE
    WHERE IS_BASIC_UNIT = 1
        AND CASE WHEN product_id = 1309 THEN packing_unit_id <> 23 ELSE TRUE END
),

prs AS (
    SELECT DISTINCT 
        product_purchased_receipts.product_id,
        purchased_receipts.warehouse_id,
        purchased_receipts.date::DATE AS date,
        product_purchased_receipts.purchased_item_count * product_purchased_receipts.basic_unit_count AS purchase_min_count
    FROM product_purchased_receipts
    JOIN purchased_receipts ON purchased_receipts.id = product_purchased_receipts.purchased_receipt_id
    JOIN last_oss lo ON product_purchased_receipts.product_id = lo.product_id 
        AND lo.warehouse_id = purchased_receipts.warehouse_id 
        AND purchased_receipts.date > lo.last_in_stock_day 
    WHERE product_purchased_receipts.purchased_item_count <> 0
        AND purchased_receipts.purchased_receipt_status_id IN (4, 5, 7)
        AND purchased_receipts.date::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 120
),

main AS (
    SELECT 
        prs.product_id, 
        prs.warehouse_id, 
        MIN(date) AS first_order_date, 
        SUM(purchase_min_count) AS total_recieved, 
        cs.AVAILABLE_STOCK, 
        cs.activation
    FROM prs 
    JOIN current_stocks cs ON cs.product_id = prs.product_id AND prs.warehouse_id = cs.warehouse_id
    GROUP BY prs.product_id, prs.warehouse_id, cs.AVAILABLE_STOCK, cs.activation
),

sold_days AS (
    SELECT product_id, warehouse_id, COUNT(DISTINCT o_date) AS sales_days
    FROM (
        SELECT DISTINCT
            so.created_at::DATE AS o_date,
            pso.warehouse_id,
            pso.product_id,
            SUM(pso.purchased_item_count * basic_unit_count) AS daily_qty
        FROM product_sales_order pso
        JOIN sales_orders so ON so.id = pso.sales_order_id
        JOIN main m ON m.product_id = pso.product_id 
            AND m.warehouse_id = pso.warehouse_id 
            AND so.created_at::DATE >= m.first_order_date
        WHERE so.created_at::DATE BETWEEN CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 120 
            AND CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE
            AND so.sales_order_status_id NOT IN (7, 12)
            AND so.channel IN ('telesales', 'retailer')
            AND pso.purchased_item_count <> 0
        GROUP BY o_date, pso.warehouse_id, pso.product_id
    )
    GROUP BY product_id, warehouse_id
)

SELECT DISTINCT warehouse_id, product_id
FROM (
    SELECT m.product_id, m.warehouse_id, m.first_order_date, m.activation,
        COALESCE(sd.sales_days, 0) AS sales_days,
        COALESCE(sd.sales_days, 0)::FLOAT / NULLIF((CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 1) - m.first_order_date, 0) AS perc_days
    FROM main m 
    LEFT JOIN sold_days sd ON sd.product_id = m.product_id AND sd.warehouse_id = m.warehouse_id
    WHERE m.first_order_date < CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 10
)
WHERE perc_days <= 0.3
    AND activation = 'true'
'''

# Execute zero demand query
print("Loading zero demand SKUs...")
df_zero_demand = query_snowflake(ZERO_DEMAND_QUERY)
df_zero_demand = convert_to_numeric(df_zero_demand)
print(f"Loaded {len(df_zero_demand)} zero demand SKU records")


Loading zero demand SKUs...
Loaded 3737 zero demand SKU records


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [21]:
# =============================================================================
# Add Zero Demand Flag to pricing_with_discount
# =============================================================================

# Add a marker column to identify zero demand SKUs
df_zero_demand['zero_demand'] = 1

# Merge with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_zero_demand[['warehouse_id', 'product_id', 'zero_demand']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing values with 0 (not zero demand)
pricing_with_discount['zero_demand'] = pricing_with_discount['zero_demand'].fillna(0).astype(int)

print(f"Zero demand flag added!")
print(f"SKUs flagged as zero demand: {len(pricing_with_discount[pricing_with_discount['zero_demand'] == 1])}")
print(f"SKUs with normal demand: {len(pricing_with_discount[pricing_with_discount['zero_demand'] == 0])}")


Zero demand flag added!
SKUs flagged as zero demand: 3598
SKUs with normal demand: 83114


In [22]:
# =============================================================================
# OOS Yesterday Query - Identify SKUs out of stock yesterday
# =============================================================================
OOS_YESTERDAY_QUERY = f'''
SELECT DISTINCT product_id, warehouse_id,
    CASE WHEN opening_stocks = 0 AND closing_stocks = 0 THEN 1
         ELSE 0 
    END AS oos_yesterday
FROM (
    SELECT 
        timestamp,
        product_id,
        warehouse_id, 
        AVAILABLE_STOCK AS closing_stocks,
        LAG(AVAILABLE_STOCK) OVER (PARTITION BY product_id, warehouse_id ORDER BY TIMESTAMP) AS opening_stocks
    FROM materialized_views.stock_day_close
    WHERE timestamp::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 2
    QUALIFY opening_stocks IS NOT NULL
)
WHERE oos_yesterday = 1
'''

# Execute OOS yesterday query
print("Loading OOS yesterday data...")
df_oos_yesterday = query_snowflake(OOS_YESTERDAY_QUERY)
df_oos_yesterday = convert_to_numeric(df_oos_yesterday)
print(f"Loaded {len(df_oos_yesterday)} OOS yesterday records")


Loading OOS yesterday data...
Loaded 1917793 OOS yesterday records


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [23]:
# =============================================================================
# Add OOS Yesterday Flag to pricing_with_discount
# =============================================================================

# Merge with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_oos_yesterday[['warehouse_id', 'product_id', 'oos_yesterday']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing values with 0 (not OOS yesterday)
pricing_with_discount['oos_yesterday'] = pricing_with_discount['oos_yesterday'].fillna(0).astype(int)

print(f"OOS yesterday flag added!")
print(f"SKUs out of stock yesterday: {len(pricing_with_discount[pricing_with_discount['oos_yesterday'] == 1])}")
print(f"SKUs in stock yesterday: {len(pricing_with_discount[pricing_with_discount['oos_yesterday'] == 0])}")


OOS yesterday flag added!
SKUs out of stock yesterday: 66388
SKUs in stock yesterday: 20324


In [24]:
# =============================================================================
# Running Rate Query - Get in-stock running rate by warehouse and product
# =============================================================================
RUNNING_RATE_QUERY = f'''
WITH params AS (
    SELECT
        CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE AS run_date,
        DATEADD(month, -3, CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE) AS history_start
),

-- Daily sales aggregation
sales_base AS (
    SELECT
        pso.product_id,
        pso.warehouse_id,
        DATE_TRUNC('day', pso.created_at)::DATE AS date,
        SUM(pso.purchased_item_count * pso.basic_unit_count) AS sold_units,
        SUM(pso.purchased_item_count * pso.basic_unit_count * pso.item_price)
            / NULLIF(SUM(pso.purchased_item_count * pso.basic_unit_count), 0) AS avg_selling_price,
        COUNT(DISTINCT so.retailer_id) AS retailer_count
    FROM product_sales_order pso
    JOIN sales_orders so ON pso.sales_order_id = so.id
    WHERE DATE_TRUNC('day', pso.created_at)::DATE >= (SELECT history_start FROM params)
    GROUP BY 1, 2, 3
),

-- Stock daily metrics
stock_daily AS (
    SELECT
        product_id,
        warehouse_id,
        DATE_TRUNC('day', TIMESTAMP)::DATE AS date,
        MAX_BY(available_stock, TIMESTAMP) AS stock_closing,
        24 * SUM(CASE WHEN activation = FALSE OR available_stock = 0 THEN 1 ELSE 0 END)::FLOAT 
            / NULLIF(COUNT(*), 0) AS oos_hours,
        MAX(CASE WHEN activation = TRUE AND available_stock > 0 THEN 1 ELSE 0 END) AS in_stock_flag
    FROM materialized_views.STOCK_SNAP_SHOTS_RECENT
    WHERE product_id IS NOT NULL
    GROUP BY product_id, warehouse_id, date
),

-- Join sales + stock + WAC (only in-stock days)
base_data AS (
    SELECT
        sb.product_id,
        sb.warehouse_id,
        sb.date,
        sb.sold_units,
        sb.avg_selling_price,
        sb.retailer_count,
        sd.oos_hours,
        sd.in_stock_flag,
        ac.wac_p AS wac,
        CASE WHEN DAYOFWEEKISO(sb.date) IN (5, 6) THEN 1 ELSE 0 END AS is_weekend
    FROM sales_base sb
    LEFT JOIN stock_daily sd ON sb.product_id = sd.product_id 
        AND sb.warehouse_id = sd.warehouse_id AND sb.date = sd.date
    LEFT JOIN finance.ALL_COGS ac ON sb.product_id = ac.product_id 
        AND sb.date BETWEEN ac.from_date AND ac.to_date
    WHERE sd.in_stock_flag = 1
),

-- Stats per SKU x Warehouse
sku_wh_stats AS (
    SELECT
        product_id, warehouse_id,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY sold_units) AS med_units,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY sold_units) AS pct95_units,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY retailer_count) AS med_retailers,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY 
            CASE WHEN avg_selling_price IS NULL OR avg_selling_price = 0 THEN 0 
            ELSE (avg_selling_price - COALESCE(wac, 0)) / NULLIF(avg_selling_price, 0) END
        ) AS med_margin
    FROM base_data
    GROUP BY product_id, warehouse_id
),

-- Cap outliers and adjust for retailer spikes
adjusted AS (
    SELECT
        b.product_id, b.warehouse_id, b.date, b.in_stock_flag, b.oos_hours, b.is_weekend,
        b.avg_selling_price, b.wac, s.med_margin,
        CASE 
            WHEN b.retailer_count > GREATEST(2, s.med_retailers * 2) 
                AND b.retailer_count > 0 AND s.med_retailers IS NOT NULL
            THEN ROUND(LEAST(b.sold_units, s.pct95_units) * (s.med_retailers::FLOAT / NULLIF(b.retailer_count::FLOAT, 0)), 0)
            ELSE LEAST(b.sold_units, s.pct95_units)
        END AS units_adjusted
    FROM base_data b
    LEFT JOIN sku_wh_stats s ON b.product_id = s.product_id AND b.warehouse_id = s.warehouse_id
),

-- Apply weights (recency, stock availability, weekend, margin)
weighted AS (
    SELECT
        product_id, warehouse_id, date, units_adjusted,
        (
            -- Recency weight
            CASE WHEN date >= DATEADD(day, -21, (SELECT run_date FROM params)) THEN 1.5
                 WHEN date >= DATEADD(day, -90, (SELECT run_date FROM params)) THEN 1.0
                 ELSE 0.5 END
            -- In-stock weight
            * CASE WHEN in_stock_flag = 1 AND COALESCE(oos_hours, 0) < 12 THEN 1.4
                   WHEN in_stock_flag = 1 AND COALESCE(oos_hours, 0) >= 12 THEN 0.9
                   ELSE 0.6 END
            -- Weekend weight
            * CASE WHEN is_weekend = 1 THEN 0.7 ELSE 1.0 END
            -- Margin weight
            * CASE WHEN avg_selling_price IS NULL OR avg_selling_price = 0 OR med_margin IS NULL THEN 1.0
                   WHEN ((avg_selling_price - COALESCE(wac, 0)) / NULLIF(avg_selling_price, 0)) < med_margin
                   THEN 1.0 + LEAST((med_margin - ((avg_selling_price - COALESCE(wac, 0)) / NULLIF(avg_selling_price, 0))) * 2.0, 0.6)
                   WHEN ((avg_selling_price - COALESCE(wac, 0)) / NULLIF(avg_selling_price, 0)) > med_margin
                   THEN 1.0 - LEAST((((avg_selling_price - COALESCE(wac, 0)) / NULLIF(avg_selling_price, 0)) - med_margin) * 2.0, 0.4)
                   ELSE 1.0 END
        ) AS final_weight
    FROM adjusted
    WHERE units_adjusted IS NOT NULL
),

-- Weighted average forecast
forecast_base AS (
    SELECT
        product_id, warehouse_id,
        SUM(units_adjusted * final_weight) / NULLIF(SUM(final_weight), 0) AS weighted_avg_units
    FROM weighted
    GROUP BY product_id, warehouse_id
),

-- Zero-sales last 4 days (with stock) exclusion flag
last4_flag AS (
    SELECT product_id, warehouse_id,
        CASE WHEN COUNT(*) = 4 
             AND SUM(CASE WHEN COALESCE(sold_units, 0) = 0 AND in_stock_flag = 1 THEN 1 ELSE 0 END) = 4
        THEN 1 ELSE 0 END AS exclude_flag
    FROM base_data
    WHERE date >= DATEADD(day, -4, (SELECT run_date FROM params)) 
        AND date < (SELECT run_date FROM params)
    GROUP BY product_id, warehouse_id
),

-- Zero sales excluded (in stock but no sales)
zero_sales_excluded AS (
    SELECT DISTINCT s.warehouse_id, s.product_id
    FROM (
        SELECT pw.warehouse_id, pw.product_id, SUM(pw.available_stock)::INT AS stocks
        FROM product_warehouse pw
        WHERE pw.warehouse_id NOT IN (6, 9, 10) AND pw.is_basic_unit = 1 AND pw.available_stock > 0
        GROUP BY pw.warehouse_id, pw.product_id
    ) s
    LEFT JOIN (
        SELECT pso.product_id, pso.warehouse_id, SUM(pso.total_price) AS nmv
        FROM product_sales_order pso
        JOIN sales_orders so ON so.id = pso.sales_order_id
        WHERE so.created_at::date BETWEEN CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 5 
            AND CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 1
            AND so.sales_order_status_id NOT IN (7, 12) AND so.channel IN ('telesales', 'retailer')
            AND pso.purchased_item_count <> 0
        GROUP BY pso.product_id, pso.warehouse_id
    ) md ON md.product_id = s.product_id AND md.warehouse_id = s.warehouse_id
    LEFT JOIN finance.all_cogs f ON f.product_id = s.product_id
        AND f.from_date::date <= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE
        AND f.to_date::date > CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE
    LEFT JOIN (
        SELECT pr.warehouse_id, ppr.product_id, SUM(ppr.final_price) AS total_prs
        FROM product_purchased_receipts ppr
        JOIN purchased_receipts pr ON pr.id = ppr.purchased_receipt_id
        WHERE pr.date::date >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 4
            AND pr.is_actual = 'true' AND pr.purchased_receipt_status_id IN (4, 5, 7)
            AND ppr.purchased_item_count <> 0
        GROUP BY pr.warehouse_id, ppr.product_id
    ) prs ON prs.product_id = s.product_id AND prs.warehouse_id = s.warehouse_id
    WHERE COALESCE(md.nmv, 0) = 0 
        AND COALESCE(prs.total_prs, 0) < 0.7 * (COALESCE(f.wac_p, 0) * s.stocks)
),

-- First sale date for new products
first_sale AS (
    SELECT product_id, warehouse_id, MIN(date) AS first_sale_date
    FROM base_data WHERE sold_units > 0
    GROUP BY product_id, warehouse_id
)

-- Final output: running rate per warehouse/product
SELECT
    fb.warehouse_id,
    fb.product_id,
    CASE
        WHEN l4.exclude_flag = 1 THEN 0
        WHEN fs.first_sale_date >= DATEADD(day, -2, (SELECT run_date FROM params))
        THEN GREATEST(CEIL(fb.weighted_avg_units), 1)
        ELSE CEIL(fb.weighted_avg_units)
    END AS In_stock_rr
FROM forecast_base fb
LEFT JOIN last4_flag l4 ON fb.product_id = l4.product_id AND fb.warehouse_id = l4.warehouse_id
LEFT JOIN first_sale fs ON fb.product_id = fs.product_id AND fb.warehouse_id = fs.warehouse_id
LEFT JOIN zero_sales_excluded zse ON fb.product_id = zse.product_id AND fb.warehouse_id = zse.warehouse_id
WHERE zse.product_id IS NULL
'''

# Execute running rate query
print("Loading running rate data (this may take a moment)...")
df_running_rate = query_snowflake(RUNNING_RATE_QUERY)
df_running_rate = convert_to_numeric(df_running_rate)
print(f"Loaded {len(df_running_rate)} running rate records")


Loading running rate data (this may take a moment)...
Loaded 23116 running rate records


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [25]:
# =============================================================================
# Merge Running Rate and Calculate DOH (Days on Hand)
# =============================================================================

# Merge running rate data with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_running_rate[['warehouse_id', 'product_id', 'in_stock_rr']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing running rate with 0
pricing_with_discount['in_stock_rr'] = pricing_with_discount['in_stock_rr'].fillna(0)

# Calculate DOH (Days on Hand) = stocks / in_stock_rr
# Handle division by zero - if running rate is 0, DOH is infinite (use 999)
pricing_with_discount['doh'] = np.select(
    [
        (pricing_with_discount['in_stock_rr'] > 0) & (pricing_with_discount['stocks'] > 0),
        pricing_with_discount['stocks'] == 0
    ],
    [
        pricing_with_discount['stocks'] / pricing_with_discount['in_stock_rr'],
        0
    ],
    default=999
)


In [26]:
# =============================================================================
# Product Classification Query - ABC Classification based on order contribution
# =============================================================================
PRODUCT_CLASSIFICATION_QUERY = f'''
WITH order_counts AS (
    SELECT 
        pso.warehouse_id,
        pso.product_id,
        COUNT(DISTINCT pso.sales_order_id) AS order_count
    FROM product_sales_order pso
    JOIN sales_orders so ON so.id = pso.sales_order_id
    WHERE so.created_at::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 90
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
    GROUP BY pso.warehouse_id, pso.product_id
),

warehouse_totals AS (
    SELECT 
        warehouse_id,
        SUM(order_count) AS total_orders
    FROM order_counts
    GROUP BY warehouse_id
),

ranked_products AS (
    SELECT 
        oc.warehouse_id,
        oc.product_id,
        oc.order_count,
        wt.total_orders,
        oc.order_count::FLOAT / NULLIF(wt.total_orders, 0) AS contribution,
        SUM(oc.order_count::FLOAT / NULLIF(wt.total_orders, 0)) 
            OVER (PARTITION BY oc.warehouse_id ORDER BY oc.order_count DESC 
                  ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_contribution
    FROM order_counts oc
    JOIN warehouse_totals wt ON oc.warehouse_id = wt.warehouse_id
)

SELECT 
    warehouse_id,
    product_id,
    order_count,
    contribution,
    cumulative_contribution,
    CASE 
        WHEN cumulative_contribution <= 0.3 THEN 'A'
        WHEN cumulative_contribution <= 0.75 THEN 'B'
        ELSE 'C'
    END AS abc_class
FROM ranked_products
'''

# Execute product classification query
print("Loading product classification data...")
df_classification = query_snowflake(PRODUCT_CLASSIFICATION_QUERY)
df_classification = convert_to_numeric(df_classification)
print(f"Loaded {len(df_classification)} product classification records")
print(f"\nClassification distribution:")
print(df_classification['abc_class'].value_counts().to_string())


Loading product classification data...
Loaded 27805 product classification records

Classification distribution:
abc_class
C    22354
B     4732
A      719


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [27]:
# =============================================================================
# Add ABC Classification to pricing_with_discount
# =============================================================================

# Merge classification data with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_classification[['warehouse_id', 'product_id', 'order_count', 'contribution', 'abc_class']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing values - products without orders in last 3 months get class 'C'
pricing_with_discount['order_count'] = pricing_with_discount['order_count'].fillna(0).astype(int)
pricing_with_discount['contribution'] = pricing_with_discount['contribution'].fillna(0)
pricing_with_discount['abc_class'] = pricing_with_discount['abc_class'].fillna('C')

print(f"ABC Classification added!")
print(f"\nClassification in pricing_with_discount:")
print(pricing_with_discount['abc_class'].value_counts().to_string())
print(f"\nSample data with classification:")
pricing_with_discount[
    ['product_id', 'warehouse_id', 'sku', 'order_count', 'contribution', 'abc_class', 'stocks', 'doh']
].head(15)


ABC Classification added!

Classification in pricing_with_discount:
abc_class
C    81529
B     4505
A      678

Sample data with classification:


Unnamed: 0,product_id,warehouse_id,sku,order_count,contribution,abc_class,stocks,doh
0,10124,337,مسحوق ليدر عادى لافندر - 2 كجم,113,0.000531,B,27,3.857143
1,10124,8,مسحوق ليدر عادى لافندر - 2 كجم,80,0.000271,C,22,7.333333
2,10124,401,مسحوق ليدر عادى لافندر - 2 كجم,9,0.000104,C,12,6.0
3,8915,236,بيبسى - 1.47 لتر,2271,0.004205,A,215,1.174863
4,8915,962,بيبسى - 1.47 لتر,2131,0.00419,A,128,0.731429
5,23127,703,ويفر فينجرز بكريمة شوكولاتة البندق 6 عبوة - 33 جم,0,0.0,C,0,0.0
6,22286,401,مكرونة كايرو مرمرية - 1 كجم,3,3.5e-05,C,16,5.333333
7,3456,337,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,61,0.000286,C,17,2.833333
8,3456,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,121,0.00041,C,52,5.777778
9,23548,703,ريف بسكويت ويفر بحشو كريمة البندق - 12 قطعه,0,0.0,C,0,0.0


In [28]:
# =============================================================================
# PO (Purchase Order) Data Query - Last PO status and rejection count
# =============================================================================
PO_DATA_QUERY = '''
WITH last_data AS (
    SELECT product_id, warehouse_id, confirmation_status, PO_date::DATE AS last_po_date, ordered_qty
    FROM (
        SELECT 
            product_id,
            Target_WAREHOUSE_ID AS warehouse_id,
            confirmation_status,
            created_at AS PO_date,
            MIN_QUANTITY AS ordered_qty,
            reason,
            MAX(created_at) OVER (PARTITION BY product_id, Target_WAREHOUSE_ID) AS last_po
        FROM retool.PO_INITIAL_PLAN
        WHERE created_at::DATE >= CURRENT_DATE - 15 
    ) x
    WHERE last_po = PO_date
),

last_15_data AS (
    SELECT 
        product_id,
        target_WAREHOUSE_ID AS warehouse_id,
        COUNT(DISTINCT CASE WHEN confirmation_status <> 'yes' THEN created_at END) AS no_last_15
    FROM retool.PO_INITIAL_PLAN
    WHERE created_at::DATE >= CURRENT_DATE - 15 
    GROUP BY 1, 2
)

SELECT 
    ld.product_id,
    ld.warehouse_id,
    ld.confirmation_status,
    ld.last_po_date,
    ld.ordered_qty,
    COALESCE(lfd.no_last_15, 0) AS no_last_15
FROM last_data ld 
LEFT JOIN last_15_data lfd 
    ON lfd.product_id = ld.product_id 
    AND lfd.warehouse_id = ld.warehouse_id
'''

# Execute PO data query using dwh_pg_query
print("Loading PO data...")
df_po_data = setup_environment_2.dwh_pg_query(
    PO_DATA_QUERY, 
    columns=['product_id', 'warehouse_id', 'confirmation_status', 'last_po_date', 'ordered_qty', 'no_last_15']
)
df_po_data.columns = df_po_data.columns.str.lower()
df_po_data = convert_to_numeric(df_po_data)
print(f"Loaded {len(df_po_data)} PO records")
print(f"\nConfirmation status distribution:")
print(df_po_data['confirmation_status'].value_counts().to_string())


Loading PO data...
Loaded 15444 PO records

Confirmation status distribution:
confirmation_status
yes    11231
no      2565


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [29]:
# =============================================================================
# Add PO Data to pricing_with_discount
# =============================================================================

# Merge PO data with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_po_data[['warehouse_id', 'product_id', 'confirmation_status', 'last_po_date', 'ordered_qty', 'no_last_15']], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill missing values
pricing_with_discount['ordered_qty'] = pricing_with_discount['ordered_qty'].fillna(0)
pricing_with_discount['no_last_15'] = pricing_with_discount['no_last_15'].fillna(0).astype(int)

print(f"PO data added!")
print(f"\nRecords with PO data: {len(pricing_with_discount[~pricing_with_discount['confirmation_status'].isna()])}")
print(f"Records without PO data: {len(pricing_with_discount[pricing_with_discount['confirmation_status'].isna()])}")
print(f"\nSample data with PO info:")
pricing_with_discount[
    ['product_id', 'warehouse_id', 'sku', 'confirmation_status', 'last_po_date', 'ordered_qty', 'no_last_15']
].dropna(subset=['confirmation_status']).head(15)


PO data added!

Records with PO data: 13278
Records without PO data: 73434

Sample data with PO info:


Unnamed: 0,product_id,warehouse_id,sku,confirmation_status,last_po_date,ordered_qty,no_last_15
0,10124,337,مسحوق ليدر عادى لافندر - 2 كجم,yes,2026-02-09,21.0,0
1,10124,8,مسحوق ليدر عادى لافندر - 2 كجم,yes,2026-02-07,13.0,0
7,3456,337,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,yes,2026-02-10,36.0,2
8,3456,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,yes,2026-02-10,24.0,2
10,61,797,نسكافيه 3*1 - 18 جم,yes,2026-02-10,60.0,2
11,12902,1,فن داى كب كيك بنكهة الفانيلا وقطع الشوكولاتة ...,yes,2026-02-10,12.0,0
12,2703,632,نسكافيه 3*1 ريتش - 21 جم,yes,2026-02-07,24.0,2
19,9186,1,مناديل زينة كلاسيك سحب 2 طبقة الوان عرض خاص 3 ...,yes,2026-02-09,16.0,1
20,11772,501,اوكسى يدوى - 250 جم,yes,2026-02-10,16.0,0
21,974,797,كتاكيتو ويفر مغطي بالشوكولاته- 20 ج,yes,2026-02-09,12.0,0


In [30]:
# =============================================================================
# Leadtime Query - Supplier leadtime by brand, category, and warehouse
# =============================================================================
LEADTIME_QUERY = '''
SELECT brand, cat, warehouse_id, leadtime
FROM (
    SELECT a.*, b.name_ar AS brand, c.name_ar AS cat
    FROM (
        SELECT DISTINCT 
            sl.supplier_id, 
            warehouse_id, 
            category_id, 
            brand_id, 
            sl.updated_at, 
            leadtime,
            MAX(sl.updated_at) OVER (PARTITION BY sl.supplier_id, warehouse_id) AS last_update
        FROM retool.SUPPLIER_MOQ sl 
        JOIN retool.PO_SUPPLIER_MAPPING sm ON sl.supplier_id = sm.supplier_id 
    ) a
    JOIN brands b ON b.id = a.brand_id 
    JOIN categories c ON c.id = a.category_id
    WHERE a.updated_at = last_update
) d
'''

# Execute leadtime query using dwh_pg_query
print("Loading leadtime data...")
df_leadtime = setup_environment_2.dwh_pg_query(
    LEADTIME_QUERY, 
    columns=['brand', 'cat', 'warehouse_id', 'leadtime']
)
df_leadtime.columns = df_leadtime.columns.str.lower()
df_leadtime = convert_to_numeric(df_leadtime)
print(f"Loaded {len(df_leadtime)} leadtime records")


Loading leadtime data...
Loaded 14960 leadtime records


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [31]:
# =============================================================================
# Add Leadtime to pricing_with_discount
# =============================================================================

# Merge leadtime data with pricing_with_discount (by brand, cat, warehouse_id)
pricing_with_discount = pricing_with_discount.merge(
    df_leadtime[['brand', 'cat', 'warehouse_id', 'leadtime']], 
    on=['brand', 'cat', 'warehouse_id'], 
    how='left'
)

# Fill missing leadtime with 0 or a default value
pricing_with_discount['leadtime'] = pricing_with_discount['leadtime'].fillna(72)


print(f"Leadtime data added!")
print(f"\nRecords with leadtime: {len(pricing_with_discount[pricing_with_discount['leadtime'] > 0])}")
print(f"Records without leadtime: {len(pricing_with_discount[pricing_with_discount['leadtime'] == 0])}")
print(f"\nLeadtime distribution:")
print(pricing_with_discount['leadtime'].describe())

# =============================================================================
# Calculate Expected Receiving Day
# If confirmation_status is 'no': add 2 extra days (48 hours) before adding leadtime
# expected_receiving_day = last_po_date + ((2 + leadtime) / 24) if not confirmed
# expected_receiving_day = last_po_date + (leadtime / 24) if confirmed
# =============================================================================

# Convert last_po_date to datetime if not already
pricing_with_discount['last_po_date'] = pd.to_datetime(pricing_with_discount['last_po_date'], errors='coerce')

# Calculate adjusted leadtime: add 48 hours (2 days) if confirmation_status is 'no'
pricing_with_discount['adjusted_leadtime'] = np.where(
    pricing_with_discount['confirmation_status'].str.lower() == 'no',
    pricing_with_discount['leadtime'] + 48,  # Add 2 days (48 hours) if not confirmed
    pricing_with_discount['leadtime']
)

# Calculate expected receiving day (leadtime is in hours, divide by 24 for days)
pricing_with_discount['expected_receiving_day'] = pricing_with_discount['last_po_date'] + pd.to_timedelta(
    pricing_with_discount['adjusted_leadtime'] / 24, unit='D'
)

# Set expected_receiving_day to empty if it's in the past (smaller than today)
pricing_with_discount['expected_receiving_day'] = np.where(
    pricing_with_discount['expected_receiving_day'] < pd.Timestamp(TODAY),
    pd.NaT,
    pricing_with_discount['expected_receiving_day']
)
# Convert back to datetime (np.where returns object type)
pricing_with_discount['expected_receiving_day'] = pd.to_datetime(pricing_with_discount['expected_receiving_day'])

print(f"\nExpected receiving day calculated!")
print(f"Records with expected receiving day (future dates only): {len(pricing_with_discount[~pricing_with_discount['expected_receiving_day'].isna()])}")
print(f"Records with past expected dates (set to empty): {len(pricing_with_discount[pricing_with_discount['expected_receiving_day'].isna() & pricing_with_discount['last_po_date'].notna()])}")
print(f"Records with confirmation_status='no' (added 2 extra days): {len(pricing_with_discount[pricing_with_discount['confirmation_status'].str.lower() == 'no'])}")
print(f"\nSample data with expected receiving day:")
pricing_with_discount[~pricing_with_discount['last_po_date'].isna()][
    ['product_id', 'warehouse_id', 'sku', 'confirmation_status', 'last_po_date', 'leadtime', 'adjusted_leadtime', 'expected_receiving_day', 'doh']
].head(15)


Leadtime data added!

Records with leadtime: 91842
Records without leadtime: 0

Leadtime distribution:
count    91842.000000
mean        55.100804
std         30.642846
min         24.000000
25%         48.000000
50%         48.000000
75%         72.000000
max        168.000000
Name: leadtime, dtype: float64

Expected receiving day calculated!
Records with expected receiving day (future dates only): 12620
Records with past expected dates (set to empty): 2926
Records with confirmation_status='no' (added 2 extra days): 2537

Sample data with expected receiving day:


Unnamed: 0,product_id,warehouse_id,sku,confirmation_status,last_po_date,leadtime,adjusted_leadtime,expected_receiving_day,doh
0,10124,337,مسحوق ليدر عادى لافندر - 2 كجم,yes,2026-02-09,48.0,48.0,2026-02-11,3.857143
1,10124,8,مسحوق ليدر عادى لافندر - 2 كجم,yes,2026-02-07,48.0,48.0,NaT,7.333333
3,8915,236,بيبسى - 1.47 لتر,,2026-02-10,24.0,24.0,2026-02-11,1.174863
4,8915,962,بيبسى - 1.47 لتر,,2026-02-10,24.0,24.0,2026-02-11,0.731429
7,3456,337,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,yes,2026-02-10,48.0,48.0,2026-02-12,2.833333
8,3456,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,yes,2026-02-10,48.0,48.0,2026-02-12,5.777778
11,61,797,نسكافيه 3*1 - 18 جم,yes,2026-02-10,48.0,48.0,2026-02-12,7.652174
12,12902,1,فن داى كب كيك بنكهة الفانيلا وقطع الشوكولاتة ...,yes,2026-02-10,48.0,48.0,2026-02-12,3.5
13,2703,632,نسكافيه 3*1 ريتش - 21 جم,yes,2026-02-07,48.0,48.0,NaT,8.0
14,2327,337,شويبس جولد اناناس - 1.75 لتر,,2026-02-10,24.0,24.0,2026-02-11,2.196429


In [32]:
# =============================================================================
# SKIP: Margin Boundaries Query - Now fetched via market_data_module.get_margin_tiers()
# =============================================================================
# The margin boundaries and tier calculation is now centralized in market_data_module.
# We'll use get_margin_tiers() to get pre-calculated margin tiers.

print("Loading margin tiers from market_data_module...")
df_margin_tiers = get_margin_tiers()
print(f"Loaded {len(df_margin_tiers)} margin tier records from module")


Loading margin tiers from market_data_module...

FETCHING MARGIN TIERS
Timestamp: 2026-02-10 14:26:22 Cairo time

Step 1: Fetching margin boundaries from PRODUCT_STATISTICS...
    Loaded 18067 records

Step 2: Adding cohort IDs...
    Records with cohorts: 24928

Step 3: Calculating margin tiers...

MARGIN TIERS COMPLETE
Total records: 24928

Margin Tier Structure:
  margin_tier_below:   effective_min - step (1 below)
  margin_tier_1:       effective_min_margin
  margin_tier_2:       effective_min + 1*step
  margin_tier_3:       effective_min + 2*step
  margin_tier_4:       effective_min + 3*step
  margin_tier_5:       max_boundary
  margin_tier_above_1: max_boundary + 1*step
  margin_tier_above_2: max_boundary + 2*step
Loaded 24928 margin tier records from module


In [33]:
# =============================================================================
# Add Margin Tiers from market_data_module (Pre-calculated)
# =============================================================================
# The margin tiers are now calculated in market_data_module.get_margin_tiers()
# We just need to merge them with pricing_with_discount

# Merge pre-calculated margin tiers
pricing_with_discount = pricing_with_discount.merge(
    df_margin_tiers[[
        'product_id', 'region', 'cohort_id',
        'optimal_bm', 'min_boundary', 'max_boundary', 'median_bm',
        'effective_min_margin', 'margin_step',
        'margin_tier_below', 'margin_tier_1', 'margin_tier_2', 'margin_tier_3',
        'margin_tier_4', 'margin_tier_5', 'margin_tier_above_1', 'margin_tier_above_2'
    ]], 
    on=['product_id', 'region','cohort_id'],
    how='left'
)

print(f"Margin tiers merged from module!")
print(f"\nRecords with margin tiers: {len(pricing_with_discount[~pricing_with_discount['max_boundary'].isna()])}")
print(f"Records without margin tiers: {len(pricing_with_discount[pricing_with_discount['max_boundary'].isna()])}")

print(f"\nMargin Tier Structure (from market_data_module):")
print(f"  margin_tier_below:   effective_min - step (1 below)")
print(f"  margin_tier_1:       effective_min_margin")
print(f"  margin_tier_2:       effective_min + 1*step")
print(f"  margin_tier_3:       effective_min + 2*step")
print(f"  margin_tier_4:       effective_min + 3*step")
print(f"  margin_tier_5:       max_boundary")
print(f"  margin_tier_above_1: max_boundary + 1*step")
print(f"  margin_tier_above_2: max_boundary + 2*step")

print(f"\nSample margin tiers:")
pricing_with_discount[~pricing_with_discount['max_boundary'].isna()][
    ['product_id', 'sku', 'effective_min_margin', 'max_boundary', 'margin_step',
     'margin_tier_below', 'margin_tier_1', 'margin_tier_3', 'margin_tier_5', 
     'margin_tier_above_1', 'margin_tier_above_2']
].head(10)


Margin tiers merged from module!

Records with margin tiers: 34935
Records without margin tiers: 56907

Margin Tier Structure (from market_data_module):
  margin_tier_below:   effective_min - step (1 below)
  margin_tier_1:       effective_min_margin
  margin_tier_2:       effective_min + 1*step
  margin_tier_3:       effective_min + 2*step
  margin_tier_4:       effective_min + 3*step
  margin_tier_5:       max_boundary
  margin_tier_above_1: max_boundary + 1*step
  margin_tier_above_2: max_boundary + 2*step

Sample margin tiers:


Unnamed: 0,product_id,sku,effective_min_margin,max_boundary,margin_step,margin_tier_below,margin_tier_1,margin_tier_3,margin_tier_5,margin_tier_above_1,margin_tier_above_2
0,10124,مسحوق ليدر عادى لافندر - 2 كجم,0.039697,0.123859,0.021041,0.018656,0.039697,0.081778,0.123859,0.1449,0.16594
1,10124,مسحوق ليدر عادى لافندر - 2 كجم,0.039697,0.123859,0.021041,0.018656,0.039697,0.081778,0.123859,0.1449,0.16594
2,10124,مسحوق ليدر عادى لافندر - 2 كجم,0.031806,0.123859,0.023013,0.008793,0.031806,0.077833,0.123859,0.146872,0.169885
3,8915,بيبسى - 1.47 لتر,0.029099,0.075,0.011475,0.017624,0.029099,0.05205,0.075,0.086475,0.09795
4,8915,بيبسى - 1.47 لتر,0.029099,0.075,0.011475,0.017624,0.029099,0.05205,0.075,0.086475,0.09795
6,22286,مكرونة كايرو مرمرية - 1 كجم,0.019815,0.075,0.013796,0.006019,0.019815,0.047407,0.075,0.088796,0.102593
7,3456,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,0.036596,0.09,0.013351,0.023245,0.036596,0.063298,0.09,0.103351,0.116702
8,3456,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,0.036596,0.09,0.013351,0.023245,0.036596,0.063298,0.09,0.103351,0.116702
11,61,نسكافيه 3*1 - 18 جم,0.042,0.076543,0.008636,0.033364,0.042,0.059272,0.076543,0.085179,0.093815
12,12902,فن داى كب كيك بنكهة الفانيلا وقطع الشوكولاتة ...,0.051732,0.087613,0.00897,0.042761,0.051732,0.069673,0.087613,0.096584,0.105554


In [34]:
# =============================================================================
# Minimum Selling Quantity Query - Get min selling qty per product
# =============================================================================
MIN_SELLING_QTY_QUERY = f'''
SELECT product_id, min_selling_qty
FROM (
    SELECT *, MIN(basic_unit_count) OVER (PARTITION BY product_id) AS min_selling_qty
    FROM (
        SELECT DISTINCT
            pso.product_id,
            pso.PACKING_UNIT_ID,
            pup.basic_unit_count,
            SUM(pso.total_price) AS nmv,
            SUM(pso.total_price) / SUM(nmv) OVER (PARTITION BY pso.product_id) AS cntrb
        FROM product_sales_order pso
        JOIN PACKING_UNIT_PRODUCTS pup ON pup.product_id = pso.product_id 
            AND pup.PACKING_UNIT_ID = pso.PACKING_UNIT_ID
        JOIN sales_orders so ON so.id = pso.sales_order_id
        WHERE so.created_at::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 120
            AND so.sales_order_status_id NOT IN (7, 12)
            AND so.channel IN ('telesales', 'retailer')
            AND pso.purchased_item_count <> 0
        GROUP BY ALL
        QUALIFY cntrb > 0.05
    )
    QUALIFY basic_unit_count = min_selling_qty
)
'''

# Execute min selling qty query
print("Loading minimum selling quantity data...")
df_min_selling_qty = query_snowflake(MIN_SELLING_QTY_QUERY)
df_min_selling_qty = convert_to_numeric(df_min_selling_qty)
print(f"Loaded {len(df_min_selling_qty)} min selling qty records")


Loading minimum selling quantity data...
Loaded 3876 min selling qty records


  df[col] = pd.to_numeric(df[col], errors='ignore')


In [35]:
# =============================================================================
# Add Min Selling Qty and Below Min Stock Flag to pricing_with_discount
# =============================================================================

# Merge min selling qty with pricing_with_discount (by product_id)
pricing_with_discount = pricing_with_discount.merge(
    df_min_selling_qty[['product_id', 'min_selling_qty']], 
    on='product_id', 
    how='left'
)

# Fill missing min_selling_qty with 1 (default)
pricing_with_discount['min_selling_qty'] = pricing_with_discount['min_selling_qty'].fillna(1).astype(int)

# Create flag: below_min_stock_flag = 1 if (RR = 0 AND stocks > 0 AND stocks < min_selling_qty)
pricing_with_discount['below_min_stock_flag'] = np.where(
    (pricing_with_discount['in_stock_rr'] == 0) & 
    (pricing_with_discount['stocks'] > 0) &
    (pricing_with_discount['stocks'] < pricing_with_discount['min_selling_qty']),
    1, 0
)

print(f"Min selling qty and below_min_stock_flag added!")
print(f"\nSKUs flagged (zero RR & stocks < min_selling_qty): {len(pricing_with_discount[pricing_with_discount['below_min_stock_flag'] == 1])}")
print(f"SKUs not flagged: {len(pricing_with_discount[pricing_with_discount['below_min_stock_flag'] == 0])}")
print(f"\nSample flagged SKUs:")
pricing_with_discount[pricing_with_discount['below_min_stock_flag'] == 1][
    ['product_id', 'warehouse_id', 'sku', 'stocks', 'min_selling_qty', 'in_stock_rr', 'below_min_stock_flag']
].head(15)


Min selling qty and below_min_stock_flag added!

SKUs flagged (zero RR & stocks < min_selling_qty): 49
SKUs not flagged: 91793

Sample flagged SKUs:


Unnamed: 0,product_id,warehouse_id,sku,stocks,min_selling_qty,in_stock_rr,below_min_stock_flag
3045,10048,401,مولبد الترا رفيعة فاليو طويل جدا + 2 فوطة مجان...,1,2,0.0,1
3154,2213,401,اولويز الترا رفيعة طويل جدا 3*1 بالاعشاب - 7 فوطة,3,4,0.0,1
10012,4673,170,التمساح عسل اسود - 330 جم,5,6,0.0,1
10013,4673,170,التمساح عسل اسود - 330 جم,5,6,0.0,1
17533,70,401,عسل البوادى اسود - 680 جم,5,6,0.0,1
20147,9122,337,مولبد ماكسى مضغوطة حماية ضد البكتيريا طويل جدا...,3,4,0.0,1
20281,948,632,حلواني حلاوة طحينية سادة - 280 جم,1,5,0.0,1
20423,426,962,أمريكانا فول مدمس ساده- 400 جم,3,6,0.0,1
23037,12471,632,مولبد الترا طويل 34 + 6 فوطة مجانا ميجا توفير ...,2,4,0.0,1
24507,9122,170,مولبد ماكسى مضغوطة حماية ضد البكتيريا طويل جدا...,1,4,0.0,1


In [36]:
# =============================================================================
# Yesterday's Discount Analysis Query
# Gets: SKU discount, Quantity discount, Tier 1/2/3 NMV breakdown and contributions
# =============================================================================
YESTERDAY_DISCOUNT_QUERY = f'''
WITH qd_det AS (
    -- Map dynamic tags to warehouse IDs using name matching
    SELECT DISTINCT 
        dt.id AS tag_id, 
        dt.name AS tag_name,
        REPLACE(w.name, ' ', '') AS warehouse_name,
        w.id AS warehouse_id,
        warehouse_name ILIKE '%' || CASE 
            WHEN SPLIT_PART(tag_name, '_', 1) = 'El' THEN SPLIT_PART(tag_name, '_', 2) 
            ELSE SPLIT_PART(tag_name, '_', 1) 
        END || '%' AS contains_flag
    FROM dynamic_tags dt
    JOIN dynamic_taggables dta ON dt.id = dta.dynamic_tag_id 
    CROSS JOIN warehouses w 
    WHERE dt.id > 3000
        AND dt.name LIKE '%QD_rets%'
        AND w.id IN (1, 236, 337, 8, 339, 170, 501, 401, 703, 632, 797, 962)
        AND contains_flag = 'true'
),

qd_config AS (
    SELECT * 
    FROM (
        SELECT 
            product_id,
            start_at,
            end_at,
            packing_unit_id,
            id AS qd_id,
            qd.warehouse_id,
            MAX(CASE WHEN tier = 1 THEN quantity END) AS tier_1_qty,
            MAX(CASE WHEN tier = 1 THEN discount_percentage END) AS tier_1_discount_pct,
            MAX(CASE WHEN tier = 2 THEN quantity END) AS tier_2_qty,
            MAX(CASE WHEN tier = 2 THEN discount_percentage END) AS tier_2_discount_pct,
            MAX(CASE WHEN tier = 3 THEN quantity END) AS tier_3_qty,
            MAX(CASE WHEN tier = 3 THEN discount_percentage END) AS tier_3_discount_pct
        FROM (
            SELECT 
                qd.id,
                qdv.product_id,
                qdv.packing_unit_id,
                qdv.quantity,
                qdv.discount_percentage,
                qd.dynamic_tag_id,
                qd.start_at,
                qd.end_at,
                ROW_NUMBER() OVER (
                    PARTITION BY qdv.product_id, qdv.packing_unit_id, qd.id 
                    ORDER BY qdv.quantity
                ) AS tier
            FROM quantity_discounts qd 
            JOIN quantity_discount_values qdv ON qd.id = qdv.quantity_discount_id 
            WHERE CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 1 
                  BETWEEN qd.start_at::DATE AND qd.end_at::DATE
                AND qd.start_at::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 5
        ) qd_tiers
        JOIN qd_det qd ON qd.tag_id = qd_tiers.dynamic_tag_id
        GROUP BY ALL
    )
    QUALIFY ROW_NUMBER() OVER (PARTITION BY product_id, packing_unit_id, warehouse_id ORDER BY start_at DESC) = 1
),

-- Get all sales from yesterday
yesterday_sales AS (
    SELECT 
        pso.warehouse_id,
        pso.product_id,
        so.retailer_id,
        pso.packing_unit_id,
        pso.purchased_item_count AS qty,
        pso.total_price AS nmv,
        pso.item_price / pso.basic_unit_count AS unit_price,
        pso.ITEM_DISCOUNT_VALUE AS sku_discount_per_unit,
        pso.ITEM_QUANTITY_DISCOUNT_VALUE AS qty_discount_per_unit,
        pso.ITEM_DISCOUNT_VALUE * pso.purchased_item_count AS sku_discount_total,
        pso.ITEM_QUANTITY_DISCOUNT_VALUE * pso.purchased_item_count AS qty_discount_total,
        qd.tier_1_qty,
        qd.tier_2_qty,
        qd.tier_3_qty,
        qd.tier_1_discount_pct,
        qd.tier_2_discount_pct,
        qd.tier_3_discount_pct,
        -- Determine tier used
        CASE 
            WHEN pso.ITEM_QUANTITY_DISCOUNT_VALUE = 0 OR qd.tier_1_qty IS NULL THEN 'Base'
            WHEN qd.tier_3_qty IS NOT NULL AND pso.purchased_item_count >= qd.tier_3_qty THEN 'Tier 3'
            WHEN qd.tier_2_qty IS NOT NULL AND pso.purchased_item_count >= qd.tier_2_qty THEN 'Tier 2'
            WHEN qd.tier_1_qty IS NOT NULL AND pso.purchased_item_count >= qd.tier_1_qty THEN 'Tier 1'
            ELSE 'Base'
        END AS tier_used
    FROM product_sales_order pso
    JOIN sales_orders so ON so.id = pso.sales_order_id
    LEFT JOIN qd_config qd 
        ON qd.product_id = pso.product_id 
        AND qd.packing_unit_id = pso.packing_unit_id
        AND qd.warehouse_id = so.warehouse_id
    WHERE so.created_at::DATE = CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 1
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
)

SELECT 
    warehouse_id,
    product_id,
    SUM(nmv) AS total_nmv,
    SUM(CASE WHEN sku_discount_per_unit > 0 THEN nmv ELSE 0 END) AS sku_discount_nmv,
    SUM(CASE WHEN qty_discount_per_unit > 0 THEN nmv ELSE 0 END) AS qty_discount_nmv,
    SUM(CASE WHEN tier_used = 'Tier 1' THEN nmv ELSE 0 END) AS tier1_nmv,
    SUM(CASE WHEN tier_used = 'Tier 2' THEN nmv ELSE 0 END) AS tier2_nmv,
    SUM(CASE WHEN tier_used = 'Tier 3' THEN nmv ELSE 0 END) AS tier3_nmv,
    -- Tier quantities and discount percentages (from the active QD config)
    MAX(tier_1_qty) AS tier_1_qty,
    MAX(tier_1_discount_pct) AS tier_1_discount_pct,
    MAX(tier_2_qty) AS tier_2_qty,
    MAX(tier_2_discount_pct) AS tier_2_discount_pct,
    MAX(tier_3_qty) AS tier_3_qty,
    MAX(tier_3_discount_pct) AS tier_3_discount_pct
FROM yesterday_sales
GROUP BY warehouse_id, product_id
HAVING SUM(nmv) > 0
ORDER BY total_nmv DESC
'''

# Execute yesterday discount query
print("Loading yesterday's discount analysis data...")
df_yesterday_discount = query_snowflake(YESTERDAY_DISCOUNT_QUERY)
df_yesterday_discount = convert_to_numeric(df_yesterday_discount)
print(f"Loaded {len(df_yesterday_discount)} SKU discount records from yesterday")

# Calculate contributions in Python
df_yesterday_discount['sku_discount_nmv_cntrb'] = (
    df_yesterday_discount['sku_discount_nmv'] / df_yesterday_discount['total_nmv'] * 100
).round(2)
df_yesterday_discount['qty_discount_nmv_cntrb'] = (
    df_yesterday_discount['qty_discount_nmv'] / df_yesterday_discount['total_nmv'] * 100
).round(2)
df_yesterday_discount['tier1_nmv_cntrb'] = (
    df_yesterday_discount['tier1_nmv'] / df_yesterday_discount['total_nmv'] * 100
).round(2)
df_yesterday_discount['tier2_nmv_cntrb'] = (
    df_yesterday_discount['tier2_nmv'] / df_yesterday_discount['total_nmv'] * 100
).round(2)
df_yesterday_discount['tier3_nmv_cntrb'] = (
    df_yesterday_discount['tier3_nmv'] / df_yesterday_discount['total_nmv'] * 100
).round(2)

# Summary
print(f"\n{'='*60}")
print(f"YESTERDAY'S DISCOUNT ANALYSIS SUMMARY")
print(f"{'='*60}")
print(f"\nTotal NMV yesterday: {df_yesterday_discount['total_nmv'].sum():,.0f}")
print(f"SKU Discount NMV: {df_yesterday_discount['sku_discount_nmv'].sum():,.0f}")
print(f"Quantity Discount NMV: {df_yesterday_discount['qty_discount_nmv'].sum():,.0f}")
print(f"\nNMV by Tier:")
print(f"  Tier 1: {df_yesterday_discount['tier1_nmv'].sum():,.0f}")
print(f"  Tier 2: {df_yesterday_discount['tier2_nmv'].sum():,.0f}")
print(f"  Tier 3: {df_yesterday_discount['tier3_nmv'].sum():,.0f}")

df_yesterday_discount.head(10)


Loading yesterday's discount analysis data...
Loaded 10804 SKU discount records from yesterday

YESTERDAY'S DISCOUNT ANALYSIS SUMMARY

Total NMV yesterday: 23,934,583
SKU Discount NMV: 4,939,885
Quantity Discount NMV: 508,951

NMV by Tier:
  Tier 1: 193,640
  Tier 2: 255,148
  Tier 3: 31,388


  df[col] = pd.to_numeric(df[col], errors='ignore')


Unnamed: 0,warehouse_id,product_id,total_nmv,sku_discount_nmv,qty_discount_nmv,tier1_nmv,tier2_nmv,tier3_nmv,tier_1_qty,tier_1_discount_pct,tier_2_qty,tier_2_discount_pct,tier_3_qty,tier_3_discount_pct,sku_discount_nmv_cntrb,qty_discount_nmv_cntrb,tier1_nmv_cntrb,tier2_nmv_cntrb,tier3_nmv_cntrb
0,962,7630,212560.25,0.0,0.0,0.0,0.0,0.0,,,,,,,0.0,0.0,0.0,0.0,0.0
1,797,2327,107171.0,0.0,0.0,0.0,0.0,0.0,,,,,,,0.0,0.0,0.0,0.0,0.0
2,962,8089,88758.5,68585.75,0.0,0.0,0.0,0.0,,,,,,,77.27,0.0,0.0,0.0,0.0
3,1,5151,82838.0,0.0,25525.5,0.0,25525.5,0.0,4.0,0.47,7.0,1.18,27.0,1.48,0.0,30.81,0.0,30.81,0.0
4,236,130,79852.75,51170.0,0.0,0.0,0.0,0.0,,,,,,,64.08,0.0,0.0,0.0,0.0
5,236,5151,78047.0,0.0,43227.0,6003.75,33621.0,0.0,4.0,0.62,7.0,2.33,,,0.0,55.39,7.69,43.08,0.0
6,1,143,72715.0,44167.75,0.0,0.0,0.0,0.0,4.0,0.83,7.0,1.6,,,60.74,0.0,0.0,0.0,0.0
7,501,2778,64524.0,56070.0,0.0,0.0,0.0,0.0,,,,,,,86.9,0.0,0.0,0.0,0.0
8,1,1492,63065.75,31480.0,0.0,0.0,0.0,0.0,4.0,0.92,7.0,1.82,,,49.92,0.0,0.0,0.0,0.0
9,236,13259,54790.0,0.0,0.0,0.0,0.0,0.0,,,,,,,0.0,0.0,0.0,0.0,0.0


In [37]:
# =============================================================================
# Add Yesterday's Discount Analysis to pricing_with_discount (Contributions Only)
# =============================================================================

# Merge yesterday discount data with pricing_with_discount - contributions + tier config
pricing_with_discount = pricing_with_discount.merge(
    df_yesterday_discount[[
        'warehouse_id', 'product_id', 
        'sku_discount_nmv_cntrb', 'qty_discount_nmv_cntrb',
        'tier1_nmv_cntrb', 'tier2_nmv_cntrb', 'tier3_nmv_cntrb',
        'tier_1_qty', 'tier_1_discount_pct',
        'tier_2_qty', 'tier_2_discount_pct',
        'tier_3_qty', 'tier_3_discount_pct'
    ]].rename(columns={
        'sku_discount_nmv_cntrb': 'yesterday_sku_disc_cntrb',
        'qty_discount_nmv_cntrb': 'yesterday_qty_disc_cntrb',
        'tier1_nmv_cntrb': 'yesterday_t1_cntrb',
        'tier2_nmv_cntrb': 'yesterday_t2_cntrb',
        'tier3_nmv_cntrb': 'yesterday_t3_cntrb',
        'tier_1_qty': 'qd_tier_1_qty',
        'tier_1_discount_pct': 'qd_tier_1_disc_pct',
        'tier_2_qty': 'qd_tier_2_qty',
        'tier_2_discount_pct': 'qd_tier_2_disc_pct',
        'tier_3_qty': 'qd_tier_3_qty',
        'tier_3_discount_pct': 'qd_tier_3_disc_pct'
    }), 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill NaN for SKUs that had no sales yesterday
contrib_cols = [
    'yesterday_sku_disc_cntrb', 'yesterday_qty_disc_cntrb',
    'yesterday_t1_cntrb', 'yesterday_t2_cntrb', 'yesterday_t3_cntrb'
]
for col in contrib_cols:
    if col in pricing_with_discount.columns:
        pricing_with_discount[col] = pricing_with_discount[col].fillna(0)

# Fill NaN for QD tier config (0 means no tier configured)
qd_config_cols = [
    'qd_tier_1_qty', 'qd_tier_1_disc_pct',
    'qd_tier_2_qty', 'qd_tier_2_disc_pct',
    'qd_tier_3_qty', 'qd_tier_3_disc_pct'
]
for col in qd_config_cols:
    if col in pricing_with_discount.columns:
        pricing_with_discount[col] = pricing_with_discount[col].fillna(0)

print(f"Yesterday's discount contributions and QD tier config added!")
print(f"\nSKUs with discount data: {len(pricing_with_discount[pricing_with_discount['yesterday_sku_disc_cntrb'] > 0]) + len(pricing_with_discount[pricing_with_discount['yesterday_qty_disc_cntrb'] > 0])}")
print(f"SKUs with QD tier config: {len(pricing_with_discount[pricing_with_discount['qd_tier_1_qty'] > 0])}")
print(f"\nSample data with yesterday's discount contributions and QD tiers:")
pricing_with_discount[pricing_with_discount['qd_tier_1_qty'] > 0][
    ['product_id', 'warehouse_id', 'sku', 
     'yesterday_sku_disc_cntrb', 'yesterday_qty_disc_cntrb',
     'qd_tier_1_qty', 'qd_tier_1_disc_pct', 'qd_tier_2_qty', 'qd_tier_2_disc_pct', 'qd_tier_3_qty', 'qd_tier_3_disc_pct']
].head(15)


Yesterday's discount contributions and QD tier config added!

SKUs with discount data: 3359
SKUs with QD tier config: 1068

Sample data with yesterday's discount contributions and QD tiers:


Unnamed: 0,product_id,warehouse_id,sku,yesterday_sku_disc_cntrb,yesterday_qty_disc_cntrb,qd_tier_1_qty,qd_tier_1_disc_pct,qd_tier_2_qty,qd_tier_2_disc_pct,qd_tier_3_qty,qd_tier_3_disc_pct
55,10450,962,15% هارفست فول بزيت الزيتون خصم - 400 جم,0.0,33.33,6.0,0.7,10.0,1.51,0.0,0.0
124,12344,170,زيووو كرانشى الشطة الحار - 5 جنية,0.0,0.0,4.0,2.43,7.0,4.67,0.0,0.0
126,1490,962,زيت حبوبة خليط - 700 مل,27.74,0.0,4.0,0.41,7.0,1.07,57.0,1.35
127,1163,236,لبان كلورتس نعناع - 1.5 ج,0.0,38.5,4.0,0.59,7.0,1.18,394.0,1.36
128,1163,962,لبان كلورتس نعناع - 1.5 ج,0.0,19.18,4.0,0.59,7.0,1.18,394.0,1.36
129,1243,236,ارز حبوبة عريض الحبة - 1 كجم,0.0,17.5,4.0,1.07,7.0,2.07,0.0,0.0
145,214,339,البوادي حلاوة- 255 جم,23.69,0.0,9.0,0.92,15.0,1.82,0.0,0.0
146,214,170,البوادي حلاوة- 255 جم,13.64,20.46,9.0,1.03,15.0,1.92,0.0,0.0
150,346,962,صلصة هارفست ظرف - 30 جم,0.0,0.0,4.0,1.04,7.0,2.01,0.0,0.0
152,1504,8,كوكا كولا - 1.45 لتر,19.92,0.0,4.0,0.92,7.0,2.18,0.0,0.0


In [38]:
# =============================================================================
# Performance Benchmark Query
# Gets: Yesterday qty, Recent 7d qty, MTD qty, and P80 benchmarks (240 days)
# Uses materialized_views.stock_day_close for in-stock determination
# =============================================================================
PERFORMANCE_BENCHMARK_QUERY = f'''
WITH params AS (
    SELECT
        CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE AS today,
        CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 1 AS yesterday,
        CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 240 AS history_start,
        DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE) AS current_month_start,
        DAY(CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE) AS current_day_of_month
),

-- Daily sales aggregation (240 days) - includes qty and retailer count
daily_sales AS (
    SELECT
        pso.warehouse_id,
        pso.product_id,
        so.created_at::DATE AS sale_date,
        SUM(pso.purchased_item_count * pso.basic_unit_count) AS daily_qty,
        COUNT(DISTINCT so.retailer_id) AS daily_retailers
    FROM product_sales_order pso
    JOIN sales_orders so ON so.id = pso.sales_order_id
    CROSS JOIN params p
    WHERE so.created_at::DATE >= p.history_start
        AND so.created_at::DATE < p.today
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
    GROUP BY pso.warehouse_id, pso.product_id, so.created_at::DATE
),

-- Daily stock status using stock_day_close
-- In-stock = opening (prev day close) > 0 AND closing > 0
daily_stock AS (
    SELECT
        sdc.warehouse_id,
        sdc.product_id,
        sdc.TIMESTAMP::DATE AS stock_date,
        sdc.available_stock,
        LAG(sdc.available_stock, 1) OVER (
            PARTITION BY sdc.warehouse_id, sdc.product_id 
            ORDER BY sdc.TIMESTAMP::DATE
        ) AS opening_stock,
        CASE 
            WHEN LAG(sdc.available_stock, 1) OVER (
                    PARTITION BY sdc.warehouse_id, sdc.product_id ORDER BY sdc.TIMESTAMP::DATE
                 ) > 0 
                 AND sdc.available_stock > 0 
            THEN 1 
            ELSE 0 
        END AS in_stock_flag
    FROM materialized_views.stock_day_close sdc
    CROSS JOIN params p
    WHERE sdc.TIMESTAMP::DATE >= p.history_start - 1  -- Need one extra day for LAG
        AND sdc.TIMESTAMP::DATE < p.today
),

-- Combine sales with stock status
daily_with_stock AS (
    SELECT
        COALESCE(ds.warehouse_id, st.warehouse_id) AS warehouse_id,
        COALESCE(ds.product_id, st.product_id) AS product_id,
        COALESCE(ds.sale_date, st.stock_date) AS the_date,
        COALESCE(ds.daily_qty, 0) AS daily_qty,
        COALESCE(ds.daily_retailers, 0) AS daily_retailers,
        COALESCE(st.in_stock_flag, 0) AS in_stock_flag
    FROM daily_sales ds
    FULL OUTER JOIN daily_stock st 
        ON ds.warehouse_id = st.warehouse_id 
        AND ds.product_id = st.product_id 
        AND ds.sale_date = st.stock_date
    WHERE COALESCE(ds.sale_date, st.stock_date) >= (SELECT history_start FROM params)
),

-- Calculate P80 benchmark (in-stock days only, 240 days, EXCLUDING last 7 days)
p80_daily_benchmark AS (
    SELECT
        warehouse_id,
        product_id,
        PERCENTILE_CONT(0.8) WITHIN GROUP (ORDER BY daily_qty) AS p80_daily_240d,
        AVG(daily_qty) AS avg_daily_240d,
        STDDEV(daily_qty) AS std_daily_240d,
        COUNT(*) AS in_stock_days_240d
    FROM daily_with_stock
    CROSS JOIN params p
    WHERE in_stock_flag = 1
        AND the_date >= p.history_start
        AND the_date < p.today - 7  -- Exclude last 7 days from benchmark
    GROUP BY warehouse_id, product_id
),

-- Calculate P70 retailer benchmark (in-stock days only, 240 days, EXCLUDING last 7 days)
p70_retailer_benchmark AS (
    SELECT
        warehouse_id,
        product_id,
        PERCENTILE_CONT(0.7) WITHIN GROUP (ORDER BY daily_retailers) AS p70_daily_retailers_240d,
        AVG(daily_retailers) AS avg_daily_retailers_240d,
        STDDEV(daily_retailers) AS std_daily_retailers_240d
    FROM daily_with_stock
    CROSS JOIN params p
    WHERE in_stock_flag = 1
        AND the_date >= p.history_start
        AND the_date < p.today - 7  -- Exclude last 7 days from benchmark
    GROUP BY warehouse_id, product_id
),

-- Calculate 7-day rolling SUM for P80 recent benchmark
rolling_7d AS (
    SELECT
        warehouse_id,
        product_id,
        the_date,
        SUM(daily_qty) OVER (
            PARTITION BY warehouse_id, product_id 
            ORDER BY the_date 
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) AS rolling_7d_sum,
        SUM(in_stock_flag) OVER (
            PARTITION BY warehouse_id, product_id 
            ORDER BY the_date 
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) AS in_stock_days_7d
    FROM daily_with_stock
),

p80_7d_benchmark AS (
    SELECT
        warehouse_id,
        product_id,
        PERCENTILE_CONT(0.8) WITHIN GROUP (ORDER BY rolling_7d_sum) AS p80_7d_rolling_240d
    FROM rolling_7d
    CROSS JOIN params p
    WHERE the_date >= p.history_start + 7  -- Need 7 days for rolling
        AND the_date < p.today - 7  -- Exclude last 7 days from benchmark
        AND in_stock_days_7d >= 4  -- At least 4 of 7 days in stock
    GROUP BY warehouse_id, product_id
),

-- MTD benchmark: P80 of same MTD period totals (last 12 months)
-- Sum all sales from day 1 to current day of month for each historical month
mtd_historical AS (
    SELECT
        dws.warehouse_id,
        dws.product_id,
        DATE_TRUNC('month', dws.the_date) AS period_month_start,
        SUM(dws.daily_qty) AS mtd_total_qty  -- Sum of all days from 1 to current_day_of_month
    FROM daily_with_stock dws
    CROSS JOIN params p
    WHERE DAY(dws.the_date) <= p.current_day_of_month  -- Only days up to current day of month
    GROUP BY dws.warehouse_id, dws.product_id, DATE_TRUNC('month', dws.the_date)
),

mtd_by_period AS (
    SELECT
        mh.warehouse_id,
        mh.product_id,
        mh.period_month_start,
        mh.mtd_total_qty AS mtd_qty_at_day  -- Total MTD qty for that month
    FROM mtd_historical mh
    CROSS JOIN params p
    WHERE mh.period_month_start >= DATEADD(month, -12, p.current_month_start)
        AND mh.period_month_start < p.current_month_start
),

p80_mtd_benchmark AS (
    SELECT
        warehouse_id,
        product_id,
        PERCENTILE_CONT(0.8) WITHIN GROUP (ORDER BY mtd_qty_at_day) AS p80_mtd_12mo,
        AVG(mtd_qty_at_day) AS avg_mtd_12mo
    FROM mtd_by_period
    GROUP BY warehouse_id, product_id
    HAVING COUNT(*) >= 3  -- At least 3 months of data
),

-- Current period quantities
current_metrics AS (
    SELECT
        warehouse_id,
        product_id,
        -- Yesterday
        SUM(CASE WHEN the_date = (SELECT yesterday FROM params) THEN daily_qty ELSE 0 END) AS yesterday_qty,
        SUM(CASE WHEN the_date = (SELECT yesterday FROM params) THEN daily_retailers ELSE 0 END) AS yesterday_retailers,
        -- Yesterday in-stock flag (1 if in stock yesterday, 0 otherwise)
        MAX(CASE WHEN the_date = (SELECT yesterday FROM params) THEN in_stock_flag ELSE 0 END) AS yesterday_in_stock,
        -- Recent 7 days
        SUM(CASE WHEN the_date >= (SELECT today FROM params) - 7 AND the_date < (SELECT today FROM params) THEN daily_qty ELSE 0 END) AS recent_7d_qty,
        SUM(CASE WHEN the_date >= (SELECT today FROM params) - 7 AND the_date < (SELECT today FROM params) AND in_stock_flag = 1 THEN 1 ELSE 0 END) AS recent_7d_in_stock_days,
        -- MTD
        SUM(CASE WHEN the_date >= (SELECT current_month_start FROM params) AND the_date < (SELECT today FROM params) THEN daily_qty ELSE 0 END) AS mtd_qty,
        SUM(CASE WHEN the_date >= (SELECT current_month_start FROM params) AND the_date < (SELECT today FROM params) AND in_stock_flag = 1 THEN 1 ELSE 0 END) AS mtd_in_stock_days
    FROM daily_with_stock
    GROUP BY warehouse_id, product_id
),

-- Combined performance ratio calculation with dynamic weights
combined_ratio_calc AS (
    SELECT
        cm.warehouse_id,
        cm.product_id,
        cm.yesterday_qty,
        cm.yesterday_retailers,
        cm.yesterday_in_stock,
        cm.recent_7d_qty,
        cm.recent_7d_in_stock_days,
        cm.mtd_qty,
        cm.mtd_in_stock_days,
        
        -- Benchmark values
        COALESCE(pb.p80_daily_240d, 1) AS p80_daily_240d,
        COALESCE(pb.avg_daily_240d, 0) AS avg_daily_240d,
        COALESCE(pb.std_daily_240d, 0) AS std_daily_240d,
        COALESCE(pb.in_stock_days_240d, 0) AS in_stock_days_240d,
        COALESCE(p7.p80_7d_rolling_240d, pb.p80_daily_240d * 7, 1) AS p80_7d_sum_240d,
        COALESCE(pm.p80_mtd_12mo, pb.p80_daily_240d * (SELECT current_day_of_month FROM params), 1) AS p80_mtd_12mo,
        
        -- Retailer benchmarks
        COALESCE(pr.p70_daily_retailers_240d, 1) AS p70_daily_retailers_240d,
        COALESCE(pr.avg_daily_retailers_240d, 0) AS avg_daily_retailers_240d,
        COALESCE(pr.std_daily_retailers_240d, 0) AS std_daily_retailers_240d,
        
        -- Calculate base ratios (capped at 3)
        LEAST(cm.yesterday_qty / NULLIF(COALESCE(pb.p80_daily_240d, 1), 0), 3) AS yesterday_ratio_capped,
        LEAST(cm.recent_7d_qty / NULLIF(COALESCE(p7.p80_7d_rolling_240d, pb.p80_daily_240d * 7, 1), 0), 3) AS recent_ratio_capped,
        LEAST(cm.mtd_qty / NULLIF(COALESCE(pm.p80_mtd_12mo, pb.p80_daily_240d * (SELECT current_day_of_month FROM params), 1), 0), 3) AS mtd_ratio_capped,
        
        -- In-stock percentages for each period
        cm.yesterday_in_stock AS yesterday_in_stock_pct,
        cm.recent_7d_in_stock_days / 7.0 AS recent_7d_in_stock_pct,
        cm.mtd_in_stock_days / NULLIF((SELECT current_day_of_month FROM params) - 1, 0) AS mtd_in_stock_pct,
        
        -- Raw weights (base: 20% yesterday, 40% recent 7d, 40% MTD) scaled by in-stock percentage
        0.2 * cm.yesterday_in_stock AS yesterday_raw_weight,
        0.4 * (cm.recent_7d_in_stock_days / 7.0) AS recent_7d_raw_weight,
        CASE 
            WHEN (SELECT current_day_of_month FROM params) >= 3 
            THEN 0.4 * COALESCE(cm.mtd_in_stock_days / NULLIF((SELECT current_day_of_month FROM params) - 1, 0), 0)
            ELSE 0 
        END AS mtd_raw_weight
        
    FROM current_metrics cm
    LEFT JOIN p80_daily_benchmark pb ON cm.warehouse_id = pb.warehouse_id AND cm.product_id = pb.product_id
    LEFT JOIN p80_7d_benchmark p7 ON cm.warehouse_id = p7.warehouse_id AND cm.product_id = p7.product_id
    LEFT JOIN p80_mtd_benchmark pm ON cm.warehouse_id = pm.warehouse_id AND cm.product_id = pm.product_id
    LEFT JOIN p70_retailer_benchmark pr ON cm.warehouse_id = pr.warehouse_id AND cm.product_id = pr.product_id
),

-- Pre-calculate combined ratio to avoid repetition
final_with_combined AS (
    SELECT
        crc.*,
        -- Calculate combined performance ratio once
        CASE WHEN (crc.yesterday_raw_weight + crc.recent_7d_raw_weight + crc.mtd_raw_weight) > 0
        THEN (
            (crc.yesterday_raw_weight / (crc.yesterday_raw_weight + crc.recent_7d_raw_weight + crc.mtd_raw_weight)) * crc.yesterday_ratio_capped +
            (crc.recent_7d_raw_weight / (crc.yesterday_raw_weight + crc.recent_7d_raw_weight + crc.mtd_raw_weight)) * crc.recent_ratio_capped +
            (crc.mtd_raw_weight / (crc.yesterday_raw_weight + crc.recent_7d_raw_weight + crc.mtd_raw_weight)) * crc.mtd_ratio_capped
        )
        ELSE 0 END AS combined_perf_ratio
    FROM combined_ratio_calc crc
)

-- Final output (same columns as original, values adjusted when over-achieving)
SELECT
    f.warehouse_id,
    f.product_id,
    
    -- Current period quantities
    f.yesterday_qty,
    f.yesterday_retailers,
    f.recent_7d_qty,
    f.recent_7d_in_stock_days,
    f.mtd_qty,
    f.mtd_in_stock_days,
    
    -- Quantity Benchmarks (P80) - adjusted when over-achieving
    CASE WHEN f.combined_perf_ratio > 1.1 
         THEN ROUND(f.p80_daily_240d + 0.5 * f.std_daily_240d, 2)
         ELSE f.p80_daily_240d 
    END AS p80_daily_240d,
    f.avg_daily_240d,
    f.std_daily_240d,
    f.in_stock_days_240d,
    CASE WHEN f.combined_perf_ratio > 1.1 
         THEN ROUND(f.p80_7d_sum_240d + 0.5 * f.std_daily_240d * 7, 2)
         ELSE f.p80_7d_sum_240d 
    END AS p80_7d_sum_240d,
    CASE WHEN f.combined_perf_ratio > 1.1 
         THEN ROUND(f.p80_mtd_12mo + 0.5 * f.std_daily_240d * (SELECT current_day_of_month FROM params), 2)
         ELSE f.p80_mtd_12mo 
    END AS p80_mtd_12mo,
    
    -- Retailer Benchmarks (P70) - adjusted when over-achieving
    CASE WHEN f.combined_perf_ratio > 1.1 
         THEN ROUND(f.p70_daily_retailers_240d + 0.5 * f.std_daily_retailers_240d, 2)
         ELSE f.p70_daily_retailers_240d 
    END AS p70_daily_retailers_240d,
    f.avg_daily_retailers_240d,
    f.std_daily_retailers_240d,
    
    -- Performance ratios (adjusted when over-achieving)
    ROUND(f.yesterday_qty / NULLIF(
        CASE WHEN f.combined_perf_ratio > 1.1 
             THEN f.p80_daily_240d + 0.5 * f.std_daily_240d
             ELSE f.p80_daily_240d 
        END, 0), 2) AS yesterday_ratio,
    ROUND(f.recent_7d_qty / NULLIF(
        CASE WHEN f.combined_perf_ratio > 1.1 
             THEN f.p80_7d_sum_240d + 0.5 * f.std_daily_240d * 7
             ELSE f.p80_7d_sum_240d 
        END, 0), 2) AS recent_ratio,
    ROUND(f.mtd_qty / NULLIF(
        CASE WHEN f.combined_perf_ratio > 1.1 
             THEN f.p80_mtd_12mo + 0.5 * f.std_daily_240d * (SELECT current_day_of_month FROM params)
             ELSE f.p80_mtd_12mo 
        END, 0), 2) AS mtd_ratio,
    ROUND(f.yesterday_retailers / NULLIF(
        CASE WHEN f.combined_perf_ratio > 1.1 
             THEN f.p70_daily_retailers_240d + 0.5 * f.std_daily_retailers_240d
             ELSE f.p70_daily_retailers_240d 
        END, 0), 2) AS yesterday_retailer_ratio,
    
    -- Additional columns for visibility
    ROUND(f.combined_perf_ratio, 2) AS combined_perf_ratio,
    CASE WHEN f.combined_perf_ratio > 1.1 THEN 1 ELSE 0 END AS is_over_achiever

FROM final_with_combined f
WHERE f.warehouse_id IN (1, 236, 337, 8, 339, 170, 501, 401, 703, 632, 797, 962)
'''

# Execute benchmark query
print("Loading performance benchmark data (this may take a moment due to 240-day history)...")
df_benchmarks = query_snowflake(PERFORMANCE_BENCHMARK_QUERY)
df_benchmarks = convert_to_numeric(df_benchmarks)
print(f"Loaded {len(df_benchmarks)} benchmark records")

# =============================================================================
# Apply Minimum Thresholds to Benchmark Values
# - Daily quantity benchmarks should not be below 5
# - Daily retailers benchmarks should not be less than 2
# This ensures performance calculations don't use unrealistic benchmarks
# =============================================================================
MIN_DAILY_QTY_BENCHMARK = 5
MIN_DAILY_RETAILERS_BENCHMARK = 5

# Apply minimums to daily benchmarks
df_benchmarks['p80_daily_240d'] = df_benchmarks['p80_daily_240d'].clip(lower=MIN_DAILY_QTY_BENCHMARK)
df_benchmarks['p70_daily_retailers_240d'] = df_benchmarks['p70_daily_retailers_240d'].clip(lower=MIN_DAILY_RETAILERS_BENCHMARK)

# Apply proportional minimums to interval-based benchmarks
df_benchmarks['p80_7d_sum_240d'] = df_benchmarks['p80_7d_sum_240d'].clip(lower=MIN_DAILY_QTY_BENCHMARK * 7)  # 35

# For MTD, calculate dynamic minimum based on days in current period
# mtd_in_stock_days represents how many days of data we have in current month
df_benchmarks['p80_mtd_12mo'] = df_benchmarks.apply(
    lambda row: max(row['p80_mtd_12mo'], MIN_DAILY_QTY_BENCHMARK * max(row['mtd_in_stock_days'], 1)),
    axis=1
)

print(f"Applied minimum thresholds: qty >= {MIN_DAILY_QTY_BENCHMARK}/day, retailers >= {MIN_DAILY_RETAILERS_BENCHMARK}/day")

# Preview
df_benchmarks


Loading performance benchmark data (this may take a moment due to 240-day history)...


  df[col] = pd.to_numeric(df[col], errors='ignore')


Loaded 298633 benchmark records
Applied minimum thresholds: qty >= 5/day, retailers >= 5/day


Unnamed: 0,warehouse_id,product_id,yesterday_qty,yesterday_retailers,recent_7d_qty,recent_7d_in_stock_days,mtd_qty,mtd_in_stock_days,p80_daily_240d,avg_daily_240d,...,p80_mtd_12mo,p70_daily_retailers_240d,avg_daily_retailers_240d,std_daily_retailers_240d,yesterday_ratio,recent_ratio,mtd_ratio,yesterday_retailer_ratio,combined_perf_ratio,is_over_achiever
0,632,11111,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0
1,962,7314,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0
2,339,2003,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0
3,501,5503,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0
4,962,15725,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,,,,,0.0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
298628,962,19064,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0
298629,236,1913,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0
298630,170,18181,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0
298631,501,12510,0,0,0,0,0,0,5.0,0.000,...,5.0,5.0,0.0,0.000000,0.0,0.00,,0.0,0.0,0


In [39]:
# =============================================================================
# Add Performance Benchmarks and Tags to pricing_with_discount
# =============================================================================

# Merge benchmark data with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_benchmarks[[
        'warehouse_id', 'product_id',
        'yesterday_qty', 'yesterday_retailers', 'recent_7d_qty', 'recent_7d_in_stock_days', 'mtd_qty', 'mtd_in_stock_days',
        'p80_daily_240d', 'avg_daily_240d','std_daily_240d', 'in_stock_days_240d',
        'p80_7d_sum_240d', 'p80_mtd_12mo',
        'p70_daily_retailers_240d', 'avg_daily_retailers_240d', 'std_daily_retailers_240d',
        'yesterday_ratio', 'recent_ratio', 'mtd_ratio', 'yesterday_retailer_ratio'
    ]], 
    on=['warehouse_id', 'product_id'], 
    how='left'
)

# Fill NaN values
qty_cols = ['yesterday_qty', 'yesterday_retailers', 'recent_7d_qty', 'recent_7d_in_stock_days', 'mtd_qty', 'mtd_in_stock_days']
for col in qty_cols:
    pricing_with_discount[col] = pricing_with_discount[col].fillna(0)

benchmark_cols = ['p80_daily_240d', 'p80_7d_sum_240d', 'p80_mtd_12mo', 'p70_daily_retailers_240d']
for col in benchmark_cols:
    pricing_with_discount[col] = pricing_with_discount[col].fillna(5)  # Default to 1 to avoid division issues

ratio_cols = ['yesterday_ratio', 'recent_ratio', 'mtd_ratio', 'yesterday_retailer_ratio']
for col in ratio_cols:
    pricing_with_discount[col] = pricing_with_discount[col].fillna(0)

pricing_with_discount['avg_daily_240d'] = pricing_with_discount['avg_daily_240d'].fillna(0)
pricing_with_discount['in_stock_days_240d'] = pricing_with_discount['in_stock_days_240d'].fillna(0)
pricing_with_discount['avg_daily_retailers_240d'] = pricing_with_discount['avg_daily_retailers_240d'].fillna(0)
pricing_with_discount['std_daily_retailers_240d'] = pricing_with_discount['std_daily_retailers_240d'].fillna(0)

# =============================================================================
# Performance Tags - Classify each ratio
# =============================================================================
def get_performance_tag(ratio):
    """
    Classify performance based on ratio to benchmark
    On Track: 90% to 115% of benchmark
    Upper tiers: start from 115%
    Lower tiers: start from 90%
    """
    if pd.isna(ratio) or ratio == 0:
        return 'No Data'
    elif ratio >= 1.75:
        return 'Star Performer'      # 🌟 75%+ above benchmark
    elif ratio > 1.15:
        return 'Over Achiever'       # 🔥 15%+ above benchmark  
    elif ratio >= 0.90:
        return 'On Track'            # ✅ Meeting benchmark (90%-115%)
    elif ratio >= 0.70:
        return 'Underperforming'     # ⚠️ 10%-30% below benchmark
    elif ratio >= 0.40:
        return 'Struggling'          # 🔻 30%-60% below benchmark
    else:
        return 'Critical'            # 🚨 60%+ below benchmark

# Apply tags to each timeframe
pricing_with_discount['yesterday_status'] = pricing_with_discount['yesterday_ratio'].apply(get_performance_tag)
pricing_with_discount['recent_status'] = pricing_with_discount['recent_ratio'].apply(get_performance_tag)
pricing_with_discount['mtd_status'] = pricing_with_discount['mtd_ratio'].apply(get_performance_tag)

# =============================================================================
# Combined Performance Score (weighted average of ratios)
# Approach 2: Scale Weights by In-Stock Percentage
# =============================================================================

# Calculate days in month so far (excluding today)
days_in_month_so_far = max(TODAY.day - 1, 1)  # At least 1 to avoid division by zero

# Calculate in-stock percentages for each period
pricing_with_discount['yesterday_in_stock_pct'] = 1 - pricing_with_discount['oos_yesterday']
pricing_with_discount['recent_7d_in_stock_pct'] = pricing_with_discount['recent_7d_in_stock_days'] / 7
pricing_with_discount['mtd_in_stock_pct'] = pricing_with_discount['mtd_in_stock_days'] / days_in_month_so_far

# Base weights: Yesterday 20%, Recent 7d 40%, MTD 40%
# Scale by in-stock percentage
# NOTE: MTD weight = 0 for first 3 days of month (unreliable data)
MTD_RELIABLE_DAY = 3  # Only use MTD when day >= 3
pricing_with_discount['yesterday_raw_weight'] = 0.2 * pricing_with_discount['yesterday_in_stock_pct']
pricing_with_discount['recent_7d_raw_weight'] = 0.4 * pricing_with_discount['recent_7d_in_stock_pct']
pricing_with_discount['mtd_raw_weight'] = np.where(
    TODAY.day >= MTD_RELIABLE_DAY,
    0.4 * pricing_with_discount['mtd_in_stock_pct'],
    0  # Set MTD weight to 0 at start of month
)

# Total raw weight for normalization
pricing_with_discount['total_raw_weight'] = (
    pricing_with_discount['yesterday_raw_weight'] + 
    pricing_with_discount['recent_7d_raw_weight'] + 
    pricing_with_discount['mtd_raw_weight']
)

# Normalized weights (sum to 1)
pricing_with_discount['yesterday_norm_weight'] = np.where(
    pricing_with_discount['total_raw_weight'] > 0,
    pricing_with_discount['yesterday_raw_weight'] / pricing_with_discount['total_raw_weight'],
    0
)
pricing_with_discount['recent_7d_norm_weight'] = np.where(
    pricing_with_discount['total_raw_weight'] > 0,
    pricing_with_discount['recent_7d_raw_weight'] / pricing_with_discount['total_raw_weight'],
    0
)
pricing_with_discount['mtd_norm_weight'] = np.where(
    pricing_with_discount['total_raw_weight'] > 0,
    pricing_with_discount['mtd_raw_weight'] / pricing_with_discount['total_raw_weight'],
    0
)

# Combined performance ratio with dynamic weights based on in-stock days
pricing_with_discount['combined_perf_ratio'] = (
    pricing_with_discount['yesterday_norm_weight'] * pricing_with_discount['yesterday_ratio'].clip(upper=3) +
    pricing_with_discount['recent_7d_norm_weight'] * pricing_with_discount['recent_ratio'].clip(upper=3) +
    pricing_with_discount['mtd_norm_weight'] * pricing_with_discount['mtd_ratio'].clip(upper=3)
)

# Handle cases where total_raw_weight = 0 (completely OOS in all periods)
pricing_with_discount['combined_perf_ratio'] = pricing_with_discount['combined_perf_ratio'].fillna(0)

# Clean up intermediate columns (optional - keep for debugging)
weight_debug_cols = ['yesterday_in_stock_pct', 'recent_7d_in_stock_pct', 'mtd_in_stock_pct',
                     'yesterday_raw_weight', 'recent_7d_raw_weight', 'mtd_raw_weight', 'total_raw_weight',
                     'yesterday_norm_weight', 'recent_7d_norm_weight', 'mtd_norm_weight']
# Uncomment to drop: pricing_with_discount.drop(columns=weight_debug_cols, inplace=True)

print(f"\nDynamic weight calculation complete!")
print(f"Days in month so far: {days_in_month_so_far}")
print(f"\nSample of weight distributions:")
print(pricing_with_discount[pricing_with_discount['total_raw_weight'] > 0][
    ['product_id', 'warehouse_id', 'oos_yesterday', 'recent_7d_in_stock_days', 'mtd_in_stock_days',
     'yesterday_norm_weight', 'recent_7d_norm_weight', 'mtd_norm_weight', 'combined_perf_ratio']
].head(10))

pricing_with_discount['combined_status'] = pricing_with_discount['combined_perf_ratio'].apply(get_performance_tag)

# =============================================================================
# High Performer Flag (for immediate action consideration)
# =============================================================================
# Flag SKUs that are significantly over-achieving and may need action (price increase, etc.)
pricing_with_discount['high_performer_flag'] = np.where(
    (pricing_with_discount['yesterday_ratio'] >= 1.5) & 
    (pricing_with_discount['recent_ratio'] >= 1.3) &
    (pricing_with_discount['mtd_ratio'] >= 1.2),
    1, 0
)

# Star performer flag (exceptional - all metrics 2x+ benchmark)
pricing_with_discount['star_performer_flag'] = np.where(
    (pricing_with_discount['yesterday_ratio'] >= 2.0) & 
    (pricing_with_discount['recent_ratio'] >= 1.5) &
    (pricing_with_discount['mtd_ratio'] >= 1.5),
    1, 0
)

# =============================================================================
# Summary
# =============================================================================
print(f"Performance benchmarks added!")
print(f"\n{'='*60}")
print(f"PERFORMANCE STATUS DISTRIBUTION")
print(f"{'='*60}")

print(f"\nYesterday Status:")
print(pricing_with_discount['yesterday_status'].value_counts().to_string())

print(f"\nRecent 7d Status:")
print(pricing_with_discount['recent_status'].value_counts().to_string())

print(f"\nMTD Status:")
print(pricing_with_discount['mtd_status'].value_counts().to_string())

print(f"\nCombined Status:")
print(pricing_with_discount['combined_status'].value_counts().to_string())

print(f"\n{'='*60}")
print(f"HIGH PERFORMERS (Action Candidates)")
print(f"{'='*60}")
print(f"High Performers (flag=1): {len(pricing_with_discount[pricing_with_discount['high_performer_flag'] == 1])}")
print(f"Star Performers (flag=1): {len(pricing_with_discount[pricing_with_discount['star_performer_flag'] == 1])}")

# Show top performers
print(f"\nTop 15 Star Performers:")
pricing_with_discount[pricing_with_discount['star_performer_flag'] == 1].nlargest(15, 'combined_perf_ratio')[
    ['product_id', 'warehouse_id', 'sku', 
     'yesterday_ratio', 'recent_ratio', 'mtd_ratio', 'combined_perf_ratio',
     'yesterday_status', 'combined_status']
]



Dynamic weight calculation complete!
Days in month so far: 9

Sample of weight distributions:
    product_id  warehouse_id  oos_yesterday  recent_7d_in_stock_days  \
0        10124           337              0                        7   
1        10124             8              0                        7   
2        10124           401              0                        7   
3         8915           236              0                        7   
4         8915           962              0                        4   
6        22286           401              0                        7   
7         3456           337              0                        4   
8         3456             8              0                        5   
11          61           797              0                        4   
12       12902             1              0                        7   

    mtd_in_stock_days  yesterday_norm_weight  recent_7d_norm_weight  \
0                   9               0.200

Unnamed: 0,product_id,warehouse_id,sku,yesterday_ratio,recent_ratio,mtd_ratio,combined_perf_ratio,yesterday_status,combined_status
23089,24244,703,كيك كمارا تورنيدو كريمة الفانيليا و مربى الفرا...,12.0,46.0,50.0,3.0,Star Performer,Star Performer
25934,5852,170,مكرونة كايرو لسان عصفور - 400 جم,3.4,3.0,4.66,3.0,Star Performer,Star Performer
41589,10175,339,مسحوق باهى لافندر اوتوماتيك لافندر 4 كم + 500 ...,5.32,7.2,5.89,3.0,Star Performer,Star Performer
45886,12925,703,زيت ثمرات خليط - 550 مل,16.37,4.06,7.22,3.0,Star Performer,Star Performer
55892,475,1,هانم سمنة بطعم الزبدة الصفراء- 1.5 كجم,4.48,3.76,3.47,3.0,Star Performer,Star Performer
55893,475,1,هانم سمنة بطعم الزبدة الصفراء- 1.5 كجم,4.48,3.76,3.47,3.0,Star Performer,Star Performer
65915,12925,501,زيت ثمرات خليط - 550 مل,3.69,3.17,3.08,3.0,Star Performer,Star Performer
82012,2879,797,زيت كريستال عباد الشمس 4 زجاجة - 2.2 لتر,3.46,4.39,3.29,3.0,Star Performer,Star Performer
14627,12574,962,بيك رولز طعم الجبنه الناتشو حجم عائلي - 15 جنية,24.0,6.14,4.13,3.0,Star Performer,Star Performer
30372,12559,236,بيك رولز حلو وحار - 10 جنية,8.0,4.57,8.0,3.0,Star Performer,Star Performer


In [40]:
# =============================================================================
# No NMV in Last 4 Months Flag
# Identifies SKUs that have not generated any NMV in the past 4 months (120 days)
# =============================================================================
NO_NMV_4M_QUERY = f'''
WITH nmv_last_4m AS (
    SELECT 
        pso.warehouse_id,
        pso.product_id,
        SUM(pso.total_price) AS total_nmv_4m
    FROM product_sales_order pso
    JOIN sales_orders so ON so.id = pso.sales_order_id
    WHERE so.created_at::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 120
        AND so.created_at::DATE < CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
    GROUP BY pso.warehouse_id, pso.product_id
    HAVING SUM(pso.total_price) > 0
)
SELECT 
    warehouse_id,
    product_id,
    total_nmv_4m
FROM nmv_last_4m
'''

# Execute query
print("Loading SKUs with NMV in last 4 months...")
df_nmv_4m = query_snowflake(NO_NMV_4M_QUERY)
df_nmv_4m = convert_to_numeric(df_nmv_4m)
print(f"Found {len(df_nmv_4m)} SKU-warehouse combinations with NMV in last 4 months")

# Merge and create no_nmv_4m flag
pricing_with_discount = pricing_with_discount.merge(
    df_nmv_4m[['warehouse_id', 'product_id', 'total_nmv_4m']],
    on=['warehouse_id', 'product_id'],
    how='left'
)

# Flag SKUs with no NMV in last 4 months
# 1 = No NMV (should potentially be filtered), 0 = Has NMV
pricing_with_discount['no_nmv_4m'] = np.where(
    pricing_with_discount['total_nmv_4m'].isna() | (pricing_with_discount['total_nmv_4m'] == 0),
    1, 0
)

# Fill NaN for total_nmv_4m
pricing_with_discount['total_nmv_4m'] = pricing_with_discount['total_nmv_4m'].fillna(0)

print(f"\n{'='*60}")
print(f"NO NMV IN LAST 4 MONTHS ANALYSIS")
print(f"{'='*60}")
print(f"Total records: {len(pricing_with_discount)}")
print(f"SKUs with NO NMV in 4 months (no_nmv_4m=1): {len(pricing_with_discount[pricing_with_discount['no_nmv_4m'] == 1])}")
print(f"SKUs with NMV in 4 months (no_nmv_4m=0): {len(pricing_with_discount[pricing_with_discount['no_nmv_4m'] == 0])}")

# Show sample of SKUs with no NMV
print(f"\nSample SKUs with no NMV in last 4 months:")
pricing_with_discount[pricing_with_discount['no_nmv_4m'] == 1][
    ['product_id', 'warehouse_id', 'sku', 'stocks', 'in_stock_rr', 'zero_demand', 'no_nmv_4m']
].head(15)


Loading SKUs with NMV in last 4 months...


  df[col] = pd.to_numeric(df[col], errors='ignore')


Found 28962 SKU-warehouse combinations with NMV in last 4 months

NO NMV IN LAST 4 MONTHS ANALYSIS
Total records: 91842
SKUs with NO NMV in 4 months (no_nmv_4m=1): 62709
SKUs with NMV in 4 months (no_nmv_4m=0): 29133

Sample SKUs with no NMV in last 4 months:


Unnamed: 0,product_id,warehouse_id,sku,stocks,in_stock_rr,zero_demand,no_nmv_4m
5,23127,703,ويفر فينجرز بكريمة شوكولاتة البندق 6 عبوة - 33 جم,0,0.0,0,1
9,23548,703,ريف بسكويت ويفر بحشو كريمة البندق - 12 قطعه,0,0.0,0,1
10,23548,703,ريف بسكويت ويفر بحشو كريمة البندق - 12 قطعه,0,0.0,0,1
28,13880,632,بالانس ساور كريم و أنيون - 15 جنية,0,0.0,0,1
39,6222,339,دجاج كوكى كرانشى بارد + بطاطس - 12 قطعه,0,0.0,0,1
40,6222,170,دجاج كوكى كرانشى بارد + بطاطس - 12 قطعه,0,0.0,0,1
58,12235,501,تيكا فن بيض صغير أزرق اولاد - 15 جرام 10 جنية,0,0.0,0,1
59,22694,703,ديدوز كوكيز بالفواكه و الشوفان محشو بالتمر و ا...,0,0.0,0,1
60,10484,632,برجر دجاج كوكى - 8 قطعه,0,0.0,0,1
63,23282,401,ماكسيللو كيك الذهبي والمحشو بجيلي طعم الفراولة...,0,0.0,0,1


In [42]:
# =============================================================================
# Normal Refill Query - Avg qty & stddev for frequent retailers (last 120 days)
# Frequent retailer definition based on ABC classification (from existing dataframe):
#   - Class A: bought 4+ times
#   - Class B: bought 3+ times
#   - Class C: bought 2+ times
# =============================================================================
NORMAL_REFILL_QUERY = f'''
WITH params AS (
    SELECT 
        CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE AS today,
        CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - 120 AS history_start
),

-- Get retailer order counts per product-warehouse (last 120 days)
retailer_orders AS (
    SELECT 
        pso.warehouse_id,
        pso.product_id,
        so.retailer_id,
        COUNT(DISTINCT so.id) AS order_count
    FROM product_sales_order pso
    JOIN sales_orders so ON so.id = pso.sales_order_id
    CROSS JOIN params p
    WHERE so.created_at::DATE >= p.history_start
        AND so.created_at::DATE < p.today
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
    GROUP BY pso.warehouse_id, pso.product_id, so.retailer_id
),

-- Get individual order quantities per retailer
order_quantities AS (
    SELECT 
        pso.warehouse_id,
        pso.product_id,
        so.retailer_id,
        so.id AS order_id,
        SUM(pso.purchased_item_count * pso.basic_unit_count) AS order_qty
    FROM product_sales_order pso
    JOIN sales_orders so ON so.id = pso.sales_order_id
    CROSS JOIN params p
    WHERE so.created_at::DATE >= p.history_start
        AND so.created_at::DATE < p.today
        AND so.sales_order_status_id NOT IN (7, 12)
        AND so.channel IN ('telesales', 'retailer')
        AND pso.purchased_item_count <> 0
    GROUP BY pso.warehouse_id, pso.product_id, so.retailer_id, so.id
)

-- Return retailer-level data with order counts for Python filtering
SELECT 
    oq.warehouse_id,
    oq.product_id,
    oq.retailer_id,
    ro.order_count,
    oq.order_id,
    oq.order_qty
FROM order_quantities oq
JOIN retailer_orders ro 
    ON ro.warehouse_id = oq.warehouse_id 
    AND ro.product_id = oq.product_id 
    AND ro.retailer_id = oq.retailer_id
'''

# Execute normal refill query
print("Loading retailer order data for normal refill calculation (last 120 days)...")
df_retailer_orders = query_snowflake(NORMAL_REFILL_QUERY)
df_retailer_orders = convert_to_numeric(df_retailer_orders)
print(f"Loaded {len(df_retailer_orders)} retailer order records")

# Get ABC classification from existing dataframe
abc_mapping = pricing_with_discount[['warehouse_id', 'product_id', 'abc_class']].drop_duplicates()
print(f"ABC classification mapping: {len(abc_mapping)} product-warehouse combinations")

# Merge ABC classification into retailer orders
df_retailer_orders = df_retailer_orders.merge(
    abc_mapping,
    on=['warehouse_id', 'product_id'],
    how='inner'
)
print(f"Records after ABC merge: {len(df_retailer_orders)}")

# Filter frequent retailers based on ABC class thresholds
# Class A: 4+ orders, Class B: 3+ orders, Class C: 2+ orders
df_frequent = df_retailer_orders[
    ((df_retailer_orders['abc_class'] == 'A') & (df_retailer_orders['order_count'] >= 4)) |
    ((df_retailer_orders['abc_class'] == 'B') & (df_retailer_orders['order_count'] >= 3)) |
    ((df_retailer_orders['abc_class'] == 'C') & (df_retailer_orders['order_count'] >= 2))
].copy()
print(f"Records from frequent retailers: {len(df_frequent)}")

# Calculate normal_refill (avg qty) and refill_stddev per product-warehouse
df_normal_refill = df_frequent.groupby(['warehouse_id', 'product_id']).agg(
    frequent_retailer_count=('retailer_id', 'nunique'),
    frequent_order_count=('order_id', 'nunique'),
    normal_refill=('order_qty', 'mean'),
    refill_stddev=('order_qty', 'std')
).reset_index()

# Round values and fill NaN stddev (when only 1 order)
df_normal_refill['normal_refill'] = df_normal_refill['normal_refill'].round(2)
df_normal_refill['refill_stddev'] = df_normal_refill['refill_stddev'].fillna(0).round(2)

# Filter to products with at least 2 orders for meaningful stats
df_normal_refill = df_normal_refill[df_normal_refill['frequent_order_count'] >= 2]
print(f"Final normal refill records (min 2 orders): {len(df_normal_refill)}")

# Merge with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_normal_refill[['warehouse_id', 'product_id', 'frequent_retailer_count', 
                      'frequent_order_count', 'normal_refill', 'refill_stddev']],
    on=['warehouse_id', 'product_id'],
    how='left'
)

# Fill NaN values
pricing_with_discount['frequent_retailer_count'] = pricing_with_discount['frequent_retailer_count'].fillna(0)
pricing_with_discount['frequent_order_count'] = pricing_with_discount['frequent_order_count'].fillna(0)
pricing_with_discount['normal_refill'] = pricing_with_discount['normal_refill'].fillna(0)
pricing_with_discount['refill_stddev'] = pricing_with_discount['refill_stddev'].fillna(0)

print(f"\n{'='*60}")
print(f"NORMAL REFILL ANALYSIS (Frequent Retailers - 120 days)")
print(f"{'='*60}")
print(f"Records with normal_refill data: {len(pricing_with_discount[pricing_with_discount['normal_refill'] > 0])}")
print(f"Records without normal_refill data: {len(pricing_with_discount[pricing_with_discount['normal_refill'] == 0])}")
print(f"\nNormal refill distribution:")
print(pricing_with_discount[pricing_with_discount['normal_refill'] > 0]['normal_refill'].describe())
print(f"\nSample data:")
pricing_with_discount[pricing_with_discount['normal_refill'] > 0][
    ['product_id', 'warehouse_id', 'sku', 'abc_class', 'frequent_retailer_count', 
     'frequent_order_count', 'normal_refill', 'refill_stddev', 'in_stock_rr']
].head(15)


Loading retailer order data for normal refill calculation (last 120 days)...


  df[col] = pd.to_numeric(df[col], errors='ignore')


Loaded 4877249 retailer order records
ABC classification mapping: 86712 product-warehouse combinations
Records after ABC merge: 4643854
Records from frequent retailers: 1633070
Final normal refill records (min 2 orders): 22110

NORMAL REFILL ANALYSIS (Frequent Retailers - 120 days)
Records with normal_refill data: 23283
Records without normal_refill data: 68559

Normal refill distribution:
count    23283.000000
mean         3.175506
std         32.905029
min          1.000000
25%          1.250000
50%          1.740000
75%          2.920000
max       4534.000000
Name: normal_refill, dtype: float64

Sample data:


Unnamed: 0,product_id,warehouse_id,sku,abc_class,frequent_retailer_count,frequent_order_count,normal_refill,refill_stddev,in_stock_rr
0,10124,337,مسحوق ليدر عادى لافندر - 2 كجم,B,17.0,77.0,3.7,5.14,7.0
1,10124,8,مسحوق ليدر عادى لافندر - 2 كجم,C,18.0,48.0,1.9,1.34,3.0
2,10124,401,مسحوق ليدر عادى لافندر - 2 كجم,C,5.0,11.0,1.18,0.6,2.0
3,8915,236,بيبسى - 1.47 لتر,A,234.0,1446.0,3.73,7.11,183.0
4,8915,962,بيبسى - 1.47 لتر,A,219.0,1275.0,3.73,3.58,175.0
7,3456,337,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,C,10.0,25.0,1.72,1.14,6.0
8,3456,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,C,31.0,70.0,2.57,3.07,9.0
11,61,797,نسكافيه 3*1 - 18 جم,B,4.0,14.0,2.29,1.59,23.0
12,12902,1,فن داى كب كيك بنكهة الفانيلا وقطع الشوكولاتة ...,C,12.0,25.0,1.96,2.26,4.0
13,2703,632,نسكافيه 3*1 ريتش - 21 جم,C,10.0,21.0,3.1,3.45,3.0


In [43]:
# =============================================================================
# Live Cart Rules Query - Get current cart rules from the system
# Merges on product_id and cohort_id
# =============================================================================
LIVE_CART_RULES_QUERY = f'''
SELECT 
    cppu.cohort_id,
    pup.product_id,
    pup.packing_unit_id,
    pup.basic_unit_count,
    COALESCE(cppu.MAX_PER_SALES_ORDER, cppu2.MAX_PER_SALES_ORDER) AS current_cart_rule
FROM COHORT_PRODUCT_PACKING_UNITS cppu 
JOIN PACKING_UNIT_PRODUCTS pup ON cppu.PRODUCT_PACKING_UNIT_ID = pup.id 
JOIN cohorts c ON c.id = cppu.cohort_id
LEFT JOIN COHORT_PRODUCT_PACKING_UNITS cppu2 
    ON cppu.PRODUCT_PACKING_UNIT_ID = cppu2.PRODUCT_PACKING_UNIT_ID 
    AND cppu2.cohort_id = c.FALLBACK_COHORT_ID
WHERE cppu.cohort_id IN ({','.join(map(str, COHORT_IDS))})
'''

# Execute live cart rules query
print("Loading live cart rules...")
df_cart_rules = query_snowflake(LIVE_CART_RULES_QUERY)
df_cart_rules = convert_to_numeric(df_cart_rules)
print(f"Loaded {len(df_cart_rules)} cart rule records")

# Aggregate to product-cohort level (take the cart rule for basic unit, or min if multiple)
# Filter to basic unit (packing_unit_id where basic_unit_count = 1) for simpler merging
df_cart_rules_basic = df_cart_rules[df_cart_rules['basic_unit_count'] == 1].copy()
print(f"Basic unit cart rules: {len(df_cart_rules_basic)}")

# If no basic unit, take the minimum cart rule per product-cohort
df_cart_rules_agg = df_cart_rules.groupby(['cohort_id', 'product_id']).agg(
    current_cart_rule=('current_cart_rule', 'min')
).reset_index()

# Prefer basic unit cart rule, fallback to aggregated
df_cart_rules_final = df_cart_rules_basic[['cohort_id', 'product_id', 'current_cart_rule']].drop_duplicates()
df_cart_rules_final = df_cart_rules_final.merge(
    df_cart_rules_agg[['cohort_id', 'product_id', 'current_cart_rule']].rename(columns={'current_cart_rule': 'cart_rule_agg'}),
    on=['cohort_id', 'product_id'],
    how='outer'
)
df_cart_rules_final['current_cart_rule'] = df_cart_rules_final['current_cart_rule'].fillna(df_cart_rules_final['cart_rule_agg'])
df_cart_rules_final = df_cart_rules_final[['cohort_id', 'product_id', 'current_cart_rule']].drop_duplicates()
print(f"Final cart rules (product-cohort level): {len(df_cart_rules_final)}")

# Merge with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_cart_rules_final,
    on=['cohort_id', 'product_id'],
    how='left'
)

# Fill NaN cart rules with 0 (no cart rule set)
pricing_with_discount['current_cart_rule'] = pricing_with_discount['current_cart_rule'].fillna(0)

print(f"\n{'='*60}")
print(f"LIVE CART RULES ANALYSIS")
print(f"{'='*60}")
print(f"Records with cart rule > 0: {len(pricing_with_discount[pricing_with_discount['current_cart_rule'] > 0])}")
print(f"Records without cart rule: {len(pricing_with_discount[pricing_with_discount['current_cart_rule'] == 0])}")
print(f"\nCart rule distribution:")
print(pricing_with_discount[pricing_with_discount['current_cart_rule'] > 0]['current_cart_rule'].describe())
print(f"\nSample data with cart rules:")
pricing_with_discount[pricing_with_discount['current_cart_rule'] > 0][
    ['product_id', 'cohort_id', 'warehouse_id', 'sku', 'current_price', 'current_cart_rule', 'in_stock_rr']
].head(15)


Loading live cart rules...


  df[col] = pd.to_numeric(df[col], errors='ignore')


Loaded 110893 cart rule records
Basic unit cart rules: 73147
Final cart rules (product-cohort level): 73142

LIVE CART RULES ANALYSIS
Records with cart rule > 0: 91874
Records without cart rule: 0

Cart rule distribution:
count    91874.000000
mean        50.865511
std        566.221086
min          2.000000
25%          8.000000
50%         14.000000
75%         25.000000
max      10000.000000
Name: current_cart_rule, dtype: float64

Sample data with cart rules:


Unnamed: 0,product_id,cohort_id,warehouse_id,sku,current_price,current_cart_rule,in_stock_rr
0,10124,703,337,مسحوق ليدر عادى لافندر - 2 كجم,270.25,15,7.0
1,10124,703,8,مسحوق ليدر عادى لافندر - 2 كجم,270.25,15,3.0
2,10124,1126,401,مسحوق ليدر عادى لافندر - 2 كجم,261.75,7,2.0
3,8915,701,236,بيبسى - 1.47 لتر,186.5,25,183.0
4,8915,701,962,بيبسى - 1.47 لتر,186.5,25,175.0
5,23127,1123,703,ويفر فينجرز بكريمة شوكولاتة البندق 6 عبوة - 33 جم,26.0,10,0.0
6,22286,1126,401,مكرونة كايرو مرمرية - 1 كجم,228.0,25,3.0
7,3456,703,337,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,132.25,10,6.0
8,3456,703,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,132.25,10,9.0
9,23548,1123,703,ريف بسكويت ويفر بحشو كريمة البندق - 12 قطعه,118.25,10,0.0


In [44]:
# =============================================================================
# Commercial Constraint Minimum Price Query
# Gets the minimum price constraints from finance.minimum_prices
# =============================================================================
COMMERCIAL_MIN_PRICE_QUERY = f'''
WITH to_remove AS (
    SELECT 
        check_date AS start_date,
        (check_date + INTERVAL '1 month') + 6 AS end_date 
    FROM (
        SELECT 
            CASE 
                WHEN DATE_PART('day', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE) < 7 
                THEN DATE_TRUNC('month', CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE - INTERVAL '1 month') 
                ELSE DATE_FROM_PARTS(
                    YEAR(CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE), 
                    MONTH(CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE), 
                    1
                )  
            END AS check_date
    )
),
region_mapping as(
select *
from (
values
('Greater Cairo', 'Cairo'),
('Greater Cairo', 'Giza')

)x(region,main_region)

)

SELECT  
    sku_id AS product_id,
    sku,
    brand AS comm_brand,
    cat AS comm_cat,
    region,
    created_at AS comm_created_at,
    min_price AS commercial_min_price
FROM (
    SELECT 
        product_id AS sku_id,
        product_name AS sku,
        brand,
        category AS cat,
        coalesce(rm.main_region,mp.region) as region,
        min_price,
        created_at,
        MAX(created_at) OVER (PARTITION BY product_id, mp.region) AS latest_date
    FROM finance.minimum_prices mp
	left join region_mapping rm on mp.region = rm.region
    WHERE is_deleted = 'false'
        AND created_at BETWEEN (SELECT start_date FROM to_remove) AND (SELECT end_date FROM to_remove)
) comm
WHERE created_at = latest_date
'''

# Execute commercial min price query
print("Loading commercial minimum price constraints...")
df_commercial_min = query_snowflake(COMMERCIAL_MIN_PRICE_QUERY)
df_commercial_min = convert_to_numeric(df_commercial_min)
print(f"Loaded {len(df_commercial_min)} commercial min price records")

# Merge with pricing_with_discount on product_id and region
pricing_with_discount = pricing_with_discount.merge(
    df_commercial_min[['product_id', 'region', 'commercial_min_price']],
    on=['product_id', 'region'],
    how='left'
)

# Fill NaN with 0 (no commercial constraint)
pricing_with_discount['commercial_min_price'] = pricing_with_discount['commercial_min_price'].fillna(0)

print(f"\n{'='*60}")
print(f"COMMERCIAL MINIMUM PRICE CONSTRAINTS")
print(f"{'='*60}")
print(f"Records with commercial min price: {len(pricing_with_discount[pricing_with_discount['commercial_min_price'] > 0])}")
print(f"Records without commercial min price: {len(pricing_with_discount[pricing_with_discount['commercial_min_price'] == 0])}")
print(f"\nCommercial min price distribution:")
print(pricing_with_discount[pricing_with_discount['commercial_min_price'] > 0]['commercial_min_price'].describe())
print(f"\nSample data with commercial constraints:")
pricing_with_discount[pricing_with_discount['commercial_min_price'] > 0][
    ['product_id', 'region', 'warehouse_id', 'sku', 'current_price', 'commercial_min_price', 'price_after_discount']
].head(15)


Loading commercial minimum price constraints...
Loaded 724 commercial min price records

COMMERCIAL MINIMUM PRICE CONSTRAINTS
Records with commercial min price: 1588
Records without commercial min price: 90286

Commercial min price distribution:
count    1588.000000
mean      226.513550
std       161.007447
min        11.750000
25%        97.000000
50%       210.999999
75%       286.000000
max       939.000000
Name: commercial_min_price, dtype: float64

Sample data with commercial constraints:


  df[col] = pd.to_numeric(df[col], errors='ignore')


Unnamed: 0,product_id,region,warehouse_id,sku,current_price,commercial_min_price,price_after_discount
86,13024,Delta East,339,بسكويت تاو تاو فانتوم فراولة - 6 قطعه,24.75,24.5,24.75
87,13024,Delta East,170,بسكويت تاو تاو فانتوم فراولة - 6 قطعه,24.75,24.5,24.75
139,24243,Upper Egypt,703,كيك تاوتاو تربو محشو كريم الفانيليا ومربى الفر...,25.5,24.5,25.5
215,24244,Upper Egypt,501,كيك كمارا تورنيدو كريمة الفانيليا و مربى الفرا...,24.5,24.5,23.5567
258,13038,Cairo,1,ﺷوﻜﻮﻻﺗﺔ ﺗﺎﻭﺗﺎﻭ ﺣﻠﻴﺐ - 5 جنية,50.75,49.5,50.75
302,13968,Upper Egypt,401,بيتي عصير برتقال عرض خاص 27ج - 1 لتر,283.5,286.0,283.5
330,2669,Delta East,339,كلوركس جل نعناع - 750 مل,509.0,508.990411,505.2334
331,2669,Delta East,170,كلوركس جل نعناع - 750 مل,509.0,508.990411,503.2483
559,3389,Upper Egypt,401,كلوركس الوان عادى - 475 مل,260.25,265.0,260.25
560,3397,Alexandria,797,كلوركس 5*1 صنوبر - 700 مل,503.25,504.121312,503.25


In [45]:
# =============================================================================
# Active SKU Discount Query - Get current SKU discount percentage per warehouse
# =============================================================================
ACTIVE_SKU_DISCOUNT_QUERY = f'''
WITH active_sku_discount AS ( 
    SELECT 
        x.id AS sku_discount_id,
        retailer_id,
        product_id,
        packing_unit_id,
        DISCOUNT_PERCENTAGE,
        start_at,
        end_at 
    FROM (
        SELECT 
            sd.*,
            f.value::INT AS retailer_id 
        FROM SKU_DISCOUNTS sd,
        LATERAL FLATTEN(
            input => SPLIT(
                REPLACE(REPLACE(REPLACE(sd.retailer_ids, '{{', ''), '}}', ''), '"', ''), 
                ','
            )
        ) f
        WHERE start_at::DATE <= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE
            AND end_at::DATE >= CONVERT_TIMEZONE('{TIMEZONE}', 'Africa/Cairo', CURRENT_TIMESTAMP())::DATE
    ) x 
    JOIN SKU_DISCOUNT_VALUES sdv ON x.id = sdv.sku_discount_id
    WHERE name_en = 'Special Discounts'
    QUALIFY MAX(start_at) OVER (PARTITION BY retailer_id, product_id, packing_unit_id) = start_at 
)

SELECT 
    product_id, 
    warehouse_id,
    AVG(DISCOUNT_PERCENTAGE) AS active_sku_disc_pct 
FROM (
    SELECT 
        asd.*,
        warehouse_id 
    FROM active_sku_discount asd 
    JOIN materialized_views.retailer_polygon rp ON rp.retailer_id = asd.retailer_id
    JOIN WAREHOUSE_DISPATCHING_RULES wdr ON wdr.product_id = asd.product_id
    JOIN DISPATCHING_POLYGONS dp ON dp.id = wdr.DISPATCHING_POLYGON_ID AND dp.district_id = rp.district_id
)
GROUP BY ALL
'''

# Execute active SKU discount query
print("Loading active SKU discount data...")
df_active_sku_disc = query_snowflake(ACTIVE_SKU_DISCOUNT_QUERY)
df_active_sku_disc = convert_to_numeric(df_active_sku_disc)
print(f"Loaded {len(df_active_sku_disc)} active SKU discount records")

# Merge with pricing_with_discount
pricing_with_discount = pricing_with_discount.merge(
    df_active_sku_disc[['product_id', 'warehouse_id', 'active_sku_disc_pct']],
    on=['product_id', 'warehouse_id'],
    how='left'
)

# Fill NaN with 0 (no active SKU discount)
pricing_with_discount['active_sku_disc_pct'] = pricing_with_discount['active_sku_disc_pct'].fillna(0)

print(f"\n{'='*60}")
print(f"ACTIVE SKU DISCOUNT ANALYSIS")
print(f"{'='*60}")
print(f"Records with active SKU discount: {len(pricing_with_discount[pricing_with_discount['active_sku_disc_pct'] > 0])}")
print(f"Records without active SKU discount: {len(pricing_with_discount[pricing_with_discount['active_sku_disc_pct'] == 0])}")
print(f"\nActive SKU discount distribution:")
print(pricing_with_discount[pricing_with_discount['active_sku_disc_pct'] > 0]['active_sku_disc_pct'].describe())
print(f"\nSample data with active SKU discounts:")
pricing_with_discount[pricing_with_discount['active_sku_disc_pct'] > 0][
    ['product_id', 'warehouse_id', 'sku', 'current_price', 'active_sku_disc_pct', 'discount_perc']
].head(15)


Loading active SKU discount data...
Loaded 8418 active SKU discount records

ACTIVE SKU DISCOUNT ANALYSIS
Records with active SKU discount: 8993
Records without active SKU discount: 82881

Active SKU discount distribution:
count    8993.000000
mean        1.582507
std         1.089419
min         0.250000
25%         0.792846
50%         1.260000
75%         2.027959
max         5.000000
Name: active_sku_disc_pct, dtype: float64

Sample data with active SKU discounts:


  df[col] = pd.to_numeric(df[col], errors='ignore')


Unnamed: 0,product_id,warehouse_id,sku,current_price,active_sku_disc_pct,discount_perc
3,8915,236,بيبسى - 1.47 لتر,186.5,1.142228,0.006524
4,8915,962,بيبسى - 1.47 لتر,186.5,1.17,0.0
8,3456,8,نسكافيه 2*1 بدون سكر 24 ظرف - 10 جم,132.25,1.346811,0.003819
11,61,797,نسكافيه 3*1 - 18 جم,163.25,5.0,0.0
12,12902,1,فن داى كب كيك بنكهة الفانيلا وقطع الشوكولاتة ...,94.5,1.025759,0.000663
13,2703,632,نسكافيه 3*1 ريتش - 21 جم,104.0,0.55,0.0
14,2327,337,شويبس جولد اناناس - 1.75 لتر,211.0,0.81,0.002666
17,9877,632,تونة دولفين جولد فصوص حار - 170 جم,63.25,0.86,0.0
18,8990,236,حفاضات بى بم جامبو بانتس مقاس 3 - 56 حفاضة,268.5,0.93,0.0
19,8990,962,حفاضات بى بم جامبو بانتس مقاس 3 - 56 حفاضة,268.5,0.93,0.0


In [46]:
final_df = pricing_with_discount[(pricing_with_discount['no_nmv_4m']==0)|(pricing_with_discount['stocks']>0)]

In [48]:
# Drop duplicates before saving
final_df = final_df.drop_duplicates(subset=['product_id', 'warehouse_id'], keep='first')
#final_df.to_excel('pricing_with_discount.xlsx', index=False)
print(f"Exported {len(final_df)} records (duplicates removed)")

Exported 28352 records (duplicates removed)


In [None]:
final_df['created_at'] = TODAY
final_df['created_at'] =pd.to_datetime(final_df['created_at']).dt.date

In [None]:
status = upload_dataframe_to_snowflake("Egypt", final_df, "MATERIALIZED_VIEWS", "Pricing_data_extraction", "append", auto_create_table=True, conn=None)

# Send Slack notification
if status:
    slack_message = f"""✅ *Data Extraction Script Completed Successfully*
    
📅 Date: {TODAY}
📊 Records uploaded: {len(final_df):,}
🗄️ Table: MATERIALIZED_VIEWS.Pricing_data_extraction
⏰ Completed at: {datetime.now(CAIRO_TZ).strftime('%Y-%m-%d %H:%M:%S')} Cairo time"""
    
    send_text_slack('new-pricing-logic',slack_message)
    print("✅ Slack notification sent!")
else:
    error_message = f"""❌ *Data Extraction Script Failed*
    
📅 Date: {TODAY}
⏰ Failed at: {datetime.now(CAIRO_TZ).strftime('%Y-%m-%d %H:%M:%S')} Cairo time
⚠️ Upload to Snowflake failed - please check logs"""
    
    send_text_slack('new-pricing-logic',error_message)
    print("❌ Error notification sent to Slack!")