In [1]:
import os
os.chdir("../")
%pwd

'/Users/tapankheni/Data_Science/Data Science Projects/Credit_Card_Fault_Prediction'

In [2]:

from dataclasses import dataclass
from pathlib import Path
from typing import List

@dataclass
class DataIngestionConfig:
    root_dir: Path
    local_data_path: List[str]


In [3]:
from CreditCardFraudDetection.constants import (
    PARAMS_YAML_FILE_PATH, CONFIG_YAML_FILE_PATH, SCHEMA_YAML_FILE_PATH,
    MONGO_DB_URL, MONGO_DB_DATABASE_NAME, MONGO_DB_COLLECTION_NAME)
from CreditCardFraudDetection.utils.common import read_yaml, create_directories

In [4]:
class ConfigurationManager:
    def __init__(self,
                 params_yaml_file_path: Path = PARAMS_YAML_FILE_PATH,
                 config_yaml_file_path: Path = CONFIG_YAML_FILE_PATH,
                 schema_yaml_file_path: Path = SCHEMA_YAML_FILE_PATH):
        
        self.params = read_yaml(params_yaml_file_path)
        self.config = read_yaml(config_yaml_file_path)
        self.schema = read_yaml(schema_yaml_file_path)

        create_directories([self.config.artifacts_root])

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion
        create_directories([config.root_dir])

        return DataIngestionConfig(
            root_dir = Path(config.root_dir),
            local_data_path = config.local_data_path
        )

In [5]:
import pandas as pd
import numpy as np
from pymongo import MongoClient
from CreditCardFraudDetection import logger
from CreditCardFraudDetection.utils.common import get_size

In [6]:
class DataIngestion:
    def __init__(self, config: DataIngestionConfig):
        self.config = config

    def export_collection_as_dataframe(self, collection_name: str) -> pd.DataFrame:
        logger.info("Exporting MongoDB collection to DataFrame")
        client = MongoClient(MONGO_DB_URL)
        collection = client[MONGO_DB_DATABASE_NAME][collection_name]

        try:
            client.server_info()
            logger.info("Connected to MongoDB")
        except Exception as e:
            logger.error(f"Connection failed: {e}")
            return pd.DataFrame()
        
        number_of_documents = collection.count_documents({})
        logger.info(f"Number of documents in collection {collection_name}: {number_of_documents}")

        try:
            data = list(collection.find())
            logger.info(f"Data retrieved from collection {data[:2]}")
        except Exception as e:
            logger.error(f"Failed to retrieve data from collection: {e}")
            return pd.DataFrame()
        
        df = pd.DataFrame(data)

        logger.info(f"DataFrame columns: {df.columns.to_list()}")

        if "_id" in df.columns.to_list():
            df.drop(columns=["_id"], axis=1, inplace=True)
        
        logger.info(f"DataFrame columns: {df.columns.to_list()}")

        logger.info(f"DataFrame shape: {df.shape}")
        logger.info("Data retrieved successfully")

        client.close()

        return df
    
    def export_data_to_file_path(self):
        logger.info("Exporting data from mongoDB to store it into the desired artifacts.")

        i = 0
        for collection_name in MONGO_DB_COLLECTION_NAME:
            logger.info(f"Exporting data from collection {collection_name}")
            
            df = self.export_collection_as_dataframe(collection_name)

            logger.info(f"Exporting data to {self.config.root_dir}")

            file_path = Path(self.config.local_data_path[i])
            if not os.path.exists(file_path):
                df.to_csv(file_path, index=False)

                logger.info(f"exported data from mongoDB to {file_path}")
            else:
                logger.info(f"file already exists at {file_path} of size {get_size(file_path)}")
            
            i += 1

    def initiate_data_ingestion(self):
        logger.info("data ingestion process initiated.")

        self.export_data_to_file_path()

        logger.info("data ingestion process completed.")

In [7]:
try:
    config_manager = ConfigurationManager()
    data_ingestion_config = config_manager.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.initiate_data_ingestion()
except Exception as e:
    logger.error(f"Error in data ingestion: {str(e)}")

[2024-07-05 13:49:35,964: INFO: common: yaml file: params.yaml loaded successfully]
[2024-07-05 13:49:35,968: INFO: common: yaml file: config/config.yaml loaded successfully]
[2024-07-05 13:49:35,972: INFO: common: yaml file: schema.yaml loaded successfully]
[2024-07-05 13:49:35,972: INFO: common: created directory at: artifacts]
[2024-07-05 13:49:35,973: INFO: common: created directory at: artifacts/data_ingestion]
[2024-07-05 13:49:35,973: INFO: 4041267636: data ingestion process initiated.]
[2024-07-05 13:49:35,973: INFO: 4041267636: Exporting data from mongoDB to store it into the desired artifacts.]
[2024-07-05 13:49:35,974: INFO: 4041267636: Exporting data from collection fraudulent_transactions_data]
[2024-07-05 13:49:35,974: INFO: 4041267636: Exporting MongoDB collection to DataFrame]
[2024-07-05 13:49:37,057: INFO: 4041267636: Connected to MongoDB]
[2024-07-05 13:49:37,348: INFO: 4041267636: Number of documents in collection fraudulent_transactions_data: 568630]
[2024-07-05 13