---

#### Importing necessary libraries and packages


In [1]:
import os
import pandas as pd
import json
from pymongo import MongoClient
from dotenv import load_dotenv
import psycopg2
from psycopg2 import sql
from datetime import datetime, timedelta, timezone

In [2]:
class MongoToPostgresELT:

    # --------------------------------------------------------
    # Initialize Environment Variables and Database Connection
    def __init__(self):
        """
        Initializes the MongoToPostgresETL class.
        Loads environment variables and sets up MongoDB and PostgreSQL connection parameters.
        """

        # Load environment variables from .env file
        load_dotenv()

        self.mongo_url = os.getenv("MONGO_DB_URL")
        self.mongo_db_database_name = os.getenv("MONGODB_DB_NAME")
        self.mongo_client = MongoClient(self.mongo_url)
        self.db = self.mongo_client[
            self.mongo_db_database_name
        ]  # Initialize the database here

        # PostgreSQL connection parameters from .env
        self.pg_host = os.getenv("PG_HOST")
        self.pg_database = os.getenv("PG_DATABASE")
        self.pg_user = os.getenv("PG_USER")
        self.pg_password = os.getenv("PG_PASSWORD")
        self.pg_port = os.getenv("PG_PORT")
        self.pg_connection = None

        self.unique_id_mapping = {
            "customers": {
                "unique_id_key_col": "customer_id",
                "table_name": "tbl_customers",
            },
            "loan_types": {
                "unique_id_key_col": "loan_type_id",
                "table_name": "tbl_loan_types",
            },
            "loan_applications": {
                "unique_id_key_col": "loan_id",
                "table_name": "tbl_loan_applications",
            },
            "loan_repayments": {
                "unique_id_key_col": "repayment_id",
                "table_name": "tbl_loan_repayments",
            },
            "loan_history": {
                "unique_id_key_col": "history_id",
                "table_name": "tbl_loan_history",
            },
            "loan_collateral": {
                "unique_id_key_col": "collateral_id",
                "table_name": "tbl_loan_collateral",
            },
            "loan_restructuring": {
                "unique_id_key_col": "restructuring_id",
                "table_name": "tbl_loan_restructuring",
            },
            "loan_disbursements": {
                "unique_id_key_col": "disbursement_id",
                "table_name": "tbl_loan_disbursements",
            },
        }

    def get_mongo_client(self) -> MongoClient:
        """
        Establishes and returns a MongoDB client connection.

        Returns:
        MongoClient: MongoDB client instance."""

        if self.mongo_client is None:
            self.mongo_client = MongoClient(self.mongo_url)
        return self.mongo_client

    def connect_to_postgres(self):
        """
        Establish a connection to the PostgreSQL database.

        Returns:
        connection: A connection object to the PostgreSQL database."""

        try:
            connection = psycopg2.connect(
                dbname=os.getenv("PG_DATABASE"),
                user=os.getenv("PG_USER"),
                password=os.getenv("PG_PASSWORD"),
                host=os.getenv("PG_HOST"),
                port=os.getenv("PG_PORT"),
            )
            print("Connection to PostgreSQL established successfully.")
            return connection
        except Exception as e:
            print(f"Failed to connect to PostgreSQL: {e}")
            return None

    # ---------------------------------------------------------
    # load callection as dataframe to work on them as required.

    def load_collection_as_dataframe(self, collection_name: str) -> pd.DataFrame:
        """
        Converts a MongoDB collection into a pandas DataFrame.

        Parameters:
        collection_name (str): The name of the MongoDB collection.
        db_name (str): The MongoDB database name.

        Returns:
        pd.DataFrame: A DataFrame containing the MongoDB collection data."""

        client = self.get_mongo_client()
        db = client[self.mongo_db_database_name]
        collection = db[collection_name]
        data = list(collection.find())

        # Convert the data to a DataFrame
        df = pd.DataFrame(data)

        # Check if '_id' column exists and drop it if it does
        if "_id" in df.columns:
            df = df.drop(columns=["_id"])

        return df

    def load_dataframe_to_postgres(
        self, df: pd.DataFrame, table_name: str, json_columns: list = []
    ):
        """
        Writes a pandas DataFrame to a PostgreSQL table, with support for JSON fields.

        Parameters:
        df (pd.DataFrame): The DataFrame to be written to PostgreSQL.
        table_name (str): The name of the PostgreSQL table where data will be inserted.
        json_columns (list): List of column names in the DataFrame that contain JSON data.
                            These columns will be converted to JSON strings and cast to 'jsonb' in PostgreSQL.
        """
        # Establish connection for this operation
        connection = self.connect_to_postgres()  # Always establish a new connection
        cursor = connection.cursor()

        # Handle conversion of specified columns to JSON strings
        for column in json_columns:
            df[column] = df[column].apply(json.dumps)

        # Prepare the columns and placeholders for SQL query
        columns = df.columns.tolist()

        # Create SQL query for insertion, casting JSON columns to jsonb
        insert_query = sql.SQL(
            """
            INSERT INTO {} ({}) 
            VALUES ({})"""
        ).format(
            sql.Identifier(table_name),
            sql.SQL(", ").join(map(sql.Identifier, columns)),
            sql.SQL(", ").join(
                sql.SQL("%s::jsonb") if col in json_columns else sql.Placeholder()
                for col in columns
            ),
        )

        # Convert DataFrame rows to tuples
        data_tuples = [tuple(row) for row in df.itertuples(index=False)]

        try:
            # Execute the insertion query
            cursor.executemany(insert_query, data_tuples)
            connection.commit()
            print(f"Data successfully loaded into {table_name} table.")
        except Exception as error:
            print(f"Error inserting data into PostgreSQL: {error}")
            connection.rollback()
        finally:
            cursor.close()
            connection.close()

    # --------------------------------------------------
    # function to perfrom full load and incremental load

    def perform_full_load(self, dataframes: dict):
        """
        Loads all DataFrames from the provided dictionary into corresponding PostgreSQL tables.

        Parameters:
        dataframes (dict): A dictionary where keys are DataFrame names (e.g., 'customers__df')
                           and values are the corresponding pandas DataFrames."""

        for df_name, df in dataframes.items():
            table_name = df_name.replace(
                "__df", ""
            )  # Remove '__df' to get the table name

            # Determine if any JSON columns need special handling
            json_columns = (
                []
            )  # Define any specific columns that are JSON formatted if needed

            # Check for JSON columns in the DataFrame
            if "new_loan_terms" in df.columns or "restructure_terms" in df.columns:
                json_columns = [
                    "new_loan_terms",
                    "restructure_terms",
                ]  # Adjust based on your DataFrame structure

            try:
                self.load_dataframe_to_postgres(
                    df, f"tbl_{table_name}", json_columns=json_columns
                )  # Call the loading function
                print(f"Successfully loaded data into table: tbl_{table_name}")

            except Exception as e:
                print(f"Error loading data into table {table_name}: {e}")

    def incremental_load(self):
        """
        - Performs an incremental load for all MongoDB collections into corresponding PostgreSQL tables,
        based on the defined unique_id_mapping.
        - This function handles both new document insertions and updates for each collection, synchronizing changes with PostgreSQL tables.
        """
        # Iterate over each collection in the unique_id_mapping
        for collection_name, config in self.unique_id_mapping.items():
            collection_unique_id_col = config["unique_id_key_col"]
            table_name = config["table_name"]
            table_unique_id_col = config["unique_id_key_col"]

            # Step 1: Fetch the latest 'added_at' and 'modified_at' from the PostgreSQL table
            last_added_at, last_modified_at = self.get_latest_timestamps(table_name)

            # Step 2: Load MongoDB collection as DataFrame for filtering
            df = self.load_collection_as_dataframe(collection_name)

            # Step 3: Filter for new and updated records in MongoDB
            new_records = df[df["added_at"] > last_added_at]
            updated_records = df[
                (df["modified_at"] > last_modified_at)
                & (df["added_at"] <= last_added_at)
            ]

            # Step 4: Process new documents
            if not new_records.empty:
                for _, record in new_records.iterrows():
                    # Check for document presence in MongoDB and insert if absent
                    if not self.document_exists_in_mongo(
                        collection_name,
                        collection_unique_id_col,
                        record[collection_unique_id_col],
                    ):
                        self.insert_document(collection_name, record.to_dict())
                        # Insert new row into PostgreSQL
                        self.load_dataframe_to_postgres(
                            pd.DataFrame([record]), table_name
                        )
                    else:
                        print(
                            f"Document with {collection_unique_id_col} {record[collection_unique_id_col]} already exists in MongoDB."
                        )

            # Step 5: Process updated documents
            if not updated_records.empty:
                for _, record in updated_records.iterrows():
                    # Update document in MongoDB
                    self.update_document_in_mongo(
                        collection_name, record.to_dict(), collection_unique_id_col
                    )
                    # Update row in PostgreSQL
                    self.update_record_in_postgres(
                        record, table_name, table_unique_id_col
                    )

        print("Incremental load completed for all collections.")

    # -------------------------------------------------------------------
    # Function to Normalize tables and load into their repspective tables

    def normalize_nested_fields(
        self, df: pd.DataFrame, table_name: str, nested_fields: dict
    ):
        """
        Normalizes nested fields in a DataFrame and inserts data into PostgreSQL.

        Parameters:
        df (pd.DataFrame): The DataFrame containing data with nested fields.
        table_name (str): The name of the main PostgreSQL table.
        nested_fields (dict): A dictionary where keys are nested fields in df,
                            and values are the corresponding table names in PostgreSQL.
        """
        # Iterate over each nested field
        for nested_field, nested_table_name in nested_fields.items():
            if nested_field in df.columns:
                # Normalize the nested field into a separate DataFrame
                nested_df = pd.json_normalize(df[nested_field])
                nested_df.columns = [
                    col.replace(".", "_") for col in nested_df.columns
                ]  # Flatten column names

                # Add unique IDs for the nested records
                nested_df[f"{nested_table_name}_id"] = range(1, len(nested_df) + 1)

                # Merge the IDs back to the main DataFrame
                df = df.join(nested_df[f"{nested_table_name}_id"])

                # Insert nested data into its respective table
                self.load_dataframe_to_postgres(nested_df, nested_table_name)

                # Drop original nested column from main DataFrame
                df = df.drop(columns=[nested_field])

        # Insert the main table with foreign keys for the nested tables
        self.load_dataframe_to_postgres(df, table_name)

    # -----------------------------------------------------------
    # Helper Function to for insertion and updation into MongoDB

    def insert_document(self, collection_name: str, document: dict):
        """
        Inserts a document into the specified MongoDB collection only if
        a document with the same unique identifier does not already exist.

        Parameters:
        collection_name (str): The name of the MongoDB collection.
        document (dict): The document to insert into the collection.
        """

        # Retrieve unique identifier configuration for the collection
        config = self.unique_id_mapping.get(collection_name)
        if config is None:
            print(
                f"No configuration found for collection '{collection_name}'. Document not inserted."
            )
            return

        # Extract the unique identifier column for the collection
        unique_id_field = config["unique_id_key_col"]

        # Connect to collection
        collection = self.db[collection_name]

        # Check for existence of the document by unique identifier
        if unique_id_field in document:
            existing_doc = collection.find_one(
                {unique_id_field: document[unique_id_field]}
            )
            if existing_doc:
                print(
                    f"Document with {unique_id_field} = {document[unique_id_field]} already exists in collection '{collection_name}'."
                )
                return

        # Set added_at and modified_at to the current date and time as per Indian Standard Time (IST)
        current_time_ist = datetime.now(timezone.utc)
        document["added_at"] = current_time_ist
        document["modified_at"] = current_time_ist

        # Insert the document if unique identifier check passes
        result = collection.insert_one(document)
        print(f"Document inserted with ID: {result.inserted_id}")

    def update_document(self, collection_name: str, query: dict, update: dict):
        """
        Updates an existing document in the specified MongoDB collection.

        Parameters:
        collection_name (str): The name of the MongoDB collection.
        query (dict): The query to identify the document to update.
        update (dict): The updates to apply to the document."""

        collection = self.db[collection_name]

        # Set modified_at to the current date and time as per Indian Standard Time (IST)
        current_time_ist = datetime.now(timezone.utc)
        update["modified_at"] = current_time_ist

        result = collection.update_one(query, {"$set": update})
        if result.matched_count > 0:
            print(f"Document updated: {result.modified_count} document(s) modified.")
        else:
            print("No document found matching the query.")

    # -----------------------------------------------------------
    # Helper Function to for insertion and updation into PostgreSQL

    def update_record_in_postgres(
        self, record: pd.Series, table_name: str, unique_id_col: str
    ):
        """
        Updates an existing record in the PostgreSQL table based on a unique identifier.

        Parameters:
        record (pd.Series): The record to update.
        table_name (str): The name of the PostgreSQL table to update.
        unique_id_col (str): The name of the unique identifier column used for the update.
        """

        # Establish connection for this operation
        connection = self.connect_to_postgres()
        cursor = connection.cursor()

        # Prepare the SQL UPDATE statement
        set_clause = ", ".join(
            [f"{col} = %s" for col in record.index if col != unique_id_col]
        )  # Exclude unique identifier
        update_query = f"""
            UPDATE {table_name} 
            SET {set_clause} 
            WHERE {unique_id_col} = %s
        """

        # Values to be updated
        values = tuple(record[col] for col in record.index if col != unique_id_col) + (
            record[unique_id_col],
        )

        try:
            # Execute the update query
            cursor.execute(update_query, values)
            connection.commit()
            print(
                f"Record with {unique_id_col} {record[unique_id_col]} updated successfully."
            )
        except Exception as error:
            print(f"Error updating record in PostgreSQL: {error}")
            connection.rollback()
        finally:
            cursor.close()
            connection.close()

    # -----------------------------------------------------------
    # Helper Function to extract latest added_at and modified_at from PostgreSQL
    def get_latest_timestamps(self, table_name):
        """
        - Retrieves the latest added_at and modified_at timestamps from the specified PostgreSQL table.
        - incremental_load() method is dependent on the data extracted by this method to perform as expected.
        """
        query = f"""
        SELECT MAX(added_at) AS last_added_at, MAX(modified_at) AS last_modified_at
        FROM {table_name};
        """
        connection = self.connect_to_postgres()
        cursor = connection.cursor()
        cursor.execute(query)
        result = cursor.fetchone()
        cursor.close()
        connection.close()
        return result[0], result[1]

