In [1]:
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

import os
import sys

project_path = os.path.abspath(os.path.join('..'))

if project_path not in sys.path:
    sys.path.append(project_path)

## Generate AGG_HYB_FCST table

In [2]:
AGG_HYB_FCST = pd.DataFrame(
    {
        'PRODUCT_LVL_ID6': [x for x in range(600001, 602001)],
        'LOCATION_LVL_ID8': [x for x in range(800001, 802001)],
        'CUSTOMER_LVL_ID6': [x for x in range(600001, 602001)],
        'DISTR_CHANNEL_LVL_ID6': [x for x in range(600001, 602001)],
        'PERIOD_DT': pd.date_range(start='2015-01-01', periods=2000, freq='MS'),
        'PERIOD_END_DT': pd.date_range(start='2015-02-01', periods=2000, freq='MS'),
        'SEGMENT_NAME': ['name1' for x in range(600001, 602001)],
        'VF_FORECAST_VALUE': np.random.uniform(0, 100, 2000),
        'DEMAND_TYPE': np.random.randint(0, 2, 2000),
        'ASSORTMENT_TYPE': np.random.choice(['new', 'old'], 2000),
        'ML_FORECAST_VALUE': np.random.uniform(0, 100, 2000),
        'HYBRID_FORECAST_VALUE': np.random.uniform(0, 100, 2000)
    }
                           )

AGG_HYB_FCST['PERIOD_DT'] += pd.Timedelta('1D')

In [3]:
AGG_HYB_FCST.head()

Unnamed: 0,PRODUCT_LVL_ID6,LOCATION_LVL_ID8,CUSTOMER_LVL_ID6,DISTR_CHANNEL_LVL_ID6,PERIOD_DT,PERIOD_END_DT,SEGMENT_NAME,VF_FORECAST_VALUE,DEMAND_TYPE,ASSORTMENT_TYPE,ML_FORECAST_VALUE,HYBRID_FORECAST_VALUE
0,600001,800001,600001,600001,2015-01-02,2015-02-01,name1,9.190933,0,old,2.833737,27.708736
1,600002,800002,600002,600002,2015-02-02,2015-03-01,name1,74.199686,1,new,44.95859,45.786556
2,600003,800003,600003,600003,2015-03-02,2015-04-01,name1,90.215905,1,new,40.587812,67.570161
3,600004,800004,600004,600004,2015-04-02,2015-05-01,name1,4.658254,0,old,14.551054,37.1682
4,600005,800005,600005,600005,2015-05-02,2015-06-01,name1,13.006199,1,old,0.367381,49.406356


