# Data Ingestion Pipeline Research

In [1]:
import os

# check the current working dir
%pwd

# changing the dir to main repo
working_dir = "G:/ML_DL_Projects/MLOPS_Indian_Flight_Price_Prediction"
os.chdir(working_dir)

print(f"Current working directory is {os.getcwd()} ")

Current working directory is G:\ML_DL_Projects\MLOPS_Indian_Flight_Price_Prediction 


In [None]:
from dataclasses import dataclass
from pathlib import Path

# for src/<project_name>/entity/config_entity.py
# step 1 in workflow for Data ingestion
@dataclass
class DataIngestionConfig:
    root_dir : Path
    source_URL : str
    # kaggle_dataset : str
    local_data_file : Path
    unzip_dir : Path

In [None]:
# step 2 and 3 not needed now
# step 4 later on
# step 5 in workflow for data ingestion

# for src/<project_name>config/configuration.py
from src.indian_flight_price_prediction.constants import *
from src.indian_flight_price_prediction.utils.common import read_yaml, create_directories


In [None]:
# for src/<project_name>config/configuration.py
class ConfigurationManager:
    def __init__(self,
                config_file_path = CONFIG_FILE_PATH,
                params_file_path = PARAMS_FILE_PATH,
                schema_file_path = SCHEMA_FILE_PATH):
        
        self.config = read_yaml(config_file_path)
        self.params = read_yaml(params_file_path)
        self.schema = read_yaml(schema_file_path)

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
        create_directories([config.root_dir])

        data_ingestion_config = DataIngestionConfig(
            root_dir = config.root_dir,
            source_URL = config.source_URL,
            local_data_file = config.local_data_file,
            unzip_dir = config.unzip_dir
        )
        
        return data_ingestion_config

In [None]:
# for src/<project_name>/components/data_ingestion.py
import os
import urllib.request as request
from src.indian_flight_price_prediction.logger import logger
# from kaggle.api.kaggle_api_extended import KaggleApi
import zipfile

In [None]:
# compnent - Data Ingestion 
# for src/<project_name>/components/data_ingestion.py
class DataIngestion:
    def __init__(self, config : DataIngestionConfig):
        self.config = config
        # self.api = KaggleApi()
        # self.api.authenticate()
        
        # to be added to the code 
        # try:
        #     self.api.authenticate()
        # except Exception as e:
        #     raise RuntimeError("Failed to authenticate with Kaggle API") from e
    
    # Downloading the zip file
    def download_file(self):
        if not os.path.exists(self.config.local_data_file):
            filename, headers = request.urlretrieve(
                url = self.config.source_URL,
                filename = self.config.local_data_file
            )
            # dataset_name = self.config.kaggle_dataset
            # logger.info(f"Dataset {dataset_name} downloading!!")
            # self.api.dataset_download_files(
            #     dataset = dataset_name,
            #     path = os.path.dirname(self.config.local_data_file),
            #     force = True,
            #     quiet = False 
            # )
            logger.info(f"{filename} downloading with following info :\n {headers}!!")
        else:
            logger.info(f"Dataset file already exists locally.")

    # extract zip file
    def extract_zip_file(self):
        """
        zip_file_path : str
        Extracts the zip file into the directory
        """
        unzip_path = self.config.unzip_dir
        os.makedirs(unzip_path, exist_ok = True)
        zip_file_path = self.config.local_data_file
        # if not zip_file_path.endswith(".zip"):
        #     zip_file_path += ".zip"

        with zipfile.ZipFile(zip_file_path, 'r' ) as zip_ref:
            zip_ref.extractall(unzip_path)
            logger.info(f"Data extracted to {unzip_path}")

In [42]:
try:
    config = ConfigurationManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config = data_ingestion_config)
    data_ingestion.download_file()
    data_ingestion.extract_zip_file()
except Exception as e:
    raise e

[2025-04-21 00:03:02,825 : INFO : common : yaml file : config\config.yaml loaded successfully.]
[2025-04-21 00:03:02,847 : INFO : common : yaml file : params.yaml loaded successfully.]
[2025-04-21 00:03:02,860 : INFO : common : yaml file : schema.yaml loaded successfully.]
[2025-04-21 00:03:02,873 : INFO : common : Created directory at artifacts]
[2025-04-21 00:03:02,883 : INFO : common : Created directory at artifacts/data_ingestion]


[2025-04-21 00:03:03,723 : INFO : 887415457 : artifacts/data_ingestion/data.zip downloading with following info :
 Connection: close
Content-Length: 1514069
Cache-Control: max-age=300
Content-Security-Policy: default-src 'none'; style-src 'unsafe-inline'; sandbox
Content-Type: application/zip
ETag: "7bd18bccdb8b763b69796ea3f2f2c0926724b52a50f37715916a2b61b79007e5"
Strict-Transport-Security: max-age=31536000
X-Content-Type-Options: nosniff
X-Frame-Options: deny
X-XSS-Protection: 1; mode=block
X-GitHub-Request-Id: 1EDC:1442AC:30EE6D:8BC66A:68053DDB
Accept-Ranges: bytes
Date: Sun, 20 Apr 2025 18:33:04 GMT
Via: 1.1 varnish
X-Served-By: cache-bom4750-BOM
X-Cache: MISS
X-Cache-Hits: 0
X-Timer: S1745173984.767185,VS0,VE570
Vary: Authorization,Accept-Encoding,Origin
Access-Control-Allow-Origin: *
Cross-Origin-Resource-Policy: cross-origin
X-Fastly-Request-ID: 392875e9d86117f09b817a927233b77aea17660f
Expires: Sun, 20 Apr 2025 18:38:04 GMT
Source-Age: 0

!!]
[2025-04-21 00:03:04,467 : INFO : 887