In [2]:
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict

In [3]:
@dataclass
class M5DatasetConfig:
    """Configuration specific to M5 Walmart dataset."""
    # Data paths
    sales_train_path: str = "data/raw/sales_train_evaluation.csv"
    prices_path: str = "data/raw/sell_prices.csv"
    calendar_path: str = "data/raw/calendar.csv"
    
    # Schema definitions
    sales_schema: Dict = None
    prices_schema: Dict = None
    calendar_schema: Dict = None
    
    # Processing parameters
    chunk_size: int = 10000
    max_memory_gb: float = 8.0
    
    # Validation thresholds
    min_sales_per_item: int = 100  
    max_zero_days_pct: float = 0.95  
    
    # Output configuration
    save_interim: bool = True
    save_features: bool = True
    compression: str = "snappy"
    load_fraction: float =0.01
    max_rows: int = None


In [4]:
import psutil
from contextlib import contextmanager
import time
import gc
import logging

logger = logging.getLogger(__name__)

class MemoryManager:
    """memory management for large dataset processing."""
    
    def __init__(self, max_memory_gb: float = 8.0):
        self.max_memory_gb = max_memory_gb
        self.max_memory_bytes = max_memory_gb * 1024**3
        
    def get_memory_usage(self) -> Dict[str, float]:
        """Get current memory usage statistics."""
        process = psutil.Process()
        memory_info = process.memory_info()
        
        return {
            'rss_gb': memory_info.rss / 1024**3,
            'vms_gb': memory_info.vms / 1024**3,
            'percent': process.memory_percent(),
            'available_gb': psutil.virtual_memory().available / 1024**3
        }
    
    def check_memory_usage(self):
        """Check if memory usage is within limits."""
        memory_stats = self.get_memory_usage()
        
        if memory_stats['rss_gb'] > self.max_memory_gb:
            logger.warning(f"Memory usage ({memory_stats['rss_gb']:.2f} GB) exceeds limit ({self.max_memory_gb} GB)")
            gc.collect()  # Force garbage collection
            
        logger.debug(f"Memory usage: {memory_stats['rss_gb']:.2f} GB ({memory_stats['percent']:.1f}%)")
    
    @contextmanager
    def memory_monitor(self, operation_name: str):
        """Context manager to monitor memory usage during operations."""
        start_memory = self.get_memory_usage()
        start_time = time.time()
        
        logger.info(f"Starting {operation_name} - Memory: {start_memory['rss_gb']:.2f} GB")
        
        try:
            yield
        finally:
            end_memory = self.get_memory_usage()
            duration = time.time() - start_time
            memory_delta = end_memory['rss_gb'] - start_memory['rss_gb']
            
            logger.info(f"Completed {operation_name} in {duration:.2f}s - "
                       f"Memory: {end_memory['rss_gb']:.2f} GB ({memory_delta:+.2f} GB)")
            
            self.check_memory_usage()

In [5]:
@dataclass
class DataQualityMetrics:
    """Comprehensive data quality metrics for M5 dataset."""
    total_time_series: int
    valid_time_series: int
    total_observations: int
    missing_sales_pct: float
    zero_sales_pct: float
    negative_sales_count: int
    price_coverage_pct: float
    calendar_coverage_pct: float
    data_completeness_score: float
    temporal_consistency_score: float
    hierarchical_consistency_score: float

In [6]:
def downcast(df: pd.DataFrame) -> pd.DataFrame:
    for col in df.columns:
        col_type = df[col].dtype

        if pd.api.types.is_integer_dtype(col_type):
            df[col] = pd.to_numeric(df[col], downcast='integer')

        elif pd.api.types.is_float_dtype(col_type):
            df[col] = pd.to_numeric(df[col], downcast='float')

        elif pd.api.types.is_object_dtype(col_type):
            if col == 'date':
                df[col] = pd.to_datetime(df[col], format='%Y-%m-%d', errors='coerce')
            else:
                # Check if column is numeric-like
                converted = pd.to_numeric(df[col], errors='coerce')
                num_missing = converted.isna().sum()
                total = len(df[col])

                if num_missing / total < 0.05:  # less than 5% non-convertible treated as numeric
                    df[col] = converted
                else:
                    df[col] = df[col].astype('category')
                    print(f"Column '{col}' converted to category")

    return df


def downcast_with_stats(df: pd.DataFrame, name: str = "DataFrame") -> pd.DataFrame:
    start_mem = df.memory_usage(deep=True).sum() / 1024**2
    df = downcast(df)
    end_mem = df.memory_usage(deep=True).sum() / 1024**2
    print(f"🔧 {name} memory usage: {start_mem:.2f} MB → {end_mem:.2f} MB ({100 * (start_mem - end_mem) / start_mem:.1f}% reduction)")
    return df


In [7]:
import json 
class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        return super().default(obj)

In [None]:
import json
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
from contextlib import contextmanager
import gc
import time
from functools import wraps
from pathlib import Path