In [4]:
class Disaccumulation:
    def __init__(self, data, out_time_lvl):
        self.data = data.copy()
        self.out_time_lvl = out_time_lvl
        self.FINAL_GRANULARITY_DELIVERED = True
        
    def _get_period_start(self, dt, time_lvl):
        if time_lvl == 'D':
            return dt
        elif time_lvl.startswith('W'):
            if '.' in time_lvl:
                parts = time_lvl.split('.')
                if len(parts) > 1:
                    day_num = int(parts[1])
                    target_dow = (day_num - 2) % 7
                else:
                    target_dow = 6
            else:
                target_dow = 6
            days_back = (dt.weekday() - target_dow) % 7
            return dt - pd.Timedelta(days=days_back)
        elif time_lvl == 'M':
            return dt.replace(day=1)
        return dt
        
    def _get_period_end(self, dt, time_lvl):
        if time_lvl == 'D':
            return dt
        elif time_lvl.startswith('W'):
            period_start = self._get_period_start(dt, time_lvl)
            return period_start + pd.Timedelta(days=6)
        elif time_lvl == 'M':
            if dt.month == 12:
                return dt.replace(year=dt.year + 1, month=1, day=1) - pd.Timedelta(days=1)
            else:
                return dt.replace(month=dt.month + 1, day=1) - pd.Timedelta(days=1)
        return dt
        
    def check_granulatiry(self):
        period_start_dt = self.data['PERIOD_DT'].apply(lambda x: self._get_period_start(x, self.out_time_lvl))
        period_end_dt = self.data['PERIOD_END_DT'].apply(lambda x: self._get_period_start(x, self.out_time_lvl))
        
        if (period_start_dt != period_end_dt).any():
            self.FINAL_GRANULARITY_DELIVERED = False
            
        return self.FINAL_GRANULARITY_DELIVERED
        
    def change_granularity(self):
        result_rows = []
        
        for _, row in tqdm(self.data.iterrows(), total=self.data.shape[0]):
            period_dt = row['PERIOD_DT']
            period_end_dt = row['PERIOD_END_DT']
            
            periods = []
            current = period_dt
            
            while current <= period_end_dt:
                period_start = self._get_period_start(current, self.out_time_lvl)
                period_end = self._get_period_end(current, self.out_time_lvl)
                
                out_period_dt = max(period_dt, period_start)
                out_period_end_dt = min(period_end_dt, period_end)
                
                if out_period_dt <= out_period_end_dt:
                    periods.append((out_period_dt, out_period_end_dt))
                
                if period_end >= period_end_dt:
                    break
                    
                current = period_end + pd.Timedelta(days=1)
            
            for out_pd, out_ped in periods:
                new_row = row.copy()
                new_row['OUT_PERIOD_DT'] = out_pd
                new_row['OUT_PERIOD_END_DT'] = out_ped
                result_rows.append(new_row)
        
        self.data_filled = pd.DataFrame(result_rows).reset_index(drop=True)
        return self.data_filled
    
    def share_forecast(self):
        def split_value(x, target_col):
            orig_days = (x['PERIOD_END_DT'] - x['PERIOD_DT']).days + 1
            out_days = (x['OUT_PERIOD_END_DT'] - x['OUT_PERIOD_DT']).days + 1
            return x[target_col] * out_days / orig_days
        
        self.data_filled['VF_FORECAST_VALUE'] = self.data_filled.apply(lambda x: split_value(x, 'VF_FORECAST_VALUE'), axis=1)
        self.data_filled['ML_FORECAST_VALUE'] = self.data_filled.apply(lambda x: split_value(x, 'ML_FORECAST_VALUE'), axis=1)
        self.data_filled['HYBRID_FORECAST_VALUE'] = self.data_filled.apply(lambda x: split_value(x, 'HYBRID_FORECAST_VALUE'), axis=1)
        
        self.data_filled = self.data_filled.drop(['PERIOD_DT', 'PERIOD_END_DT'], axis=1)
        self.data_filled = self.data_filled.rename(columns={'OUT_PERIOD_DT': 'PERIOD_DT', 'OUT_PERIOD_END_DT': 'PERIOD_END_DT'})
        
        id_cols = [col for col in self.data_filled.columns if '_ID' in col or col in ['SEGMENT_NAME', 'DEMAND_TYPE', 'ASSORTMENT_TYPE']]
        sort_cols = id_cols + ['PERIOD_DT']
        self.data_filled = self.data_filled.sort_values(sort_cols).reset_index(drop=True)
        
        return self.data_filled
    
    def provide_product_life_cycle(self, data):
        return data
    
    def provide_location_life_cycle(self, data):
        return data
    
    def provide_customer_life_cycle(self, data):
        return data
    
    def split_forecasts(self):
        self.check_granulatiry()
        if not self.FINAL_GRANULARITY_DELIVERED:
            self.change_granularity()
            self.share_forecast()
            result = self.data_filled
        else:
            result = self.data
            
        result = self.provide_product_life_cycle(result)
        result = self.provide_location_life_cycle(result)
        result = self.provide_customer_life_cycle(result)
        
        return result
    
    

In [5]:
Dis = Disaccumulation(AGG_HYB_FCST, 'W.2')

In [6]:
ACC_AGG_HYBRID_FORECAST = Dis.split_forecasts()

  0%|          | 0/2000 [00:00<?, ?it/s]

In [7]:
ACC_AGG_HYBRID_FORECAST.head()

