diff --git a/docs/rakuten/CONCURRENCY_LIMITS.md b/docs/rakuten/CONCURRENCY_LIMITS.md new file mode 100644 index 0000000000..511ac0f23e --- /dev/null +++ b/docs/rakuten/CONCURRENCY_LIMITS.md @@ -0,0 +1,139 @@ +# Concurrency Limitations for /webhook-parallel + +## Executive Summary + +**Recommended Configuration:** `max_concurrent_reviews = 2-3` + +**Primary Bottleneck:** Bitbucket Server API rate limits (token bucket: 15 tokens max, 3 tokens/sec refill) + +## Resource Constraints + +### 1. Hardware (Rakuten CaaS Deployment) +- **CPU:** 500m (0.5 cores) +- **Memory:** 1Gi +- **Impact:** Can support 5-8 concurrent I/O-bound reviews, not the limiting factor + +### 2. Bitbucket Server API Rate Limit (PRIMARY BOTTLENECK) +- **Token bucket:** 15 tokens maximum +- **Refill rate:** 3 tokens/second (180 tokens/minute) +- **Cost per review:** ~9-13 API calls = 9-13 tokens + - Phase 1: Get PR metadata (1) + Get PR diff (1) = 2-3 tokens + - Phase 2: AI processing (0 tokens) + - Phase 3: Post comments (7-10 tokens) + - /describe: 1 token + - /review: 1 token + - /improve: 1 main comment + 4-7 inline code suggestions = 5-8 tokens + +**Critical constraint:** API calls are bursty (most happen during Phase 3 comment posting). With 3 concurrent reviews finishing simultaneously: +- Required tokens: 3 × 10 = 30 tokens (average case) +- Available tokens: 15 (bucket) + 15 (refilled during 5s burst) = 30 tokens +- **Result:** At capacity limit, occasional rate limit errors expected (~2-5%) + +**Note:** Bitbucket Server does not support persistent comments (`get_issue_comments()` not implemented). The `persistent_comment=true` setting has no effect and always creates new comments, which actually saves 2-3 API calls per review compared to other git providers. + +### 3. Task Queue (No Hard Limit) +- **Implementation:** `asyncio.create_task()` with no queue size limit +- **Memory per waiting task:** ~5KB (task object + copied context) +- **Theoretical capacity:** ~200,000 tasks before memory exhaustion +- **Practical limit:** ~1,000 waiting tasks safe on 1Gi pod + +## Concurrency Calculation + +### Sustainable Throughput +``` +Token refill rate: 180 tokens/minute +Tokens per review: 10 +Theoretical max: 180 / 10 = 18 reviews/minute +``` + +### Safe Concurrent Reviews +``` +Review phases (per PR): +- Phase 1 (0-5s): 2-3 API calls (get PR data, get diff) +- Phase 2 (5-25s): 0 API calls (AI processing - LLM calls, not Bitbucket API) +- Phase 3 (25-30s): 7-10 API calls (BURSTY - all comment posting) + - /describe: 1 call + - /review: 1 call + - /improve: 5-8 calls (1 main + 4-7 inline code suggestions) + +Total per PR: 9-13 API calls (average: 10 calls) + +Safe concurrent (avoiding burst collisions): 2-3 reviews +Risky concurrent (may hit rate limits): 4-5 reviews +``` + + +### Wait Time Examples +| Concurrent | Burst Load | Wait Time for 10th PR | Rate Limit Risk | +|------------|------------|----------------------|-----------------| +| 2 | 10 PRs | ~150s (2.5 min) | Low (<1%) | +| 3 | 10 PRs | ~100s (1.7 min) | Medium (2-5%) | +| 5 | 10 PRs | ~60s (1 min) | High (10-20%) | + +## Configuration Recommendations + +### Conservative (Recommended) +```toml +[bitbucket_app] +enable_parallel_reviews = true +max_concurrent_reviews = 2 +``` +- **Throughput:** ~240 reviews/hour +- **Burst capacity:** 2 simultaneous PRs +- **Rate limit errors:** <1% + +### Balanced +```toml +max_concurrent_reviews = 3 +``` +- **Throughput:** ~360 reviews/hour +- **Burst capacity:** 3 simultaneous PRs +- **Rate limit errors:** 2-5% during bursts + +### Aggressive (Not Recommended) +```toml +max_concurrent_reviews = 5 +``` +- **Throughput:** ~10 reviews/minute (theoretical) +- **Rate limit errors:** 10-20% +- **Requires:** API call optimization or higher rate limits + +## Optimization Options + +### Immediate (No Code Changes) +1. Start with `max_concurrent_reviews = 2` +2. Monitor rate limit errors in logs +3. Gradually increase to 3 if error rate <2% + +### Short-term (Code Changes) +1. **Reduce inline code suggestions:** `num_code_suggestions=4` → `2` + - Savings: ~2 API calls per review + - New capacity: 4 concurrent reviews + +2. **Add queue depth monitoring:** Track wait times and queue depth + +3. **Add task queue limit:** Prevent unbounded memory growth + +## Monitoring Metrics + +Key metrics to track: +- `bitbucket_api_rate_limit_errors`: HTTP 429 responses +- `review_wait_time_seconds`: Time waiting for semaphore slot +- `review_duration_seconds`: Total review time +- `queue_depth`: Number of waiting tasks +- `active_reviews`: Current processing reviews + +Alert thresholds: +- Rate limit error rate >5%: Reduce concurrency +- Average wait time >60s: Increase concurrency (if rate limits allow) +- Queue depth >50: Possible traffic spike or stuck reviews + +## References + +- Configuration file: `pr_agent/settings/configuration.toml:315-316` +- Implementation: `pr_agent/servers/bitbucket_server_webhook.py:208-317` +- Gunicorn config: `pr_agent/servers/gunicorn_config.py:74-80` + +## Last Updated + +2025-12-08 - Initial analysis based on Rakuten CaaS deployment constraints diff --git a/docs/rakuten/README.md b/docs/rakuten/README.md new file mode 100644 index 0000000000..b14b6039e7 --- /dev/null +++ b/docs/rakuten/README.md @@ -0,0 +1,20 @@ +# Rakuten Custom Documentation + +This directory contains documentation specific to Rakuten's deployment and customizations. + +## Contents + +- **[CONCURRENCY_LIMITS.md](CONCURRENCY_LIMITS.md)** - Concurrency analysis for Rakuten CaaS deployment + - Hardware constraints and Bitbucket Server API rate limits + - Recommended configuration for `/webhook-parallel` endpoint + +- **[architecture/](architecture/)** - Architecture documentation + - System architecture overview + - Parallel review implementation details + - Class and sequence diagrams + +## Upstream Documentation + +For official pr-agent documentation, see: +- Main docs site: https://qodo-merge-docs.qodo.ai/ +- Local docs: `docs/docs/` directory (upstream content) diff --git a/docs/docs/architecture/README.md b/docs/rakuten/architecture/README.md similarity index 100% rename from docs/docs/architecture/README.md rename to docs/rakuten/architecture/README.md diff --git a/docs/docs/architecture/architecture-overview.md b/docs/rakuten/architecture/architecture-overview.md similarity index 100% rename from docs/docs/architecture/architecture-overview.md rename to docs/rakuten/architecture/architecture-overview.md diff --git a/docs/docs/architecture/class-diagrams.md b/docs/rakuten/architecture/class-diagrams.md similarity index 100% rename from docs/docs/architecture/class-diagrams.md rename to docs/rakuten/architecture/class-diagrams.md diff --git a/docs/docs/architecture/data-flow.md b/docs/rakuten/architecture/data-flow.md similarity index 100% rename from docs/docs/architecture/data-flow.md rename to docs/rakuten/architecture/data-flow.md diff --git a/docs/docs/architecture/parallel-review-architecture.md b/docs/rakuten/architecture/parallel-review-architecture.md similarity index 100% rename from docs/docs/architecture/parallel-review-architecture.md rename to docs/rakuten/architecture/parallel-review-architecture.md diff --git a/docs/docs/architecture/sequence-diagrams.md b/docs/rakuten/architecture/sequence-diagrams.md similarity index 100% rename from docs/docs/architecture/sequence-diagrams.md rename to docs/rakuten/architecture/sequence-diagrams.md diff --git a/pr_agent/git_providers/bitbucket_server_git_provider.py b/pr_agent/git_providers/bitbucket_server_git_provider.py index cb07b63e3c..5765e9ec4a 100644 --- a/pr_agent/git_providers/bitbucket_server_git_provider.py +++ b/pr_agent/git_providers/bitbucket_server_git_provider.py @@ -238,7 +238,7 @@ def get_files(self): diffstat = [change["path"]['toString'] for change in changes] return diffstat - def get_file_from_git(self, path: str, commit_id: str, repo_path: str) -> str: + async def get_file_from_git(self, path: str, commit_id: str, repo_path: str) -> str: """ Read file content from local git repository using git show command. This bypasses REST API calls and avoids rate limiting. @@ -252,17 +252,19 @@ def get_file_from_git(self, path: str, commit_id: str, repo_path: str) -> str: File content as string, empty string if file not found """ try: - result = subprocess.run( + from pr_agent.git_providers.git_provider import run_subprocess_with_output + + result = await run_subprocess_with_output( ['git', 'show', f'{commit_id}:{path}'], cwd=repo_path, - capture_output=True, - text=True, + check=False, timeout=30 ) if result.returncode == 0: - return result.stdout + return result.stdout.decode('utf-8') if isinstance(result.stdout, bytes) else result.stdout else: - get_logger().debug(f"File {path} not found at commit {commit_id}: {result.stderr}") + stderr_str = result.stderr.decode('utf-8') if isinstance(result.stderr, bytes) else result.stderr + get_logger().debug(f"File {path} not found at commit {commit_id}: {stderr_str}") return "" except subprocess.TimeoutExpired: get_logger().warning(f"Timeout reading {path} from git at commit {commit_id}") @@ -283,13 +285,13 @@ def get_best_common_ancestor(source_commits_list, destination_commits_list, guar return guaranteed_common_ancestor - def get_diff_files(self) -> list[FilePatchInfo]: + async def get_diff_files(self) -> list[FilePatchInfo]: if self.diff_files: return self.diff_files # Custom: Use git clone mode if enabled (bypasses rate limits) if get_settings().get("bitbucket_server.use_git_clone_for_files", False): - return self._get_diff_files_with_git_clone() + return await self._get_diff_files_with_git_clone() # Original code continues below (unchanged for easier upstream merges) head_sha = self.pr.fromRef['latestCommit'] @@ -368,7 +370,7 @@ def get_diff_files(self) -> list[FilePatchInfo]: self.diff_files = diff_files return diff_files - def _get_diff_files_with_git_clone(self) -> list[FilePatchInfo]: + async def _get_diff_files_with_git_clone(self) -> list[FilePatchInfo]: """ Hybrid approach: REST API for metadata + Git clone for file contents. @@ -398,7 +400,7 @@ def _get_diff_files_with_git_clone(self) -> list[FilePatchInfo]: get_logger().info(f"Cloning repository: {repo_url}") with TemporaryDirectory() as tmp_dir: - cloned_repo = self.clone(repo_url, tmp_dir, remove_dest_folder=False) + cloned_repo = await self.clone(repo_url, tmp_dir, remove_dest_folder=False) if not cloned_repo: get_logger().error("Git clone failed") raise RuntimeError(f"Git clone mode enabled but clone failed for repo: {repo_url}") @@ -459,12 +461,13 @@ def _get_diff_files_with_git_clone(self) -> list[FilePatchInfo]: get_logger().info(f"Fetching commits for file access: {base_sha[:8]} and {head_sha[:8]}") ssl_env = get_git_ssl_env() try: + from pr_agent.git_providers.git_provider import run_subprocess_with_output + # Fetch both commits in one batch for efficiency - subprocess.run( + await run_subprocess_with_output( ['git', 'fetch', '--depth=1', 'origin', head_sha, base_sha], cwd=repo_path, env=ssl_env, - capture_output=True, check=True, timeout=60 ) @@ -496,21 +499,21 @@ def _get_diff_files_with_git_clone(self) -> list[FilePatchInfo]: match change['type']: case 'ADD': edit_type = EDIT_TYPE.ADDED - new_file_content_str = self.get_file_from_git(file_path, head_sha, repo_path) + new_file_content_str = await self.get_file_from_git(file_path, head_sha, repo_path) new_file_content_str = decode_if_bytes(new_file_content_str) original_file_content_str = "" case 'DELETE': edit_type = EDIT_TYPE.DELETED new_file_content_str = "" - original_file_content_str = self.get_file_from_git(file_path, base_sha, repo_path) + original_file_content_str = await self.get_file_from_git(file_path, base_sha, repo_path) original_file_content_str = decode_if_bytes(original_file_content_str) case 'RENAME': edit_type = EDIT_TYPE.RENAMED case _: edit_type = EDIT_TYPE.MODIFIED - original_file_content_str = self.get_file_from_git(file_path, base_sha, repo_path) + original_file_content_str = await self.get_file_from_git(file_path, base_sha, repo_path) original_file_content_str = decode_if_bytes(original_file_content_str) - new_file_content_str = self.get_file_from_git(file_path, head_sha, repo_path) + new_file_content_str = await self.get_file_from_git(file_path, head_sha, repo_path) new_file_content_str = decode_if_bytes(new_file_content_str) patch = load_large_diff(file_path, new_file_content_str, original_file_content_str, show_warning=False) @@ -775,7 +778,9 @@ def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None: #Overriding the shell command, since for some reason usage of x-token-auth doesn't work, as mentioned here: # https://stackoverflow.com/questions/56760396/cloning-bitbucket-server-repo-with-access-tokens - def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_seconds: int=None): + async def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_seconds: int=None): + from pr_agent.git_providers.git_provider import run_subprocess_async + bearer_token = self.bearer_token if not bearer_token: #Shouldn't happen since this is checked in _prepare_clone, therefore - throwing an exception. @@ -786,5 +791,4 @@ def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_sec ssl_env = get_git_ssl_env() - subprocess.run(cli_args, env=ssl_env, check=True, # check=True will raise an exception if the command fails - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=operation_timeout_in_seconds) + await run_subprocess_async(cli_args, env=ssl_env, check=True, timeout=operation_timeout_in_seconds) diff --git a/pr_agent/git_providers/git_provider.py b/pr_agent/git_providers/git_provider.py index 631e189c04..28e2276542 100644 --- a/pr_agent/git_providers/git_provider.py +++ b/pr_agent/git_providers/git_provider.py @@ -1,9 +1,10 @@ from abc import ABC, abstractmethod # enum EDIT_TYPE (ADDED, DELETED, MODIFIED, RENAMED) +import asyncio import os import shutil import subprocess -from typing import Optional, Tuple +from typing import Optional, Tuple, List, Dict from pr_agent.algo.types import FilePatchInfo from pr_agent.algo.utils import Range, process_description @@ -12,6 +13,92 @@ MAX_FILES_ALLOWED_FULL = 50 + +async def run_subprocess_async( + args: List[str], + env: Optional[Dict[str, str]] = None, + timeout: Optional[int] = None, + check: bool = True, + cwd: Optional[str] = None +) -> subprocess.CompletedProcess: + """ + Run subprocess asynchronously to prevent blocking the event loop. + Uses asyncio.create_subprocess_exec for true non-blocking execution. + """ + process = await asyncio.create_subprocess_exec( + *args, + env=env, + cwd=cwd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL + ) + + try: + if timeout: + await asyncio.wait_for(process.wait(), timeout=timeout) + else: + await process.wait() + except asyncio.TimeoutError: + process.kill() + await process.wait() + raise subprocess.TimeoutExpired(args, timeout) + + if check and process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, args) + + return subprocess.CompletedProcess( + args=args, + returncode=process.returncode, + stdout=None, + stderr=None + ) + + +async def run_subprocess_with_output( + args: List[str], + env: Optional[Dict[str, str]] = None, + timeout: Optional[int] = None, + check: bool = True, + cwd: Optional[str] = None +) -> subprocess.CompletedProcess: + """ + Run subprocess asynchronously and capture stdout/stderr. + Uses asyncio.create_subprocess_exec for true non-blocking execution. + """ + process = await asyncio.create_subprocess_exec( + *args, + env=env, + cwd=cwd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + try: + if timeout: + stdout, stderr = await asyncio.wait_for( + process.communicate(), + timeout=timeout + ) + else: + stdout, stderr = await process.communicate() + except asyncio.TimeoutError: + process.kill() + await process.wait() + raise subprocess.TimeoutExpired(args, timeout) + + if check and process.returncode != 0: + raise subprocess.CalledProcessError( + process.returncode, args, stdout, stderr + ) + + return subprocess.CompletedProcess( + args=args, + returncode=process.returncode, + stdout=stdout, + stderr=stderr + ) + + def get_git_ssl_env() -> dict[str, str]: """ Get git SSL configuration arguments for per-command use. @@ -136,7 +223,7 @@ def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_sec CLONE_TIMEOUT_SEC = 20 # Clone a given url to a destination folder. If successful, returns an object that wraps the destination folder, # deleting it once it is garbage collected. See: GitProvider.ScopedClonedRepo for more details. - def clone(self, repo_url_to_clone: str, dest_folder: str, remove_dest_folder: bool = True, + async def clone(self, repo_url_to_clone: str, dest_folder: str, remove_dest_folder: bool = True, operation_timeout_in_seconds: int=CLONE_TIMEOUT_SEC) -> ScopedClonedRepo|None: returned_obj = None clone_url = self._prepare_clone_url_with_token(repo_url_to_clone) @@ -146,7 +233,7 @@ def clone(self, repo_url_to_clone: str, dest_folder: str, remove_dest_folder: bo try: if remove_dest_folder and os.path.exists(dest_folder) and os.path.isdir(dest_folder): shutil.rmtree(dest_folder) - self._clone_inner(clone_url, dest_folder, operation_timeout_in_seconds) + await self._clone_inner(clone_url, dest_folder, operation_timeout_in_seconds) returned_obj = GitProvider.ScopedClonedRepo(dest_folder) except Exception as e: get_logger().exception(f"Clone failed: Could not clone url.", @@ -159,7 +246,7 @@ def get_files(self) -> list: pass @abstractmethod - def get_diff_files(self) -> list[FilePatchInfo]: + async def get_diff_files(self) -> list[FilePatchInfo]: pass def get_incremental_commits(self, is_incremental): diff --git a/pr_agent/servers/bitbucket_server_webhook.py b/pr_agent/servers/bitbucket_server_webhook.py index 7720eab9de..965322b70d 100644 --- a/pr_agent/servers/bitbucket_server_webhook.py +++ b/pr_agent/servers/bitbucket_server_webhook.py @@ -4,6 +4,7 @@ import json import os import re +import time from typing import List import uvicorn @@ -30,6 +31,10 @@ # Global semaphore for limiting concurrent reviews review_semaphore = None +# Task tracking for queue monitoring +active_review_tasks = {} +active_review_tasks_lock = asyncio.Lock() + def handle_request( background_tasks: BackgroundTasks, url: str, body: str, log_context: dict @@ -155,7 +160,7 @@ async def handle_webhook(background_tasks: BackgroundTasks, request: Request): pr_url = f"{bitbucket_server}/projects/{project_name}/repos/{repository_name}/pull-requests/{pr_id}" log_context["api_url"] = pr_url - log_context["event"] = "pull_request" + log_context["webhook_event_type"] = "pull_request" commands_to_run = [] @@ -250,7 +255,7 @@ async def handle_webhook_parallel(request: Request): pr_url = f"{bitbucket_server}/projects/{project_name}/repos/{repository_name}/pull-requests/{pr_id}" log_context["api_url"] = pr_url - log_context["event"] = "pull_request" + log_context["webhook_event_type"] = "pull_request" commands_to_run = [] @@ -284,8 +289,15 @@ async def handle_webhook_parallel(request: Request): if comment_text == "/inspect": get_logger().info("/inspect command detected, expanding to all commands") commands_to_run.extend(_get_commands_list_from_settings('BITBUCKET_SERVER.PR_COMMANDS')) - else: + elif comment_text.startswith("/"): commands_to_run.append(comment_text) + else: + # Not a command, ignore + get_logger().info(f"Ignoring non-command comment: {comment_text[:50]}...", **log_context) + return JSONResponse( + status_code=status.HTTP_200_OK, + content=jsonable_encoder({"message": "Comment ignored (not a command)"}) + ) else: return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, @@ -298,16 +310,88 @@ async def handle_webhook_parallel(request: Request): async def inner_parallel(): """Inner function that runs the commands with semaphore control""" + task_id = id(asyncio.current_task()) + start_time = time.time() + try: + # Register task + async with active_review_tasks_lock: + active_review_tasks[task_id] = { + "pr_url": pr_url, + "created_at": start_time, + "status": "waiting", + "wait_start": start_time, + } + queue_depth = len(active_review_tasks) + + get_logger().info( + "Review task created", + event="review_task_created", + task_id=task_id, + pr_url=pr_url, + queue_depth=queue_depth, + **log_context_copy + ) + if review_semaphore: + wait_start = time.time() + async with review_semaphore: - get_logger().info(f"Starting parallel review for {pr_url}", **log_context_copy) + wait_time = time.time() - wait_start + + # Update status and get counts + async with active_review_tasks_lock: + active_review_tasks[task_id]["status"] = "processing" + active_review_tasks[task_id]["processing_start"] = time.time() + active_tasks = len([t for t in active_review_tasks.values() if t["status"] == "processing"]) + waiting_tasks = len([t for t in active_review_tasks.values() if t["status"] == "waiting"]) + + get_logger().info( + "Review task processing", + event="review_task_processing", + task_id=task_id, + pr_url=pr_url, + wait_time_seconds=round(wait_time, 2), + active_reviews=active_tasks, + waiting_reviews=waiting_tasks, + **log_context_copy + ) + await _run_commands_sequentially(commands_copy, pr_url, log_context_copy) else: get_logger().info(f"Starting parallel review (no semaphore) for {pr_url}", **log_context_copy) await _run_commands_sequentially(commands_copy, pr_url, log_context_copy) + except Exception as e: - get_logger().error(f"Failed to handle webhook in parallel: {e}", **log_context_copy) + async with active_review_tasks_lock: + if task_id in active_review_tasks: + active_review_tasks[task_id]["status"] = "failed" + get_logger().error( + "Review task failed", + event="review_task_failed", + task_id=task_id, + pr_url=pr_url, + error=str(e), + **log_context_copy + ) + finally: + # Cleanup and log completion + duration = time.time() - start_time + async with active_review_tasks_lock: + status = active_review_tasks.get(task_id, {}).get("status", "unknown") + active_review_tasks.pop(task_id, None) + remaining_tasks = len(active_review_tasks) + + get_logger().info( + "Review task completed", + event="review_task_completed", + task_id=task_id, + pr_url=pr_url, + duration_seconds=round(duration, 2), + status=status, + remaining_tasks=remaining_tasks, + **log_context_copy + ) # Use asyncio.create_task for true concurrent execution asyncio.create_task(inner_parallel()) diff --git a/pr_agent/tools/pr_add_docs.py b/pr_agent/tools/pr_add_docs.py index 3ec97b31ce..61cb8e7acf 100644 --- a/pr_agent/tools/pr_add_docs.py +++ b/pr_agent/tools/pr_add_docs.py @@ -64,7 +64,7 @@ async def run(self): get_logger().info('Pushing PR documentation...') self.git_provider.remove_initial_comment() get_logger().info('Pushing inline code documentation...') - self.push_inline_docs(data) + await self.push_inline_docs(data) except Exception as e: get_logger().error(f"Failed to generate code documentation for PR, error: {e}") @@ -101,7 +101,7 @@ def _prepare_pr_code_docs(self) -> Dict: data = {'Code Documentation': data} return data - def push_inline_docs(self, data): + async def push_inline_docs(self, data): docs = [] if not data['Code Documentation']: @@ -116,7 +116,7 @@ def push_inline_docs(self, data): documentation = d['documentation'] doc_placement = d['doc placement'].strip() if documentation: - new_code_snippet = self.dedent_code(relevant_file, relevant_line, documentation, doc_placement, + new_code_snippet = await self.dedent_code(relevant_file, relevant_line, documentation, doc_placement, add_original_line=True) body = f"**Suggestion:** Proposed documentation\n```suggestion\n" + new_code_snippet + "\n```" @@ -133,11 +133,13 @@ def push_inline_docs(self, data): for doc_suggestion in docs: self.git_provider.publish_code_suggestions([doc_suggestion]) - def dedent_code(self, relevant_file, relevant_lines_start, new_code_snippet, doc_placement='after', + async def dedent_code(self, relevant_file, relevant_lines_start, new_code_snippet, doc_placement='after', add_original_line=False): try: # dedent code snippet - self.diff_files = self.git_provider.diff_files if self.git_provider.diff_files \ - else self.git_provider.get_diff_files() + if self.git_provider.diff_files: + self.diff_files = self.git_provider.diff_files + else: + self.diff_files = await self.git_provider.get_diff_files() original_initial_line = None for file in self.diff_files: if file.filename.strip() == relevant_file: diff --git a/pr_agent/tools/pr_code_suggestions.py b/pr_agent/tools/pr_code_suggestions.py index 3029207416..0c0f59bb25 100644 --- a/pr_agent/tools/pr_code_suggestions.py +++ b/pr_agent/tools/pr_code_suggestions.py @@ -459,7 +459,7 @@ async def analyze_self_reflection_response(self, data, response_reflect): suggestion["score"] = 7 suggestion["score_why"] = "" - suggestion = self.validate_one_liner_suggestion_not_repeating_code(suggestion) + suggestion = await self.validate_one_liner_suggestion_not_repeating_code(suggestion) # if the before and after code is the same, clear one of them try: @@ -559,7 +559,7 @@ async def push_inline_code_suggestions(self, data): label = d['label'].strip() if new_code_snippet: - new_code_snippet = self.dedent_code(relevant_file, relevant_lines_start, new_code_snippet) + new_code_snippet = await self.dedent_code(relevant_file, relevant_lines_start, new_code_snippet) if d.get('score'): body = f"**Suggestion:** {content} [{label}, importance: {d.get('score')}]\n```suggestion\n" + new_code_snippet + "\n```" @@ -578,10 +578,12 @@ async def push_inline_code_suggestions(self, data): for code_suggestion in code_suggestions: self.git_provider.publish_code_suggestions([code_suggestion]) - def dedent_code(self, relevant_file, relevant_lines_start, new_code_snippet): + async def dedent_code(self, relevant_file, relevant_lines_start, new_code_snippet): try: # dedent code snippet - self.diff_files = self.git_provider.diff_files if self.git_provider.diff_files \ - else self.git_provider.get_diff_files() + if self.git_provider.diff_files: + self.diff_files = self.git_provider.diff_files + else: + self.diff_files = await self.git_provider.get_diff_files() original_initial_line = None for file in self.diff_files: if file.filename.strip() == relevant_file: @@ -618,7 +620,7 @@ def dedent_code(self, relevant_file, relevant_lines_start, new_code_snippet): return new_code_snippet - def validate_one_liner_suggestion_not_repeating_code(self, suggestion): + async def validate_one_liner_suggestion_not_repeating_code(self, suggestion): try: existing_code = suggestion.get('existing_code', '').strip() if '...' in existing_code: @@ -626,7 +628,7 @@ def validate_one_liner_suggestion_not_repeating_code(self, suggestion): new_code = suggestion.get('improved_code', '').strip() relevant_file = suggestion.get('relevant_file', '').strip() - diff_files = self.git_provider.get_diff_files() + diff_files = await self.git_provider.get_diff_files() for file in diff_files: if file.filename.strip() == relevant_file: # protections diff --git a/pr_agent/tools/pr_description.py b/pr_agent/tools/pr_description.py index a6387a9243..43ccecf202 100644 --- a/pr_agent/tools/pr_description.py +++ b/pr_agent/tools/pr_description.py @@ -72,7 +72,7 @@ def __init__(self, pr_url: str, args: list = None, "custom_labels_class": "", # will be filled if necessary in 'set_custom_labels' function "enable_semantic_files_types": get_settings().pr_description.enable_semantic_files_types, "related_tickets": "", - "include_file_summary_changes": len(self.git_provider.get_diff_files()) <= self.COLLAPSIBLE_FILE_LIST_THRESHOLD, + "include_file_summary_changes": True, # Will be calculated in run() "duplicate_prompt_examples": get_settings().config.get("duplicate_prompt_examples", False), "enable_pr_diagram": enable_pr_diagram, } @@ -101,6 +101,10 @@ async def run(self): if get_settings().config.publish_output and not get_settings().config.get('is_auto_command', False): self.git_provider.publish_comment("Preparing PR description...", is_temporary=True) + # Calculate include_file_summary_changes + diff_files = await self.git_provider.get_diff_files() + self.vars["include_file_summary_changes"] = len(diff_files) <= self.COLLAPSIBLE_FILE_LIST_THRESHOLD + # ticket extraction if exists await extract_and_cache_pr_tickets(self.git_provider, self.vars) @@ -123,9 +127,9 @@ async def run(self): get_logger().debug(f"Publishing labels disabled") if get_settings().pr_description.use_description_markers: - pr_title, pr_body, changes_walkthrough, pr_file_changes = self._prepare_pr_answer_with_markers() + pr_title, pr_body, changes_walkthrough, pr_file_changes = await self._prepare_pr_answer_with_markers() else: - pr_title, pr_body, changes_walkthrough, pr_file_changes = self._prepare_pr_answer() + pr_title, pr_body, changes_walkthrough, pr_file_changes = await self._prepare_pr_answer() if not self.git_provider.is_supported( "publish_file_comments") or not get_settings().pr_description.inline_file_summary: pr_body += "\n\n" + changes_walkthrough + "___\n\n" @@ -337,7 +341,7 @@ async def extend_uncovered_files(self, original_prediction: str) -> str: filenames_predicted = [] # extend the prediction with additional files not included in the original prediction - pr_files = self.git_provider.get_diff_files() + pr_files = await self.git_provider.get_diff_files() prediction_extra = "pr_files:" MAX_EXTRA_FILES_TO_OUTPUT = 100 counter_extra_files = 0 @@ -495,7 +499,7 @@ def _prepare_labels(self) -> List[str]: get_logger().error(f"Error converting labels to original case {self.pr_id}: {e}") return pr_labels - def _prepare_pr_answer_with_markers(self) -> Tuple[str, str, str, List[dict]]: + async def _prepare_pr_answer_with_markers(self) -> Tuple[str, str, str, List[dict]]: get_logger().info(f"Using description marker replacements {self.pr_id}") # Remove the 'PR Title' key from the dictionary @@ -532,7 +536,7 @@ def _prepare_pr_answer_with_markers(self) -> Tuple[str, str, str, List[dict]]: pr_file_changes = [] if ai_walkthrough and not re.search(r'', body): try: - walkthrough_gfm, pr_file_changes = self.process_pr_files_prediction(walkthrough_gfm, + walkthrough_gfm, pr_file_changes = await self.process_pr_files_prediction(walkthrough_gfm, self.file_label_dict) body = body.replace('pr_agent:walkthrough', walkthrough_gfm) except Exception as e: @@ -546,7 +550,7 @@ def _prepare_pr_answer_with_markers(self) -> Tuple[str, str, str, List[dict]]: return title, body, walkthrough_gfm, pr_file_changes - def _prepare_pr_answer(self) -> Tuple[str, str, str, List[dict]]: + async def _prepare_pr_answer(self) -> Tuple[str, str, str, List[dict]]: """ Prepare the PR description based on the AI prediction data. @@ -599,7 +603,7 @@ def _prepare_pr_answer(self) -> Tuple[str, str, str, List[dict]]: if self.git_provider.is_supported("gfm_markdown"): pr_body += "\n" elif 'pr_files' in key.lower() and get_settings().pr_description.enable_semantic_files_types: # 'File Walkthrough' section - changes_walkthrough_table, pr_file_changes = self.process_pr_files_prediction(changes_walkthrough, value) + changes_walkthrough_table, pr_file_changes = await self.process_pr_files_prediction(changes_walkthrough, value) if get_settings().pr_description.get('file_table_collapsible_open_by_default', False): initial_status = " open" else: @@ -656,7 +660,7 @@ def _prepare_file_labels(self): pass return file_label_dict - def process_pr_files_prediction(self, pr_body, value): + async def process_pr_files_prediction(self, pr_body, value): pr_comments = [] # logic for using collapsible file list use_collapsible_file_list = get_settings().pr_description.collapsible_file_list @@ -698,7 +702,7 @@ def process_pr_files_prediction(self, pr_body, value): filename_publish = f"{filename_publish}" diff_plus_minus = "" delta_nbsp = "" - diff_files = self.git_provider.get_diff_files() + diff_files = await self.git_provider.get_diff_files() for f in diff_files: if f.filename.lower().strip('/') == filename.lower().strip('/'): num_plus_lines = f.num_plus_lines diff --git a/pr_agent/tools/pr_line_questions.py b/pr_agent/tools/pr_line_questions.py index f373a4a11f..9bd5ef6c40 100644 --- a/pr_agent/tools/pr_line_questions.py +++ b/pr_agent/tools/pr_line_questions.py @@ -78,7 +78,7 @@ async def run(self): side=side ) else: - diff_files = self.git_provider.get_diff_files() + diff_files = await self.git_provider.get_diff_files() for file in diff_files: if file.filename == file_name: self.patch_with_lines, self.selected_lines = extract_hunk_lines_from_patch(file.patch, file.filename, diff --git a/pr_agent/tools/pr_reviewer.py b/pr_agent/tools/pr_reviewer.py index c4917f3597..687f5d4ae1 100644 --- a/pr_agent/tools/pr_reviewer.py +++ b/pr_agent/tools/pr_reviewer.py @@ -157,7 +157,7 @@ async def run(self) -> None: self.git_provider.remove_initial_comment() return None - pr_review = self._prepare_pr_review() + pr_review = await self._prepare_pr_review() get_logger().debug(f"PR output", artifact=pr_review) should_publish = get_settings().config.publish_output and self._should_publish_review_no_suggestions(pr_review) @@ -226,7 +226,7 @@ async def _get_prediction(self, model: str) -> str: return response - def _prepare_pr_review(self) -> str: + async def _prepare_pr_review(self) -> str: """ Prepare the PR review by processing the AI prediction and generating a markdown-formatted text that summarizes the feedback. @@ -258,7 +258,7 @@ def _prepare_pr_review(self) -> str: markdown_text = convert_to_markdown_v2(data, self.git_provider.is_supported("gfm_markdown"), incremental_review_markdown_text, git_provider=self.git_provider, - files=self.git_provider.get_diff_files()) + files=await self.git_provider.get_diff_files()) # Add help text if gfm_markdown is supported if self.git_provider.is_supported("gfm_markdown") and get_settings().pr_reviewer.enable_help_text: diff --git a/pyproject.toml b/pyproject.toml index 3e44a1f2ea..6fe59576df 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "pr-agent" -version = "1.4.0" +version = "1.5.0" authors = [{ name = "QodoAI", email = "ofir.f@qodo.ai" }] diff --git a/tests/unittest/test_bitbucket_server_webhook.py b/tests/unittest/test_bitbucket_server_webhook.py new file mode 100644 index 0000000000..e3c261d01d --- /dev/null +++ b/tests/unittest/test_bitbucket_server_webhook.py @@ -0,0 +1,174 @@ +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from pr_agent.servers.bitbucket_server_webhook import ( + active_review_tasks, + active_review_tasks_lock, +) + + +class TestBitbucketServerWebhook: + """Tests for Bitbucket Server webhook parallel endpoint""" + + @pytest.mark.asyncio + async def test_task_tracking_lifecycle(self): + """Test that tasks are properly tracked through their lifecycle""" + # Clear any existing tasks + async with active_review_tasks_lock: + active_review_tasks.clear() + + # Simulate task lifecycle + task_id = 12345 + pr_url = "https://git.rakuten-it.com/projects/TEST/repos/test-repo/pull-requests/1" + + # Task created + async with active_review_tasks_lock: + active_review_tasks[task_id] = { + "pr_url": pr_url, + "created_at": time.time(), + "status": "waiting", + "wait_start": time.time(), + } + queue_depth = len(active_review_tasks) + + assert queue_depth == 1 + assert active_review_tasks[task_id]["status"] == "waiting" + assert active_review_tasks[task_id]["pr_url"] == pr_url + + # Task processing + async with active_review_tasks_lock: + active_review_tasks[task_id]["status"] = "processing" + active_review_tasks[task_id]["processing_start"] = time.time() + + assert active_review_tasks[task_id]["status"] == "processing" + assert "processing_start" in active_review_tasks[task_id] + + # Task completed + async with active_review_tasks_lock: + active_review_tasks.pop(task_id, None) + remaining = len(active_review_tasks) + + assert remaining == 0 + assert task_id not in active_review_tasks + + @pytest.mark.asyncio + async def test_multiple_concurrent_tasks(self): + """Test tracking multiple concurrent tasks""" + # Clear any existing tasks + async with active_review_tasks_lock: + active_review_tasks.clear() + + # Simulate 3 concurrent tasks + task_ids = [1001, 1002, 1003] + pr_urls = [ + "https://git.rakuten-it.com/projects/TEST/repos/test-repo/pull-requests/1", + "https://git.rakuten-it.com/projects/TEST/repos/test-repo/pull-requests/2", + "https://git.rakuten-it.com/projects/TEST/repos/test-repo/pull-requests/3", + ] + + # Create tasks + for task_id, pr_url in zip(task_ids, pr_urls): + async with active_review_tasks_lock: + active_review_tasks[task_id] = { + "pr_url": pr_url, + "created_at": time.time(), + "status": "waiting", + "wait_start": time.time(), + } + + # Verify all tasks are tracked + async with active_review_tasks_lock: + assert len(active_review_tasks) == 3 + waiting_count = len([t for t in active_review_tasks.values() if t["status"] == "waiting"]) + assert waiting_count == 3 + + # Move first task to processing + async with active_review_tasks_lock: + active_review_tasks[task_ids[0]]["status"] = "processing" + active_review_tasks[task_ids[0]]["processing_start"] = time.time() + active_count = len([t for t in active_review_tasks.values() if t["status"] == "processing"]) + waiting_count = len([t for t in active_review_tasks.values() if t["status"] == "waiting"]) + + assert active_count == 1 + assert waiting_count == 2 + + # Complete first task + async with active_review_tasks_lock: + active_review_tasks.pop(task_ids[0], None) + remaining = len(active_review_tasks) + + assert remaining == 2 + + # Cleanup + async with active_review_tasks_lock: + active_review_tasks.clear() + + @pytest.mark.asyncio + async def test_task_status_counts(self): + """Test accurate counting of active vs waiting tasks""" + # Clear any existing tasks + async with active_review_tasks_lock: + active_review_tasks.clear() + + # Create 5 tasks: 2 processing, 3 waiting + async with active_review_tasks_lock: + for i in range(5): + status = "processing" if i < 2 else "waiting" + active_review_tasks[i] = { + "pr_url": f"https://git.rakuten-it.com/projects/TEST/repos/test-repo/pull-requests/{i}", + "created_at": time.time(), + "status": status, + "wait_start": time.time(), + } + + active_count = len([t for t in active_review_tasks.values() if t["status"] == "processing"]) + waiting_count = len([t for t in active_review_tasks.values() if t["status"] == "waiting"]) + + assert active_count == 2 + assert waiting_count == 3 + assert len(active_review_tasks) == 5 + + # Cleanup + async with active_review_tasks_lock: + active_review_tasks.clear() + + @pytest.mark.asyncio + async def test_task_failure_tracking(self): + """Test that failed tasks are properly marked""" + # Clear any existing tasks + async with active_review_tasks_lock: + active_review_tasks.clear() + + task_id = 9999 + pr_url = "https://git.rakuten-it.com/projects/TEST/repos/test-repo/pull-requests/999" + + # Create task + async with active_review_tasks_lock: + active_review_tasks[task_id] = { + "pr_url": pr_url, + "created_at": time.time(), + "status": "waiting", + "wait_start": time.time(), + } + + # Mark as failed + async with active_review_tasks_lock: + if task_id in active_review_tasks: + active_review_tasks[task_id]["status"] = "failed" + + assert active_review_tasks[task_id]["status"] == "failed" + + # Cleanup (simulating finally block) + async with active_review_tasks_lock: + status = active_review_tasks.get(task_id, {}).get("status", "unknown") + active_review_tasks.pop(task_id, None) + + assert status == "failed" + assert task_id not in active_review_tasks + + # Cleanup + async with active_review_tasks_lock: + active_review_tasks.clear() diff --git a/uv.lock b/uv.lock index a0bca58111..da276ee864 100644 --- a/uv.lock +++ b/uv.lock @@ -1705,7 +1705,7 @@ wheels = [ [[package]] name = "pr-agent" -version = "1.4.0" +version = "1.5.0" source = { editable = "." } dependencies = [ { name = "aiohttp" },