In [1]:
# Installing required libraries (Run this only once in Jupyter Notebook)
!pip install pymongo pandas psycopg2 sqlalchemy

import pymongo
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import logging

# Setup Logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

try:
    # Step 1: Connect to MongoDB
    logging.info("Connecting to MongoDB...")
    mongo_client = pymongo.MongoClient("mongodb://etl_user:aP8fwfgftempRhkgGa9@3.251.75.195:27017/sales")
    mongo_db = mongo_client["sales"]
    collection = mongo_db["transactions"]

    # Step 2: Fetch data & Convert to Pandas DataFrame
    logging.info("Fetching data from MongoDB...")
    documents = list(collection.find({}, {"_id": 1, "customer_id": 1, "total_amount": 1, "transaction_date": 1}))

    if not documents:
        logging.warning("No data found in MongoDB! Exiting...")
        exit()

    df = pd.DataFrame(documents)

    # Step 3: Convert `_id` from ObjectId to String
    if "_id" in df.columns:
        df["_id"] = df["_id"].astype(str)

    # Step 4: Connect to PostgreSQL
    logging.info("Connecting to PostgreSQL...")
    pg_engine = create_engine("postgresql://yousef:yousef@rds-module.cnc6gugkeq4f.eu-west-1.rds.amazonaws.com:5432/sales_db3")
    
    with pg_engine.connect() as pg_conn:
       
        logging.info("Creating table if not exists...")
        create_table_query = """
        CREATE TABLE IF NOT EXISTS transactions (
            _id VARCHAR(100) PRIMARY KEY,
            customer_id INT,
            total_amount DECIMAL(12,2),
            transaction_date TIMESTAMP
        );
        """
        pg_conn.execute(create_table_query)

        # Step 5: Insert Data into PostgreSQL
        logging.info("Inserting data into PostgreSQL...")
        df.to_sql("transactions", pg_engine, if_exists="append", index=False, method="multi")

        logging.info("Data successfully migrated to PostgreSQL!")

except Exception as e:
    logging.error(f" Error occurred: {e}")

finally:
    mongo_client.close()
    logging.info(" MongoDB connection closed.")




2025-03-25 08:32:53,841 - INFO - Connecting to MongoDB...
2025-03-25 08:32:53,870 - INFO - Fetching data from MongoDB...
2025-03-25 08:32:54,920 - INFO - Connecting to PostgreSQL...
2025-03-25 08:32:57,692 - INFO - Creating table if not exists...
2025-03-25 08:32:58,211 - INFO - Inserting data into PostgreSQL...
2025-03-25 08:33:00,056 - INFO - Data successfully migrated to PostgreSQL!
2025-03-25 08:33:00,229 - INFO -  MongoDB connection closed.
