### Github Repo RAG

In [None]:
import numpy as np
import typing
import dotenv
import json
import os
dotenv.load_dotenv()

To implement a RAG system over a code repository, we first need to access its files. There are two alternatives for this:

**Clone the Repository Locally**: The code below generates a JSON with the content of the files by cloning the repository locally. If you don't want to clone the repository, omit this chunk.


In [None]:
import subprocess
from pathlib import Path

# URL of the repository to clone
repo_url = "https://github.com/viarotel-org/escrcpy"
repo_dir = "repo_temp"

# Clone the repository if it doesn't exist
if not os.path.exists(repo_dir):
    subprocess.run(["git", "clone", repo_url, repo_dir])

# Get the files in the repository
def get_repo_files(directory):
    files_dict = {}
    for file_path in Path(directory).rglob("*"):
        if file_path.is_file():
            try:
                with open(file_path, "r", encoding="utf-8", errors='ignore') as f:
                    content = f.read()
                relative_path = file_path.relative_to(directory)
                files_dict[str(relative_path)] = content
            except Exception as e:
                print(f"Error reading {file_path}: {e}")
    return files_dict

files_dict = get_repo_files(repo_dir)

# Save the files in a JSON file
with open('./data/files.json', 'w', encoding='utf-8') as f:
    json.dump(files_dict, f, ensure_ascii=False, indent=4)

print(f"Total files obtained: {len(files_dict)}")


**Use the GitHub API**: The second alternative is to use the GitHub API to get access to the files, but you need to be logged in to GitHub.

In [None]:
from repo_api import get_github_files

url = "https://github.com/viarotel-org/escrcpy"
token = os.getenv("GITHUB_TOKEN")

files_dict = get_github_files(url, token)

# Save de dict as a JSON (to not exhaust the API calls)
with open('./data/files.json', 'w', encoding='utf-8') as f:
    json.dump(files_dict, f, ensure_ascii=False, indent=4)

# Show results
print(f"Total obtained files: {len(files_dict)}")
for path, content in list(files_dict.items())[:3]:  # Show first 3 files head as example
    print(f"\nFile: {path}")
    print(f"Content (first 100 characters): {content[:100]}...")

### Divide into chunks
These files are mostly code resources, which can contain a lot of information irrelevant to a specific query and cause noise. To reduce this noise and improve the similarity of the embeddings, we first divide the files into chunks. The size of these chunks determines the balance between context and relevance. Very small chunks may not provide enough context, while very large chunks may include too much irrelevant information. The chosen size is 2500 characters per chunk to create aproximately 7 chunks per file.

It is also important to not cut the chunks in bad ways. For this reason, we use RecursiveCharacterTextSplitter from langchain, which preserves larger units, which will benefit the structure of the code as functions or classes as well as other files structured in paragraphs such as markdown.

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter 
from langchain.schema import Document

def get_chunks(doc, chunk_size=2500, chunk_overlap=50) -> typing.List[dict]:
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, # Size of each chunk in characters
        chunk_overlap=chunk_overlap, # Overlap between consecutive chunks
        length_function=len, # Function to compute the length of the text
        add_start_index=True, # Flag to add start index to each chunk
    )
    # Split document into smaller chunks using text splitter
    chunks = text_splitter.split_documents(doc)

    return chunks

if 'files_dict' not in globals():
    with open('./data/files.json', 'r', encoding='utf-8') as f:
        files_dict = json.load(f)

chunked_files = [
    Document(page_content=file_content, metadata={"source": file_path})
    for file_path, file_content in files_dict.items()
]
chunks = get_chunks(chunked_files, chunk_size=2500, chunk_overlap=50)

### Start Indexing

Before we can perform any searches, we need to index the repository files. This involves loading the encoder model and generating embeddings for each chunk of text.

### Load the encoder_model

The encoder model is loaded using the `SentenceTransformer` from the `sentence_transformers` library. The model is loaded onto the GPU if available, otherwise, it defaults to the CPU. The model is configured to use float16 precision to reduce memory usage and speed up inference.


In [55]:
from sentence_transformers import SentenceTransformer

try:
    import torch
    device = "cuda" if torch.cuda.is_available() else "cpu"
except ImportError:
    device = "cpu"
# Load the model with float16 support to reduce memory usage and speed up inference
encoder_model = SentenceTransformer("ibm-granite/granite-embedding-107m-multilingual", device=device, model_kwargs={"torch_dtype": "float16"})
# Verifies the use of the GPU
print(f"Using device: {encoder_model.device}")


#"ibm-granite/granite-embedding-107m-multilingual"

Using device: cuda:0


### Normalize text
Once we have the chunks, we need to normalize the text before encoding it, to ensure that the embeddings are consistent. We will use the langchain library to normalize the text, where we preserve the original indetation while removing extra spaces and newlines.

