In [2]:
import pandas as pd
import numpy as np
import pickle
from tqdm import tqdm
from xgboost import XGBRegressor
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
import logging
import sys
import boto3
from io import BytesIO
from pymongo import MongoClient

# Get Lambda's root logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Add console handler only if running locally (not in AWS Lambda)
if not any(isinstance(h, logging.StreamHandler) for h in logger.handlers):
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.INFO)
    formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)



In [3]:
def get_transaction_data(project_id: str, planning_scenario_id: str, new_filename_base: str, s3_client=None, bucket_name="dev-ai-analytics-private") -> pd.DataFrame:
    """
    Fetch transaction data from S3 parquet file.

    Args:
        project_id (str): Project identifier
        planning_scenario_id (str): Planning scenario identifier
        new_filename_base (str): Base name of parquet file
        s3_client: Optional boto3 S3 client
        bucket_name (str): S3 bucket name (default: dev-ai-analytics-private)

    Returns:
        pd.DataFrame: Transaction data
    """
    try:
        # --- Step 1: Build S3 key ---
        s3_key = f"fpa/transactional_data/{project_id}/{planning_scenario_id}/{new_filename_base}.parquet"
        logger.info("▶ Fetching from S3: s3://%s/%s", bucket_name, s3_key)

        if s3_client is None:
            s3_client = boto3.client("s3")

        # --- Step 2: Read parquet from S3 ---
        response = s3_client.get_object(Bucket=bucket_name, Key=s3_key)

        df = pd.read_parquet(BytesIO(response["Body"].read()))
        logger.info("✅ Loaded parquet with %d rows, %d columns", df.shape[0], df.shape[1])
        return df

    except Exception as e:
        logger.error("❌ Error fetching transaction data: %s", str(e), exc_info=True)
        return pd.DataFrame()
    
project_id= "01K5DMJ4KE5WSK9BE3KM3M0931"
planning_scenario_id="01K1TAZPVGNR5KK6BBRHZFYWQ7"
transaction_filename= "trans"

df = get_transaction_data(project_id, planning_scenario_id, new_filename_base= transaction_filename)
 


2025-10-01 12:40:40,652 - INFO - ▶ Fetching from S3: s3://dev-ai-analytics-private/fpa/transactional_data/01K5DMJ4KE5WSK9BE3KM3M0931/01K1TAZPVGNR5KK6BBRHZFYWQ7/trans.parquet
2025-10-01 12:40:40,668 - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-10-01 12:40:42,435 - INFO - ✅ Loaded parquet with 65611 rows, 8 columns


In [5]:
def get_account_and_generic_mappings(mongo_uri, db_name, project_id, planning_scenario_id):
    """
    Fetch Account and Generic table names with their mappings 
    for a given project_id and planning_scenario_id.
    
    Date type is skipped.

    Returns
    -------
    dict
        Example:
        {
            "Account": {
                "table_name": "glaccount",
                "mapping": "gl_account_id"
            },
            "Generic": [
                {"table_name": "cost_center", "mapping": "cost_center_id"},
                {"table_name": "profit_center", "mapping": "profit_center_id"}
            ]
        }
    """
    client = MongoClient(mongo_uri)
    db = client[db_name]
    collection = db["table_metadata"]
    mapping_collection = db["mapping_results"]

    # Default structure
    result = {
        "Account": {"table_name": None, "mapping": None},
        "Generic": []
    }

    # --- Step 1: get table names from scenarios.tables ---
    doc_tables = collection.find_one({"project_id": project_id}, {"scenarios": 1, "_id": 0})
    if doc_tables:
        for scenario in doc_tables.get("scenarios", []):
            if scenario.get("planning_scenario_id") == planning_scenario_id:
                for table in scenario.get("tables", []):
                    ttype = table.get("Type")
                    tname = table.get("table_name")

                    if ttype == "Account":
                        result["Account"]["table_name"] = tname
                    elif ttype == "Generic":  # allow multiple generic types
                        result["Generic"].append({"table_name": tname, "mapping": None})

    # --- Step 2: get field mappings from planning_scenarios.result ---
    doc_mappings = mapping_collection.find_one({"project_id": project_id}, {"planning_scenarios": 1, "_id": 0})
    if doc_mappings:
        scenario = doc_mappings.get("planning_scenarios", {}).get(planning_scenario_id, {})
        result_map = scenario.get("result", {})

        # Account mapping
        if result["Account"]["table_name"]:
            result["Account"]["mapping"] = result_map.get(result["Account"]["table_name"])

        # Generic mappings
        for gen in result["Generic"]:
            if gen["table_name"] in result_map:
                gen["mapping"] = result_map[gen["table_name"]]

    client.close()
    return result

