# 1. Setup and Configuration

In [12]:
import os
import sys
import warnings
warnings.filterwarnings('ignore')

# Display current working directory
print(f"Current directory: {os.getcwd()}")

# Navigate to the project root directory if we're in the research folder
if os.path.basename(os.getcwd()) == "research":
    os.chdir("../")
    print(f"Changed to project root: {os.getcwd()}")

# Add the src directory to the Python path for imports
src_path = os.path.join(os.getcwd(), "src")
if src_path not in sys.path:
    sys.path.append(src_path)
    print(f"Added {src_path} to Python path")

Current directory: c:\end-to-end-wine-quality-project
Added c:\end-to-end-wine-quality-project\src to Python path


In [13]:
# Test importing the project modules
try:
    import mlProject
    print(f"mlProject package found in: {mlProject.__path__}")
    
    # Import core modules
    from mlProject.constants import *
    from mlProject.utils.common import read_yaml, create_directories, get_size
    from mlProject import logger
    print("All imports successful!")
except ModuleNotFoundError as e:
    print(f"Module import failed: {e}")
    print("Try installing the package in development mode: pip install -e .")
    raise
except Exception as e:
    print(f"Unexpected error: {e}")
    raise

mlProject package found in: ['C:\\end-to-end-wine-quality-project\\src\\mlProject']
All imports successful!


# 2. Data Configuration Classes
Define a dataclass to hold configuration for data ingestion:

In [14]:
from dataclasses import dataclass
from pathlib import Path
import urllib.request as request
import zipfile
import os
import shutil

@dataclass(frozen=True)
class DataIngestionConfig:
    """Configuration class for data ingestion parameters."""
    root_dir: Path
    source_URL: str
    local_data_file: Path
    unzip_dir: Path

# 3. Implementing the Configuration Manager
The Configuration Manager loads YAML configurations and creates component-specific configurations.

In [15]:
class ConfigurationManager:
    """
    Handles loading configuration, parameters, and schema from YAML files
    and provides component-specific configurations.
    """
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH):
        
        # Load configuration files
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)
        
        # Create root artifacts directory
        create_directories([self.config.artifacts_root])
        
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        """
        Returns DataIngestionConfig object with parameters from config.yaml
        """
        config = self.config.data_ingestion
        
        # Create data ingestion directory
        create_directories([config.root_dir])
        
        # Create and return the data ingestion configuration
        data_ingestion_config = DataIngestionConfig(
            root_dir=config.root_dir,
            source_URL=config.source_URL,
            local_data_file=config.local_data_file,
            unzip_dir=config.unzip_dir
        )
        
        return data_ingestion_config

# 4. Data Ingestion Component
Let's implement a robust data ingestion component with multiple download methods and error handling:

In [16]:
class DataIngestion:
    """
    Handles downloading and extracting data files from various sources.
    Supports direct URL downloads, Kaggle datasets, and GitHub repositories.
    """
    def __init__(self, config: DataIngestionConfig):
        self.config = config
    
    def _is_kaggle_url(self):
        """Check if the URL is from Kaggle."""
        return "kaggle" in self.config.source_URL.lower()
    
    def _is_github_url(self):
        """Check if the URL is from GitHub."""
        return "github" in self.config.source_URL.lower()
    
    def _download_using_kaggle_api(self):
        """Download dataset using Kaggle API if available."""
        try:
            import kaggle
            # Extract dataset name from URL or config
            if "/" in self.config.source_URL:
                dataset_name = self.config.source_URL.split("/")[-1]
            else:
                dataset_name = self.config.source_URL
                
            # Create directory for the download
            os.makedirs(os.path.dirname(self.config.local_data_file), exist_ok=True)
            
            # Download dataset
            kaggle.api.dataset_download_files(
                dataset_name,
                path=os.path.dirname(self.config.local_data_file),
                unzip=False
            )
            logger.info(f"Downloaded dataset using Kaggle API: {dataset_name}")
            return True
        except Exception as e:
            logger.warning(f"Kaggle API download failed: {e}")
            logger.info("Falling back to direct URL download")
            return False
    
    def download_file(self):
        """Download the data file from the source URL to the local path."""
        if os.path.exists(self.config.local_data_file):
            file_size = get_size(Path(self.config.local_data_file))
            logger.info(f"File already exists of size: {file_size}")
            
            # Verify it's a valid file (not an HTML error page)
            if self._is_valid_file():
                return
            else:
                logger.warning("Existing file appears to be invalid. Re-downloading...")
                os.remove(self.config.local_data_file)
        
        # Create directory for the download
        os.makedirs(os.path.dirname(self.config.local_data_file), exist_ok=True)
        
        # Try Kaggle API first if it's a Kaggle URL
        if self._is_kaggle_url() and self._download_using_kaggle_api():
            return
            
        # Otherwise, use direct URL download
        try:
            logger.info(f"Downloading file from {self.config.source_URL}")
            
            # GitHub URLs may need to be adjusted for raw content
            download_url = self.config.source_URL
            if self._is_github_url() and "/blob/" in download_url:
                download_url = download_url.replace("/blob/", "/raw/")
                
            filename, headers = request.urlretrieve(
                url=download_url,
                filename=self.config.local_data_file
            )
            logger.info(f"Downloaded {filename} with headers: \n{headers}")
            
            # Verify the downloaded file is valid
            if not self._is_valid_file():
                raise ValueError("Downloaded file is not valid. Check the URL and access permissions.")
                
        except Exception as e:
            logger.error(f"Error downloading file: {e}")
            raise
    
    def _is_valid_file(self):
        """
        Check if the downloaded file is valid by examining its content.
        Returns True if valid, False otherwise.
        """
        if not os.path.exists(self.config.local_data_file):
            return False
            
        try:
            # Check if it's a ZIP file
            with open(self.config.local_data_file, 'rb') as f:
                magic_number = f.read(4)
                # ZIP file signature is PK\x03\x04
                is_zip = magic_number.startswith(b'PK\x03\x04')
                
                # If it's supposed to be a ZIP file, validate it
                if self.config.local_data_file.suffix.lower() == '.zip' and not is_zip:
                    logger.warning(f"File has .zip extension but isn't a valid ZIP file")
                    return False
                    
                # Check if it's an HTML error page
                f.seek(0)
                content_start = f.read(1000).lower()
                if b'<!doctype html>' in content_start or b'<html' in content_start:
                    logger.warning("Downloaded file appears to be an HTML page, not data")
                    return False
                    
            return True
                
        except Exception as e:
            logger.error(f"Error validating file: {e}")
            return False
    
    def extract_zip_file(self):
        """Extract the downloaded ZIP file to the specified directory."""
        if not os.path.exists(self.config.local_data_file):
            raise FileNotFoundError(f"Zip file not found: {self.config.local_data_file}")
            
        try:
            # First verify it's a valid ZIP file
            try:
                with zipfile.ZipFile(self.config.local_data_file, 'r') as test_zip:
                    file_list = test_zip.namelist()
                    logger.info(f"ZIP file contains {len(file_list)} files")
            except zipfile.BadZipFile:
                # If the file extension is .csv, it might be a direct CSV file, not a ZIP
                if self.config.local_data_file.suffix.lower() == '.csv':
                    logger.info("The file is a CSV, not a ZIP file. Copying it to the unzip directory...")
                    
                    # Create unzip directory
                    os.makedirs(self.config.unzip_dir, exist_ok=True)
                    
                    # Copy the CSV file to the unzip directory
                    dest_file = os.path.join(self.config.unzip_dir, os.path.basename(self.config.local_data_file))
                    shutil.copy2(self.config.local_data_file, dest_file)
                    logger.info(f"Copied CSV file to {dest_file}")
                    return
                else:
                    # If it's not a CSV, re-raise the error
                    raise
                    
            # Create unzip directory
            unzip_path = self.config.unzip_dir
            os.makedirs(unzip_path, exist_ok=True)
            
            # Extract files
            with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
                zip_ref.extractall(unzip_path)
            
            logger.info(f"Extracted ZIP file to {unzip_path}")
            
            # List the extracted files
            extracted_files = os.listdir(unzip_path)
            logger.info(f"Extracted files: {extracted_files}")
            
        except zipfile.BadZipFile as e:
            logger.error(f"Bad ZIP file: {e}")
            # If the file exists but is not a valid ZIP, examine its content
            with open(self.config.local_data_file, 'rb') as f:
                content_start = f.read(100)
                logger.error(f"File starts with: {content_start}")
            raise
        except Exception as e:
            logger.error(f"Error extracting ZIP file: {e}")
            raise
            
    def initiate_data_ingestion(self):
        """
        Execute the complete data ingestion process:
        1. Download the data file
        2. Extract it (if it's a ZIP file)
        3. Return the path to the extracted data
        """
        logger.info("Starting data ingestion process")
        try:
            self.download_file()
            self.extract_zip_file()
            logger.info("Data ingestion completed successfully")
            return self.config.unzip_dir
        except Exception as e:
            logger.error(f"Data ingestion failed: {e}")
            raise