In [56]:
import re

def normalize_code(code: str) -> str:
    code = code.replace('\r\n', '\n').replace('\r', '\n')
    # Preserve indentation while normalizing the rest of the line
    lines = []
    for line in code.split('\n'):
        # Count initial spaces
        leading_space_count = len(line) - len(line.lstrip(' \t'))
        leading_space = line[:leading_space_count]   
        # Normalize the rest of the line
        rest_of_line = re.sub(r'[ \t]+', ' ', line[leading_space_count:])
        lines.append(leading_space + rest_of_line)
        
    code = '\n'.join(lines)
    # Eliminate consecutive blank lines
    code = re.sub(r'\n{3,}', '\n\n', code)
    
    return code.strip()

### Save embeddings
We create an embedding for each chunk using the `SentenceTransformer` model. The embeddings are stored in a ChromaDB collection for efficient similarity search. The process involves normalizing the code, generating embeddings in batches, and adding them to the ChromaDB collection.

ChromaDB is used because it provides a highly efficient and scalable solution for similarity search, allowing us to quickly retrieve the most relevant chunks of text based on their embeddings. This is crucial for handling large code repositories and ensuring fast and accurate search results.

In [99]:
from tqdm import tqdm
import chromadb

batch_size = 128

# Delete the existing ChromaDB collection if it exists
chroma_client = chromadb.PersistentClient(path="./chroma_db")

if chroma_client.get_collection("escrcpy_repo_embeddings"):
    chroma_client.delete_collection("escrcpy_repo_embeddings")

chroma_collection = chroma_client.get_or_create_collection(
    name="escrcpy_repo_embeddings",
    metadata={"hnsw:space": "cosine"},
)

# Process in batches
for i in tqdm(range(0, len(chunks), batch_size), desc="Processing Chunks"):
    batch = chunks[i:i+batch_size]
    batch_texts = [normalize_code(chunk.page_content) for chunk in batch]
    batch_embeddings = encoder_model.encode(batch_texts)
    
    # Prepare batch data for ChromaDB
    batch_ids = [f"chunk_{i + j}" for j in range(len(batch))]
    batch_documents = [chunk.page_content for chunk in batch]
    batch_metadatas = [{
        "source": chunk.metadata["source"],
        "start_index": chunk.metadata.get("start_index", 0)
    } for chunk in batch]
    
    # Add entire batch at once
    chroma_collection.add(
        embeddings=[emb.tolist() for emb in batch_embeddings],
        documents=batch_documents,
        metadatas=batch_metadatas,
        ids=batch_ids
    )

Processing Chunks: 100%|██████████| 653/653 [07:34<00:00,  1.44it/s]


### Retrieve
Once we have created the Chroma database, we need to create a function to retrieve similar encoded files using a natural language query. This function will allow us to search the repository for relevant files based on the embeddings generated from the text chunks. To avoid repeating the same file, the code concatenates chunks from the same source and ensures receiving the desired results from different files.

In [None]:
def search_repository(question, n_results=5, chroma_collection=None):  
    if chroma_collection is None:
        chroma_collection = chroma_client.get_collection("escrcpy_repo_embeddings")
    
    # Normalize the question text and generate embedding
    normalized_question = normalize_code(question)
    question_embedding = encoder_model.encode(normalized_question)
    
    combined_results = {}
    seen_sources = set()
    max_attempts = n_results * 3  # Avoid infinite loop
    offset = 0
    
    # Keep querying until we have n_results unique sources or reach max_attempts
    while len(combined_results) < n_results and offset < max_attempts:
        # Calculate how many more results we need
        remaining = n_results - len(combined_results)
        
        results = chroma_collection.query(
            query_embeddings=[question_embedding.tolist()],
            n_results=remaining + offset,  # Get extra results to account for duplicates
            include=["documents", "metadatas", "distances"]
        )
        
        for i in range(len(results["documents"][0])):
            source = results["metadatas"][0][i]["source"]
            content = results["documents"][0][i]
            similarity = 1 - results["distances"][0][i]
            
            # Skip if we've already seen this source
            if source in seen_sources:
                combined_results[source]["content"] += "\n\n" + content
                combined_results[source]["similarity"] = max(combined_results[source]["similarity"], similarity)
            
            else:
                seen_sources.add(source)
                combined_results[source] = {
                    "content": content,
                    "source": source,
                    "similarity": similarity
                }   
        # Increase offset to get new results in the next query
        offset += remaining
    
    # Convert dictionary back to list and sort by similarity
    formatted_results = list(combined_results.values())
    formatted_results.sort(key=lambda x: x["similarity"], reverse=True)
    
    return formatted_results[:n_results]


q = "How does the repository handle IPv6 addresses in ADB commands?"
results = search_repository(q, n_results=5)

