In [1]:
import os 

In [2]:
%pwd

'd:\\ML_Deployment\\Automadata\\research'

In [3]:
os.chdir('../')
%pwd

'd:\\ML_Deployment\\Automadata'

In [4]:
import os
from pymongo.mongo_client import MongoClient
from src.utils.common import get_size,  read_yaml, create_directories
from src.config.configuration import DataIngestionConfig
from src.constants import *

In [5]:
from src.logging import logger

In [6]:
import pandas as pd

In [7]:
from dataclasses import dataclass

In [8]:
@dataclass(frozen=True)
class DataIngestionConfig:
    root_dir: Path
    db_uri: str
    raw_data_dir: Path


In [9]:
class ConfigManager:
    
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH):
        
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)
        
        create_directories([self.config.artifacts_root])
    
    
    def get_data_ingestion_config(self) -> DataIngestionConfig:
        config = self.config.data_ingestion 
        
        create_directories([config.root_dir])
        
        data_ingestion_config = DataIngestionConfig(
            root_dir=config.root_dir,
            db_uri= config.db_uri,
            raw_data_dir  = config.raw_data_dir    
        )
        return data_ingestion_config

In [10]:
class DataIngestion:
    def __init__(self, config:DataIngestionConfig):
        self.config = config
        
    def export_collection_as_df(self,collection_name, db_name):
        
        try:   
            client = MongoClient(MONGO_DB_URL)
            collection = client[db_name][collection_name]
            
            # Retrieve the data from the MongoDB collection
            cursor = collection.find()

            # Convert the MongoDB cursor to a list of dictionaries
            data_list = list(cursor)

            # Convert the list of dictionaries to a DataFrame
            df = pd.DataFrame(data_list)

            # drop "_id"
            if '_id' in df.columns:
                df =  df.drop('_id', axis=1)
            return df
        except Exception as e:
            raise e
    
    def export_data_into_feature_store_file_path(self):
        """
       
        """
        try:
            logger.info(f"Exporting data from mongodb")
            raw_file_path  = self.config.raw_data_dir
            os.makedirs(raw_file_path,exist_ok=True)

            taxi_data = self.export_collection_as_df(
                                                   collection_name= MONGO_COLLECTION_NAME,
                                                   db_name = MONGO_DATABASE_NAME
                                                        )
            
            
            logger.info(f"Saving exported data into feature store file path: {raw_file_path}")
        
            feature_store_file_path = os.path.join(raw_file_path,'nyc_taxi_data.csv')
            os.makedirs(feature_store_file_path,exist_ok=True)
            taxi_data.to_csv(feature_store_file_path,index=False)
           

            return feature_store_file_path
            
            

        except Exception as e:
            raise e           

In [11]:
try: 
    config = ConfigManager()
    data_ingestion_config = config.get_data_ingestion_config()
    data_ingestion = DataIngestion(config=data_ingestion_config)
    data_ingestion.export_data_into_feature_store_file_path()
except Exception as e:
    raise e


[2024-01-13 22:37:56,850: INFO: common: yaml file: config\config.yaml loaded successfully]
[2024-01-13 22:37:56,852: INFO: common: yaml file: params.yaml loaded successfully]
[2024-01-13 22:37:56,855: INFO: common: yaml file: schema.yaml loaded successfully]
[2024-01-13 22:37:56,856: INFO: common: created directory at: artifacts]
[2024-01-13 22:37:56,857: INFO: common: created directory at: artifacts/data_ingestion]
[2024-01-13 22:37:56,857: INFO: 3285830496: Exporting data from mongodb]
[2024-01-13 22:38:05,619: INFO: 3285830496: Saving exported data into feature store file path: artifacts/data_ingestion]
