In [None]:
import os
import json
import random
import sys
from typing import List, Dict
import warnings

src_path = os.path.abspath(os.path.join(os.getcwd(), '..'))
if src_path not in sys.path:
    sys.path.append(src_path)


from utils.helper import load_env
from haystack import Pipeline
from haystack.components.builders import PromptBuilder
from haystack_integrations.components.generators.ollama import OllamaGenerator
from haystack_integrations.components.generators.anthropic import AnthropicGenerator
from haystack.utils import Secret

In [None]:
# Ignore notebook warnings
warnings.filterwarnings('ignore')
load_env()
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY")

In [3]:
def get_project_root() -> str:
    """Get project root path consistently"""
    current_dir = os.getcwd()
    return os.path.abspath(os.path.join(current_dir, '..', '..')) if current_dir.endswith('notebooks') else current_dir

def parse_diff_content(diff_content: str) -> List[Dict]:
    """Parse diff content into structured format"""
    files = []
    current_file = None
    current_diff = []

    for line in diff_content.split('\n'):
        if line.startswith('diff --git'):
            if current_file:
                files.append({
                    "filename": current_file,
                    "diff": '\n'.join(current_diff)
                })
            current_file = line.split()[-1].replace('b/', '')
            current_diff = []
        else:
            current_diff.append(line)

    # Add the last file
    if current_file:
        files.append({
            "filename": current_file,
            "diff": '\n'.join(current_diff)
        })

    return files

def read_reviewer_data(reviewer_name: str, repo_name: str) -> List[Dict]:
    """Read reviewer's PR data from processed files"""
    project_root = get_project_root()
    enriched_prs_path = os.path.join(project_root, 'data', 'processed', repo_name, f"{repo_name}_enriched_prs.json")

    if not os.path.exists(enriched_prs_path):
        raise FileNotFoundError(f"The file {enriched_prs_path} does not exist.")

    with open(enriched_prs_path, 'r', encoding='utf-8') as f:
        all_prs = json.load(f)

    # Filter PRs reviewed by the given reviewer
    reviewer_prs = [
        pr for pr in all_prs
        if reviewer_name in pr.get('reviewers', [])
    ]

    # Filter out confidential PRs (either True or missing classification)
    non_confidential_prs = [
        pr for pr in reviewer_prs
        if pr.get('confidentiality', {}).get('is_confidential') == False
    ]

    # Score PRs based on engagement and changes
    scored_prs = []
    for pr in non_confidential_prs:
        # Count reviews and comments by this reviewer
        review_count = sum(1 for review in pr.get('reviews', [])
                          if review.get('user_login') == reviewer_name)
        comment_count = sum(1 for comment in pr.get('review_comments', [])
                          if comment.get('user_login') == reviewer_name)

        # Count lines changed if diff_content exists
        lines_changed = 0
        if pr.get('diff_content'):
            lines_changed = len([line for line in pr['diff_content'].split('\n')
                               if line.startswith('+') or line.startswith('-')])

        # Calculate engagement score
        engagement_score = (review_count * 5) + (comment_count * 2) + (lines_changed * 0.1)

        scored_prs.append({
            **pr,
            'engagement_score': engagement_score,
            'reviewer_login': reviewer_name  # Add reviewer info for template
        })

    # Sort by engagement score and take top N (e.g., top 100)
    top_prs = sorted(scored_prs, key=lambda x: x['engagement_score'], reverse=True)[:100]

    print(f"Found {len(reviewer_prs)} total PRs reviewed by {reviewer_name}")
    print(f"Found {len(non_confidential_prs)} non-confidential PRs")
    print(f"Selected top {len(top_prs)} PRs based on engagement score")

    # Randomly select 60 PRs from the top 100
    prs = random.sample(top_prs, min(60, len(top_prs)))

    return prs

