In [2]:
import streamlit as st
from llama_index.core.prompts import RichPromptTemplate
from llama_index.llms.ollama import Ollama
from llama_index.llms.openai import OpenAI
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.llms import ChatMessage, TextBlock, ImageBlock
from llama_index.embeddings.ollama import OllamaEmbedding

import time
import os
from PIL import Image
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import WebDriverException
import logging
from llama_index.core.callbacks import LlamaDebugHandler, CallbackManager
from llama_index.core.agent.workflow import ReActAgent,FunctionAgent
from llama_index.core.tools import FunctionTool
from llama_index.core.settings import Settings
from pathlib import Path
from git import Repo
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    StorageContext,
    load_index_from_storage,
    set_global_handler
)


In [3]:
!ollama list

NAME                        ID              SIZE      MODIFIED     
qwen3:8b                    500a1f067a9f    5.2 GB    28 hours ago    
qwen3:14b                   bdbd181c33f2    9.3 GB    28 hours ago    
deepseek-r1:latest          6995872bfe4c    5.2 GB    47 hours ago    
gemma3:4b                   a2af6cc3eb7f    3.3 GB    6 days ago      
mxbai-embed-large:latest    468836162de7    669 MB    9 days ago      
gemma3:12b                  f4031aab637d    8.1 GB    2 weeks ago     
nomic-embed-text:latest     0a109f422b47    274 MB    3 weeks ago     


In [4]:
model_provider="Ollama (Local)"
thinking=False
model_name="gemma3:12b"
temperature=0.1
context_window=8096
max_tokens=context_window

def get_llm():
    if model_provider == "Ollama (Local)":
        if thinking:
            return Ollama(
                model=model_name,
                temperature=temperature,
                context_window=context_window,
                request_timeout=500,
                thinking=thinking
            )
        else:
            return Ollama(
                model=model_name,
                temperature=temperature,
                context_window=context_window,
                request_timeout=500
            )
    else:
        return OpenAI(
            model=model_name,
            temperature=temperature,
            max_tokens=max_tokens,
            request_timeout=500
        )

In [5]:
def get_ollamaEmbeddings():
    embed_model=OllamaEmbedding(
    model_name="mxbai-embed-large:latest",
    embed_batch_size=32
    )
    return embed_model


def set_ollama():
    llm=get_llm()
    embed_model=get_ollamaEmbeddings()
    Settings.llm=llm
    Settings.embed_model=embed_model
    

# Initialize or load index

def _load_init_index(repo_path):
    """Load existing index"""
    index_path=repo_path+'/.code_index'
    try:
        storage_context = StorageContext.from_defaults(persist_dir=str(index_path))
        index = load_index_from_storage(storage_context)
    except:
    # Load all Python files
        documents = SimpleDirectoryReader(
            input_dir=str(repo_path),
            required_exts=[".py"],
            recursive=True
        ).load_data()
        
        # Create and save index
        index = VectorStoreIndex.from_documents(documents)
        index.storage_context.persist(persist_dir=str(index_path))
    finally:
        return index

def _create_agent():
    """Create a ReAct agent with custom tools"""
    # Define tools
    check_changes_tool = FunctionTool.from_defaults(fn=check_code_changes)
    generate_test_cases_tool = FunctionTool.from_defaults(fn=generate_test_cases)
    update_index_tool = FunctionTool.from_defaults(fn=_load_init_index)
    generate_tests_tool = FunctionTool.from_defaults(fn=generate_tests)
    
    agent = FunctionAgent(
        tools=[check_changes_tool,generate_test_cases_tool,generate_tests_tool,update_index_tool],
        llm=get_llm(),
        verbose=True,
        CallbackManager=CallbackManager
    )

    return agent

