In [8]:
import logging
import time
from datetime import datetime
import pandas as pd
from pymongo import MongoClient, UpdateOne
from bson import ObjectId
import importlib
import pricing
importlib.reload(pricing)  # Ensure latest version is used
from pricing import attach_prices

# -------------------------------
# LOGGING SETUP
# -------------------------------
logging.basicConfig(
    filename="etl_run.log",  # log file will be created in same folder
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# -------------------------------
# DB CONNECTIONS
# -------------------------------
LIVE_CONN_STR = "mongodb+srv://FFL-User:OoesBMAcjYp4pJGf@fflcluster.2mt2rev.mongodb.net/?retryWrites=true&w=majority&appName=FFLCluster&tls=true"
REPORTING_CONN_STR = "mongodb://localhost:27017"

live_client = MongoClient(LIVE_CONN_STR)
reporting_client = MongoClient(REPORTING_CONN_STR)

live_db = live_client["ffl"]
reporting_db = reporting_client["staging_db"]

# -------------------------------
# HELPERS
# -------------------------------
def to_objectid_safe(x):
    if pd.isnull(x):
        return None
    if isinstance(x, ObjectId):
        return x
    if isinstance(x, str):
        try:
            return ObjectId(x)
        except Exception:
            return None
    return None 

def sanitize_datetimes(df):
    for col in df.columns:
        if pd.api.types.is_datetime64_any_dtype(df[col]):
            df[col] = df[col].apply(
                lambda x: (
                    x.to_pydatetime() if isinstance(x, pd.Timestamp) and pd.notna(x)
                    else (x if isinstance(x, datetime) else None)
                )
            )
    return df

def get_last_run(collection_name):
    meta = reporting_db["etl_metadata"].find_one({"_id": collection_name})
    if meta and "last_run" in meta:
        ts = meta["last_run"]
        if isinstance(ts, pd.Timestamp):
            return ts.to_pydatetime()
        return ts
    return None

def update_last_run(timestamp, collection_name):
    if pd.isna(timestamp):
        safe_ts = None
    elif isinstance(timestamp, pd.Timestamp):
        safe_ts = timestamp.to_pydatetime()
    elif isinstance(timestamp, datetime):
        safe_ts = timestamp
    else:
        safe_ts = None

    reporting_db["etl_metadata"].update_one(
        {"_id": collection_name},
        {"$set": {"last_run": safe_ts}},
        upsert=True
    )

def extract_incremental_purchases(db, collection_name, last_run):
    if last_run:
        query = {"created_at": {"$gte": last_run}}
    else:
        query = {}

    fields = {
        "_id": 1,
        "supplier_id": 1,
        "supplier_type_id": 1,
        "mcc_id": 1,
        "area_office_id": 1,
        "gross_volume": 1,
        "ts_volume": 1,
        "opening_balance": 1,
        "type": 1,
        "created_by": 1,
        "created_at": 1,
        "booked_at": 1, 
        "time": 1,
        "serial_number" : 1, 
        "is_planned": 1, 
        "is_exceptional_release": 1,
        "tests": 1,
        "plant_id": 1,
        "price": 1,
        "cp_id": 1
    }
    cursor = db[collection_name].find(query, fields)
    df = pd.DataFrame(list(cursor))

    for col in ["mcc_id", "supplier_id", "supplier_type_id", "area_office_id", "plant_id", "cp_id"]:
        if col not in df.columns:
            df[col] = None
    return df

def transform_purchases(purchases_df):
    if purchases_df.empty:
        return purchases_df
    
    purchases_df["created_at"] = pd.to_datetime(purchases_df["created_at"], errors="coerce")
    purchases_df["booked_at"] = pd.to_datetime(purchases_df["booked_at"], errors="ignore")
    purchases_df["time"] = pd.to_datetime(purchases_df["time"], errors="coerce")

    null_mask = purchases_df["booked_at"].isna()
    if null_mask.any():
        logging.info(f"Replacing {int(null_mask.sum())} null booked_at values with time")
        purchases_df.loc[null_mask, "booked_at"] = purchases_df.loc[null_mask, "time"]

    suppliers_df = pd.DataFrame(list(live_db["suppliers"].find({})))
    collection_points_df = pd.DataFrame(list(live_db["collection_points"].find({}, {
        "_id": 1, "name": 1, "area_office_id": 1, "status": 1,
        "is_mcc": 1, "latitude": 1, "longitude": 1, "address": 1
    })))
    area_offices_df = pd.DataFrame(list(live_db["area_offices"].find({}, {"_id": 1, "name": 1})))
    supplier_types_df = pd.DataFrame(list(live_db["supplier_types"].find({}, {"_id": 1, "name": 1, "description": 1})))

    # Convert IDs
    purchases_df["_id"] = purchases_df["_id"].apply(to_objectid_safe)
    suppliers_df["_id"] = suppliers_df["_id"].apply(to_objectid_safe)
    collection_points_df["_id"] = collection_points_df["_id"].apply(to_objectid_safe)
    collection_points_df["area_office_id"] = collection_points_df["area_office_id"].apply(to_objectid_safe)
    area_offices_df["_id"] = area_offices_df["_id"].apply(to_objectid_safe)
    supplier_types_df["_id"] = supplier_types_df["_id"].apply(to_objectid_safe)

    suppliers_df["supplier_type_id"] = suppliers_df["supplier_type_id"].apply(to_objectid_safe)
    purchases_df["supplier_id"] = purchases_df["supplier_id"].apply(to_objectid_safe)
    purchases_df["mcc_id"]=purchases_df["mcc_id"].fillna(purchases_df["cp_id"])
    purchases_df["mcc_id"] = purchases_df["mcc_id"].apply(to_objectid_safe)
    purchases_df["supplier_type_id"] = purchases_df["supplier_type_id"].apply(to_objectid_safe)

    # Select relevant columns
    suppliers_df  = suppliers_df[["_id", "name", "supplier_type_id", "source", "area_office", "code"]]
    collection_points_df = collection_points_df[["_id", "name", "area_office_id", "is_mcc", "latitude", "longitude"]]
    area_offices_df = area_offices_df[["_id", "name"]]
    supplier_types_df = supplier_types_df[["_id", "name", "description"]]

    # Joins
    purchases_df = purchases_df.merge(
        suppliers_df.rename(columns={"_id": "supplier_id", "name": "supplier_name"}),
        on="supplier_id", how="left", suffixes=("", "_sup")
    )
    purchases_df = purchases_df.merge(
        collection_points_df.rename(columns={"_id": "mcc_id", "name": "collection_point_name"}),
        on="mcc_id", how="left", suffixes=("", "_mcc")
    )
    purchases_df = purchases_df.merge(
        area_offices_df.rename(columns={"_id": "area_office_id", "name": "area_office_name"}),
        left_on="area_office_id_mcc",right_on="area_office_id", how="left", suffixes=("", "_ao")
    )
    purchases_df = purchases_df.merge(
        supplier_types_df.rename(columns={"_id": "supplier_type_id", "name": "supplier_type_name"}),
        on="supplier_type_id", how="left"
    )
    purchases_df["area_office_id_ao"]=purchases_df["area_office_id_ao"].fillna(purchases_df["area_office_id"])

    return purchases_df

def load_to_reporting(df, collection_name):
    if df.empty:
        logging.info("⚠️ Nothing to load")
        return

    df = sanitize_datetimes(df)
    df = df.where(pd.notnull(df), None)

    df = df[df["_id"].notnull()]
    ops = []
    for _, row in df.iterrows():
        doc = row.to_dict()
        _id = doc.pop("_id")
        ops.append(UpdateOne({"_id": _id}, {"$set": doc}, upsert=True))

    result = reporting_db[collection_name].bulk_write(ops, ordered=False)
    logging.info(f"✅ Load completed: matched={result.matched_count}, modified={result.modified_count}, upserted={len(result.upserted_ids)}")

# -------------------------------
# MAIN ETL RUN
# -------------------------------
def run_etl(collection_name):
    start_time = time.time()
    logging.info(f"ETL started for {collection_name}")

    last_run = get_last_run(collection_name)
    logging.info(f"Last run checkpoint: {last_run}")

    try:
        # 1. Extract
        if collection_name == "milk_purchases":
            relevant_df = extract_incremental_purchases(live_db, collection_name, last_run)
            logging.info(f"Extracted {len(relevant_df)} new/updated purchases")
        else:
            logging.warning(f"No extraction defined for collection: {collection_name}")
            relevant_df = pd.DataFrame()

        # 2. Transform
        if collection_name == "milk_purchases":
            transformed_df = transform_purchases(relevant_df)
            logging.info(f"Transformed {len(transformed_df)} rows")
            print(transformed_df.head())
            prices_df = pd.DataFrame(list(live_db["prices"].find({"status": 1})))
            print(prices_df.head())
            archived_prices_df = pd.DataFrame(list(live_db["archieved_prices"].find({"status": 1})))

            for df in [prices_df, archived_prices_df]:
                for col in ["plant", "source_type", "area_office", "supplier", "collection_point"]:
                    if col in df.columns:
                        df[col] = df[col].apply(to_objectid_safe)

            transformed_df = attach_prices(transformed_df, prices_df, archived_prices_df)
            logging.info("💰 Prices attached")

        # 3. Load
        load_to_reporting(transformed_df, "fact_"+collection_name)

        # 4. Update checkpoint
        if not relevant_df.empty:
            new_last_run = relevant_df["created_at"].max()
            update_last_run(new_last_run, collection_name)
            logging.info(f"Updated last_run checkpoint to {new_last_run}")
        else:
            logging.info("No new data to process")

        elapsed = time.time() - start_time
        logging.info(f"ETL finished for {collection_name} in {elapsed:.2f} seconds")

    except Exception as e:
        elapsed = time.time() - start_time
        logging.error(f"ETL failed for {collection_name} after {elapsed:.2f} seconds. Error: {e}", exc_info=True)
        raise

if __name__ == "__main__":
    run_etl("milk_purchases")


  purchases_df["booked_at"] = pd.to_datetime(purchases_df["booked_at"], errors="ignore")


                        _id                created_by  \
0  68c952777f321fa8c7027933  64db7f48966006bb8f0b60d2   
1  68c951a77f321fa8c702792f  67bd54c860f986fd420a3ce8   
2  68c9509095534ca26d056490  67bd554660f986fd420a3cf2   
3  68c94d8a7f321fa8c7027922  674b371f71e97c0db303da9f   
4  68c94d0795534ca26d05648b  674b371f71e97c0db303da9f   

                     mcc_id             type               supplier_id  \
0  64d637c011fedf702c09ea14  purchase_at_mcc  654c74a68fb27cb1c60bdca2   
1  6851122cd8af7017aa040196     mmt_purchase  685219aff848d0028b056535   
2  68c0505f06ff4b26ae0b57db     mmt_purchase  68c0530354c7f021f90e5fcf   
3  682db2081493ae249f0e7b7c     mmt_purchase  682db2a8187a486a1a0a60b5   
4  67c6e242dfcfc989fe0c2ab3     mmt_purchase  67c6e4c0b0848dda750d7672   

           supplier_type_id  gross_volume  ts_volume                time  \
0  63b55d49781e0000b4000f03          80.0     79.754 2025-09-16 17:05:00   
1  63b55d3e781e0000b4000f02         470.0    446.500 2025-09

_id                         object
plant                       object
department                  object
area_office                 object
source_type                 object
supplier                    object
collection_point            object
price                       object
volume                      object
wef                         object
initial_remarks             object
created_by                  object
status                       int64
update_request               int64
code                         int64
updated_at          datetime64[ns]
created_at          datetime64[ns]
approved_at                 object
deleted_at          datetime64[ns]
remarks                     object
dtype: object

💰 Attaching prices for 11306 purchases...


100%|██████████| 11306/11306 [00:00<00:00, 13415.45it/s]


✅ Pricing complete.


In [9]:
import pandas as pd
from pymongo import MongoClient
from bson import ObjectId

def safe_to_objectid(val):
    """Convert 24-char hex string to ObjectId, otherwise return as-is."""
    if isinstance(val, str) and len(val) == 24:
        try:
            return ObjectId(val)
        except Exception:
            return val
    return val

def main():
    # 1. Connect to MongoDB
    LIVE_CONN_STR = "mongodb+srv://FFL-User:OoesBMAcjYp4pJGf@fflcluster.2mt2rev.mongodb.net/?retryWrites=true&w=majority&appName=FFLCluster&tls=true"
    REPORTING_CONN_STR = "mongodb://localhost:27017"

    live_client = MongoClient(LIVE_CONN_STR)
    reporting_client = MongoClient(REPORTING_CONN_STR)

    live_db = live_client["ffl"]
    reporting_db = reporting_client["staging_db"]

    # DB + collection handles
    reporting_col = live_client["ffl"]["milk_purchase_reporting_facts"]
    staging_col = reporting_client["staging_db"]["fact_milk_purchases"]

    # 2. Load into DataFrames
    reporting_df = pd.DataFrame(
        list(reporting_col.find({}, {"purchase_id": 1, "base_price": 1, "_id": 0}))
    )
    staging_df = pd.DataFrame(
        list(staging_col.find({}))  # fetch EVERYTHING
    )

    print(f"Reporting facts: {len(reporting_df)} rows")
    print(f"Staging facts: {len(staging_df)} rows")

    # 3. Convert purchase_id into ObjectId (if string hex)
    reporting_df["purchase_id"] = reporting_df["purchase_id"].apply(safe_to_objectid)

    # 4. Merge on purchase_id vs _id
    merged = staging_df.merge(
        reporting_df,
        left_on="_id",
        right_on="purchase_id",
        how="inner"
    )
    print(f"Merged: {len(merged)} rows")

    if merged.empty:
        print("⚠️ No matches found! Check if IDs are in different formats.")
        return

    # 5. Ensure numeric types for price comparison
    merged["price"] = pd.to_numeric(merged["price"], errors="coerce")
    merged["base_price"] = pd.to_numeric(merged["base_price"], errors="coerce")

    # 6. Compute difference
    merged["price_diff"] = merged["base_price"] - merged["price"]

    # 7. Find mismatches
    mismatches = merged[merged["price_diff"].abs() > 0.0001]  # tolerance

    print(f"Mismatches: {len(mismatches)}")

    print("\nSample mismatches:")
    mismatch_cols = ["_id", "purchase_id", "type", "booked_date", "price", "base_price", "price_diff"]
    available_cols = [col for col in mismatch_cols if col in mismatches.columns]
    print(mismatches[available_cols].head(10))

    # 8. Save results — now includes *all staging columns*
    merged.to_csv("price_comparison.csv", index=False)
    mismatches.to_csv("price_mismatches.csv", index=False)

    print("\nResults saved to price_comparison.csv and price_mismatches.csv")

if __name__ == "__main__":
    main()

Reporting facts: 354979 rows
Staging facts: 421727 rows
Merged: 354979 rows
Mismatches: 83

Sample mismatches:
                             _id               purchase_id             type  \
342043  68a1ef79879eb127380f8604  68a1ef79879eb127380f8604  purchase_at_mcc   
342057  68a1fbc471edb9785c00abe2  68a1fbc471edb9785c00abe2  purchase_at_mcc   
342060  68a1fe1471edb9785c00abea  68a1fe1471edb9785c00abea  purchase_at_mcc   
342099  68a28c9cb98b43585905eac8  68a28c9cb98b43585905eac8  purchase_at_mcc   
342102  68a28cb7b98b43585905eacc  68a28cb7b98b43585905eacc  purchase_at_mcc   
342103  68a28cd55721c6d98b0d4987  68a28cd55721c6d98b0d4987  purchase_at_mcc   
342108  68a290d5fefebffc250ef569  68a290d5fefebffc250ef569  purchase_at_mcc   
342145  68a29de158623940f90a1be2  68a29de158623940f90a1be2  purchase_at_mcc   
342207  68a2b37ca34d21dca60f8d0a  68a2b37ca34d21dca60f8d0a  purchase_at_mcc   
342233  68a2bc667416a9ef2306bda8  68a2bc667416a9ef2306bda8  purchase_at_mcc   

        price  base