In [None]:
# Config (from original-code/config.py)
from pathlib import Path
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import warnings

# Project root and output directory (make paths robust to working directory)
markdown
markdown
intro-1
# BMW Sales Forecast — Combined Notebook

This notebook combines the standalone modules from `original-code/` into a single, runnable document. Run cells in order (1 → last). Outputs are written to the repository `outputs/` directory.

Notes:
- The notebook mirrors the original pipeline: config → utils → data → analysis → forecasting → visualization → alerts → reporting → aggregation → run.
- If you run the notebook from a different working directory, set `PROJECT_ROOT` accordingly in the first code cell.
code
python
cell-1-config
# Config (from original-code/config.py)
from pathlib import Path
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import warnings

# Project root and output directory (make paths robust to working directory)
# If running the notebook, assume project root is the parent of this deployment folder
PROJECT_ROOT = Path('..').resolve()
OUTPUT_DIR = PROJECT_ROOT / 'outputs'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

def out_path(name: str) -> str:
    return str(OUTPUT_DIR / name)

# Matplotlib / pandas configuration
warnings.filterwarnings('ignore')
matplotlib.use('Agg')
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

# Data URLs and filenames
DATA_CSV_URL = 'https://raw.githubusercontent.com/StephenEastham/bmw-sales-forecast/refs/heads/main/BMW-sales-data-2010-2024.csv'
HOWTO_URL = 'https://raw.githubusercontent.com/StephenEastham/bmw-sales-forecast/refs/heads/main/how-to-test.md'
DATA_CSV_FILE = 'BMW-sales-data-2010-2024.csv'
HOWTO_FILE = 'how-to-test.md'

# Forecasting params
ARIMA_ORDER = (1, 1, 1)
FORECAST_STEPS = 3
TRAIN_TEST_SPLIT = 0.8

# Alert multipliers
OVERALL_THRESHOLD_MULTIPLIER = 0.8
MODEL_THRESHOLD_MULTIPLIER = 0.8
REGION_THRESHOLD_MULTIPLIER = 0.8
DECLINE_THRESHOLD = 0.15

# Test mode (set to True to exercise alerting scenarios)
TEST_MODE = False
TEST_OVERALL_FORECAST_LOW = True
TEST_MODEL_UNDERPERFORMANCE = True
TEST_REGION_DECLINE = True
TEST_DECLINING_TREND = True

print('Config loaded. OUTPUT_DIR =', OUTPUT_DIR)
code
python
cell-2-utils
# Utils (from original-code/utils.py)
import logging

def setup_logger(log_file='sales_alerts.log'):
    # configure basic logging to file in outputs
    handlers = [logging.StreamHandler()]
    try:
        handlers.insert(0, logging.FileHandler(out_path(log_file)))
    except Exception:
        pass

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=handlers,
        force=True
    )
    return logging.getLogger(__name__)

