In [4]:
import os
os.chdir("../")

In [2]:
!pwd

/c/Users/DELL/OneDrive/Desktop/sensor_fault/src


In [5]:
import sys
import os
import numpy as np
import pandas as pd
from pymongo import MongoClient
from dataclasses import dataclass
from zipfile import Path


from src.exception import CustomException
from src.logger import logging
from src.utils.main_utils import MainUtils

from src.constant import *
# =========================
# Config
# =========================
@dataclass
class DataIngestionConfig:
    artifact_folder: str = os.path.join(ARTIFACT_FOLDER)


# =========================
# Data Ingestion
# =========================
class DataIngestion:
    def __init__(self):
        self.data_ingestion_config = DataIngestionConfig()
        self.utils = MainUtils()

    def export_collection_as_dataframe(self, collection_name, db_name):
        """
        Read MongoDB collection and return Pandas DataFrame
        """
        try:
            mongo_client = MongoClient(MONGO_DB_URL)
            collection = mongo_client[db_name][collection_name]

            df = pd.DataFrame(list(collection.find()))

            if "_id" in df.columns:
                df = df.drop(columns=["_id"], axis=1)

            df.replace({"na": np.nan}, inplace=True)

            return df

        except Exception as e:
            raise CustomException(e, sys)

    def export_data_into_feature_store_file_path(self) -> pd.DataFrame:
        """
        Save MongoDB data into CSV file
        """
        try:
            logging.info("Exporting data from MongoDB")

            raw_file_path = self.data_ingestion_config.artifact_folder
            os.makedirs(raw_file_path, exist_ok=True)

            sensor_data = self.export_collection_as_dataframe(
                collection_name=MONGO_COLLECTION_NAME,
                db_name=MONGO_DATABASE_NAME
            )

            logging.info(
                f"Saving exported data into feature store path: {raw_file_path}"
            )

            feature_store_file_path = os.path.join(
                raw_file_path, "wafer_fault.csv"
            )

            sensor_data.to_csv(feature_store_file_path, index=False)

            return feature_store_file_path

        except Exception as e:
            raise CustomException(e, sys)

    def initiate_data_ingestion(self) -> Path:
        """
        Start data ingestion process
        """
        logging.info(
            "Entered initiate_data_ingestion method of DataIngestion class"
        )

        try:
            feature_store_file_path = (
                self.export_data_into_feature_store_file_path()
            )

            logging.info("Got the data from MongoDB")
            logging.info(
                "Exited initiate_data_ingestion method of DataIngestion class"
            )

            return feature_store_file_path

        except Exception as e:
            raise CustomException(e, sys) from e