class M5DatasetProcessor:
    """Processor for M5 Walmart dataset."""
    
    def __init__(self, config: M5DatasetConfig):
        self.config = config
        self.memory_manager = MemoryManager(config.max_memory_gb)
        self.quality_metrics = None
        
        # Setup output directories
        self.setup_directories()
        
        logger.info("M5DatasetProcessor initialized for production processing")
    
    def setup_directories(self):
        """Create required directory structure."""
        dirs = [
            "data/interim/m5",
            "data/processed/m5", 
            "data/features/m5",
            "data/quality/m5",
            "data/monitoring/m5"
        ]
        
        for dir_path in dirs:
            Path(dir_path).mkdir(parents=True, exist_ok=True)
    
    def validate_file_integrity(self) -> bool:
        """Validate that all required M5 files exist and are readable."""
        logger.info("Validating M5 dataset file integrity...")
        
        required_files = [
            self.config.sales_train_path,
            self.config.prices_path,
            self.config.calendar_path
        ]
        
        for file_path in required_files:
            path = Path(file_path)
            if not path.exists():
                logger.error(f"Required file missing: {file_path}")
                return False
            
            # Check file size (M5 files should be substantial)
            file_size_mb = path.stat().st_size / (1024 * 1024)
            logger.info(f"File {path.name}: {file_size_mb:.1f} MB")
            
            if file_size_mb < 1:  # Basic sanity check
                logger.warning(f"File {file_path} seems too small ({file_size_mb:.1f} MB)")
        
        logger.info("File integrity validation passed")
        return True
    
    def load_calendar_data(self) -> pd.DataFrame:
        """Load and validate calendar data with comprehensive error handling."""
        logger.info("Loading M5 calendar data...")
        
        with self.memory_manager.memory_monitor("calendar_loading"):
            try:
                calendar_dtypes = {
                    'date': 'str',
                    'wm_yr_wk': 'int32',
                    'weekday': 'str',  # keep as str for mapping
                    'd': 'str',
                    'event_name_1': 'str',
                    'event_type_1': 'str', 
                    'event_name_2': 'str',
                    'event_type_2': 'str',
                    'snap_CA': 'int8',
                    'snap_TX': 'int8',
                    'snap_WI': 'int8'
                }
                
                calendar = pd.read_csv(self.config.calendar_path, dtype=calendar_dtypes)
                calendar = downcast_with_stats(calendar, name="Calendar")

                # Convert date column once
                calendar['date'] = pd.to_datetime(calendar['date'], errors='coerce')

                # Map weekday strings to numbers BEFORE any numeric conversion
                weekday_map = {
                    'Sunday': 7,
                    'Monday': 1,
                    'Tuesday': 2,
                    'Wednesday': 3,
                    'Thursday': 4,
                    'Friday': 5,
                    'Saturday': 6
                }
                calendar['weekday'] = calendar['weekday'].map(weekday_map).astype('int8')

                # Validate calendar length
                expected_days = (pd.to_datetime('2016-05-22') - pd.to_datetime('2011-01-29')).days + 1
                if len(calendar) != expected_days:
                    logger.warning(f"Expected {expected_days} calendar days, found {len(calendar)}")
                
                # Date features
                calendar['year'] = calendar['date'].dt.year
                calendar['month'] = calendar['date'].dt.month  
                calendar['day'] = calendar['date'].dt.day
                calendar['quarter'] = calendar['date'].dt.quarter
                calendar['week_of_year'] = calendar['date'].dt.isocalendar().week
                
                # Weekend flag (assuming Sat=6, Sun=7 as weekend)
                calendar['is_weekend'] = calendar['weekday'].isin([6,7]).astype('int8')
                
                # Event processing
                calendar['has_event'] = (
                    calendar['event_name_1'].notna() | calendar['event_name_2'].notna()
                ).astype('int8')
                
                # SNAP benefits
                calendar['snap_any'] = (
                    (calendar['snap_CA'] == 1) |
                    (calendar['snap_TX'] == 1) |
                    (calendar['snap_WI'] == 1)
                ).astype('int8')
                
                logger.info(f"Calendar loaded: {len(calendar)} days, {calendar['date'].min()} to {calendar['date'].max()}")
                
                return calendar
            
            except Exception as e:
                logger.error(f"Failed to load calendar data: {e}")
                raise

        
    def load_prices_data(self) -> pd.DataFrame:
        """Load and validate pricing data."""
        logger.info("Loading M5 pricing data...")
        
        with self.memory_manager.memory_monitor("prices_loading"):
            try:
                # Load with memory-efficient dtypes
                prices_dtypes = {
                    'store_id': 'str',
                    'item_id': 'str', 
                    'wm_yr_wk': 'int32',
                    'sell_price': 'float32'
                }
                
                prices = pd.read_csv(self.config.prices_path, dtype=prices_dtypes)
                prices = downcast_with_stats(prices, name="Prices")

                if self.config.load_fraction:
                    n_rows = int(len(prices) * self.config.load_fraction)
                    prices = prices.iloc[:n_rows]  # Top rows
                    logger.info(f"Using only {n_rows} rows ({self.config.load_fraction:.0%}) of prices data.")
                    
                # Data quality checks
                null_prices = prices['sell_price'].isna().sum()
                if null_prices > 0:
                    logger.warning(f"Found {null_prices} null prices ({null_prices/len(prices)*100:.2f}%)")
                
                negative_prices = (prices['sell_price'] < 0).sum()
                if negative_prices > 0:
                    logger.error(f"Found {negative_prices} negative prices - this is critical!")
                    # In production, you might want to raise an exception or alert
                
                zero_prices = (prices['sell_price'] == 0).sum()
                if zero_prices > 0:
                    logger.warning(f"Found {zero_prices} zero prices ({zero_prices/len(prices)*100:.2f}%)")
                
                # Price statistics
                price_stats = prices['sell_price'].describe()
                logger.info(f"Price statistics: min=${price_stats['min']:.2f}, "
                        f"max=${price_stats['max']:.2f}, mean=${price_stats['mean']:.2f}")
                
                logger.info(f"Prices loaded: {len(prices)} price points for {prices['item_id'].nunique()} items")
                
                return prices
                
            except Exception as e:
                logger.error(f"Failed to load prices data: {e}")
                raise

    def load_sales_data_chunked(self) -> pd.DataFrame:
        """Load sales data in chunks to handle memory efficiently."""
        logger.info("Loading M5 sales data (chunked processing)...")
        
        with self.memory_manager.memory_monitor("sales_loading"):
            try:
                # First, get the column structure
                sample_df = pd.read_csv(self.config.sales_train_path, nrows=1)
                
                # Identify day columns (d_1, d_2, etc.)
                day_columns = [col for col in sample_df.columns if col.startswith('d_')]
                id_columns = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']

                total_rows = sum(1 for _ in open(self.config.sales_train_path)) - 1  # exclude header
                
                # Determine target rows based on load_fraction and optional max_rows
                if hasattr(self.config, 'max_rows') and self.config.max_rows is not None:
                    target_rows = min(int(total_rows * self.config.load_fraction), self.config.max_rows)
                else:
                    target_rows = int(total_rows * self.config.load_fraction)

                chunk_size = self.config.chunk_size

                total_chunks = (target_rows + chunk_size - 1) // chunk_size
                logger.info(f"Total rows in sales: {total_rows}")
                logger.info(f"Loading {self.config.load_fraction:.0%} of dataset → {target_rows} rows in ~{total_chunks} chunks (chunk size={chunk_size})")                
                # Load in chunks for memory efficiency
                chunks = []
                rows_loaded = 0
                chunk_iterator = pd.read_csv(
                    self.config.sales_train_path,
                    chunksize=self.config.chunk_size,
                    dtype={col: 'str' for col in id_columns}
                )
                
                for i, chunk in enumerate(chunk_iterator, start=1):
                    if rows_loaded >= target_rows:
                        logger.info(f"Reached target rows ({target_rows}), stopping further loading")
                        break

                    if rows_loaded + len(chunk) > target_rows:
                        chunk = chunk.iloc[:target_rows - rows_loaded]

                    for col in day_columns:
                        chunk[col] = pd.to_numeric(chunk[col], downcast='integer')

                    chunks.append(chunk)
                    rows_loaded += len(chunk)

                    logger.info(f"Loaded chunk {i}/{total_chunks}, rows loaded so far: {rows_loaded}")

                    self.memory_manager.check_memory_usage()

                sales_wide = pd.concat(chunks, ignore_index=True)
                del chunks
                gc.collect()

                logger.info(f"Completed loading sales data: {len(sales_wide)} items × {len(day_columns)} days")

                return sales_wide, day_columns

            except Exception as e:
                logger.error(f"Failed to load sales data: {e}")
                raise

    
    def reshape_sales_data(self, sales_wide: pd.DataFrame, day_columns: List[str]) -> pd.DataFrame:
        logger.info("Reshaping sales data from wide to long format...")
        with self.memory_manager.memory_monitor("sales_reshaping"):
            try:
                id_columns = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']

                # If a 'sales' column exists, rename it to avoid conflict
                if 'sold' in sales_wide.columns:
                    logger.warning("'sales' column exists in sales_wide, renaming to 'sales_old'")
                    sales_wide = sales_wide.rename(columns={'sold': 'sold_old'})



                logger.info(f"Columns before melt: {sales_wide.columns.tolist()}")

                sales_long = sales_wide.melt(
                    id_vars=id_columns,
                    value_vars=day_columns,
                    var_name='d',
                    value_name='sold'  
                )

                sales_long['sold'] = pd.to_numeric(sales_long['sold'], downcast='integer')
                sales_long = sales_long.sort_values(['item_id', 'store_id', 'd'])

                logger.info(f"Reshaped to long format: {len(sales_long)} observations")

                return sales_long

            except Exception as e:
                logger.error(f"Failed to reshape sales data: {e}")
                logger.error(f"Columns at error: {sales_wide.columns.tolist()}")
                raise
              
    def create_master_dataset(
        self,
        sales_wide: pd.DataFrame,
        calendar: pd.DataFrame,
        prices: pd.DataFrame,
        chunk_size: int = 1000000
    ) -> Path:
        """
        Chunked version of master dataset creation with debug and fix for merge key mismatches.
        """
        import gc

        logger.info("Creating master dataset in chunks with enhanced key alignment and debug...")

        output_dir = Path("data/processed/m5/master_chunks")
        output_dir.mkdir(parents=True, exist_ok=True)

        try:
            max_chunks = 3
            total_rows = len(sales_wide)
            total_chunks = (total_rows + chunk_size - 1) // chunk_size
            logger.info(f"Total rows: {total_rows}, Chunk size: {chunk_size}, Total chunks: {total_chunks}")

            chunk_paths = []

            for i, start in enumerate(range(0, total_rows, chunk_size), 1):
                if i > max_chunks:
                    logger.info(f"Reached max chunk limit ({max_chunks}), stopping further processing")
                    break

                end = min(start + chunk_size, total_rows)
                logger.info(f"Processing chunk {i}/{total_chunks}: rows {start} to {end}")
                chunk = sales_wide.iloc[start:end].copy()

                # Melt the chunk
                id_vars = ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id']

                sales_long = chunk.melt(
                    id_vars=id_vars,
                    var_name='d',
                    value_name='sales'
                )

                # Step 1: Align calendar 'd' categories with sales_long 'd'
                sales_long['d'] = sales_long['d'].astype('category')
                calendar['d'] = calendar['d'].astype('category')
                calendar['d'] = calendar['d'].cat.set_categories(sales_long['d'].cat.categories)


                # Step 2: Merge with calendar
                df = sales_long.merge(calendar, on='d', how='left')

                # Step 5: Merge with prices
                df = df.merge(
                    prices[['store_id', 'item_id', 'wm_yr_wk', 'sell_price']],
                    on=['store_id', 'item_id', 'wm_yr_wk'],
                    how='left'
                )


                # Compute revenue
                df['sales'] = pd.to_numeric(df['sales'], errors='coerce').fillna(0).astype('float32')
                # df['sell_price'] = df['sell_price_y']
                # df.drop(['sell_price_x'], axis=1, inplace=True)
                df['revenue'] = (df['sales'] * df['sell_price']).astype('float32')

                # Optional: downcast to reduce memory footprint (assuming you have this function)
                df = downcast_with_stats(df, name=f"Master Chunk [{start}-{end}]")

                # Save chunk to disk
                out_path = output_dir / f"master_chunk_{start}_{end}.parquet"
                df.to_parquet(out_path, index=False)
                chunk_paths.append(out_path)

                logger.info(f"Saved chunk: {out_path} ({df.shape[0]} rows)")

                del chunk, sales_long, df
                gc.collect()

            logger.info(f"All master chunks saved to {output_dir}")
            return output_dir

        except Exception as e:
            logger.error(f"Failed during chunked master dataset creation: {e}")
            raise

    def load_master_dataset(self, master_data_dir: Path) -> pd.DataFrame:
       import glob
       parquet_files = glob.glob(str(master_data_dir / "*.parquet"))
       dfs = [pd.read_parquet(f) for f in parquet_files]
       return pd.concat(dfs, ignore_index=True)

    def validate_data_quality(self, df: pd.DataFrame) -> DataQualityMetrics:
        """Comprehensive data quality validation for M5 dataset."""
        logger.info("Performing comprehensive data quality validation...")
        
        with self.memory_manager.memory_monitor("data_quality_validation"):
            try:
                # Basic counts
                total_obs = len(df)
                unique_series = df.groupby(['store_id', 'item_id'],observed=False).ngroups
                
                # Sales data quality
                missing_sales = df['sales'].isna().sum()
                missing_sales_pct = (missing_sales / total_obs) * 100
                
                zero_sales = (df['sales'] == 0).sum()
                zero_sales_pct = (zero_sales / total_obs) * 100
                
                negative_sales = (df['sales'] < 0).sum()
                
                # Price coverage
                missing_prices = df['sell_price'].isna().sum()
                price_coverage_pct = ((total_obs - missing_prices) / total_obs) * 100
                
                # Calendar coverage (should be 100% after join)
                missing_dates = df['date'].isna().sum()
                calendar_coverage_pct = ((total_obs - missing_dates) / total_obs) * 100
                
                # Data completeness score (weighted average of key metrics)
                completeness_score = (
                    (100 - missing_sales_pct) * 0.4 +  # Sales most important
                    price_coverage_pct * 0.3 +         # Prices important for revenue
                    calendar_coverage_pct * 0.3        # Calendar features important
                ) / 100
                
                # Temporal consistency (check for gaps in time series)
                temporal_gaps = 0
                sample_series = df.groupby(['store_id', 'item_id'],observed=False).head(1000)  # Sample for efficiency
                for (store, item), group in sample_series.groupby(['store_id', 'item_id'],observed=False):
                    date_diff = group['date'].diff().dt.days
                    gaps = (date_diff > 1).sum()
                    temporal_gaps += gaps
                
                temporal_consistency_score = max(0, 1 - (temporal_gaps / unique_series))
                
                # Hierarchical consistency (validate item-dept-cat relationships)
                hierarchy_issues = 0
                item_dept = df.groupby('item_id')['dept_id'].nunique()
                hierarchy_issues += (item_dept > 1).sum()  # Items should have single dept
                
                dept_cat = df.groupby('dept_id')['cat_id'].nunique()
                hierarchy_issues += (dept_cat > 1).sum()  # Depts should have single category
                
                hierarchical_consistency_score = max(0, 1 - (hierarchy_issues / (df['item_id'].nunique() + df['dept_id'].nunique())))
                
                # Filter valid time series (meet minimum requirements)
                series_sales_counts = df.groupby(['store_id', 'item_id'],observed=False)['sales'].count()
                valid_series = (series_sales_counts >= self.config.min_sales_per_item).sum()
                
                series_zero_pcts = df.groupby(['store_id', 'item_id'],observed=False)['sales'].apply(lambda x: (x == 0).mean())
                valid_series_zero = (series_zero_pcts <= self.config.max_zero_days_pct).sum()
                
                valid_time_series = min(valid_series, valid_series_zero)
                
                # Create quality metrics
                quality_metrics = DataQualityMetrics(
                    total_time_series=unique_series,
                    valid_time_series=valid_time_series,
                    total_observations=total_obs,
                    missing_sales_pct=missing_sales_pct,
                    zero_sales_pct=zero_sales_pct,
                    negative_sales_count=negative_sales,
                    price_coverage_pct=price_coverage_pct,
                    calendar_coverage_pct=calendar_coverage_pct,
                    data_completeness_score=completeness_score,
                    temporal_consistency_score=temporal_consistency_score,
                    hierarchical_consistency_score=hierarchical_consistency_score
                )
                
                self.quality_metrics = quality_metrics
                
                # Log quality summary
                logger.info("=" * 60)
                logger.info("DATA QUALITY SUMMARY")
                logger.info("=" * 60)
                logger.info(f"Total time series: {unique_series:,}")
                logger.info(f"Valid time series: {valid_time_series:,} ({valid_time_series/unique_series*100:.1f}%)")
                logger.info(f"Total observations: {total_obs:,}")
                logger.info(f"Missing sales: {missing_sales_pct:.2f}%")
                logger.info(f"Zero sales: {zero_sales_pct:.1f}%")
                logger.info(f"Negative sales: {negative_sales:,}")
                logger.info(f"Price coverage: {price_coverage_pct:.1f}%")

                return quality_metrics
            
            except Exception as e:
                logger.error(f"Data quality validation failed: {e}")
                raise


    def save_quality_report(self, metrics: DataQualityMetrics):
        """Save comprehensive data quality report."""
        report_path = Path("data/quality/m5/quality_report.json")
        report_path.parent.mkdir(parents=True, exist_ok=True)  # Ensure the folder exists

        report = {
            'timestamp': datetime.utcnow().isoformat(),
            'dataset': 'M5_Walmart',
            'quality_metrics': asdict(metrics),
            'validation_config': {
                'min_sales_per_item': self.config.min_sales_per_item,
                'max_zero_days_pct': self.config.max_zero_days_pct
            }
        }

        try:
            with open(report_path, 'w') as f:
                json.dump(report, f, indent=2, cls=NumpyEncoder)

            logger.info(f"✓ Quality report saved to {report_path}")
        except Exception as e:
            logger.error(f"Failed to save quality report: {e}", exc_info=True)

    
    def save_processed_data(self, df: pd.DataFrame, filename: str = "processed"):
        """Save processed data with optimal compression and metadata."""
        logger.info(f"Saving processed M5 data: {filename}")
        
        with self.memory_manager.memory_monitor("data_saving"):
            try:
                output_path = Path(f"data/processed/m5/{filename}.parquet")
                
                # Add processing metadata
                metadata = {
                    'processed_at': datetime.utcnow().isoformat(),
                    'shape': f"{df.shape[0]}x{df.shape[1]}",
                    'date_range_start': df['date'].min().isoformat(),
                    'date_range_end': df['date'].max().isoformat(),
                    'unique_time_series': int(df.groupby(['store_id', 'item_id'],observed=False).ngroups),
                    'data_quality_score': float(self.quality_metrics.data_completeness_score) if self.quality_metrics else None
                }
                
                # Convert to PyArrow table with metadata
                table = pa.Table.from_pandas(df)
                table = table.replace_schema_metadata({
                    key: str(value) for key, value in metadata.items()
                })
                
                # Save with optimal compression
                pq.write_table(
                    table, 
                    output_path,
                    compression=self.config.compression,
                    row_group_size=50000,  # Optimize for read performance
                    use_dictionary=True     # Compress categorical columns
                )
                
                file_size_mb = output_path.stat().st_size / (1024 * 1024)
                logger.info(f"Saved processed data: {output_path} ({file_size_mb:.1f} MB)")
                
                
                return output_path
                
            except Exception as e:
                logger.error(f"Failed to save processed data: {e}")
                raise
    
    def run_full_pipeline(self) -> Tuple[pd.DataFrame, DataQualityMetrics]:
        """Run the complete M5 dataset processing pipeline."""
        logger.info("Starting FAANG-level M5 dataset processing pipeline...")
        
        start_time = time.time()
        
        try:
            # Step 1: Validate file integrity
            if not self.validate_file_integrity():
                raise ValueError("File integrity validation failed")
            
            # Step 2: Load calendar data
            calendar = self.load_calendar_data()
            
            # Step 3: Load pricing data
            prices = self.load_prices_data()
            
            # Step 4: Load and reshape sales data
            sales_wide, day_columns = self.load_sales_data_chunked()
            
            # Step 5: Create master dataset
            master_data_dir = self.create_master_dataset(sales_wide, calendar, prices)

            master_df = self.load_master_dataset(master_data_dir)

            # Free memory
            del sales_wide, calendar, prices
            gc.collect()
            
            # Step 6: Validate data quality
            quality_metrics = self.validate_data_quality(master_df)

            self.save_quality_report(quality_metrics)
            
            # Step 7: Save processed data
            if self.config.save_interim:
                self.save_processed_data(master_df, "master")
            
            duration = time.time() - start_time
            
            logger.info("=" * 80)
            logger.info("🎉 M5 DATASET PROCESSING COMPLETED SUCCESSFULLY")
            logger.info("=" * 80)
            logger.info(f"Processing time: {duration/60:.1f} minutes")
            logger.info(f"Final dataset shape: {master_df.shape}")
            logger.info(f"Memory usage: {self.memory_manager.get_memory_usage()['rss_gb']:.2f} GB")
            logger.info(f"Data quality score: {quality_metrics.data_completeness_score:.3f}")
            logger.info("=" * 80)
            
            return master_df, quality_metrics
            
        except Exception as e:
            logger.error(f"Pipeline failed: {e}")
            raise
        finally:
            # Cleanup
            self.db.close()




