In [1]:
import os

In [2]:
pwd

'c:\\Users\\Robin Aluma\\Desktop\\Stroke_Detection\\Resources'

In [3]:
os.chdir('../')

In [4]:
from dataclasses import dataclass
from pathlib import Path
import yaml


@dataclass
class DataIngestionConfig():
    root_url:Path
    source_url:str
    local_data_path:Path
    unzip_data:Path
    test_data:Path
    train_data:Path

In [5]:
from src.constants import *
from src.stroke_disease_detection.utils.common import read_yaml,create_directories

In [6]:
import urllib.request as request
import zipfile
import pandas as pd
from sklearn.model_selection import train_test_split
from Exceptions import CustomException
import sys
from logger import my_logger

In [7]:
class ConfigurationManager:
    def __init__(self, 
                 config_file_path = CONFIG_FILE_PATH,
                 params_file_path = PARAMS_FILE_PATH):
        self.config = read_yaml(config_file_path)
        self.params = read_yaml(params_file_path)
        
        create_directories([self.config.Artifacts_root])
    
    def data_ingestion_config(self)->DataIngestionConfig:
        config = self.config.data_ingestion
        
        create_directories([config.root_url])
        get_data_ingestion_config = DataIngestionConfig(
            root_url = config.root_url,
            source_url = config.source_url,
            local_data_path = config.local_data_path,
            unzip_data = config.unzip_data,
            test_data = config.test_data,
            train_data = config.train_data
        )
        
        return get_data_ingestion_config

In [8]:
class DataIngestion():
    def __init__(self,config:DataIngestionConfig):
        self.config = config
    def load_data(self):
        # This function loads data from github
        local_data_path = self.config.local_data_path
        if (not os.path.exists(local_data_path)):
            file_name,headers = request.urlretrieve(
                url=self.config.source_url,
                filename=local_data_path
            )
            my_logger.info(f"Data downloaded from: {self.config.source_url} into: {local_data_path}")
    def extract_data(self):
        # This function extracts data from the downloaded zipfile
        local_data_path = self.config.local_data_path
        unzip_data = self.config.unzip_data
        if unzip_data !="":
            os.makedirs(unzip_data,exist_ok=True)
        else:
            my_logger.info("Folder already exists")
        with zipfile.ZipFile(local_data_path) as zipref:
            zipref.extractall(unzip_data)
            
        my_logger.info(f"Extracted the zipfile: {local_data_path} into: {unzip_data}")
        
    
            
            
        
            

In [9]:
# Ingestion pipeline
class DataIngestionPipeline():
    def __init__(self):
        pass
    def main(self):
        try:
            config = ConfigurationManager()
            data_ingestion_config = config.data_ingestion_config()
            data_ingestion = DataIngestion(config=data_ingestion_config)
            data_ingestion.load_data()
            data_ingestion.extract_data()
        except Exception as e:
            raise CustomException(e,sys)

In [10]:
if __name__=='__main__':
    obj = DataIngestionPipeline()
    obj.main()

[ 2025-01-26 13:28:52,035:INFO:1458254983:Extracted the zipfile: artifacts/data_ingestion/data.zip into: artifacts/data_ingestion]
