diff --git a/services/apps/git_integration/src/crowdgit/database/crud.py b/services/apps/git_integration/src/crowdgit/database/crud.py index fb240f81e9..46c542b734 100644 --- a/services/apps/git_integration/src/crowdgit/database/crud.py +++ b/services/apps/git_integration/src/crowdgit/database/crud.py @@ -8,7 +8,11 @@ from crowdgit.errors import RepoLockingError from crowdgit.models.repository import Repository from crowdgit.models.service_execution import ServiceExecution -from crowdgit.settings import MAX_CONCURRENT_ONBOARDINGS, REPOSITORY_UPDATE_INTERVAL_HOURS +from crowdgit.settings import ( + MAX_CONCURRENT_ONBOARDINGS, + MAX_INTEGRATION_RESULTS, + REPOSITORY_UPDATE_INTERVAL_HOURS, +) from .connection import get_db_connection from .registry import execute, executemany, fetchrow, fetchval, query @@ -143,13 +147,43 @@ async def acquire_recurrent_repo() -> Repository | None: ) +async def can_onboard_more(): + """ + Check if system can handle more repository onboarding based on activity load. + + Returns False if integration.results count exceeds MAX_INTEGRATION_RESULTS + or if the query fails (indicating high database load). + """ + try: + integration_results_count = await fetchval("SELECT COUNT(*) FROM integration.results") + return integration_results_count < MAX_INTEGRATION_RESULTS + except Exception as e: + logger.warning(f"Failed to get integration.results count with error: {repr(e)}") + return False # if query failed mostly due to timeout then db is already under high load + + async def acquire_repo_for_processing() -> Repository | None: - # prioritizing onboarding repositories - # TODO: document priority logic and values(0, 1, 2) - repo_to_process = await acquire_onboarding_repo() + """ + Acquire the next repository to process based on priority and system load. + + Priority logic: + 1. Onboarding repos (PENDING state) - only if system load allows and + current onboarding count is below MAX_CONCURRENT_ONBOARDINGS + 2. Recurrent repos (non-PENDING/non-PROCESSING) - fallback when onboarding + is unavailable or skipped due to high load + + Onboarding is delayed when integration.results exceeds MAX_INTEGRATION_RESULTS + to prevent overloading the system during high activity periods. + """ + repo_to_process = None + if await can_onboard_more(): + repo_to_process = await acquire_onboarding_repo() + else: + logger.info("Skipping onboarding due to high load on integration.results") + if not repo_to_process: - # Fallback to non-onboarding repos repo_to_process = await acquire_recurrent_repo() + return repo_to_process diff --git a/services/apps/git_integration/src/crowdgit/settings.py b/services/apps/git_integration/src/crowdgit/settings.py index 51a3bfe541..d4702b20e7 100644 --- a/services/apps/git_integration/src/crowdgit/settings.py +++ b/services/apps/git_integration/src/crowdgit/settings.py @@ -37,3 +37,4 @@ def load_env_var(key: str, required=True, default=None): ) WORKER_SHUTDOWN_TIMEOUT_SEC = int(load_env_var("WORKER_SHUTDOWN_TIMEOUT_SEC", default="3600")) MAX_CONCURRENT_ONBOARDINGS = int(load_env_var("MAX_CONCURRENT_ONBOARDINGS", default="3")) +MAX_INTEGRATION_RESULTS = int(load_env_var("MAX_INTEGRATION_RESULTS", default="5000000"))