Unnamed: 0,PRODUCT_LVL_ID6,LOCATION_LVL_ID8,CUSTOMER_LVL_ID6,DISTR_CHANNEL_LVL_ID6,SEGMENT_NAME,VF_FORECAST_VALUE,DEMAND_TYPE,ASSORTMENT_TYPE,ML_FORECAST_VALUE,HYBRID_FORECAST_VALUE,PERIOD_DT,PERIOD_END_DT
0,600001,800001,600001,600001,name1,0.889445,0,old,0.274233,2.681491,2015-01-02,2015-01-04
1,600001,800001,600001,600001,name1,2.075372,0,old,0.639876,6.256811,2015-01-05,2015-01-11
2,600001,800001,600001,600001,name1,2.075372,0,old,0.639876,6.256811,2015-01-12,2015-01-18
3,600001,800001,600001,600001,name1,2.075372,0,old,0.639876,6.256811,2015-01-19,2015-01-25
4,600001,800001,600001,600001,name1,2.075372,0,old,0.639876,6.256811,2015-01-26,2015-02-01


In [8]:
ACC_AGG_HYBRID_FORECAST.shape

(10409, 12)

### Disaccumaltion testing

In [9]:
def test_disaccumulation():    
    print("\n1. Creating test data")
    test_data = pd.DataFrame({
        'PRODUCT_LVL_ID6': [600001, 600002, 600003],
        'LOCATION_LVL_ID8': [800001, 800002, 800003],
        'CUSTOMER_LVL_ID6': [600001, 600002, 600003],
        'DISTR_CHANNEL_LVL_ID6': [600001, 600002, 600003],
        'PERIOD_DT': pd.to_datetime(['2015-01-02', '2015-02-02', '2015-03-02']),
        'PERIOD_END_DT': pd.to_datetime(['2015-02-01', '2015-03-01', '2015-04-01']),
        'SEGMENT_NAME': ['seg1', 'seg2', 'seg3'],
        'VF_FORECAST_VALUE': [100.0, 200.0, 300.0],
        'DEMAND_TYPE': [0, 1, 0],
        'ASSORTMENT_TYPE': ['new', 'old', 'new'],
        'ML_FORECAST_VALUE': [50.0, 150.0, 250.0],
        'HYBRID_FORECAST_VALUE': [75.0, 175.0, 275.0]
    })
    print(f"   Created {len(test_data)} test rows")
    
    print("\n2. Testing granularity check")
    dis = Disaccumulation(test_data, 'W.2')
    is_final = dis.check_granulatiry()
    assert not is_final, "Should detect that monthly periods need splitting"
    print("   Granularity check works (detected need for splitting)")
    
    print("\n3. Testing period splitting (monthly -> weekly)")
    result = dis.split_forecasts()
    print(f"   Split {len(test_data)} rows into {len(result)} rows")
    
    assert len(result) > len(test_data), "Should have more rows after splitting"
    print("   Period splitting successful")
    
    print("\n4. Validating results")
    
    required_cols = ['PRODUCT_LVL_ID6', 'LOCATION_LVL_ID8', 'CUSTOMER_LVL_ID6', 
                     'DISTR_CHANNEL_LVL_ID6', 'PERIOD_DT', 'PERIOD_END_DT',
                     'VF_FORECAST_VALUE', 'ML_FORECAST_VALUE', 'HYBRID_FORECAST_VALUE',
                     'SEGMENT_NAME', 'DEMAND_TYPE', 'ASSORTMENT_TYPE']
    missing_cols = [col for col in required_cols if col not in result.columns]
    assert len(missing_cols) == 0, f"Missing columns: {missing_cols}"
    print("   All required columns present")
    
    assert result['PERIOD_DT'].dtype == 'datetime64[ns]', "PERIOD_DT should be datetime"
    assert result['PERIOD_END_DT'].dtype == 'datetime64[ns]', "PERIOD_END_DT should be datetime"
    print("   Date columns are datetime type")
    
    assert (result['PERIOD_DT'] <= result['PERIOD_END_DT']).all(), "PERIOD_DT should be <= PERIOD_END_DT"
    print("   Date ranges are valid")
    
    print("\n5. Testing forecast value distribution")
    for idx in test_data.index:
        orig_row = test_data.iloc[idx]
        split_rows = result[
            (result['PRODUCT_LVL_ID6'] == orig_row['PRODUCT_LVL_ID6']) &
            (result['LOCATION_LVL_ID8'] == orig_row['LOCATION_LVL_ID8']) &
            (result['CUSTOMER_LVL_ID6'] == orig_row['CUSTOMER_LVL_ID6']) &
            (result['DISTR_CHANNEL_LVL_ID6'] == orig_row['DISTR_CHANNEL_LVL_ID6'])
        ]
        
        orig_days = (orig_row['PERIOD_END_DT'] - orig_row['PERIOD_DT']).days + 1
        total_vf = split_rows['VF_FORECAST_VALUE'].sum()
        total_ml = split_rows['ML_FORECAST_VALUE'].sum()
        total_hybrid = split_rows['HYBRID_FORECAST_VALUE'].sum()
        
        assert np.isclose(total_vf, orig_row['VF_FORECAST_VALUE'], rtol=1e-5), \
            f"VF_FORECAST_VALUE not preserved: {total_vf} vs {orig_row['VF_FORECAST_VALUE']}"
        assert np.isclose(total_ml, orig_row['ML_FORECAST_VALUE'], rtol=1e-5), \
            f"ML_FORECAST_VALUE not preserved: {total_ml} vs {orig_row['ML_FORECAST_VALUE']}"
        assert np.isclose(total_hybrid, orig_row['HYBRID_FORECAST_VALUE'], rtol=1e-5), \
            f"HYBRID_FORECAST_VALUE not preserved: {total_hybrid} vs {orig_row['HYBRID_FORECAST_VALUE']}"
    
    print("   Forecast values correctly distributed (sums match original)")
    
    print("\n6. Testing already-correct granularity")
    daily_data = pd.DataFrame({
        'PRODUCT_LVL_ID6': [600001],
        'LOCATION_LVL_ID8': [800001],
        'CUSTOMER_LVL_ID6': [600001],
        'DISTR_CHANNEL_LVL_ID6': [600001],
        'PERIOD_DT': pd.to_datetime(['2015-01-05']),
        'PERIOD_END_DT': pd.to_datetime(['2015-01-05']),
        'SEGMENT_NAME': ['seg1'],
        'VF_FORECAST_VALUE': [100.0],
        'DEMAND_TYPE': [0],
        'ASSORTMENT_TYPE': ['new'],
        'ML_FORECAST_VALUE': [50.0],
        'HYBRID_FORECAST_VALUE': [75.0]
    })
    
    dis_daily = Disaccumulation(daily_data, 'D')
    is_final_daily = dis_daily.check_granulatiry()
    assert is_final_daily, "Daily data with daily granularity should be final"
    result_daily = dis_daily.split_forecasts()
    assert len(result_daily) == len(daily_data), "Should not split when already correct"
    print("   Handles already-correct granularity")
    
    print("\n7. Testing week calculation")
    test_date = pd.to_datetime('2015-01-07')
    week_start = dis._get_period_start(test_date, 'W.2')
    assert week_start.weekday() == 0, "W.2 should start on Monday"
    print("   Week calculation correct (W.2 = Monday start)")
    
    print("\n8. Results summary:")
    print(f"   Original rows: {len(test_data)}")
    print(f"   Result rows: {len(result)}")
    print(f"   Expansion factor: {len(result) / len(test_data):.2f}x")
    print(f"   Date range: {result['PERIOD_DT'].min()} to {result['PERIOD_END_DT'].max()}")
    
    print("\n" + "-/" * 35)
    print("ALL TESTS PASSED!")
    print("-/" * 35)
    
    return result

In [10]:
result = test_disaccumulation()


1. Creating test data
   Created 3 test rows

2. Testing granularity check
   Granularity check works (detected need for splitting)

3. Testing period splitting (monthly -> weekly)


  0%|          | 0/3 [00:00<?, ?it/s]

   Split 3 rows into 14 rows
   Period splitting successful

4. Validating results
   All required columns present
   Date columns are datetime type
   Date ranges are valid

5. Testing forecast value distribution
   Forecast values correctly distributed (sums match original)

6. Testing already-correct granularity
   Handles already-correct granularity

7. Testing week calculation
   Week calculation correct (W.2 = Monday start)

8. Results summary:
   Original rows: 3
   Result rows: 14
   Expansion factor: 4.67x
   Date range: 2015-01-02 00:00:00 to 2015-04-01 00:00:00

-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/
ALL TESTS PASSED!
-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/-/