In [218]:
config = M5DatasetConfig()
processor = M5DatasetProcessor(config)

In [179]:
calendar = processor.load_calendar_data()

Expected 1941 calendar days, found 1969


Column 'weekday' converted to category
Column 'd' converted to category
Column 'event_name_1' converted to category
Column 'event_type_1' converted to category
Column 'event_name_2' converted to category
Column 'event_type_2' converted to category
🔧 Calendar memory usage: 0.67 MB → 0.23 MB (65.8% reduction)


In [180]:
calendar

Unnamed: 0,date,wm_yr_wk,weekday,wday,month,year,d,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI,day,quarter,week_of_year,is_weekend,has_event,snap_any
0,2011-01-29,11101,6,1,1,2011,d_1,,,,,0,0,0,29,1,4,1,0,0
1,2011-01-30,11101,7,2,1,2011,d_2,,,,,0,0,0,30,1,4,1,0,0
2,2011-01-31,11101,1,3,1,2011,d_3,,,,,0,0,0,31,1,5,0,0,0
3,2011-02-01,11101,2,4,2,2011,d_4,,,,,1,1,0,1,1,5,0,0,1
4,2011-02-02,11101,3,5,2,2011,d_5,,,,,1,0,1,2,1,5,0,0,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1964,2016-06-15,11620,3,5,6,2016,d_1965,,,,,0,1,1,15,2,24,0,0,1
1965,2016-06-16,11620,4,6,6,2016,d_1966,,,,,0,0,0,16,2,24,0,0,0
1966,2016-06-17,11620,5,7,6,2016,d_1967,,,,,0,0,0,17,2,24,0,0,0
1967,2016-06-18,11621,6,1,6,2016,d_1968,,,,,0,0,0,18,2,24,1,0,0


