Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions services/apps/git_integration/src/crowdgit/database/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from crowdgit.errors import RepoLockingError
from crowdgit.models.repository import Repository
from crowdgit.models.service_execution import ServiceExecution
from crowdgit.settings import MAX_CONCURRENT_ONBOARDINGS, REPOSITORY_UPDATE_INTERVAL_HOURS
from crowdgit.settings import (
MAX_CONCURRENT_ONBOARDINGS,
MAX_INTEGRATION_RESULTS,
REPOSITORY_UPDATE_INTERVAL_HOURS,
)

from .connection import get_db_connection
from .registry import execute, executemany, fetchrow, fetchval, query
Expand Down Expand Up @@ -143,13 +147,43 @@ async def acquire_recurrent_repo() -> Repository | None:
)


async def can_onboard_more():
"""
Check if system can handle more repository onboarding based on activity load.

Returns False if integration.results count exceeds MAX_INTEGRATION_RESULTS
or if the query fails (indicating high database load).
"""
try:
integration_results_count = await fetchval("SELECT COUNT(*) FROM integration.results")
return integration_results_count < MAX_INTEGRATION_RESULTS
except Exception as e:
logger.warning(f"Failed to get integration.results count with error: {repr(e)}")
return False # if query failed mostly due to timeout then db is already under high load


async def acquire_repo_for_processing() -> Repository | None:
# prioritizing onboarding repositories
# TODO: document priority logic and values(0, 1, 2)
repo_to_process = await acquire_onboarding_repo()
"""
Acquire the next repository to process based on priority and system load.

Priority logic:
1. Onboarding repos (PENDING state) - only if system load allows and
current onboarding count is below MAX_CONCURRENT_ONBOARDINGS
2. Recurrent repos (non-PENDING/non-PROCESSING) - fallback when onboarding
is unavailable or skipped due to high load

Onboarding is delayed when integration.results exceeds MAX_INTEGRATION_RESULTS
to prevent overloading the system during high activity periods.
"""
repo_to_process = None
if await can_onboard_more():
repo_to_process = await acquire_onboarding_repo()
else:
logger.info("Skipping onboarding due to high load on integration.results")

if not repo_to_process:
# Fallback to non-onboarding repos
repo_to_process = await acquire_recurrent_repo()

return repo_to_process


Expand Down
1 change: 1 addition & 0 deletions services/apps/git_integration/src/crowdgit/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ def load_env_var(key: str, required=True, default=None):
)
WORKER_SHUTDOWN_TIMEOUT_SEC = int(load_env_var("WORKER_SHUTDOWN_TIMEOUT_SEC", default="3600"))
MAX_CONCURRENT_ONBOARDINGS = int(load_env_var("MAX_CONCURRENT_ONBOARDINGS", default="3"))
MAX_INTEGRATION_RESULTS = int(load_env_var("MAX_INTEGRATION_RESULTS", default="5000000"))
Loading