In [4]:
prompt_template = """
Analyze the following pull request reviews by a single reviewer. Based on these reviews, create a comprehensive set of instructions for reviewing pull requests that authentically mimics the reviewer's unique style. Focus on the following aspects:

1. Review Focus Areas:
   - Code structure and organization
   - Performance considerations
   - Security implications
   - Testing requirements
   - Documentation standards

2. Review Style:
   - Level of detail in comments
   - Tone of feedback (e.g., direct, suggestive, questioning)
   - Balance between positive and critical feedback
   - Use of code examples in suggestions

3. Common Patterns:
   - Recurring concerns or emphasis
   - Preferred coding patterns
   - Common suggestions for improvement
   - Typical areas of scrutiny

4. Technical Depth:
   - Level of technical discussion
   - Focus on implementation details
   - Architectural considerations
   - Performance optimization suggestions

5. Communication Style:
   - Comment formatting and structure
   - Use of questions vs. direct statements
   - References to documentation or best practices
   - Interaction with PR authors

6. Review Process:
   - Thoroughness of review
   - Focus on specific file types or areas
   - Sequential vs. holistic review approach
   - Follow-up patterns

7. Best Practices Enforcement:
   - Coding standards emphasized
   - Testing requirements
   - Documentation expectations
   - Error handling preferences

8. Unique Characteristics:
   - Signature phrases or expressions
   - Distinctive ways of providing feedback
   - Special attention areas
   - Personal review checklist items

Here are the reviews to analyze:

{% for pr in prs %}
PR #{{ pr.pr_id }}:
Title: {{ pr.title }}
Body: {{ pr.body }}

Reviews:
{% for review in pr.reviews %}
{% if review.user_login == pr.reviewer_login %}
- {{ review.body }}
{% endif %}
{% endfor %}

Comments:
{% for comment in pr.review_comments %}
{% if comment.user_login == pr.reviewer_login %}
- {{ comment.body }}
{% endif %}
{% endfor %}

{% endfor %}

Based on these reviews, provide a detailed set of instructions for reviewing pull requests that convincingly mimics this reviewer's style. The instructions should be comprehensive enough that another reviewer could use them to provide feedback in a similar manner.

Your response should be in the following format:

Review Instructions for [Reviewer Name]:
1. [First instruction]
2. [Second instruction]
3. [Third instruction]
...

Ensure your instructions cover all the aspects mentioned above, providing specific examples from the reviewer's comments where relevant. The goal is to capture not just the superficial elements of the review style, but also the underlying technical focus and approach to code review.
"""

In [5]:
def save_instructions_to_disk(reviewer_name: str, repo_name: str, instruction_type: str, instructions: str):
    """Save reviewer instructions to disk"""
    project_root = get_project_root()
    instructions_dir = os.path.join(project_root, 'data', 'reviewer_instructions', repo_name)
    os.makedirs(instructions_dir, exist_ok=True)

    file_path = os.path.join(instructions_dir, f"{reviewer_name}_{instruction_type}_instructions.txt")
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(instructions)
    print(f"{instruction_type.capitalize()} instructions saved to {file_path}")


def load_instructions_from_disk(reviewer_name: str, repo_name: str, instruction_type: str = "review") -> str:
    """Load reviewer instructions from disk"""
    project_root = get_project_root()
    file_path = os.path.join(project_root, 'data', 'reviewer_instructions', repo_name,
                            f"{reviewer_name}_{instruction_type}_instructions.txt")

    if os.path.exists(file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    return None

def generate_reviewer_prompt(reviewer_name: str, repo_name: str, instruction_type: str = "review") -> str:
    print(f"Generating {instruction_type} prompt for reviewer: {reviewer_name}")
    prs = read_reviewer_data(reviewer_name, repo_name)

    if not prs:
        return f"No PR reviews found for {reviewer_name}"

    input_dict = {
        "prs": prs
    }

    prompt_builder = PromptBuilder(template=prompt_template)
    generator = AnthropicGenerator(
        api_key=Secret.from_env_var("ANTHROPIC_API_KEY"),
        model="claude-3-5-sonnet-20241022",
        generation_kwargs={
            "max_tokens": 8192,
            "temperature": 0.3
        }
    )

    pipeline = Pipeline()
    pipeline.add_component(instance=prompt_builder, name="prompt_builder")
    pipeline.add_component(instance=generator, name="llm")
    pipeline.connect("prompt_builder", "llm")

    result = pipeline.run(data=input_dict)
    review_instructions = result["llm"]["replies"][0]

    print(f"{instruction_type.capitalize()} instructions generated for {reviewer_name}")
    save_instructions_to_disk(reviewer_name, repo_name, instruction_type, review_instructions)
    return review_instructions

In [None]:
reviewers = ["gelbal"]
for reviewer_name in reviewers:
    repo_name = "nosara"
    instruction_type = "review"
    instructions = generate_reviewer_prompt(reviewer_name, repo_name, instruction_type)
    print(instructions)