In [3]:
elt = MongoToPostgresELT()

In [4]:
# List of MongoDB collections
collections = [
    "customers",
    "loan_types",
    "loan_applications",
    "loan_repayments",
    "loan_history",
    "loan_collateral",
    "loan_restructuring",
    "loan_disbursements",
]

# Dictionary to hold DataFrames
collections_dict = {}

# Loop through each collection name and load it into a DataFrame
for collection in collections:
    df_name = f"{collection}"  # Create a dynamic variable name
    collections_dict[df_name] = elt.load_collection_as_dataframe(
        collection
    )  # Load DataFrame

In [5]:
# # -----------------------------------------------------------------------------
# collections_dict["customers"]["joined_date"] = collections_dict["customers"][
#     "joined_date"
# ].replace({pd.NaT: None})


# # -----------------------------------------------------------------------------
# collections_dict["loan_applications"]["application_date"] = collections_dict[
#     "loan_applications"
# ]["application_date"].replace({pd.NaT: None})

# collections_dict["loan_applications"]["approval_date"] = collections_dict[
#     "loan_applications"
# ]["approval_date"].replace({pd.NaT: None})


# # -----------------------------------------------------------------------------
# collections_dict["loan_disbursements"]["disbursement_date"] = collections_dict[
#     "loan_disbursements"
# ]["disbursement_date"].replace({pd.NaT: None})

