In [1]:
import logging

from sqlalchemy.exc import OperationalError

from server_celery.logging_setup import setup_logging  # Ensure correct import path

# 1. Initialize logging **before** creating the Celery app
setup_logging()
import sys
from celery import Celery, Task
from database.db_util import initialize_database
from utilities.config import Config
from dotenv import load_dotenv
from server_celery.logging_setup import clear_log_files
load_dotenv("../.env")
logger = logging.getLogger('admin_logger')


In [2]:
config = Config()
db_settings = config.get_database_settings(config.USE_LOCAL)

try:
    initialize_database(db_settings['url'])
    logger.info('DB initialization is done.')
except Exception as e:
    logger.error(f'DB initialization failed! Error={e}', exc_info=True)

[14:15:33]-[910] [ 🌒DEBUG] [db_util.py      ] [  initialize_database   ] Database initialized with connection string: mysql+pymysql://root:z //55gohi@localhost:3306/virtual_pm
[14:15:33]-[911] [  🌑INFO] [4083959711.py   ] [        <module>        ] DB initialization is done.


In [3]:
"""
server_celery/triggers/budget_triggers.py

Holds trigger functions for:
  - PO Log
  - PurchaseOrder (PO)
  - DetailItem

Utilizes aggregator checks to distinguish between partial vs. final logic.
In partial mode (aggregator in progress), we skip big logic like Xero or Monday calls.
Once aggregator is done (status='COMPLETED'), we perform the single-item logic,
including sibling checks (like for INV sums, CC receipts, etc.).
"""

import logging

# region 🔧 Imports
from database.db_util import get_db_session
from database.database_util import DatabaseOperations
from files_xero.xero_services import xero_services  # for Xero calls
from files_dropbox.dropbox_service import DropboxService  # for links and files
from files_monday.monday_service import monday_service  # for Monday upserts
from files_budget.budget_service import budget_service  # aggregator checks + date-range updates

# endregion

# region 🏗️ Setup
db_ops = DatabaseOperations()
logger = logging.getLogger('budget_logger')
dropbox_service = DropboxService()
use_control_panel = True

# Disable all logging messages less severe than CRITICAL
logging.disable()



[14:15:35]-[886] [ 🌒DEBUG] [xero_api.py     ] [_refresh_token_if_needed] [XeroAPI] Token still valid, no refresh needed.
[14:15:35]-[887] [  🌑INFO] [xero_api.py     ] [        __init__        ] 🚀 - XeroAPI initialized.
[14:15:35]-[888] [ 🌒DEBUG] [database_util.py] [        __init__        ] 🌟 DatabaseOperations initialized.
[14:15:35]-[888] [  🌑INFO] [xero_services.py] [        __init__        ] XeroServices initialized.
[14:15:35]-[977] [  🌑INFO] [dropbox_client.p] [        __init__        ] Dropbox Client Initialized
[14:15:35]-[978] [  🌑INFO] [dropbox_client.p] [  refresh_access_token  ] Refreshing access token.
[14:15:36]-[688] [  🌑INFO] [dropbox_client.p] [request_json_string_with] Request to team/members/list
[14:15:37]-[245] [  🌑INFO] [dropbox_client.p] [        __init__        ] Impersonated user 'jeff@ophelia.company' with member ID 'dbmid:AABJ6ARzUN_VGILRIoQMBkx6vG-tnIDXxCU'.
[14:15:37]-[245] [  🌑INFO] [dropbox_client.p] [request_json_string_with] Request to team/namespaces/l

TESTING


In [4]:
logger.info("🚀 Running aggregator flow => 'po_log_new_trigger'!")
po_log_id = 1

# region CONTROL PANEL TOGGLE
if use_control_panel:
    db_ops.update_po_log(1, status="STARTED")
    logger.info("🚀 CONTROL PANEL SET PO LOG STATUS TO - STARTED")
# endregion

In [5]:
# region 1) Find po_log rows with status='STARTED'
po_log = db_ops.search_po_logs(['id'], [6])
if not po_log or not po_log["status"] == "STARTED":
    logger.info("🤷 No po_logs with status=STARTED found. Nothing to do.")
    exit(1)

# endregion

In [6]:
# region 2) Parse aggregator data from the text file or source

po_log_data = budget_service.parse_po_log_data(po_log)
if not po_log_data:
    logger.info("😶 No aggregator data parsed => skipping.")
    exit(1)
# endregion

In [7]:
# region CONTACT AGGREGATOR
with get_db_session() as session_1:
    budget_service.process_contact_aggregator(po_log_data["contacts"], session=session_1)
# endregion

In [10]:
with get_db_session() as session_2:
    budget_service.process_aggregator_pos(po_log_data, session=session_2)
# endregion

In [11]:
# region DETAIL ITEM AGGREGATOR
import importlib
from files_budget import budget_service
importlib.reload(budget_service)
from files_budget.budget_service import budget_service

with get_db_session() as session_3:
    budget_service.process_aggregator_detail_items(po_log_data, session=session_3)

# endregion

TESTING


In [None]:
# region 4) Once we’ve processed everything, set po_log.status='COMPLETED'
updated = db_ops.update_po_log(
    po_log_id=po_log_id,
    status='COMPLETED'
)
if updated:
    logger.info(f"🏁 PO log (ID={po_log_id}) => status='COMPLETED'!")
else:
    logger.warning(f"⚠️ Could not update PO log ID={po_log_id} => COMPLETED.")

# endregion