print(f"Most related file")
print(results[0]["source"])

Most related file
electron\resources\extra\linux\android-platform-tools\adb


Now that we have finished the indexing part, we import the test JSON to evaluate the performance.

In [None]:
with open("./data/escrcpy-commits-generated.json", "r") as f:
        test_data = json.load(f)

### Evaluation
The test JSON contains a list of questions and their corresponding files. We will use this data to assess the accuracy and recall of our search system. The evaluation process involves running each question through the query function and comparing the predicted sources with the actual files. First we create a dataframe from and add the predicted files.


In [117]:
import pandas as pd

df_test = pd.DataFrame(test_data)
df_evaluation = df_test.copy()
questions = df_test["question"].tolist()

sources = []
for query in questions:
    metadata = search_repository(query, n_results=10)
    source = [result["source"] for result in metadata]
    sources.append(source)

    sources = [[source.replace("\\", "/") for source in source_list] for source_list in sources]

df_evaluation["predicted_sources"] = sources

display(df_evaluation.head())

Unnamed: 0,files,question,predicted_sources
0,[src/components/PreferenceForm/components/Sele...,How does the SelectDisplay component handle th...,[src/components/PreferenceForm/components/Sele...
1,"[electron/exposes/adb/helpers/index.js, electr...",How does the repository handle IPv6 addresses ...,[electron/resources/extra/linux/android-platfo...
2,[electron/helpers/edger/index.js],How does the edge hiding and snapping mechanis...,"[electron/main.js, electron/helpers/edger/inde..."
3,"[README-CN.md, README-RU.md, README.md]",Unable to detect device,[electron/resources/extra/win/android-platform...
4,[src/pages/device/components/MirrorAction/inde...,What functionality does the component provide ...,"[src/locales/languages/zh-CN.json, electron/re..."


Once we have the predictions, we need to calculate the recall@10 metric. This metric measures how many relevant files were matched from all possible relevant files, considering only the first 10 retrieved files. 



In [None]:
def recall_at_10(relevant, retrieved):
    top_10 = retrieved[:10]
    relevant_set = set(relevant)
    retrieved_set = set(top_10)

    # Calculate hits and total relevant items
    hits = len(relevant_set & retrieved_set)
    total_relevant = len(relevant_set)
    
    return hits / total_relevant

df_recall = df_evaluation.copy()
df_recall["recall@10"] = df_recall.apply(
    lambda row: recall_at_10(row["files"], row["predicted_sources"]),
    axis=1
)

average_recall = df_recall["recall@10"].mean()
print(f"Average Recall@10: {average_recall:.2f}")

df_recall.to_csv("./data/evaluation_results.csv", index=False)

Average Recall@10: 0.49


After trying many embedding models, chunk sizes, and different distances, the maximum recall achieved has been 0.45. Attempts to add query expansion did not improve the metric. Analyzing the synthetic dataset, we can see that there are many general queries as well as many files related to others, which can lead to lower recall scores.

### LLM-Generated Answer Summaries for Retrieved Code

In this section, we integrate a Language Model (LLM) to generate concise and relevant summaries for the retrieved code snippets. This helps in understanding the context and functionality of the code without manually inspecting each file.

#### Steps to Generate Summaries:

1. **Search Repository**: We first search the repository using a natural language query to retrieve the most relevant code snippets. This is done using the `search_repository` function which leverages the embeddings stored in ChromaDB.

2. **Generate Context**: The retrieved code snippets are concatenated to form a context that provides a comprehensive view of the relevant code.

3. **Formulate Prompt**: A prompt is created for the LLM, instructing it to generate a concise answer to the query using the provided context.

4. **Generate Answer**: The LLM processes the prompt and generates a summary that directly answers the query, utilizing the context from the retrieved code snippets.

#### Example implementation with gemini-2.0-flash:


In [98]:
from google import genai
api_key = os.getenv("LLM_API_KEY")

def generate_answer(question):
    results = search_repository(question, n_results=5)
    context = "\n\n".join([result["content"] for result in results])

    prompt = f"Answer this question: {question} directly. If this context files are helpful you can use them to reinforce your answer, answer concisely:\n\n{context}\n\n"
    
    client = genai.Client(api_key=api_key)

    response = client.models.generate_content(
        model="gemini-2.0-flash",
        contents=prompt)

    return response.text

print(generate_answer("How is the sponsor dialog implemented to display sponsorship images dynamically?"))

The sponsor dialog displays sponsorship images dynamically using the `imageList` array in the component's data. This array holds objects with `src` (image path) and `alt` (alt text) properties.  The `v-for` directive iterates through this array, creating an `el-image` component for each item.  The `src` and `alt` properties of each image are bound to the corresponding properties in the `imageList` item. The image sources are directly imported into the component.

