In [47]:

from pymongo import MongoClient
from typing import Dict, Any
import pandas as pd
from sqlalchemy import create_engine, text
MONGO_URI = "mongodb://localhost:27017/"
# Note the special character '@' in the password is URL-encoded as '%40'
PostgreSQL_URI = 'postgresql+psycopg2://postgres:%40PGAdmin2025@localhost:5433/commercepulse'


In [19]:
def get_mongo_client() -> MongoClient:
    try:
        print("Connecting to MongoDB...")
        print(f"MONGO_URI: {MONGO_URI}")
        return MongoClient(MONGO_URI)
    except Exception as e:
        print(f"Error connecting to MongoDB: {e}")
        raise
    
mongo_client = get_mongo_client()

Connecting to MongoDB...
MONGO_URI: mongodb://localhost:27017/


In [20]:
# fetch data from event_raw collection in the database
db = mongo_client['commercepulse']
event_raw_collection = db['events_raw']

In [21]:
def load_from_mongoDB(query: Dict[str, Any] = {}, batch_size: int = 1000) -> pd.DataFrame:
    """
    Load events from MongoDB based on a query.

    Args:
        query: MongoDB query dictionary
        batch_size: Number of documents to fetch per batch
    Returns:
        DataFrame containing the events
    """

    cursor = event_raw_collection.find(query).batch_size(batch_size)
    return pd.DataFrame(list(cursor))

In [125]:
query_start_date = "2026-01-01"
query_end_date = "2026-01-31"

data_df = load_from_mongoDB()

all_event_types = [
    "payment_succeeded",
    "shipment_updated",
    "order_created",
    "order_updated",
    "refund_issued",
    "historical_order",
    "historical_payment",
    "historical_refund",
    "historical_shipment",
]
focus_event_type = all_event_types[0]
events_between_dates = load_from_mongoDB({"event_time": {"$gte": query_start_date, "$lte": query_end_date}})
event_type_sample = load_from_mongoDB({"event_type": focus_event_type}).head(1)
event_sample_payload = load_from_mongoDB({"event_type": focus_event_type}).head(1)["payload"].iloc[0]
event_type_sample

Unnamed: 0,_id,event_id,_bootstrapped,event_time,event_type,ingested_at,payload,vendor
0,696e6c745ef0718126353ea9,de2b2455381d,False,2026-01-19T01:06:08Z,payment_succeeded,2026-01-19T02:52:08Z,"{'order': 'ORD-260119-00194', 'timestamp': 176...",vendor_c


In [None]:
def make_sqlalchemy_db_connection():
    """Create a SQLAlchemy engine for the PostgreSQL database."""
    engine = None
    try:
        engine = create_engine(PostgreSQL_URI)
        print("SQLAlchemy engine created successfully")
    except Exception as e:
        print(f"Error: {e}")
    return engine

connection_engine = make_sqlalchemy_db_connection()
        
def execute_postgre_query(query: str) -> pd.DataFrame | None:
    """Execute a SQL query and return the results as a DataFrame or None if the connection is not initialized or the query is not a fetch query."""
    if not connection_engine:
        raise ValueError("Database connection engine is not initialized.")
    
    with connection_engine.connect() as connection:
        result = connection.execute(text(query))
        if query.strip().lower().startswith("select"):
            executed = pd.DataFrame(result.fetchall(), columns=result.keys())
            print("Query executed successfully.")
            return executed
        else:
            connection.commit()
            print("Query executed successfully.")
            return None