In [None]:
#Import necessary libraries
import os
from dataclasses import dataclass
from pathlib import Path
import urllib.request as request
import zipfile
from src.utils.common import read_yaml, create_directories
from src.constants import *
from src.utils.common import get_size
from src.logging import logger

In [None]:
#Get the current working directory
%pwd

In [None]:
#Change directory to project root
os.chdir("../")
%pwd

In [None]:
#Configuration class for data ingestion
@dataclass(frozen=True)  #frozen makes the instance immutable
class DataIngestionConfig:
    root_dir: Path
    source_URL: str
    local_data_file: Path
    unzip_dir: Path

In [None]:
#Configuration Manager class to handle configurations
class ConfigurationManager:

    #Initialization method to read config, params, and schema files
    def __init__(
        self,
        config_file_path = CONFIG_FILE_PATH,
        params_file_path = PARAMS_FILE_PATH,
        schema_file_path = SCHEMA_FILE_PATH):

        self.config = read_yaml(config_file_path)
        self.params = read_yaml(params_file_path)    
        self.schema = read_yaml(schema_file_path)

        create_directories([self.config.artifacts_root])


    #Method to get data ingestion configuration
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion

        create_directories([config.root_dir])

        data_ingestion_congfig = DataIngestionConfig(
            root_dir = config.root_dir,
            source_URL= config.source_URL,
            local_data_file= config.local_data_file,
            unzip_dir= config.unzip_dir
        )
        return data_ingestion_congfig

In [None]:
#Data Ingestion class to handle data downloading and extraction (Date Ingestion component)
class DataIngestion:

    #Initialization method to set up configuration
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    #Method to download file from source URL
    def download_file(self):
        if not os.path.exists(self.config.local_data_file):
            filename, headers = request.urlretrieve(
                url = self.config.source_URL,
                filename = self.config.local_data_file
            )
            logger.info(f"File: {filename} downloaded with following info: \n{headers}")
        else:
            logger.info(f"File already exists of size: {get_size(Path(self.config.local_data_file))}")
    
    #Method to extract the downloaded zip file
    def extract_zip_file(self):
        unzip_path = self.config.unzip_dir
        os.makedirs(unzip_path, exist_ok= True)  #Create directory if it doesn't exist
        
        #Extract the zip file
        with zipfile.ZipFile(self.config.local_data_file, 'r') as zip_ref:
            zip_ref.extractall(unzip_path)  #Extract all contents to the specified directory
        logger.info(f"File extracted to: {unzip_path}")




In [None]:
#Pipeline to execute data ingestion
try:
    config = ConfigurationManager()  #Initialize configuration manager
    data_ingestion_config = config.get_data_ingestion_config()  #Get data ingestion configuration
    data_ingestion = DataIngestion(config= data_ingestion_config)  #Initialize data ingestion
    data_ingestion.download_file()  #Download the data file
    data_ingestion.extract_zip_file()  #Extract the downloaded file
except Exception as e:
    logger.exception(e)