mongo_uri= "mongodb://devfpauser:Ok74c3YE7GLN@13.202.247.111:27017/devfpadb"
db_name= "devfpadb"
acc_gen_mapping  = get_account_and_generic_mappings(mongo_uri, db_name, project_id, planning_scenario_id)
print(acc_gen_mapping)

{'Account': {'table_name': 'glaccount', 'mapping': 'gl_account_id'}, 'Generic': [{'table_name': 'center_c', 'mapping': 'cost_center_id'}, {'table_name': 'companycode', 'mapping': 'company_code_id'}, {'table_name': 'profitcenter', 'mapping': 'profit_center_id'}]}


In [6]:
def get_measure_and_date_dimension(mongo_uri, db_name, project_id, planning_scenario_id):
    """
    Fetch 'measure' and 'date_dimension' from the columns field 
    for a given project_id and planning_scenario_id.
    """
    # Connect to MongoDB
    client = MongoClient(mongo_uri)
    db = client[db_name]
    collection = db["recommendeddimensions"]

    # Query filter
    query = {
        "project_id": project_id,
        "planning_scenario_id": planning_scenario_id
    }

    # Projection (only fetch the required fields)
    projection = {
        "columns.measure": 1,
        "columns.date_dimension": 1,
        "_id": 0
    }

    # Fetch document
    result = collection.find_one(query, projection)

    client.close()

    if result and "columns" in result:
        return {
            "measure": result["columns"].get("measure"),
            "date_dimension": result["columns"].get("date_dimension")
        }
    return None


measure_date = get_measure_and_date_dimension(mongo_uri, db_name, project_id, planning_scenario_id)
print(measure_date)

{'measure': 'amount', 'date_dimension': 'posting_date'}


In [7]:
def fallback_with_weights(series, n_periods):
    # Base value = last value or mean
    base = series.iloc[-1] if len(series) > 0 else 0
    
    # Example weights (can be tuned or learned from history)
    # Adds variation across months
    month_weights = [1.00, 1.02, 0.98, 1.05, 1.01, 0.97, 
                     1.03, 1.04, 0.96, 1.02, 1.00, 0.99]
    
    preds = []
    for i in range(n_periods):
        w = month_weights[i % 12]   # cycle over 12 months
        preds.append(base * w)
    
    idx = pd.date_range(series.index[-1] + pd.offsets.MonthBegin(),
                        periods=n_periods, freq='MS')
    
    return pd.Series(preds, index=idx)


In [8]:
import pandas as pd

def forecast_moving_average(series, n_periods, window=3):
    """
    Simple Moving Average Forecast
    """
    if len(series) < window:
        # fallback: use mean of available history
        avg = series.mean() if len(series) > 0 else 0
        idx = pd.date_range(series.index[-1] + pd.offsets.MonthBegin(), periods=n_periods, freq='MS')
        return pd.Series([avg]*n_periods, index=idx), None
    
    avg = series[-window:].mean()
    idx = pd.date_range(series.index[-1] + pd.offsets.MonthBegin(), periods=n_periods, freq='MS')
    forecast = pd.Series([avg]*n_periods, index=idx)
    return forecast, None


Auto forecast

