In [1]:
import os
import sys

In [2]:
os.chdir('../')

In [3]:
sys.path.append(os.path.join(os.getcwd(), 'src'))

In [4]:
from dataclasses import dataclass
from pathlib import Path
from NeuroScan.utils.helpers import *
from NeuroScan.constants.paths import *

In [5]:
@dataclass
class DataIngestionConfig:
    root_dir: Path
    download_url: str
    raw_data_dir: Path
    extracted_data_dir: Path
    kaggle_username: str
    kaggle_api_key: str

In [6]:
from dotenv import load_dotenv

class DataConfigurationManager:
    def __init__(self, config_file=CONFIG_PATH, params_file=PARAMS_PATH):
        self.config = read_yaml(config_file)
        self.params = read_yaml(params_file)

        load_dotenv()

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.ingestion

        return DataIngestionConfig(
            root_dir=Path(config.root_dir),
            download_url=config.download_url,
            raw_data_dir=Path(config.raw_data_dir),
            extracted_data_dir=Path(config.extracted_data_dir),
            kaggle_username=os.getenv("kaggle_username"),
            kaggle_api_key=os.getenv("kaggle_api_key")
        )

In [7]:
from kaggle.api.kaggle_api_extended import KaggleApi
import zipfile
import os
from pathlib import Path
import glob

class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    def set_kaggle_credentials(self):
        os.environ["KAGGLE_USERNAME"] = self.config.kaggle_username
        os.environ["KAGGLE_KEY"] = self.config.kaggle_api_key
        logger.info("Kaggle credentials set for API access.")

    def download_file(self):
        self.set_kaggle_credentials()

        if not os.path.exists(self.config.extracted_data_dir) or not os.listdir(self.config.extracted_data_dir):
            os.makedirs(self.config.raw_data_dir, exist_ok=True)
            logger.info("Downloading dataset from Kaggle API...")

            api = KaggleApi()
            api.authenticate()

            api.dataset_download_files(
                dataset=self.config.download_url,
                path=str(self.config.raw_data_dir),
                unzip=False,
                quiet=False
            )

            zip_files = glob.glob(str(self.config.raw_data_dir / "*.zip"))
            if not zip_files:
                raise FileNotFoundError("No .zip file found in raw_data_dir after download.")

            zip_path = zip_files[0]  # assume only one zip
            logger.info(f"Dataset downloaded as: {zip_path}")

            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(self.config.raw_data_dir)
            logger.info(f"Dataset extracted to: {self.config.raw_data_dir}")

            os.remove(zip_path)
        else:
            logger.info(f"Dataset already exists at: {self.config.extracted_data_dir}")


In [None]:
config = DataConfigurationManager()
ingestion_config = config.get_data_ingestion_config()
data_ingestor = DataIngestion(config=ingestion_config)
data_ingestor.download_file()