# collections_dict["loan_disbursements"]["application_date"] = collections_dict[
#     "loan_disbursements"
# ]["application_date"].replace({pd.NaT: None})


# # -----------------------------------------------------------------------------
# collections_dict["loan_history"]["loan_disbursed_date"] = collections_dict[
#     "loan_history"
# ]["loan_disbursed_date"].replace({pd.NaT: None})

# collections_dict["loan_history"]["loan_repaid_date"] = collections_dict["loan_history"][
#     "loan_repaid_date"
# ].replace({pd.NaT: None})


# # -----------------------------------------------------------------------------
# collections_dict["loan_repayments"]["repayment_date"] = collections_dict[
#     "loan_repayments"
# ]["repayment_date"].replace({pd.NaT: None})

In [6]:
def replace_nat_with_none(collections_dict, columns_to_replace):
    """
    Replaces pd.NaT with None in the specified columns of each collection in collections_dict.

    Parameters:
    collections_dict (dict): Dictionary where keys are collection names and values are DataFrames.
    columns_to_replace (dict): Dictionary where keys are collection names and values are lists of columns
                               in which to replace pd.NaT with None.
    """
    for collection_name, columns in columns_to_replace.items():
        for column in columns:
            if column in collections_dict[collection_name].columns:
                collections_dict[collection_name][column] = collections_dict[
                    collection_name
                ][column].replace({pd.NaT: None})


