In [1]:
import os 

In [None]:
%pwd

In [2]:
os.chdir("../")

In [None]:
%pwd

In [3]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    source_URL: str
    local_data_file: Path
    unzip_dir: Path

In [12]:
from cnnClassifier.constants import *
from cnnClassifier.utils.common import read_yaml, create_directories

In [6]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])


    
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion

        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir=config.root_dir,
            source_URL=config.source_URL,
            local_data_file=config.local_data_file,
            unzip_dir=config.unzip_dir 
        )

        return data_ingestion_config
      

In [8]:
!pip install kagglehub



In [29]:
import kagglehub
import kagglehub
import os
import zipfile
from pathlib import Path
from cnnClassifier import logger



class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    
    def download_file(self):
        if not os.path.exists(self.config.local_data_file):
            try:
                logger.info(f"Downloading dataset from Kaggle: {self.config.source_URL}")
                
                # Extract dataset owner and name from URL
                # Example URL: "https://www.kaggle.com/datasets/techsash/waste-classification-data"
                url_parts = self.config.source_URL.strip('/').split('/')
                dataset_owner = url_parts[-2]
                dataset_name = url_parts[-1]
                
                logger.info(f"Downloading dataset: {dataset_owner}/{dataset_name}")
                
                # Download dataset using kagglehub
                path = kagglehub.dataset_download(f"{dataset_owner}/{dataset_name}")
                
                # The downloaded file might be a zip file or directory
                # If it's a directory, we need to zip it or handle accordingly
                if os.path.isdir(path):
                    # Create a zip file from the downloaded directory
                    with zipfile.ZipFile(self.config.local_data_file, 'w', zipfile.ZIP_DEFLATED) as zipf:
                        for root, dirs, files in os.walk(path):
                            for file in files:
                                file_path = os.path.join(root, file)
                                arcname = os.path.relpath(file_path, path)
                                zipf.write(file_path, arcname)
                    logger.info(f"Downloaded dataset zipped to: {self.config.local_data_file}")
                else:
                    # If it's already a file, move/copy it to our desired location
                    import shutil
                    shutil.move(path, self.config.local_data_file)
                    logger.info(f"Downloaded file moved to: {self.config.local_data_file}")
                
                logger.info(f"Dataset downloaded successfully!")
                
            except Exception as e:
                logger.error(f"Error downloading dataset from Kaggle: {e}")
                raise e
        else:
            logger.info(f"File already exists ")  


    
    def extract_zip_file(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """
        unzip_path = self.config.unzip_dir
        os.makedirs(unzip_path, exist_ok=True)
        
        logger.info(f"Extracting zip file: {self.config.local_data_file}")
        logger.info(f"Extracting to: {unzip_path}")
        
        with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
            zip_ref.extractall(unzip_path)
        
        logger.info(f"Extraction completed to: {unzip_path}")

In [30]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.download_file()
    data_ingestion.extract_zip_file()
    
except Exception as e:
    raise e

[2025-09-20 04:45:48,044: INFO: common: yaml file: config\config.yaml loaded successfully]
[2025-09-20 04:45:48,045: INFO: common: yaml file: params.yaml loaded successfully]
[2025-09-20 04:45:48,047: INFO: common: created directory at: artifacts]
[2025-09-20 04:45:48,048: INFO: common: created directory at: artifacts/data_ingestion]
[2025-09-20 04:45:48,049: INFO: 3987774324: File already exists ]
[2025-09-20 04:45:48,050: INFO: 3987774324: Extracting zip file: artifacts/data_ingestion/data.zip]
[2025-09-20 04:45:48,050: INFO: 3987774324: Extracting to: artifacts/data_ingestion]
[2025-09-20 04:45:50,171: INFO: 3987774324: Extraction completed to: artifacts/data_ingestion]