def check_code_changes(repo_path:str,since_commit: str = "HEAD~1",agentic=True) -> str:
    """
    Check what Python files have changed since a specific commit.
    
    Args:
        since_commit: The commit to compare against (default: previous commit)
        
    Returns:
        String describing the changed files and their diffs
    """
    
    try:
        repo = Repo(repo_path)
        print(f"Repository: {repo_path}")
        print(f"Comparing: {since_commit} ({repo.commit(since_commit).hexsha[:7]}) to HEAD ({repo.head.commit.hexsha[:7]})")
    except Exception as e:
        print(f"Error opening repository at {repo_path}: {str(e)}")

    # Get changed files
    changed_files = []
    diffs = repo.commit(since_commit).diff(repo.head.commit)
    if not diffs:
        print("No differences found between the commits")
    else:
        for diff in diffs:
            file_path = getattr(diff, "a_path", None) or getattr(diff, "b_path", None)
            # Some diffs may not have a diff attribute (e.g., binary files or renames)
            diff_content = ""
            if hasattr(diff, "diff") and diff.diff:
                try:
                    diff_content = diff.diff.decode("utf-8", errors="replace")
                except Exception:
                    diff_content = "<Could not decode diff>"
            print(f"Found changes in {file_path} ({getattr(diff, 'change_type', '?')})")
            if diff_content:
                print(f"Diff length: {len(diff_content)} bytes")
            # If diff_content is empty, try to get the diff using repo.git.diff as a fallback
            if not diff_content and file_path:
                try:
                    diff_content = repo.git.diff(f"{since_commit}..HEAD", "--", file_path)
                except Exception:
                    diff_content = "<Could not retrieve diff via git command>"
            changed_files.append({
                "path": file_path,
                "change_type": getattr(diff, "change_type", "?"),
                "diff": diff_content,
            })

    if not changed_files:
        return "No Python files have changed since the specified commit."
        
    # Format output
    if agentic:
        output = ["Changed Python files:"]
        for file in changed_files:
            output.append(f"\nFile: {file['path']} ({file['change_type']})")
            output.append(f"Diff:\n{file['diff']}")
        return "\n".join(output)
    else:
        output = []
        for file in changed_files:
            output.append(f"{file['path']}")
        return output
        

def generate_tests( repo_path,file_path: str, framework: str = "pytest",streaming=False) -> str:
    """
    Generate tests for a specific Python file.
    
    Args:
        file_path: Path to the Python file
        framework: Testing framework to use (default: pytest)
        
    Returns:
        Generated test code as a string
    """
    # Read the file content
    repopath=Path(repo_path)
    full_path = repopath / file_path
    if not full_path.exists():
        return f"Error: File {file_path} does not exist."
        
    with open(full_path, "r") as f:
        code_content = f.read()
    
    # Use LLM to generate tests
    prompt = f"""
    Analyze the following Python code and generate comprehensive test cases using {framework}.
    Focus on edge cases, input validation, and expected behavior.
    
    Code:
    {code_content}
    
    Please return only the test code with appropriate imports, no additional explanation.
    The tests should be production-ready and follow best practices.
    """
    llm=Settings.llm
    if streaming:
        response=llm.stream_complete(prompt)
        return response
    else:
        response=llm.complete(prompt)
        return response


def generate_test_cases(index,changes: str,streaming=False) -> str:
    """
    Generate test cases for a specific Python file.
    
    Args:
        file_path: Path to the Python file
        
    Returns:
        Generated test cases as a markup format
    """
    
    # Use LLM to generate tests
    prompt = f"""
        You are an expert test engineer who excels in creating test cases for IT products, 
        Analyze the following Python code and create test cases, Focusing on edge cases, input validation, and expected behavior.
        
        Code:
        {changes}
        
        The test cases should follow the following markdown format: 
"| Test Case ID | TC-PERF-001 |\n",
"|--------------|------------|\n",
"| **Title** | Verify API response time under load |\n",
"| **Description** | Ensure GraphQL API responds within 500ms under expected load |\n",
"| **Preconditions** | Load testing environment set up |\n",
"| **Test Steps** | 1. Simulate concurrent users accessing API endpoints<br>2. Measure response times |\n",
"| **Expected Result** | API response times remain below 500ms under load |\n",
"\n",
"---\n",
"\n",
""" 
    qe=index.as_query_engine(streaming=streaming, similarity_top_k=1)
    if streaming:
        return qe.query(prompt).response_gen
    else:
        return qe.query(prompt).response



# Configure Chrome options
def get_chrome_options():
    chrome_options = Options()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument("--window-size=1920,1080")
    return chrome_options


In [6]:
Settings.llm=get_llm()
Settings.embed_model=get_ollamaEmbeddings()

In [7]:
repoPath="snake/build"

In [8]:
index=_load_init_index(repo_path=repoPath)

Loading llama_index.core.storage.kvstore.simple_kvstore from snake/build/.code_index/docstore.json.
Loading llama_index.core.storage.kvstore.simple_kvstore from snake/build/.code_index/index_store.json.


In [9]:
changes=check_code_changes(repoPath,agentic=True)

Repository: snake/build
Comparing: HEAD~1 (3f602dd) to HEAD (fca8f74)
Found changes in snake.py (M)


In [16]:
from IPython.display import display,Markdown
Markdown(f'{changes}')

Changed Python files:

