In [1]:
import os

In [2]:
%pwd

'c:\\Users\\Orçamento\\Desktop\\Bike Sales\\VeloAnalytics\\notebooks'

In [3]:
os.chdir('../')

In [4]:
%pwd

'c:\\Users\\Orçamento\\Desktop\\Bike Sales\\VeloAnalytics'

In [None]:
from dataclasses import dataclass
from pathlib import Path
from src.logger_config import logger
import pandas as pd

In [6]:
# --- Data Transformation Configuration Entity ---
# This defines the structure for the data transformation configuration.
@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path
    output_path: Path

In [7]:
from src.utils import read_yaml, create_directories

In [None]:
class ConfigurationManager:
    def __init__(
        self, 
        config_filepath = Path("config.yaml")):
        """
        Initializes the ConfigurationManager by reading the main config file.
        It also creates the main artifacts directory.
        """
        self.config = read_yaml(config_filepath)
        create_directories([Path(self.config.artifacts_root)])
        
    def get_data_transformation_config(self) -> DataTransformationConfig:
        """
        Extracts the data transformation configuration from the main config file.
        """
        config = self.config.data_transformation
        create_directories([Path(config.root_dir), Path(config.output_path)])

        data_transformation_config = DataTransformationConfig(
            root_dir=Path(config.root_dir),
            data_path=Path(config.data_path),
            output_path=Path(config.output_path)
        )
        return data_transformation_config

In [None]:
import os
import pandas as pd
from pathlib import Path
from src.logger_config import logger
from src.utils import read_yaml

class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        """
        Initializes the DataTransformation component with its configuration
        and loads the data schema.
        """
        self.config = config
        self.schema = read_yaml(Path("schema.yaml"))

    def _clean_and_transform(self, df: pd.DataFrame, file_schema: dict) -> pd.DataFrame:
        """
        Private helper method to apply cleaning and transformations to a dataframe.
        """
        # --- 1. Enforce Data Types based on schema.yaml ---
        for col, dtype in file_schema.items():
            if col in df.columns:
                if 'date' in col.lower() or 'at' in col.lower():
                    df[col] = pd.to_datetime(df[col], format='%Y%m%d', errors='coerce')
                else:
                    # Use astype for other types, ignoring errors for robustness
                    df[col] = df[col].astype(dtype, errors='ignore')
        
        # --- 2. Handle Missing Values ---
        # Fill numeric columns with 0 and object (text) columns with 'N/A'
        for col in df.select_dtypes(include=['number']).columns:
            df[col] = df[col].fillna(0)
        for col in df.select_dtypes(include=['object', 'string']).columns:
            df[col] = df[col].fillna('N/A')
            
        return df

    def validate_and_transform_data(self):
        """
        Reads all raw CSV files, validates them against the defined schema,
        applies transformations, and saves them as processed Parquet files.
        """
        try:
            raw_data_path = self.config.data_path
            processed_data_path = self.config.output_path
            all_schemas = self.schema.COLUMNS

            all_csv_files = [f for f in os.listdir(raw_data_path) if f.endswith('.csv')]
            logger.info(f"Found {len(all_csv_files)} CSV files to transform.")

            for csv_file in all_csv_files:
                file_name = Path(csv_file).stem
                
                if file_name not in all_schemas:
                    logger.warning(f"Schema not defined for {csv_file}. Skipping.")
                    continue

                logger.info(f"Processing and validating file: {csv_file}")
                
                file_schema = all_schemas[file_name]
                # --- FIX: Added encoding='latin1' to handle special characters ---
                df = pd.read_csv(os.path.join(raw_data_path, csv_file), encoding='latin1')

                # --- Schema Column Validation ---
                schema_cols = set(file_schema.keys())
                df_cols = set(df.columns)
                
                if not schema_cols.issubset(df_cols):
                    missing_cols = schema_cols - df_cols
                    logger.error(f"Schema validation failed for {csv_file}. Missing columns: {missing_cols}")
                    continue
                
                # --- Apply Cleaning and Transformations ---
                df_transformed = self._clean_and_transform(df, file_schema)

                # --- Save the processed dataframe as a parquet file ---
                output_file_path = os.path.join(processed_data_path, f"{file_name}.parquet")
                df_transformed.to_parquet(output_file_path, index=False)
                logger.info(f"Successfully transformed and saved {csv_file} to {output_file_path}")

        except Exception as e:
            logger.exception(f"An error occurred during data transformation: {e}")
            raise e


In [13]:
# --- STAGE 3: DATA TRANSFORMATION ---
STAGE_NAME = "Data Transformation stage"
try:
    logger.info(f">>>>>> Stage '{STAGE_NAME}' started <<<<<<")
            
    # Initialize the configuration manager
    config = ConfigurationManager()
            
    # Get the specific configuration for data transformation
    data_transformation_config = config.get_data_transformation_config()
            
    # Initialize the data transformation component
    data_transformation = DataTransformation(config=data_transformation_config)
            
    # Run the transformation process
    data_transformation.validate_and_transform_data()
            
    logger.info(f">>>>>> Stage '{STAGE_NAME}' completed successfully <<<<<<\n\nx==========x")
except Exception as e:
    logger.exception(e)
    raise e

[2025-08-28 10:47:14,865: INFO: 159385861: >>>>>> Stage 'Data Transformation stage' started <<<<<<]
[2025-08-28 10:47:14,870: INFO: utils: YAML file loaded successfully: config.yaml]
[2025-08-28 10:47:14,872: INFO: utils: Directory created or already exists: artifacts]
[2025-08-28 10:47:14,874: INFO: utils: Directory created or already exists: artifacts\data_transformation]
[2025-08-28 10:47:14,875: INFO: utils: Directory created or already exists: data\02_processed]
[2025-08-28 10:47:14,885: INFO: utils: YAML file loaded successfully: schema.yaml]
[2025-08-28 10:47:14,886: INFO: 1008841359: Found 9 CSV files to transform.]
[2025-08-28 10:47:14,886: INFO: 1008841359: Processing and validating file: Addresses.csv]
[2025-08-28 10:47:14,890: ERROR: 1008841359: Schema validation failed for Addresses.csv. Missing columns: {'ADDRESSID'}]
[2025-08-28 10:47:14,892: INFO: 1008841359: Processing and validating file: BusinessPartners.csv]
[2025-08-28 10:47:14,913: INFO: 1008841359: Successfully t

                # (Optional) Here you would add more transformation logic:
                # - Enforce data types from schema
                # - Handle missing values
                # - Create new features