In [181]:
df = pd.read_csv(r"C:\Users\Administrator\OneDrive\Desktop\SalesAI\backend\data\raw\calendar.csv")

In [182]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1969 entries, 0 to 1968
Data columns (total 14 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   date          1969 non-null   object
 1   wm_yr_wk      1969 non-null   int64 
 2   weekday       1969 non-null   object
 3   wday          1969 non-null   int64 
 4   month         1969 non-null   int64 
 5   year          1969 non-null   int64 
 6   d             1969 non-null   object
 7   event_name_1  162 non-null    object
 8   event_type_1  162 non-null    object
 9   event_name_2  5 non-null      object
 10  event_type_2  5 non-null      object
 11  snap_CA       1969 non-null   int64 
 12  snap_TX       1969 non-null   int64 
 13  snap_WI       1969 non-null   int64 
dtypes: int64(7), object(7)
memory usage: 215.5+ KB


In [183]:
calendar.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1969 entries, 0 to 1968
Data columns (total 20 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   date          1969 non-null   datetime64[ns]
 1   wm_yr_wk      1969 non-null   int16         
 2   weekday       1969 non-null   int8          
 3   wday          1969 non-null   int8          
 4   month         1969 non-null   int32         
 5   year          1969 non-null   int32         
 6   d             1969 non-null   category      
 7   event_name_1  162 non-null    category      
 8   event_type_1  162 non-null    category      
 9   event_name_2  5 non-null      category      
 10  event_type_2  5 non-null      category      
 11  snap_CA       1969 non-null   int8          
 12  snap_TX       1969 non-null   int8          
 13  snap_WI       1969 non-null   int8          
 14  day           1969 non-null   int32         
 15  quarter       1969 non-null   int32   

In [184]:
calendar.isnull().mean()*100

date             0.000000
wm_yr_wk         0.000000
weekday          0.000000
wday             0.000000
month            0.000000
year             0.000000
d                0.000000
event_name_1    91.772473
event_type_1    91.772473
event_name_2    99.746064
event_type_2    99.746064
snap_CA          0.000000
snap_TX          0.000000
snap_WI          0.000000
day              0.000000
quarter          0.000000
week_of_year     0.000000
is_weekend       0.000000
has_event        0.000000
snap_any         0.000000
dtype: float64

In [185]:
df.isnull().mean()*100

date             0.000000
wm_yr_wk         0.000000
weekday          0.000000
wday             0.000000
month            0.000000
year             0.000000
d                0.000000
event_name_1    91.772473
event_type_1    91.772473
event_name_2    99.746064
event_type_2    99.746064
snap_CA          0.000000
snap_TX          0.000000
snap_WI          0.000000
dtype: float64

In [186]:
import os
os.chdir(r"C:\Users\Administrator\OneDrive\Desktop\SalesAI\backend")

In [187]:
prices = processor.load_prices_data()

Column 'store_id' converted to category
Column 'item_id' converted to category
🔧 Prices memory usage: 905.33 MB → 58.98 MB (93.5% reduction)


In [188]:
prices.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 68411 entries, 0 to 68410
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype   
---  ------      --------------  -----   
 0   store_id    68411 non-null  category
 1   item_id     68411 non-null  category
 2   wm_yr_wk    68411 non-null  int16   
 3   sell_price  68411 non-null  float32 
dtypes: category(2), float32(1), int16(1)
memory usage: 690.1 KB


In [189]:
prices_df = pd.read_csv(r"C:\Users\Administrator\OneDrive\Desktop\SalesAI\backend\data\raw\sell_prices.csv")

In [190]:
prices.shape, prices_df.shape

((68411, 4), (6841121, 4))

In [191]:
prices.isnull().mean()*100

store_id      0.0
item_id       0.0
wm_yr_wk      0.0
sell_price    0.0
dtype: float64

In [192]:
prices_df.isnull().mean()*100

store_id      0.0
item_id       0.0
wm_yr_wk      0.0
sell_price    0.0
dtype: float64

In [193]:
sales_wide, day_columns = processor.load_sales_data_chunked()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  chunk[col] = pd.to_numeric(chunk[col], downcast='integer')


In [194]:
sales_wide.columns

Index(['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id', 'd_1',
       'd_2', 'd_3', 'd_4',
       ...
       'd_1932', 'd_1933', 'd_1934', 'd_1935', 'd_1936', 'd_1937', 'd_1938',
       'd_1939', 'd_1940', 'd_1941'],
      dtype='object', length=1947)

In [195]:
sales_wide.shape

(304, 1947)

In [196]:
sales_wide.isnull().sum()

id          0
item_id     0
dept_id     0
cat_id      0
store_id    0
           ..
d_1937      0
d_1938      0
d_1939      0
d_1940      0
d_1941      0
Length: 1947, dtype: int64

In [197]:
len(day_columns)

1941

In [198]:
gc.collect()

0

In [199]:
calendar['wm_yr_wk'].isnull().sum()

0

In [200]:
print("Creating master dataset in chunks...")
output_dir = Path("data/processed/m5/master_chunks")
output_dir.mkdir(parents=True, exist_ok=True)

Creating master dataset in chunks...


In [201]:
chunk_size: int = 1000000
max_chunks = 3
total_rows = len(sales_wide)
total_chunks = (total_rows + chunk_size - 1) // chunk_size  # ceil division
print(f"Total rows: {total_rows}, Chunk size: {chunk_size}, Total chunks to process: {total_chunks}")


Total rows: 304, Chunk size: 1000000, Total chunks to process: 1


In [116]:
chunk_paths = []

for i, start in enumerate(range(0, total_rows, chunk_size), 1):
    if i > max_chunks:
        logger.info(f"Reached max chunk limit ({max_chunks}), stopping further processing")
        break

    end = min(start + chunk_size, total_rows)
    logger.info(f"Processing chunk {i}/{total_chunks}: rows {start} to {end}")
    chunk = sales_wide.iloc[start:end].copy()

In [117]:
chunk.isnull().sum()

id          0
item_id     0
dept_id     0
cat_id      0
store_id    0
           ..
d_1937      0
d_1938      0
d_1939      0
d_1940      0
d_1941      0
Length: 1947, dtype: int64

In [118]:
chunk.shape

(304, 1947)

In [119]:
# Melt the chunk
sales_long = chunk.melt(
                        id_vars=['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id'],
                        var_name='d',
                        value_name='sales'
                    )

In [120]:
sales_long.shape

(590064, 8)

In [121]:
sales_long.isnull().sum()

id          0
item_id     0
dept_id     0
cat_id      0
store_id    0
state_id    0
d           0
sales       0
dtype: int64

In [122]:
print(sales_long['sales'].isna().sum())          # Count NaNs
print(np.isinf(sales_long['sales']).sum())  

0
0


In [123]:
sales_long['d'] = sales_long['d'].astype('category')

In [124]:
sales_long['d'].dtype

CategoricalDtype(categories=['d_1', 'd_10', 'd_100', 'd_1000', 'd_1001', 'd_1002',
                  'd_1003', 'd_1004', 'd_1005', 'd_1006',
                  ...
                  'd_990', 'd_991', 'd_992', 'd_993', 'd_994', 'd_995',
                  'd_996', 'd_997', 'd_998', 'd_999'],
, ordered=False, categories_dtype=object)

In [125]:
calendar['d'].dtype

CategoricalDtype(categories=['d_1', 'd_10', 'd_100', 'd_1000', 'd_1001', 'd_1002',
                  'd_1003', 'd_1004', 'd_1005', 'd_1006',
                  ...
                  'd_990', 'd_991', 'd_992', 'd_993', 'd_994', 'd_995',
                  'd_996', 'd_997', 'd_998', 'd_999'],
, ordered=False, categories_dtype=object)

In [126]:
calendar['d'] = calendar['d'].astype('category')

In [127]:
calendar['d'].dtype

CategoricalDtype(categories=['d_1', 'd_10', 'd_100', 'd_1000', 'd_1001', 'd_1002',
                  'd_1003', 'd_1004', 'd_1005', 'd_1006',
                  ...
                  'd_990', 'd_991', 'd_992', 'd_993', 'd_994', 'd_995',
                  'd_996', 'd_997', 'd_998', 'd_999'],
, ordered=False, categories_dtype=object)

In [128]:
calendar['d'] = calendar['d'].cat.set_categories(sales_long['d'].cat.categories)


In [129]:
calendar['d'].isnull().sum()

28

In [131]:
df = sales_long.merge(calendar, on='d',how='left')

In [130]:
df.shape, calendar.shape,sales_long.shape

((1969, 14), (1969, 20), (590064, 8))

In [132]:
df.isnull().sum()

id                   0
item_id              0
dept_id              0
cat_id               0
store_id             0
state_id             0
d                    0
sales                0
date                 0
wm_yr_wk             0
weekday              0
wday                 0
month                0
year                 0
event_name_1    542032
event_type_1    542032
event_name_2    588848
event_type_2    588848
snap_CA              0
snap_TX              0
snap_WI              0
day                  0
quarter              0
week_of_year         0
is_weekend           0
has_event            0
snap_any             0
dtype: int64

In [85]:
print(f"Unique 'd' in sales_long: {sales_long['d'].nunique()}")
print(f"Unique 'd' in calendar: {calendar['d'].nunique()}")

missing_days = set(sales_long['d'].unique()) - set(calendar['d'].unique())
print(f"Number of 'd' values in sales_long not in calendar: {len(missing_days)}")
print(f"Some missing 'd' values: {list(missing_days)[:10]}")


Unique 'd' in sales_long: 1941
Unique 'd' in calendar: 19
Number of 'd' values in sales_long not in calendar: 1922
Some missing 'd' values: ['d_1365', 'd_485', 'd_1741', 'd_68', 'd_1421', 'd_144', 'd_1860', 'd_1495', 'd_1787', 'd_1908']


In [86]:
print(sales_long['d'].dtype)
print(calendar['d'].dtype)


category
category


In [135]:
prices

Unnamed: 0,store_id,item_id,wm_yr_wk,sell_price
0,CA_1,HOBBIES_1_001,11325,9.58
1,CA_1,HOBBIES_1_001,11326,9.58
2,CA_1,HOBBIES_1_001,11327,8.26
3,CA_1,HOBBIES_1_001,11328,8.26
4,CA_1,HOBBIES_1_001,11329,8.26
...,...,...,...,...
68406,CA_1,HOBBIES_1_304,11317,3.98
68407,CA_1,HOBBIES_1_304,11318,3.98
68408,CA_1,HOBBIES_1_304,11319,3.98
68409,CA_1,HOBBIES_1_304,11320,3.98


In [134]:
df.shape,prices.shape

((590064, 27), (68411, 4))

In [139]:
# Merge with prices
df = df.merge(
            prices[['store_id', 'item_id', 'wm_yr_wk', 'sell_price']],
            on=['store_id', 'item_id', 'wm_yr_wk'],
            how='left'
                    )

In [140]:
df.shape

(469142, 29)

In [141]:
df.isnull().sum()

id                   0
item_id              0
dept_id              0
cat_id               0
store_id             0
state_id             0
d                    0
sales                0
date                 0
wm_yr_wk             0
weekday              0
wday                 0
month                0
year                 0
event_name_1    431106
event_type_1    431106
event_name_2    468193
event_type_2    468193
snap_CA              0
snap_TX              0
snap_WI              0
day                  0
quarter              0
week_of_year         0
is_weekend           0
has_event            0
snap_any             0
sell_price_x         0
sell_price_y         0
dtype: int64

In [150]:
df['sales'].isnull().sum()

0

In [203]:
print('sell_price' in df.columns)  # Should be True



False


In [160]:
df['revenue'] = (df['sales'] * df['sell_price']).astype('float32')

In [161]:
df['revenue'].isnull().sum()

0

In [163]:
df = downcast_with_stats(df, name=f"Master Chunk [{start}-{end}]")

Column 'id' converted to category
Column 'item_id' converted to category
Column 'dept_id' converted to category
Column 'cat_id' converted to category
Column 'store_id' converted to category
Column 'state_id' converted to category
🔧 Master Chunk [0-304] memory usage: 207.33 MB → 23.06 MB (88.9% reduction)


In [164]:
out_path = output_dir / f"master_chunk_{start}_{end}.parquet"

In [165]:
df.to_parquet(out_path, index=False)
chunk_paths.append(out_path)

In [166]:
print(f"Saved chunk: {out_path} ({df.shape[0]} rows)")

Saved chunk: data\processed\m5\master_chunks\master_chunk_0_304.parquet (469142 rows)


In [168]:
del chunk, sales_long, df
gc.collect()

2908

In [167]:
output_dir

WindowsPath('data/processed/m5/master_chunks')

In [210]:
master_data = processor.create_master_dataset(sales_wide,calendar,prices)

Column 'id' converted to category
Column 'item_id' converted to category
Column 'dept_id' converted to category
Column 'cat_id' converted to category
Column 'store_id' converted to category
Column 'state_id' converted to category
🔧 Master Chunk [0-304] memory usage: 260.73 MB → 28.94 MB (88.9% reduction)


In [209]:
master_data

WindowsPath('data/processed/m5/master_chunks')

In [212]:
ds = pd.read_parquet(r"C:\Users\Administrator\OneDrive\Desktop\SalesAI\backend\data\processed\m5\master_chunks\master_chunk_0_304.parquet")

In [220]:
master_df = processor.load_master_dataset(master_data)

In [221]:
master_df.isnull().sum()

id                   0
item_id              0
dept_id              0
cat_id               0
store_id             0
state_id             0
d                    0
sales                0
date                 0
wm_yr_wk             0
weekday              0
wday                 0
month                0
year                 0
event_name_1    542032
event_type_1    542032
event_name_2    588848
event_type_2    588848
snap_CA              0
snap_TX              0
snap_WI              0
day                  0
quarter              0
week_of_year         0
is_weekend           0
has_event            0
snap_any             0
sell_price      120922
revenue         120922
dtype: int64

In [223]:
 quality_metrics = processor.validate_data_quality(master_df)

  unique_series = df.groupby(['store_id', 'item_id']).ngroups
  sample_series = df.groupby(['store_id', 'item_id']).head(1000)  # Sample for efficiency
  for (store, item), group in sample_series.groupby(['store_id', 'item_id']):
  item_dept = df.groupby('item_id')['dept_id'].nunique()
  dept_cat = df.groupby('dept_id')['cat_id'].nunique()
  series_sales_counts = df.groupby(['store_id', 'item_id'])['sales'].count()
  series_zero_pcts = df.groupby(['store_id', 'item_id'])['sales'].apply(lambda x: (x == 0).mean())


In [224]:
processor.save_quality_report(quality_metrics)

In [225]:
if config.save_interim:
   processor.save_processed_data(master_df, "master")

  'unique_time_series': int(df.groupby(['store_id', 'item_id']).ngroups),


In [226]:
final_df = pd.read_parquet(r"C:\Users\Administrator\OneDrive\Desktop\SalesAI\backend\data\processed\m5\master.parquet")

In [229]:
final_df.isnull().sum()

id                   0
item_id              0
dept_id              0
cat_id               0
store_id             0
state_id             0
d                    0
sales                0
date                 0
wm_yr_wk             0
weekday              0
wday                 0
month                0
year                 0
event_name_1    542032
event_type_1    542032
event_name_2    588848
event_type_2    588848
snap_CA              0
snap_TX              0
snap_WI              0
day                  0
quarter              0
week_of_year         0
is_weekend           0
has_event            0
snap_any             0
sell_price      120922
revenue         120922
dtype: int64