File: snake.py (M)
Diff:
diff --git a/snake.py b/snake.py
index 62f7b09..b801b3b 100644
--- a/snake.py
+++ b/snake.py
@@ -24,7 +24,7 @@ pygame.display.set_caption('Snake Game')
 clock = pygame.time.Clock()
 
 # Snake block size and speed
-snake_block = 10
+snake_block = 20
 snake_speed = 15
 
 # Font styles

In [23]:
resp=generate_tests(repo_path=repoPath,file_path=changes[0],streaming=False)

In [27]:
Markdown("dsfsdf\n\n\n---\n\newrewr")

dsfsdf


---

ewrewr

In [26]:
from IPython.display import Markdown,display
change=changes[0]
Markdown(f"***Generating unit test for: {change}***")

***Generating unit test for: snake.py***

In [28]:
str([1,2,3])

'[1, 2, 3]'

In [29]:
Markdown(resp.text+"\n---\n")

```python
import pytest
import pygame
from pygame import event
from pygame import display
from pygame import time
from pygame import draw
from pygame import font
import random

# Mocking pygame to avoid actual display and dependencies
class MockSurface:
    def __init__(self, width, height):
        self.width = width
        self.height = height

    def fill(self, color):
        pass

    def blit(self, value, position):
        pass

    def get_width(self):
        return self.width

    def get_height(self):
        return self.height

pygame.display.set_mode = lambda x, y: MockSurface(x, y)
pygame.font.SysFont = lambda font_name, font_size: lambda text, antialias, color: text
pygame.draw.rect = lambda surface, color, rect, width: None
pygame.init = lambda: None
pygame.quit = lambda: None
pygame.time.Clock = lambda: None

def your_score(score):
    """Display the current score"""
    value = font.render("Your Score: " + str(score), True, (0, 0, 0))
    display.get_surface().blit(value, [0, 0])

def our_snake(snake_block, snake_list):
    """Draw the snake on the screen"""
    for x in snake_list:
        draw.rect(display.get_surface(), (0, 255, 0), [x[0], x[1], 35, 35])

def message(msg, color):
    """Display a message on the screen"""
    mesg = font.render(msg, True, color)
    display.get_surface().blit(mesg, [display.get_surface().get_width() / 6, display.get_surface().get_height() / 3])

def game_loop():
    """Main game loop"""
    game_over = False
    game_close = False

    # Initial snake position
    x1 = display.get_surface().get_width() / 2
    y1 = display.get_surface().get_height() / 2

    # Snake movement
    x1_change = 0
    y1_change = 0

    # Snake body
    snake_list = []
    length_of_snake = 1

    # Food position
    foodx = round(random.randrange(0, display.get_surface().get_width() - 35) / 10.0) * 10.0
    foody = round(random.randrange(0, display.get_surface().get_height() - 35) / 10.0) * 10.0

    while not game_over:

        while game_close == True:
            display.get_surface().fill((255, 255, 255))
            message("You Lost! Press Q-Quit or C-Play Again", (213, 50, 80))
            your_score(length_of_snake - 1)
            display.update()

            for event in event.get():
                if event.type == pygame.KEYDOWN:
                    if event.key == pygame.K_q:
                        game_over = True
                        game_close = False
                    if event.key == pygame.K_c:
                        game_loop()

        for event in event.get():
            if event.type == pygame.QUIT:
                game_over = True
            if event.type == pygame.KEYDOWN:
                if event.key == pygame.K_LEFT:
                    x1_change = -35
                    y1_change = 0
                elif event.key == pygame.K_RIGHT:
                    x1_change = 35
                    y1_change = 0
                elif event.key == pygame.K_UP:
                    y1_change = -35
                    x1_change = 0
                elif event.key == pygame.K_DOWN:
                    y1_change = 35
                    x1_change = 0

        # Check for boundary collision
        if x1 >= display.get_surface().get_width() or x1 < 0 or y1 >= display.get_surface().get_height() or y1 < 0:
            game_close = True

        # Update snake position
        x1 += x1_change
        y1 += y1_change
        display.get_surface().fill((255, 255, 255))
        
        # Draw food
        draw.rect(display.get_surface(), (213, 50, 80), [foodx, foody, 35, 35])
        
        # Update snake body
        snake_head = []
        snake_head.append(x1)
        snake_head.append(y1)
        snake_list.append(snake_head)
        
        # Remove extra segments if snake hasn't eaten
        if len(snake_list) > length_of_snake:
            del snake_list[0]

        # Check for self collision
        for x in snake_list[:-1]:
            if x == snake_head:
                game_close = True

        # Draw snake and score
        our_snake(35, snake_list)
        your_score(length_of_snake - 1)

        display.update()

        # Check if snake ate food
        if x1 == foodx and y1 == foody:
            foodx = round(random.randrange(0, display.get_surface().get_width() - 35) / 10.0) * 10.0
            foody = round(random.randrange(0, display.get_surface().get_height() - 35) / 10.0) * 10.0
            length_of_snake += 1

        time.Clock().tick(15)

    pygame.quit()
    quit()

@pytest.fixture
def mock_pygame():
    """Fixture to mock pygame for testing."""
    pass

def test_your_score(mock_pygame):
    """Test the your_score function."""
    assert True  # Placeholder, add actual test logic

def test_our_snake(mock_pygame):
    """Test the our_snake function."""
    assert True  # Placeholder, add actual test logic

def test_message(mock_pygame):
    """Test the message function."""
    assert True  # Placeholder, add actual test logic

def test_game_loop_boundary_collision(mock_pygame):
    """Test game loop boundary collision."""
    assert True  # Placeholder, add actual test logic

def test_game_loop_key_presses(mock_pygame):
    """Test game loop key presses."""
    assert True  # Placeholder, add actual test logic

def test_game_loop_food_consumption(mock_pygame):
    """Test game loop food consumption."""
    assert True  # Placeholder, add actual test logic
```
---


