In [1]:
import os

In [2]:
%pwd

'/Users/a/Documents/DataScience_World/ML10_end_to_end/dsproject/CompleteDSproject/research'

In [3]:
os.chdir("../")
%pwd


'/Users/a/Documents/DataScience_World/ML10_end_to_end/dsproject/CompleteDSproject'

In [4]:
from dataclasses import dataclass
from pathlib import Path

@dataclass
class DataIngestionConfig:
    root_dir: Path
    source_URL: str
    local_data_file: Path

In [5]:
from src.datascience.constants import *
from src.datascience.utils.common import read_yaml, create_directories

In [6]:
from dataclasses import dataclass
from pathlib import Path

@dataclass
class DataIngestionConfig:
    root_dir: Path
    source_URL: str
    local_data_file: Path

class ConfigurationManager:
    def __init__(self, 
                 config_filepath=CONFIG_FILE_PATH,
                 params_filepath=PARAMS_FILE_PATH,
                 schema_filepath=SCHEMA_FILE_PATH):
        
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion

        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir=config.root_dir,
            source_URL=config.source_URL,
            local_data_file=config.local_data_file
        )

        return data_ingestion_config

In [7]:
import os
import logging
import urllib.request as request
import urllib.error
from os.path import getsize
import pandas as pd
from pathlib import Path
from typing import Tuple

class DataIngestionError(Exception):
    """Custom exception for data ingestion errors"""
    pass

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config
        self._validate_config()

    def _validate_config(self):
        """Validate configuration parameters"""
        if not self.config.source_URL.startswith(('http://', 'https://')):
            raise DataIngestionError(f"Invalid URL format: {self.config.source_URL}")
        
        # Validate paths
        if not isinstance(self.config.local_data_file, (str, Path)):
            raise DataIngestionError("local_data_file must be a string or Path object")

        # Validate file extension
        if not str(self.config.local_data_file).endswith('.csv'):
            raise DataIngestionError("local_data_file must have .csv extension")

    def _validate_url(self) -> Tuple[bool, str]:
        """Validate if URL is accessible"""
        try:
            # Try to open the URL without downloading
            with request.urlopen(self.config.source_URL) as response:
                if response.code == 200:
                    return True, "URL is accessible"
        except urllib.error.HTTPError as e:
            return False, f"HTTP Error: {e.code} - {e.reason}"
        except urllib.error.URLError as e:
            return False, f"URL Error: {e.reason}"
        except Exception as e:
            return False, f"Error validating URL: {str(e)}"
        return False, "Unknown error occurred while validating URL"

    def download_file(self):
        """Download CSV file from source URL with error handling"""
        if os.path.exists(self.config.local_data_file):
            file_size = getsize(self.config.local_data_file)
            logging.info(f"File already exists of size: {file_size} bytes")
            return

        # Create directory if it doesn't exist
        os.makedirs(os.path.dirname(self.config.local_data_file), exist_ok=True)

        # Validate URL before attempting download
        is_valid, message = self._validate_url()
        if not is_valid:
            raise DataIngestionError(f"URL validation failed: {message}")

        try:
            filename, headers = request.urlretrieve(
                url=self.config.source_URL,
                filename=self.config.local_data_file
            )
            logging.info(f"File downloaded successfully to {filename}")
            logging.debug(f"Download headers: {headers}")
            
            # Validate that the downloaded file is a valid CSV
            self._validate_csv_file(filename)
            
            return filename
        except Exception as e:
            raise DataIngestionError(f"Error downloading file: {str(e)}")

    def _validate_csv_file(self, file_path):
        """Validate that the file is a proper CSV file"""
        try:
            # Try to read the first few rows to validate CSV format
            df = pd.read_csv(file_path, nrows=5)
            logging.info(f"CSV file validated successfully. Shape: {df.shape}")
        except Exception as e:
            raise DataIngestionError(f"Invalid CSV file: {str(e)}")

    def get_data_frame(self):
        """Read the CSV file and return as pandas DataFrame"""
        if not os.path.exists(self.config.local_data_file):
            raise DataIngestionError("CSV file not found. Please download the file first.")
        
        try:
            df = pd.read_csv(self.config.local_data_file)
            logging.info(f"Data loaded successfully. Shape: {df.shape}")
            return df
        except Exception as e:
            raise DataIngestionError(f"Error reading CSV file: {str(e)}")

    def run(self):
        """Run the complete data ingestion process"""
        try:
            logging.info("Starting data ingestion process...")
            self.download_file()
            df = self.get_data_frame()
            logging.info("Data ingestion completed successfully")
            return df
        except Exception as e:
            logging.error(f"Data ingestion failed: {str(e)}")
            raise

In [8]:
try:
    config_mgr = ConfigurationManager()
    data_ingestion_config = config_mgr.get_data_ingestion_config()
    data_ingestion = DataIngestion(data_ingestion_config)
    df = data_ingestion.run()
except Exception as e:
    raise e

[2025-01-06 09:09:12,948: INFO: common: yaml file: config/config.yaml loaded successfully]
[2025-01-06 09:09:12,953: INFO: common: yaml file: params.yaml loaded successfully]
[2025-01-06 09:09:12,955: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-01-06 09:09:12,957: INFO: common: created directory at: artifacts]
[2025-01-06 09:09:12,958: INFO: common: created directory at: artifacts/data_ingestion]
[2025-01-06 09:09:12,959: INFO: 2730692500: Starting data ingestion process...]
[2025-01-06 09:09:14,365: INFO: 2730692500: File downloaded successfully to artifacts/data_ingestion/data.csv]
[2025-01-06 09:09:14,384: INFO: 2730692500: CSV file validated successfully. Shape: (5, 10)]
[2025-01-06 09:09:14,394: INFO: 2730692500: Data loaded successfully. Shape: (1000, 10)]
[2025-01-06 09:09:14,397: INFO: 2730692500: Data ingestion completed successfully]


In [9]:
try:
    # Initialize configuration
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    
    # Create data ingestion object and run the process
    data_ingestion = DataIngestion(config=data_ingestion_config)
    df = data_ingestion.run()
    
    # Print basic information about the loaded data
    print(f"\nData ingestion completed successfully!")
    print(f"DataFrame shape: {df.shape}")
    print("\nFirst few rows of the data:")
    print(df.head())
    
except Exception as e:
    print(f"Error during data ingestion: {str(e)}")
    raise e

[2025-01-06 09:09:14,413: INFO: common: yaml file: config/config.yaml loaded successfully]
[2025-01-06 09:09:14,418: INFO: common: yaml file: params.yaml loaded successfully]
[2025-01-06 09:09:14,419: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-01-06 09:09:14,421: INFO: common: created directory at: artifacts]
[2025-01-06 09:09:14,423: INFO: common: created directory at: artifacts/data_ingestion]
[2025-01-06 09:09:14,425: INFO: 2730692500: Starting data ingestion process...]
[2025-01-06 09:09:14,426: INFO: 2730692500: File already exists of size: 95369 bytes]
[2025-01-06 09:09:14,431: INFO: 2730692500: Data loaded successfully. Shape: (1000, 10)]
[2025-01-06 09:09:14,432: INFO: 2730692500: Data ingestion completed successfully]

Data ingestion completed successfully!
DataFrame shape: (1000, 10)

First few rows of the data:
   adulteration_id product_name   brand   category             adulterant  \
0                1       Butter  BrandB       Meat  Artificial sweet