def print_section(title):
    print('






























































































































































































































































,,,,,: [: ,: : {,: ,,,,,,    report = f    timestamp = datetime.now()def generate_monthly_report(alerts, forecast_data, df_clean, average_sales, future_values, ts_data, future_years, ALERT_THRESHOLD_OVERALL):from datetime import datetimeimport pandas as pd# Reporting & export (from original-code/reporting.py)cell-8-reportingpythoncodeprint('Alert system loaded')            print('-', a.get('message'))        for a in self.alerts:        print(f'
Total Alerts: {len(self.alerts)}
')            return            print('
✅ No alerts triggered!')        if not self.alerts:        print('='*80)        print('SALES ALERT REPORT')        print('
' + '='*80)    def generate_alert_report(self):        return alerts            self.logger.warning(alert['message'])            alerts.append(alert)            alert = {'type': 'DECLINING_TREND', 'severity': 'MEDIUM', 'item': item_name, 'message': f'ALERT: {item_name} showing {decline_rate*100:.1f}% decline', 'decline_rate': decline_rate}        if decline_rate > decline_threshold:        decline_rate = (sales_history[-2] - sales_history[-1]) / sales_history[-2]            return alerts        if len(sales_history) < 2:        alerts = []    def check_declining_trend(self, sales_history, item_name, decline_threshold=0.1):        return alerts            self.logger.warning(alert['message'])            alerts.append(alert)            alert = {'type': 'MODEL_UNDERPERFORMANCE', 'severity': 'MEDIUM', 'model': model_name, 'message': f'ALERT: Model {model_name} recent sales ({recent_sales:,.0f}) below threshold ({threshold:,.0f})', 'recent_sales': recent_sales, 'threshold': threshold, 'gap': threshold - recent_sales}        if recent_sales < threshold:        recent_sales = model_data['historical'][-1] if len(model_data.get('historical', [])) > 0 else 0        alerts = []    def check_model_performance(self, model_data, model_name, threshold):        return alerts                self.logger.warning(alert['message'])                alerts.append(alert)                alert = {'type': 'OVERALL_SALES', 'severity': 'HIGH', 'message': f'ALERT: Forecasted sales for year {i+1} ({value:,.0f}) falls below threshold ({threshold:,.0f})', 'forecast_value': value, 'threshold': threshold, 'gap': threshold - value}            if value < threshold:        for i, value in enumerate(forecast_values):        alerts = []    def check_overall_forecast(self, forecast_values, threshold):        self.logger = setup_logger()        self.alerts = []        self.region_thresholds = region_thresholds or {}        self.model_thresholds = model_thresholds or {}        self.threshold = threshold    def __init__(self, threshold, model_thresholds=None, region_thresholds=None):class SalesAlertSystem:from utils import setup_logger# Alerts (from original-code/alerts.py)cell-7-alertspythoncodeprint('Visualization utilities loaded')    plt.close()    print(f'✅ Saved: {p}')    plt.savefig(p, dpi=300, bbox_inches='tight')    plt.tight_layout()    p = out_path('03_arima_forecast.png')    ax.plot(future_years, future_values, marker='^', linestyle=':', label='Future Forecast', color='#2ca02c')        ax.plot(test_years, forecast_test_values, marker='s', linestyle='--', label='Test Forecast', color='#ff7f0e')        test_years = ts_years[train_size:]    if forecast_test_values is not None:    ax.plot(ts_years, ts_data, marker='o', linewidth=2.5, markersize=8, label='Historical Sales', color='#1f77b4')    fig, ax = plt.subplots(figsize=(14, 6))def visualize_forecast(ts_data, ts_years, train_size, forecast_test_values, forecast_test_ci, future_values, future_years, future_ci):    plt.close()    print(f'✅ Saved: {p}')    plt.savefig(p, dpi=300, bbox_inches='tight')    p = out_path('02_model_region_heatmap.png')    sns.heatmap(heatmap_data, annot=True, fmt='.0f', cmap='YlOrRd')    plt.figure(figsize=(12, 10))    heatmap_data = heatmap_data.loc[heatmap_data.sum(axis=1).nlargest(15).index]    heatmap_data = df_clean.pivot_table(values='Sales_Volume', index='Model', columns='Region', aggfunc='sum', fill_value=0)def create_heatmap(df_clean):    plt.close()    print(f'✅ Saved: {p}')    plt.savefig(p, dpi=300, bbox_inches='tight')    plt.tight_layout()    p = out_path('01_sales_overview.png')    ax1.plot(df_yearly['Year'], df_yearly['Total_Sales'], marker='o', linewidth=2.5, markersize=8, color='#1f77b4')    ax1 = axes[0, 0]    fig.suptitle('BMW Sales Overview (2010-2024)', fontsize=16, fontweight='bold')    fig, axes = plt.subplots(2, 2, figsize=(16, 10))def create_overview_visualizations(df_yearly, df_clean):from plotly.subplots import make_subplotsimport plotly.graph_objects as goimport seaborn as snsimport matplotlib.pyplot as plt# Visualization (from original-code/visualization.py)cell-6-visualizationpythoncodeprint('Forecasting utilities loaded')    return train_size, forecast_test_values, forecast_test_ci, future_values, future_years, future_ci            future_ci = None            forecast_test_ci = None            forecast_test_values = None            future_years = np.array([ts_years[-1] + i for i in range(1, FORECAST_STEPS + 1)])            future_values = np.repeat(ts_data[-1], FORECAST_STEPS)            print(f'Fallback error: {e2}')        except Exception as e2:            future_ci = None            forecast_test_ci = None            future_years = np.array([ts_years[-1] + i for i in range(1, FORECAST_STEPS + 1)])            future_values = full_results.forecast(steps=FORECAST_STEPS)            full_results = full_model.fit()            full_model = ExponentialSmoothing(ts_data, trend='add', seasonal=None)            mae = mean_absolute_error(test_data, forecast_test_values)            rmse = np.sqrt(mean_squared_error(test_data, forecast_test_values))            forecast_test_values = results.forecast(steps=len(test_data))            results = model.fit()            model = ExponentialSmoothing(train_data, trend='add', seasonal=None)        try:        print(f'ARIMA error: {e}; falling back to ExponentialSmoothing')    except Exception as e:        future_years = np.array([ts_years[-1] + i for i in range(1, FORECAST_STEPS + 1)])        future_ci = future_forecast.conf_int()        future_values = future_forecast.predicted_mean.values        future_forecast = full_results.get_forecast(steps=FORECAST_STEPS)        full_results = full_model.fit()        full_model = ARIMA(ts_data, order=ARIMA_ORDER)        mae = mean_absolute_error(test_data, forecast_test_values)        rmse = np.sqrt(mean_squared_error(test_data, forecast_test_values))        forecast_test_ci = forecast_test.conf_int()        forecast_test_values = forecast_test.predicted_mean.values        forecast_test = arima_results.get_forecast(steps=len(test_data))        arima_results = arima_model.fit()        arima_model = ARIMA(train_data, order=ARIMA_ORDER)    try:    test_data = ts_data[train_size:]    train_data = ts_data[:train_size]    train_size = int(len(ts_data) * TRAIN_TEST_SPLIT)    print_section('0001F9D6 ARIMA TIME SERIES FORECASTING')def forecast_with_arima(ts_data, ts_years):from statsmodels.tsa.holtwinters import ExponentialSmoothingfrom statsmodels.tsa.arima.model import ARIMAfrom sklearn.metrics import mean_absolute_error, mean_squared_errorimport numpy as np# Forecasting (from original-code/forecasting.py)cell-5-forecastingpythoncodeprint('Analysis utilities loaded')    return df_yearly, ts_data, ts_years, df_model_yearly, df_region_yearly    df_yearly['YoY_Growth'] = df_yearly['Total_Sales'].pct_change() * 100    df_region_yearly = df_clean.groupby(['Year', 'Region'])['Sales_Volume'].sum().reset_index()    df_model_yearly = df_clean.groupby(['Year', 'Model'])['Sales_Volume'].sum().reset_index()    ts_years = df_yearly['Year'].values    ts_data = df_yearly['Total_Sales'].values    df_yearly.columns = ['Year', 'Total_Sales']    df_yearly = df_clean.groupby('Year')['Sales_Volume'].sum().reset_index().sort_values('Year')    print_section('0001F4C8 TIME SERIES AGGREGATION')def aggregate_time_series(df_clean):    return model_sales    print(model_sales.head(10))    model_sales = df_clean.groupby('Model')['Sales_Volume'].sum().sort_values(ascending=False)    print('
Top 10 models by sales:')    print_section('0001F4CA EXPLORATORY DATA ANALYSIS')def exploratory_data_analysis(df_clean):import numpy as np# Analysis (from original-code/analysis.py)cell-4-analysispythoncodeprint('Data utilities loaded')    return df_clean    print('
✅ Data preprocessing complete. Shape:', df_clean.shape)    df_clean.columns = df_clean.columns.str.strip()    df_clean = df.copy()    print_section('0001F4CB COLUMN ANALYSIS')def preprocess_data(df):    return df    print(df.dtypes)    print('
Data types:
')    display(df.head(10))    print('Shape:', df.shape)    print('
✅ Data loaded successfully!')    df = pd.read_csv(csv_path)    print_section('0001F4CA DATASET OVERVIEW')def load_and_explore_data(csv_path):    # HOWTO file is optional in notebook context    download_data_file(DATA_CSV_FILE, DATA_CSV_URL)def download_required_files():        print(f'✅ {file_name} already exists.')    else:            print(f'❌ Failed to download {file_name}: {e}')        except requests.exceptions.RequestException as e:            print(f'✅ {file_name} downloaded successfully!')                f.write(response.content)            with open(file_name, 'wb') as f:            response.raise_for_status()            response = requests.get(data_url)            print(f'Attempting to download {file_name}...')        try:    if not os.path.exists(file_name):def download_data_file(file_name, data_url):import pandas as pdimport requestsimport os# Data loading & preprocessing (from original-code/data.py)cell-3-datapythoncodeprint('Utils loaded')    print('='*80)    print(title)' + '='*80)