In [44]:
resp=generate_tests(repo_path=repoPath,file_path=changes[0],streaming=True)

In [45]:
full_response=''
for response in resp:
    full_response += response.delta
    Markdown(full_response)
    time.sleep(0.01)

KeyboardInterrupt: 

In [None]:
from IPython.display import Markdown,display
full_response = ""
for txt in resp:
    tst=txt.text
    #print(tst)
    full_response+=tst
    display(Markdown(tst))
    time.sleep(0.01)

In [None]:
qe=index.as_query_engine(streaming=True, similarity_top_k=1)

In [None]:
resp=qe.query(f"You are an expert test engineer who excels in creating test cases\
    create test cases for the following code changes\
    Code:\
    {changes}")

In [None]:
messages=''
for txt in resp.response_gen:
    messages+=txt
    Markdown(messages)

In [None]:
Markdown(qe.query(f"You are an expert test engineer who excels in creating test cases\
    create test cases for the following code changes\
    Code:\
    {changes}").print_response_stream())

In [None]:
full_response=""
for response in gen:
    full_response += response.delta
    time.sleep(0.01)
    Markdown(full_response)

In [None]:
from IPython.display import display, Markdown
Markdown(resp.response)

In [None]:
resp=qe.query(f"""
        You are an expert test engineer who excels in creating test cases for IT products, 
        Analyze the following Python code and create test cases, Focusing on edge cases, input validation, and expected behavior.
        
        Code:
        {changes}
        
        The test cases should follow the following format: 
        Test Case ID	Description	Priority	Steps	Expected Result	Pass/Fail	Notes
        """)

In [None]:
qe=index.as_query_engine(streaming=True, similarity_top_k=1)

In [None]:
response=qe.query(f"""
        You are an expert test engineer who excels in creating test cases for IT products, 
        Analyze the following Python code and create test cases, Focusing on edge cases, input validation, and expected behavior.
        
        Code:
        {changes}
        
        The test cases should follow the following markdown format: 
"| Test Case ID | TC-PERF-001 |\n",
"|--------------|------------|\n",
"| **Title** | Verify API response time under load |\n",
"| **Description** | Ensure GraphQL API responds within 500ms under expected load |\n",
"| **Preconditions** | Load testing environment set up |\n",
"| **Test Steps** | 1. Simulate concurrent users accessing API endpoints<br>2. Measure response times |\n",
"| **Expected Result** | API response times remain below 500ms under load |\n",
"\n",
"---\n",
"\n",
""")


In [None]:
import html
resp=""

for text in response.get_response:
    resp+=text
    Markdown(html(resp))

In [None]:
resp=qe.a(f"""
        You are an expert test engineer who excels in creating test cases for IT products, 
        Analyze the following Python code and create test cases, Focusing on edge cases, input validation, and expected behavior.
        
        Code:
        {changes}
        
        The test cases should follow the following markdown format: 
"| Test Case ID | TC-PERF-001 |\n",
"|--------------|------------|\n",
"| **Title** | Verify API response time under load |\n",
"| **Description** | Ensure GraphQL API responds within 500ms under expected load |\n",
"| **Preconditions** | Load testing environment set up |\n",
"| **Test Steps** | 1. Simulate concurrent users accessing API endpoints<br>2. Measure response times |\n",
"| **Expected Result** | API response times remain below 500ms under load |\n",
"\n",
"---\n",
"\n",
""")


In [None]:
Settings.llm