In [12]:
# auto forecast
date = "posting_date"
measure = "amount"
account_id_name = "gl_account_id"

# --- Step 0: Preprocess ---
df = df.copy()
df[date] = pd.to_datetime(df[date])
df[measure] = df[measure].astype(float)
df['date'] = df[date].dt.to_period('M').dt.to_timestamp()
df

Unnamed: 0,gl_account_id,profit_center_id,company_code_id,cost_center_id,posting_date,amount,quantity,unit_price,month,Date,date
0,5000013,PCENT002,CCODE006,CCTR002,2025-02-06,64492.28,48,1447.64,2025-02-01,2025-02-01,2025-02-01
1,5000003,PCENT004,CCODE006,CCTR007,2024-10-30,18414.46,14,1240.04,2024-10-01,2024-10-01,2024-10-01
2,5000004,PCENT006,CCODE001,CCTR002,2024-11-01,92533.98,45,2170.81,2024-11-01,2024-11-01,2024-11-01
3,5000051,PCENT002,CCODE004,CCTR002,2024-09-03,76199.86,17,4054.93,2024-09-01,2024-09-01,2024-09-01
4,5000031,PCENT004,CCODE007,CCTR002,2024-09-24,11228.79,3,3340.19,2024-09-01,2024-09-01,2024-09-01
...,...,...,...,...,...,...,...,...,...,...,...
65606,5000069,PCENT004,CCODE002,CCTR002,2025-04-15,1428.08,6,274.92,2025-04-01,2025-04-01,2025-04-01
65607,5000061,PCENT007,CCODE004,CCTR002,2024-06-26,57315.38,17,3385.54,2024-06-01,2024-06-01,2024-06-01
65608,5000069,PCENT007,CCODE004,CCTR008,2024-09-15,93935.27,34,3057.24,2024-09-01,2024-09-01,2024-09-01
65609,5000041,PCENT002,CCODE005,CCTR008,2024-11-24,37257.52,14,2945.56,2024-11-01,2024-11-01,2024-11-01


In [14]:
# --- Step 1: Aggregate GL × Month ---
gl_monthly = df.groupby([account_id_name, 'date'])[measure].sum().reset_index()
gl_monthly

Unnamed: 0,gl_account_id,date,amount
0,5000002,2024-06-01,2014197.96
1,5000002,2024-07-01,3864472.60
2,5000002,2024-08-01,3935441.30
3,5000002,2024-09-01,3014157.30
4,5000002,2024-10-01,3958473.62
...,...,...,...
814,5000072,2025-02-01,8466476.55
815,5000072,2025-03-01,8635013.40
816,5000072,2025-04-01,7632626.05
817,5000072,2025-05-01,9495330.34


In [16]:
forecast_periods=12
min_history=3
fallback_window=3
max_lag=12

# --- Helper functions for SARIMA/XGB forecasting ---
def forecast_sarima(series, n_periods):
    if len(series) < min_history:
        avg = series[-fallback_window:].mean() if len(series) > 0 else 0
        idx = pd.date_range(series.index[-1] + pd.offsets.MonthBegin(), periods=n_periods, freq='MS')
        return pd.Series([avg]*n_periods, index=idx), None
    model = SARIMAX(series, order=(1,1,1), seasonal_order=(1,1,1,12),
                    enforce_stationarity=False, enforce_invertibility=False)
    results = model.fit(disp=False)
    fc = results.forecast(steps=n_periods)
    return fc, results  # return model object