# Define the columns to replace pd.NaT with None for each collection
columns_to_replace = {
    "customers": ["joined_date"],
    "loan_applications": ["application_date", "approval_date"],
    "loan_disbursements": ["disbursement_date", "application_date"],
    "loan_history": ["loan_disbursed_date", "loan_repaid_date"],
    "loan_repayments": ["repayment_date"],
}

# Replace pd.NaT with None in the specified columns of each collection
replace_nat_with_none(collections_dict, columns_to_replace)

In [7]:
# Perfrom Full Load

elt.perform_full_load(collections_dict)

Connection to PostgreSQL established successfully.
Data successfully loaded into tbl_customers table.
Successfully loaded data into table: tbl_customers
Connection to PostgreSQL established successfully.
Data successfully loaded into tbl_loan_types table.
Successfully loaded data into table: tbl_loan_types
Connection to PostgreSQL established successfully.
Data successfully loaded into tbl_loan_applications table.
Successfully loaded data into table: tbl_loan_applications
Connection to PostgreSQL established successfully.
Data successfully loaded into tbl_loan_repayments table.
Successfully loaded data into table: tbl_loan_repayments
Connection to PostgreSQL established successfully.
Data successfully loaded into tbl_loan_history table.
Successfully loaded data into table: tbl_loan_history
Connection to PostgreSQL established successfully.
Data successfully loaded into tbl_loan_collateral table.
Successfully loaded data into table: tbl_loan_collateral
Connection to PostgreSQL establish

In [10]:
# Add 1 test into mongodb

customer = {
    "customer_id": 381617992991,
    "first_name": "Laurens",
    "last_name": "Dana",
    "gender": "Male",
    "age": 24,
    "employment_status": "employed",
    "income_level": "low",
    "location": "Phoenix",
    "joined_date": "2020-04-07T05:30:00.000Z",
    "added_at": datetime.now(timezone.utc),
    "modified_at": datetime.now(timezone.utc),
}

In [11]:
elt.insert_document("customers", customer)

Document inserted with ID: 672210af27e6422f0142dfb9


In [11]:
elt.incremental_load()

Connection to PostgreSQL established successfully.


TypeError: Invalid comparison between dtype=datetime64[ns] and date