In [1]:
import os
import warnings
from typing import Any

import numpy as np
import pandas as pd
import polars as pl

# Visualization
# import matplotlib.pyplot as plt

# NumPy settings
np.set_printoptions(precision=4)

# Pandas settings
pd.options.display.max_rows = 1_000
pd.options.display.max_columns = 1_000
pd.options.display.max_colwidth = 600

# Polars settings
pl.Config.set_fmt_str_lengths(1_000)
pl.Config.set_tbl_cols(n=1_000)
pl.Config.set_tbl_rows(n=200)

warnings.filterwarnings("ignore")

# Black code formatter (Optional)
%load_ext lab_black

# auto reload imports
%load_ext autoreload
%autoreload 2

In [2]:
from rich.console import Console
from rich.theme import Theme

custom_theme = Theme(
    {
        "white": "#FFFFFF",  # Bright white
        "info": "#00FF00",  # Bright green
        "warning": "#FFD700",  # Bright gold
        "error": "#FF1493",  # Deep pink
        "success": "#00FFFF",  # Cyan
        "highlight": "#FF4500",  # Orange-red
    }
)
console = Console(theme=custom_theme)


def go_up_from_current_directory(*, go_up: int = 1) -> None:
    """This is used to up a number of directories.

    Params:
    -------
    go_up: int, default=1
        This indicates the number of times to go back up from the current directory.

    Returns:
    --------
    None
    """
    import os
    import sys

    CONST: str = "../"
    NUM: str = CONST * go_up

    # Goto the previous directory
    prev_directory = os.path.join(os.path.dirname(__name__), NUM)
    # Get the 'absolute path' of the previous directory
    abs_path_prev_directory = os.path.abspath(prev_directory)

    # Add the path to the System paths
    sys.path.insert(0, abs_path_prev_directory)
    print(abs_path_prev_directory)

In [3]:
go_up_from_current_directory(go_up=1)

/Users/mac/Desktop/Projects/End-to-end-Sale-Forecasting


In [4]:
import httpx

url: str = "https://jsonplaceholder.typicode.com/posts"

response = httpx.get(url, timeout=10)
response.raise_for_status()  # Raise an error for bad responses
console.print(response.json()[:3], style="info")

In [5]:
from include.config import app_settings
from include.utilities.data_gen import RealisticSalesDataGenerator

gen_data = RealisticSalesDataGenerator(
    start_date="2025-08-29", end_date="2025-09-04", seed=123
)
file_paths: dict[str, Any] = gen_data.generate_sales_data(
    output_dir="./data/sales_data"
)
file_paths

2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generating data for 2025-08-29
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generating data for 2025-08-30
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generating data for 2025-08-31
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generating data for 2025-09-01
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generating data for 2025-09-02
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generating data for 2025-09-03
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generating data for 2025-09-04
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Generated 15 files
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Sales files: 5
2025-09-04 23:36:04 - include.utilities.data_gen - [INFO] - Output directory: ./data/sales_data