def forecast_xgb(series, n_periods):
    if len(series) < min_history:
        if len(series) > 1:
            growth = (series.iloc[-1] - series.iloc[0]) / max(1, len(series)-1)
        else:
            growth = 0
        preds = [series.iloc[-1] + (i+1)*growth for i in range(n_periods)]
        idx = pd.date_range(series.index[-1] + pd.offsets.MonthBegin(), periods=n_periods, freq='MS')
        return pd.Series(preds, index=idx), None

    df_feat = pd.DataFrame({measure: series})
    for lag in range(1, max_lag+1):
        df_feat[f'lag_{lag}'] = df_feat[measure].shift(lag)
    df_feat.dropna(inplace=True)
    X = df_feat.drop(columns=[measure])
    y = df_feat[measure]
    model = XGBRegressor(n_estimators=200, learning_rate=0.1, max_depth=5, random_state=42)
    model.fit(X, y)

    preds = []
    history = list(series.values)
    for step in range(n_periods):
        lags = history[-max_lag:]
        if len(lags) < max_lag:
            lags = [0]*(max_lag - len(lags)) + lags
        yhat = model.predict(np.array(lags).reshape(1,-1))[0]
        preds.append(yhat)
        history.append(yhat)
    idx = pd.date_range(series.index[-1] + pd.offsets.MonthBegin(), periods=n_periods, freq='MS')
    return pd.Series(preds, index=idx), model  # return model object


In [None]:
# --- Step 2: Evaluate models & store trained objects ---
results_summary = []
forecasts_list = []
trained_models = {}  # <-- dictionary to hold SARIMA/XGB objects per GL


for gl_id in tqdm(gl_monthly[account_id_name].unique(), desc="Evaluating Models"):
    gl_series = gl_monthly[gl_monthly[account_id_name]==gl_id].set_index('date')[measure].asfreq('MS').fillna(0)
    print("Gl series: ",gl_series)

    if len(gl_series) < min_history + forecast_periods:
        # ✅ New weighted fallback
        final_fc = fallback_with_weights(gl_series, forecast_periods)

        results_summary.append({
            "gl_account_id": gl_id, "sarima_rmse": None, "xgb_rmse": None,
            "sarima_mape": None, "xgb_mape": None, "best_model": "Fallback"
        })
        trained_models[gl_id] = {"SARIMA": None, "XGB": None}

    else:
        train = gl_series.iloc[:-forecast_periods]
        test = gl_series.iloc[-forecast_periods:]

        sarima_fc, sarima_model = forecast_sarima(train, forecast_periods)
        sarima_rmse = np.sqrt(mean_squared_error(test, sarima_fc))
        sarima_mape = mean_absolute_percentage_error(test, sarima_fc)

        xgb_fc, xgb_model = forecast_xgb(train, forecast_periods)
        xgb_rmse = np.sqrt(mean_squared_error(test, xgb_fc))
        xgb_mape = mean_absolute_percentage_error(test, xgb_fc)

        if sarima_rmse < xgb_rmse:
            best_model = "SARIMA"
            final_fc, _ = forecast_sarima(gl_series, forecast_periods)
        else:
            best_model = "XGB"
            final_fc, _ = forecast_xgb(gl_series, forecast_periods)

        results_summary.append({
            "gl_account_id": gl_id, "sarima_rmse": sarima_rmse, "xgb_rmse": xgb_rmse,
            "sarima_mape": sarima_mape, "xgb_mape": xgb_mape, "best_model": best_model
        })

        trained_models[gl_id] = {"SARIMA": sarima_model, "XGB": xgb_model}

    fc_df = final_fc.reset_index()
    fc_df.columns = ['date', 'forecast_amount']
    fc_df[account_id_name] = gl_id
    forecasts_list.append(fc_df)

forecast_df = pd.concat(forecasts_list, ignore_index=True)

forecast_df

Evaluating Models: 100%|██████████| 63/63 [00:00<00:00, 1003.16it/s]


Unnamed: 0,date,forecast_amount,gl_account_id
0,2025-07-01,2.274649e+06,5000002
1,2025-08-01,2.320142e+06,5000002
2,2025-09-01,2.229156e+06,5000002
3,2025-10-01,2.388381e+06,5000002
4,2025-11-01,2.297395e+06,5000002
...,...,...,...
751,2026-02-01,6.473481e+06,5000072
752,2026-03-01,5.975521e+06,5000072
753,2026-04-01,6.348991e+06,5000072
754,2026-05-01,6.224501e+06,5000072