# 5. Pipeline Execution
Now let's execute the data ingestion pipeline with robust error handling:

In [17]:
def execute_data_ingestion():
    """Execute the complete data ingestion pipeline with error handling."""
    try:
        logger.info(">>> Data ingestion pipeline started <<<")
        
        # Initialize configuration
        config = ConfigurationManager()
        data_ingestion_config = config.get_data_ingestion_config()
        
        # Execute data ingestion
        data_ingestion = DataIngestion(config=data_ingestion_config)
        data_path = data_ingestion.initiate_data_ingestion()
        
        logger.info(f">>> Data ingestion completed. Data available at: {data_path} <<<")
        return data_path
        
    except Exception as e:
        logger.exception("Data ingestion failed")
        raise e

# Execute the pipeline
try:
    data_directory = execute_data_ingestion()
    print(f"Data ingestion successful! Data directory: {data_directory}")
except Exception as e:
    print(f"Data ingestion failed: {str(e)}")
    
    # Suggest possible fixes
    print("\nPossible solutions:")
    print("1. Check if the URL in config.yaml is correct and accessible")
    print("2. For Kaggle datasets, ensure you have the Kaggle API installed and configured")
    print("3. Verify your internet connection")
    print("4. If the file isn't a ZIP, modify the data_ingestion component to handle the file type")

[2025-05-12 13:01:18,531: INFO: 1890961549: >>> Data ingestion pipeline started <<<]
[2025-05-12 13:01:18,542: INFO: common: yaml file: config\config.yaml loaded successfully]
[2025-05-12 13:01:18,557: INFO: common: yaml file: params.yaml loaded successfully]
[2025-05-12 13:01:18,574: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-05-12 13:01:18,580: INFO: common: created directory at: artifacts]
[2025-05-12 13:01:18,585: INFO: common: created directory at: artifacts/data_ingestion]
[2025-05-12 13:01:18,586: INFO: 900315346: Starting data ingestion process]
[2025-05-12 13:01:18,593: INFO: 900315346: File already exists of size: ~ 5 KB]
[2025-05-12 13:01:18,597: ERROR: 900315346: Error validating file: 'str' object has no attribute 'suffix']
[2025-05-12 13:01:18,616: INFO: 900315346: Falling back to direct URL download]
[2025-05-12 13:01:18,619: INFO: 900315346: Downloading file from https://www.kaggle.com/datasets/uciml/red-wine-quality-cortez-et-al-2009/download]
[202