{'sales': ['./data/sales_data/sales/year=2025/month=08/day=30/sales_2025-08-30.parquet',
  './data/sales_data/sales/year=2025/month=08/day=31/sales_2025-08-31.parquet',
  './data/sales_data/sales/year=2025/month=09/day=01/sales_2025-09-01.parquet',
  './data/sales_data/sales/year=2025/month=09/day=02/sales_2025-09-02.parquet',
  './data/sales_data/sales/year=2025/month=09/day=03/sales_2025-09-03.parquet'],
 'inventory': ['./data/sales_data/inventory/year=2025/week=35/inventory_2025-08-31.parquet'],
 'customer_traffic': ['./data/sales_data/customer_traffic/year=2025/month=08/day=29/traffic_2025-08-29.parquet',
  './data/sales_data/customer_traffic/year=2025/month=08/day=30/traffic_2025-08-30.parquet',
  './data/sales_data/customer_traffic/year=2025/month=08/day=31/traffic_2025-08-31.parquet',
  './data/sales_data/customer_traffic/year=2025/month=09/day=01/traffic_2025-09-01.parquet',
  './data/sales_data/customer_traffic/year=2025/month=09/day=02/traffic_2025-09-02.parquet',
  './data/s

In [6]:
total_files = sum(len(paths) for paths in file_paths.values())
total_files

15

In [7]:
from include.ml.trainer import ModelTrainer

print("Loading sales data from multiple files...")
sales_dfs: list[pl.DataFrame] = []
max_files: int = 5
skipped_sales: int = 0

for i, sales_file in enumerate(file_paths["sales"][:max_files]):
    try:
        df = pd.read_parquet(sales_file, engine="pyarrow")
        sales_dfs.append(df)
    except Exception as e:
        skipped_sales += 1
        print(f"  Skipping unreadable sales file {sales_file}: {e}")
        continue
    if (i + 1) % 10 == 0:
        print(f"  Loaded {i + 1} files...")
if not sales_dfs:
    raise ValueError("No readable sales parquet files were loaded; aborting training")

sales_df = pd.concat(sales_dfs, ignore_index=True)
print(f"Combined sales data shape: {sales_df.shape}")
daily_sales = (
    sales_df.groupby(["date", "store_id", "product_id", "category"])
    .agg(
        {
            "quantity_sold": "sum",
            "revenue": "sum",
            "cost": "sum",
            "profit": "sum",
            "discount_percent": "mean",
            "unit_price": "mean",
        }
    )
    .reset_index()
)
daily_sales = daily_sales.rename(columns={"revenue": "sales"})

if file_paths.get("promotions"):
    try:
        promo_df = pd.read_parquet(file_paths["promotions"][0], engine="pyarrow")
        promo_summary = (
            promo_df.groupby(["date", "product_id"])["discount_percent"]
            .max()
            .reset_index()
        )
        promo_summary["has_promotion"] = 1
        daily_sales = daily_sales.merge(
            promo_summary[["date", "product_id", "has_promotion"]],
            on=["date", "product_id"],
            how="left",
        )
        daily_sales["has_promotion"] = daily_sales["has_promotion"].fillna(0)
    except Exception as e:
        print(f"Skipping promotions merge due to error: {e}")

if file_paths.get("customer_traffic"):
    traffic_dfs = []
    skipped_traffic = 0
    for traffic_file in file_paths["customer_traffic"][:10]:
        try:
            traffic_dfs.append(pd.read_parquet(traffic_file, engine="pyarrow"))
        except Exception as e:
            skipped_traffic += 1
            print(f"  Skipping unreadable traffic file {traffic_file}: {e}")
    if traffic_dfs:
        traffic_df = pd.concat(traffic_dfs, ignore_index=True)
        traffic_summary = (
            traffic_df.groupby(["date", "store_id"])
            .agg({"customer_traffic": "sum", "is_holiday": "max"})
            .reset_index()
        )
        daily_sales = daily_sales.merge(
            traffic_summary, on=["date", "store_id"], how="left"
        )
    else:
        print("No readable traffic files; skipping merge")
print(f"Final training data shape: {daily_sales.shape}")
print(f"Columns: {daily_sales.columns.tolist()}")
# trainer = ModelTrainer()
store_daily_sales = (
    daily_sales.groupby(["date", "store_id"])
    .agg(
        {
            "sales": "sum",
            "quantity_sold": "sum",
            "profit": "sum",
            "has_promotion": "mean",
            "customer_traffic": "first",
            "is_holiday": "first",
        }
    )
    .reset_index()
)
store_daily_sales["date"] = pd.to_datetime(store_daily_sales["date"])
store_daily_sales_pl = pl.from_pandas(store_daily_sales)
# train_df, val_df, test_df = trainer.prepare_data(
#     store_daily_sales_pl,
#     target_col="sales",
#     group_cols=["store_id"],
#     categorical_cols=["store_id"],
# )
# print(
#     f"Train shape: {train_df.shape}, Val shape: {val_df.shape}, Test shape: {test_df.shape}"
# )

Loading sales data from multiple files...
Combined sales data shape: (14, 10)
Final training data shape: (14, 13)
Columns: ['date', 'store_id', 'product_id', 'category', 'quantity_sold', 'sales', 'cost', 'profit', 'discount_percent', 'unit_price', 'has_promotion', 'customer_traffic', 'is_holiday']


In [8]:
# Convert to Polars

from polars.dataframe.frame import DataFrame

print("Loading sales data from multiple files...")
sales_dfs: list[pl.DataFrame] = []
max_files: int = 5
skipped_sales: int = 0

for i, sales_file in enumerate(file_paths["sales"][:max_files]):
    try:
        df = pl.read_parquet(sales_file)
        sales_dfs.append(df)
    except Exception as e:
        skipped_sales += 1
        print(f"  Skipping unreadable sales file {sales_file}: {e}")
        continue
    if (i + 1) % 10 == 0:
        print(f"  Loaded {i + 1} files...")
if not sales_dfs:
    raise ValueError("No readable sales parquet files were loaded; aborting training")

sales_df = pl.concat(sales_dfs)
print(f"Combined sales data shape: {sales_df.shape}")
daily_sales: DataFrame = (
    sales_df.group_by(["date", "store_id", "product_id", "category"])
    .agg(
        pl.col("quantity_sold").sum(),
        pl.col("revenue").sum().alias("sales"),
        pl.col("cost").sum(),
        pl.col("profit").sum(),
        pl.col("discount_percent").mean(),
        pl.col("unit_price").mean(),
    )
    .sort("date", "store_id")
)

if file_paths.get("promotions"):
    try:
        promo_df = pl.read_parquet(file_paths["promotions"][0])
        promo_summary = (
            promo_df.group_by(["date", "product_id"])
            .agg(pl.col("discount_percent").max())
            .with_columns(pl.lit(1).cast(pl.Int8).alias("has_promotion"))
        )
        daily_sales = daily_sales.join(
            promo_summary.select(["date", "product_id", "has_promotion"]),
            on=["date", "product_id"],
            how="left",
        ).with_columns(pl.col("has_promotion").fill_null(0))
    except Exception as e:
        print(f"Skipping promotions merge due to error: {e}")

if file_paths.get("customer_traffic"):
    traffic_dfs: list[pl.DataFrame] = []
    skipped_traffic: int = 0

    for traffic_file in file_paths["customer_traffic"][:10]:
        try:
            traffic_dfs.append(pl.read_parquet(traffic_file))
        except Exception as e:
            skipped_traffic += 1
            print(f"  Skipping unreadable traffic file {traffic_file}: {e}")

    if traffic_dfs:
        traffic_df = pl.concat(traffic_dfs)
        traffic_summary = traffic_df.group_by(["date", "store_id"]).agg(
            pl.col("customer_traffic").sum(), pl.col("is_holiday").max()
        )
        daily_sales = daily_sales.join(
            traffic_summary,
            on=["date", "store_id"],
            how="left",
        )
    else:
        print("No readable traffic files; skipping merge")
print(f"Final training data shape: {daily_sales.shape}")
print(f"Columns: {daily_sales.columns}")

trainer = ModelTrainer()
store_daily_sales: DataFrame = (
    daily_sales.group_by(["date", "store_id"])
    .agg(
        pl.col("sales").sum(),
        pl.col("quantity_sold").sum(),
        pl.col("profit").sum(),
        pl.col("has_promotion").mean(),
        pl.col("customer_traffic").first(),
        pl.col("is_holiday").first(),
    )
    .with_columns(pl.col("date").cast(pl.Date))
)
train_df, val_df, test_df = trainer.prepare_data(
    store_daily_sales_pl,
    target_col="sales",
    group_cols=["store_id"],
    categorical_cols=["store_id"],
)
print(
    f"Train shape: {train_df.shape}, Val shape: {val_df.shape}, Test shape: {test_df.shape}"
)

Loading sales data from multiple files...
Combined sales data shape: (14, 10)
Final training data shape: (14, 13)
Columns: ['date', 'store_id', 'product_id', 'category', 'quantity_sold', 'sales', 'cost', 'profit', 'discount_percent', 'unit_price', 'has_promotion', 'customer_traffic', 'is_holiday']


2025/09/04 23:36:20 INFO mlflow.tracking.fluent: Experiment with name 'sales_forecasting' does not exist. Creating a new experiment.


2025-09-04 23:36:20 - include.utilities.feature_engineering - [INFO] - Starting feature engineering pipeline
2025-09-04 23:36:20 - include.utilities.feature_engineering - [INFO] - Created 7 lag features
2025-09-04 23:36:20 - include.utilities.feature_engineering - [INFO] - Feature engineering pipeline completed. 41 total features.
2025-09-04 23:36:20 - include.ml.trainer - [INFO] - Data split - {"train_size": 6, "validation_size": 0, "test_size": 3}
Train shape: (6, 41), Val shape: (0, 41), Test shape: (3, 41)


In [9]:
display(daily_sales.head())
store_daily_sales.head()

date,store_id,product_id,category,quantity_sold,sales,cost,profit,discount_percent,unit_price,has_promotion,customer_traffic,is_holiday
datetime[ns],str,str,str,i64,f64,f64,f64,f64,f64,i8,i64,bool
2025-08-30 00:00:00,"""store_001""","""HOME_005""","""Home""",1,39.0,23.4,15.6,0.0,39.0,0,1443,False
2025-08-30 00:00:00,"""store_001""","""SPRT_005""","""Sports""",1,89.0,57.85,31.15,0.0,89.0,0,1443,False
2025-08-30 00:00:00,"""store_002""","""CLTH_001""","""Clothing""",1,29.0,14.5,14.5,0.0,29.0,0,1195,False
2025-08-31 00:00:00,"""store_009""","""SPRT_001""","""Sports""",1,29.0,13.05,15.95,0.0,29.0,0,1221,False
2025-08-31 00:00:00,"""store_009""","""CLTH_001""","""Clothing""",1,29.0,14.5,14.5,0.0,29.0,0,1221,False


date,store_id,sales,quantity_sold,profit,has_promotion,customer_traffic,is_holiday
date,str,f64,i64,f64,f64,i64,bool
2025-08-30,"""store_002""",29.0,1,14.5,0.0,1195,False
2025-09-02,"""store_009""",24.65,1,11.6,1.0,703,False
2025-09-03,"""store_002""",24.65,1,11.6,1.0,676,False
2025-08-30,"""store_001""",128.0,2,46.75,0.0,1443,False
2025-09-03,"""store_001""",24.65,1,11.6,1.0,922,False


In [10]:
display(daily_sales.head())
store_daily_sales.head()

date,store_id,product_id,category,quantity_sold,sales,cost,profit,discount_percent,unit_price,has_promotion,customer_traffic,is_holiday
datetime[ns],str,str,str,i64,f64,f64,f64,f64,f64,i8,i64,bool
2025-08-30 00:00:00,"""store_001""","""HOME_005""","""Home""",1,39.0,23.4,15.6,0.0,39.0,0,1443,False
2025-08-30 00:00:00,"""store_001""","""SPRT_005""","""Sports""",1,89.0,57.85,31.15,0.0,89.0,0,1443,False
2025-08-30 00:00:00,"""store_002""","""CLTH_001""","""Clothing""",1,29.0,14.5,14.5,0.0,29.0,0,1195,False
2025-08-31 00:00:00,"""store_009""","""SPRT_001""","""Sports""",1,29.0,13.05,15.95,0.0,29.0,0,1221,False
2025-08-31 00:00:00,"""store_009""","""CLTH_001""","""Clothing""",1,29.0,14.5,14.5,0.0,29.0,0,1221,False


date,store_id,sales,quantity_sold,profit,has_promotion,customer_traffic,is_holiday
date,str,f64,i64,f64,f64,i64,bool
2025-08-30,"""store_002""",29.0,1,14.5,0.0,1195,False
2025-09-02,"""store_009""",24.65,1,11.6,1.0,703,False
2025-09-03,"""store_002""",24.65,1,11.6,1.0,676,False
2025-08-30,"""store_001""",128.0,2,46.75,0.0,1443,False
2025-09-03,"""store_001""",24.65,1,11.6,1.0,922,False


In [None]:
df: pl.DataFrame = pl.DataFrame(
    data={
        "id": [1, 2, 3, 4],
        "name": ["Alice", "Bob", "Charlie", "Bob"],
        "role": ["Engineer", "Manager", "Engineer", "Manager"],
        "skill": ["Python", "Leadership", "Python", "Management"],
        "experience": [5, 2, 3, 3],
        "age": [30, 40, 35, 34],
        "target": [1, 0, 1, 1],
    }
)

df

In [None]:
counts = df["name"].value_counts()
mean_target = df.group_by("name").agg(pl.col("target").mean())
display(mean_target)
display(counts["name"])
for row in counts["name"]:
    print(counts.filter(pl.col("name").eq(row))["count"].item())

counts.filter(pl.col("name").eq("Alice"))["count"].item()

In [None]:
temp_df

### Connect To MLFlow

- Set the `tracking URI` to the MLflow server.
    - Tracking URI requires the MLflow `server address`, `port`, `S3 endpoint URL`, and `S3 credentials`.
    - S3 credentials include `access key`, `secret key`, and `bucket name`.
    - `MinIO` is used as a local S3-compatible storage service.

- Verify the connection by listing experiments.

In [None]:
# Force localhost configuration and debug
RUNNING_IN_DOCKER = False
DEFAULT_MINIO_HOST = app_settings.AWS_S3_HOST if RUNNING_IN_DOCKER else "minio"
DEFAULT_MINIO_PORT = app_settings.AWS_S3_PORT
MINIO_ENDPOINT = app_settings.mlflow_s3_endpoint_url
# This connects to the MLflow server with PostgreSQL backend
MLFLOW_URI = app_settings.mlflow_tracking_uri
AWS_KEY = app_settings.AWS_ACCESS_KEY_ID
AWS_SECRET = app_settings.AWS_SECRET_ACCESS_KEY.get_secret_value()
AWS_REGION = app_settings.AWS_DEFAULT_REGION
BUCKET = app_settings.AWS_S3_BUCKET

# Set environment variables
os.environ["AWS_ACCESS_KEY_ID"] = app_settings.AWS_ACCESS_KEY_ID
os.environ["AWS_SECRET_ACCESS_KEY"] = AWS_SECRET
os.environ["AWS_DEFAULT_REGION"] = AWS_REGION
os.environ["MLFLOW_S3_ENDPOINT_URL"] = MINIO_ENDPOINT

print("=== CONFIGURATION DEBUG ===")
print(f"RUNNING_IN_DOCKER: {RUNNING_IN_DOCKER}")
print(f"DEFAULT_MINIO_HOST: {DEFAULT_MINIO_HOST}")
print(f"MINIO_ENDPOINT: {MINIO_ENDPOINT}")
print(f"MLFLOW_URI: {MLFLOW_URI}")
print(f"AWS_ACCESS_KEY_ID: {AWS_KEY}")
print(f"BUCKET: {BUCKET}")
print(f"Environment MLFLOW_S3_ENDPOINT_URL: {MINIO_ENDPOINT}")
print("=== END CONFIGURATION DEBUG ===\n")

In [None]:
# Test MLflow server connection and S3 storage
import tempfile
import traceback

import boto3
import mlflow
from botocore.exceptions import ClientError

# 1) Test S3/MinIO connection
print("Testing S3/MinIO connection...")
s3 = boto3.client(
    "s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=AWS_KEY,
    aws_secret_access_key=AWS_SECRET,
    region_name=AWS_REGION,
)

try:
    s3.head_bucket(Bucket=BUCKET)
    print(f"✅ Bucket '{BUCKET}' is reachable")
except ClientError as e:
    print(f"❌ S3/MinIO connection failed: {e}")

# 2) Test MLflow server connection
print(f"\nTesting MLflow server connection to {MLFLOW_URI}...")
mlflow.set_tracking_uri(MLFLOW_URI)
print(f"✅ MLflow tracking URI set to: {mlflow.get_tracking_uri()}")

# 3) Test that MLflow uses PostgreSQL backend (not local files)
try:
    # This should connect to the MLflow server which uses PostgreSQL
    experiments = mlflow.search_experiments()
    print(f"✅ Connected to MLflow server. Found {len(experiments)} experiments.")
    print("✅ This confirms MLflow is using the PostgreSQL backend, not local files.")
except Exception as e:
    print(f"❌ Failed to connect to MLflow server: {e}")

print("\n" + "=" * 50)
print("IMPORTANT: If MLflow server is using PostgreSQL correctly,")
print("experiments and runs will be stored in the database,")
print("and artifacts will be stored in MinIO/S3.")
print("Local 'mlruns' folders should NOT be created.")
print("=" * 50)

In [None]:
import mlflow
import mlflow.sklearn
from botocore.exceptions import ClientError
from sklearn import datasets
from sklearn.linear_model import ElasticNet

try:
    mlflow.set_experiment("notebook_quick_test")
    X, y = datasets.load_diabetes(return_X_y=True)
    model = ElasticNet(alpha=0.1, l1_ratio=0.5, random_state=42)
    model.fit(X, y)

    with mlflow.start_run() as run:
        mlflow.log_param("alpha", 0.1)
        mlflow.log_param("l1_ratio", 0.5)
        mlflow.log_metric("dummy_score", model.score(X, y))

        # Create a small artifact file and upload
        with tempfile.NamedTemporaryFile("w", suffix=".txt", delete=False) as tmp:
            tmp.write("mlflow artifact test")
            tmp_path = tmp.name

        mlflow.log_artifact(tmp_path, artifact_path="test_artifacts")
        mlflow.sklearn.log_model(model, "model", input_example=X[:2].tolist())

        # Remove temp file after logging
        os.remove(tmp_path)

        print("✅ Logged run id:", run.info.run_id)
        print("✅ Experiment id:", run.info.experiment_id)

    print("✅ MLflow logging complete — check the UI and MinIO for artifact/model.")
    print("✅ Data stored in PostgreSQL database, artifacts in MinIO S3")

except ClientError as e:
    # boto3 ClientError can surface during artifact upload
    print("❌ Boto3 ClientError during MLflow operations:", e)
    print(traceback.format_exc())
    raise
except Exception:
    print("❌ Unexpected error during MLflow logging:")
    print(traceback.format_exc())
    raise

In [None]:
def create_cyclical_features(df: pl.DataFrame, date_col: str = "date") -> pl.DataFrame:
    df = df.clone()

    return df.with_columns(
        # month (convert 1-12 to 0-11 for proper cyclical encoding)
        pl.col(date_col).dt.month().map_elements(lambda x: np.sin(2 * np.pi * (x - 1) / 12)).alias("month_sin"),
        pl.col(date_col).dt.month().map_elements(lambda x: np.cos(2 * np.pi * (x - 1) / 12)).alias("month_cos"),
        # day (Retain original values; 1-31)
        pl.col(date_col).dt.day().map_elements(lambda x: np.sin(2 * np.pi * x / 31)).alias("day_sin"),
        pl.col(date_col).dt.day().map_elements(lambda x: np.cos(2 * np.pi * x / 31)).alias("day_cos"),
        # day of week (convert 1-7 to 0-6 for proper cyclical encoding)
        pl.col(date_col).dt.weekday().map_elements(lambda x: np.sin(2 * np.pi * (x - 1) / 7)).alias("day_of_week_sin"),
        pl.col(date_col).dt.weekday().map_elements(lambda x: np.cos(2 * np.pi * (x - 1) / 7)).alias("day_of_week_cos"),
    )


create_cyclical_features(temp_df, date_col="date")

In [None]:
# Let's examine the cyclical features to see what's wrong
cyclical_result = create_cyclical_features(temp_df, date_col="date")

# Check the cyclical features
cyclical_sample = cyclical_result.select(
    [
        "date",
        "day_of_week",
        "day_of_week_sin",
        "day_of_week_cos",
        "month_sin",
        "month_cos",
    ]
).unique()

print("Cyclical features sample:")
print(cyclical_sample)

print("\nLet's check the day_of_week values and corresponding sin/cos:")
day_check = (
    cyclical_result.select(["date", "day_of_week", "day_of_week_sin", "day_of_week_cos"]).unique().sort("day_of_week")
)
print(day_check)

print("\nIssue Analysis:")
print("day_of_week ranges from 1-7 in Polars (Monday=1, Sunday=7)")
print("But for cyclical encoding, we want values from 0 to 2π")
print("Current formula: sin(2π × day_of_week / 7)")
print("This means day 7 gives: sin(2π × 7 / 7) = sin(2π) = 0")
print("And day 1 gives: sin(2π × 1 / 7) = sin(2π/7)")
print("This creates a discontinuity between Sunday (7) and Monday (1)!")

In [None]:
import os
import sys
from datetime import datetime, timedelta

import pandas as pd

from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator

# Add include path
sys.path.append("/usr/local/airflow/include")

from ml_models.train_models import ModelTrainer
from utils.mlflow_utils import MLflowManager

default_args = {
    "owner": "data-team",
    "depends_on_past": False,
    "start_date": datetime(2025, 7, 22),
    "email_on_failure": True,
    "email_on_retry": False,
    "email": ["admin@example.com"],
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}


@dag(
    schedule="@weekly",
    start_date=datetime(2025, 7, 22),
    catchup=False,
    default_args=default_args,
    description="Train sales forecasting models",
    tags=["ml", "training", "sales"],
)
def sales_forecast_training():
    @task()
    def extract_data_task():
        from utils.data_generator import RealisticSalesDataGenerator

        data_output_dir = "/tmp/sales_data"
        generator = RealisticSalesDataGenerator(start_date="2021-01-01", end_date="2021-12-31")
        print("Generating realistic sales data...")
        file_paths = generator.generate_sales_data(output_dir=data_output_dir)
        total_files = sum(len(paths) for paths in file_paths.values())
        print(f"Generated {total_files} files:")
        for data_type, paths in file_paths.items():
            print(f"  - {data_type}: {len(paths)} files")
        return {
            "data_output_dir": data_output_dir,
            "file_paths": file_paths,
            "total_files": total_files,
        }

    @task()
    def validate_data_task(extract_result):
        file_paths = extract_result["file_paths"]
        total_rows = 0
        issues_found = []
        print(f"Validating {len(file_paths['sales'])} sales files...")
        for i, sales_file in enumerate(file_paths["sales"][:10]):
            df = pd.read_parquet(sales_file)
            if i == 0:
                print(f"Sales data columns: {df.columns.tolist()}")
            if df.empty:
                issues_found.append(f"Empty file: {sales_file}")
                continue
            required_cols = [
                "date",
                "store_id",
                "product_id",
                "quantity_sold",
                "revenue",
            ]
            missing_cols = set(required_cols) - set(df.columns)
            if missing_cols:
                issues_found.append(f"Missing columns in {sales_file}: {missing_cols}")
            total_rows += len(df)
            if df["quantity_sold"].min() < 0:
                issues_found.append(f"Negative quantities in {sales_file}")
            if df["revenue"].min() < 0:
                issues_found.append(f"Negative revenue in {sales_file}")
        for data_type in ["promotions", "store_events", "customer_traffic"]:
            if data_type in file_paths and file_paths[data_type]:
                sample_file = file_paths[data_type][0]
                df = pd.read_parquet(sample_file)
                print(f"{data_type} data shape: {df.shape}")
                print(f"{data_type} columns: {df.columns.tolist()}")
        validation_summary = {
            "total_files_validated": len(file_paths["sales"][:10]),
            "total_rows": total_rows,
            "issues_found": len(issues_found),
            "issues": issues_found[:5],
        }
        if issues_found:
            print(f"Validation completed with {len(issues_found)} issues:")
            for issue in issues_found[:5]:
                print(f"  - {issue}")
        else:
            print(f"Validation passed! Total rows: {total_rows}")
        return validation_summary

    @task()
    def train_models_task(extract_result, validation_summary):
        file_paths = extract_result["file_paths"]
        print("Loading sales data from multiple files...")
        sales_dfs = []
        max_files = 50
        for i, sales_file in enumerate(file_paths["sales"][:max_files]):
            df = pd.read_parquet(sales_file)
            sales_dfs.append(df)
            if (i + 1) % 10 == 0:
                print(f"  Loaded {i + 1} files...")
        sales_df = pd.concat(sales_dfs, ignore_index=True)
        print(f"Combined sales data shape: {sales_df.shape}")
        daily_sales = (
            sales_df.groupby(["date", "store_id", "product_id", "category"])
            .agg(
                {
                    "quantity_sold": "sum",
                    "revenue": "sum",
                    "cost": "sum",
                    "profit": "sum",
                    "discount_percent": "mean",
                    "unit_price": "mean",
                }
            )
            .reset_index()
        )
        daily_sales = daily_sales.rename(columns={"revenue": "sales"})
        if file_paths.get("promotions"):
            promo_df = pd.read_parquet(file_paths["promotions"][0])
            promo_summary = promo_df.groupby(["date", "product_id"])["discount_percent"].max().reset_index()
            promo_summary["has_promotion"] = 1
            daily_sales = daily_sales.merge(
                promo_summary[["date", "product_id", "has_promotion"]],
                on=["date", "product_id"],
                how="left",
            )
            daily_sales["has_promotion"] = daily_sales["has_promotion"].fillna(0)
        if file_paths.get("customer_traffic"):
            traffic_dfs = []
            for traffic_file in file_paths["customer_traffic"][:10]:
                traffic_dfs.append(pd.read_parquet(traffic_file))
            traffic_df = pd.concat(traffic_dfs, ignore_index=True)
            traffic_summary = (
                traffic_df.groupby(["date", "store_id"]).agg({"customer_traffic": "sum", "is_holiday": "max"}).reset_index()
            )
            daily_sales = daily_sales.merge(traffic_summary, on=["date", "store_id"], how="left")
        print(f"Final training data shape: {daily_sales.shape}")
        print(f"Columns: {daily_sales.columns.tolist()}")
        trainer = ModelTrainer()
        store_daily_sales = (
            daily_sales.groupby(["date", "store_id"])
            .agg(
                {
                    "sales": "sum",
                    "quantity_sold": "sum",
                    "profit": "sum",
                    "has_promotion": "mean",
                    "customer_traffic": "first",
                    "is_holiday": "first",
                }
            )
            .reset_index()
        )
        store_daily_sales["date"] = pd.to_datetime(store_daily_sales["date"])
        train_df, val_df, test_df = trainer.prepare_data(
            store_daily_sales,
            target_col="sales",
            date_col="date",
            group_cols=["store_id"],
            categorical_cols=["store_id"],
        )
        print(f"Train shape: {train_df.shape}, Val shape: {val_df.shape}, Test shape: {test_df.shape}")
        results = trainer.train_all_models(train_df, val_df, test_df, target_col="sales", use_optuna=True)
        for model_name, model_results in results.items():
            if "metrics" in model_results:
                print(f"\n{model_name} metrics:")
                for metric, value in model_results["metrics"].items():
                    print(f"  {metric}: {value:.4f}")
        print("\nVisualization charts have been generated and saved to MLflow/MinIO")
        print("Charts include:")
        print("  - Model metrics comparison")
        print("  - Predictions vs actual values")
        print("  - Residuals analysis")
        print("  - Error distribution")
        print("  - Feature importance comparison")
        serializable_results = {}
        for model_name, model_results in results.items():
            serializable_results[model_name] = {"metrics": model_results.get("metrics", {})}
        import mlflow

        current_run_id = mlflow.active_run().info.run_id if mlflow.active_run() else None
        return {
            "training_results": serializable_results,
            "mlflow_run_id": current_run_id,
        }

    @task()
    def evaluate_models_task(training_result):
        results = training_result["training_results"]
        mlflow_manager = MLflowManager()
        best_model_name = None
        best_rmse = float("inf")
        for model_name, model_results in results.items():
            if "metrics" in model_results and "rmse" in model_results["metrics"]:
                if model_results["metrics"]["rmse"] < best_rmse:
                    best_rmse = model_results["metrics"]["rmse"]
                    best_model_name = model_name
        print(f"Best model: {best_model_name} with RMSE: {best_rmse:.4f}")
        best_run = mlflow_manager.get_best_model(metric="rmse", ascending=True)
        return {"best_model": best_model_name, "best_run_id": best_run["run_id"]}

    @task()
    def register_best_model_task(evaluation_result):
        evaluation_result["best_model"]
        run_id = evaluation_result["best_run_id"]
        mlflow_manager = MLflowManager()
        model_versions = {}
        for model_name in ["xgboost", "lightgbm"]:
            version = mlflow_manager.register_model(run_id, model_name, model_name)
            model_versions[model_name] = version
            print(f"Registered {model_name} version: {version}")
        return model_versions

    @task()
    def transition_to_production_task(model_versions):
        mlflow_manager = MLflowManager()
        for model_name, version in model_versions.items():
            mlflow_manager.transition_model_stage(model_name, version, "Production")
            print(f"Transitioned {model_name} v{version} to Production")
        return "Models transitioned to production"

    @task()
    def generate_performance_report_task(training_result, validation_summary):
        results = training_result["training_results"]
        report = {
            "timestamp": datetime.now().isoformat(),
            "data_summary": {
                "total_rows": (validation_summary.get("total_rows", 0) if validation_summary else 0),
                "files_validated": (validation_summary.get("total_files_validated", 0) if validation_summary else 0),
                "issues_found": (validation_summary.get("issues_found", 0) if validation_summary else 0),
                "issues": (validation_summary.get("issues", []) if validation_summary else []),
            },
            "model_performance": {},
        }
        if results:
            for model_name, model_results in results.items():
                if "metrics" in model_results:
                    report["model_performance"][model_name] = model_results["metrics"]
        import json

        with open("/tmp/performance_report.json", "w") as f:
            json.dump(report, f, indent=2)
        print("Performance report generated")
        print(f"Models trained: {list(report['model_performance'].keys())}")
        return report

    # Task dependencies using function calls
    extract_result = extract_data_task()
    validation_summary = validate_data_task(extract_result)
    training_result = train_models_task(extract_result, validation_summary)
    evaluation_result = evaluate_models_task(training_result)
    model_versions = register_best_model_task(evaluation_result)
    transition = transition_to_production_task(model_versions)
    report = generate_performance_report_task(training_result, validation_summary)
    cleanup = BashOperator(
        task_id="cleanup",
        bash_command="rm -rf /tmp/sales_data /tmp/performance_report.json || true",
    )
    report >> cleanup


sales_forecast_training_dag = sales_forecast_training()

In [None]:
tree .
.
├── airflow
│   ├── dags
│   │   ├── demo_dag.py
│   │   ├── ml_dag.py
│   │   └── simple_dag.py
├── artifacts
├── dags
├── data
│   ├── customer_traffic
│   │   └── year=2021
│   │       ├── month=01
│   │       │   ├── day=01
│   │       │   │   └── traffic_2021-01-01.parquet
├── docker
│   ├── Dockerfile.airflow
│   ├── Dockerfile.mlflow
│   └── requirements.txt
├── docker-compose.yaml
├── include
│   ├── __init__.py
│   ├── config
│   │   ├── __init__.py
│   │   ├── config.py
│   │   ├── config.yaml
│   │   └── settings.py
│   ├── ml
│   │   ├── __init__.py
│   │   ├── diagnostics.py
│   │   ├── ensemble_model.py
│   │   ├── trainer.py
│   │   └── visualization.py
│   ├── schemas
│   │   ├── __init__.py
│   │   └── input_schema.py
│   └── utilities
│       ├── __init__.py
│       ├── data_gen.py
│       ├── feature_engineering.py
│       ├── mlflow_s3_utils.py
│       ├── mlflow_utils.py
│       ├── s3_verification.py
│       └── service_discovery.py
├── makefile
├── note.py
├── note.txt
├── notebooks
│   ├── 01-lab.ipynb
├── plugins
├── pyproject.toml
├── README.md
├── setup_airflow.sh
└── uv.lock

## Docker Container Import Testing

When working with the Airflow containers, imports work correctly when you run Python from the right directory.

### ✅ Correct way to import in Airflow containers:

```bash
# Start container shell from the correct directory
docker compose exec airflow-worker bash

# You'll be in /opt/airflow - this is the correct working directory
pwd  # Should show: /opt/airflow

# Now run Python and import
python
```

```python
# These imports will work correctly:
import pandas as pd
from include.config import app_settings
from include.utilities.data_gen import RealisticSalesDataGenerator

# Test the imports
print("All imports successful!")
print("MLFLOW_HOST:", app_settings.MLFLOW_HOST)
gen = RealisticSalesDataGenerator(start_date="2025-09-01", end_date="2025-09-02", seed=42)
print("Data generator created:", type(gen))
```

### ❌ Common mistake - don't do this:

```bash
# Don't cd into the include directory first
cd include  # This breaks imports!
python      # Imports will fail from here
```

### Why this happens:

1. Our `PYTHONPATH` is set to `/opt/airflow/include`
2. When you run `python` from `/opt/airflow/include`, Python adds `.` (current directory) to sys.path
3. This creates a conflict where Python tries to import `include` from within itself
4. The solution: always run Python from `/opt/airflow` directory

In [None]:
# Test imports in Docker container (run this to verify everything works)
import subprocess
import json


def test_docker_imports():
    """Test that imports work correctly in the Airflow container."""

    # Test command to run in the container
    test_script = """
import sys
import pandas as pd
from include.config import app_settings
from include.utilities.data_gen import RealisticSalesDataGenerator

# Test results
results = {
    "python_path_includes_include": "/opt/airflow/include" in sys.path,
    "current_working_directory": __import__("os").getcwd(),
    "pandas_version": pd.__version__,
    "mlflow_host": app_settings.MLFLOW_HOST,
    "data_generator_created": str(type(RealisticSalesDataGenerator(start_date="2025-09-01", end_date="2025-09-02", seed=42)))
}

import json
print(json.dumps(results, indent=2))
"""

    try:
        # Run the test in the container
        cmd = [
            "docker",
            "compose",
            "exec",
            "-T",
            "airflow-worker",
            "python",
            "-c",
            test_script,
        ]

        result = subprocess.run(cmd, capture_output=True, text=True, cwd="../")

        if result.returncode == 0:
            test_results = json.loads(result.stdout.strip())
            print("✅ Docker container import test PASSED!")
            print("\nTest Results:")
            for key, value in test_results.items():
                print(f"  {key}: {value}")
            return True
        else:
            print("❌ Docker container import test FAILED!")
            print("Error output:", result.stderr)
            return False

    except Exception as e:
        print(f"❌ Failed to run Docker test: {e}")
        return False


# Run the test
test_docker_imports()