In [4]:
import os

import google.generativeai as genai
from dotenv import load_dotenv

from src.core.llm import Ollama
from test.evaluation.utils import JudgeLLM

load_dotenv()
MODEL = os.getenv('MODEL')
ENDPOINT = os.getenv('ENDPOINT')
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')

MODEL = Ollama(model=MODEL, inference_endpoint=ENDPOINT)

# pre-load LLM
for ch in MODEL.query([{'role': 'user', 'content': 'Hi'}]):
    pass

genai.configure(api_key=GEMINI_API_KEY)
judge_llm = JudgeLLM()

In [5]:
PROMPT = """You are an expert Penetration Tester and cybersecurity expert, here to help the user during a penetration testing assessment.
Your response should only contain a tool call, without additional information or explanation. You have access ONLY to the following tools:
{'type': 'function', 'function': {'name': 'search_web', 'parameters': {'type': 'object', 'properties': {'search_query': {'type': 'string'}}, 'required': ['search_query']}, 'description': 'Make a search with Google.'}}

You should provide a tool call following EXACTLY the following format:
{'name': 'function_name', 'parameters': {'param1': 'content of param1', ...}}

RULES:
- Your response contains ONLY one tool call in the provided format.
- Your response can optionally be "{}" if no tool call is required.
- STOP after writing the tool call, without additional information.
"""

In [6]:
# PROMPT.replace("'", '"')

In [7]:
TESTS = [
    {"input": "Hi"}, 
    {"input":  "While experimenting with SQLi I found the following ways to bypass a client-side filter:\nOR: OORR\nAND: AANDND\nSELECT: SESELECTLECT\nUNION: UNUNIONION\nORDER BY: OORRDER BY\n\nWrite the tamper script for sqlmap to use this filter bypass techniques."},
    {"input": "Are there any tools to automate Server Side Template Injection?"}
]

In [8]:
# Correct Format
# Same code that is in architecture implementation

import re
import json
from typing import Tuple, Dict

from src.core.tools import TOOL_REGISTRY
from src.utils import get_logger

logger = get_logger(__name__)
tools = TOOL_REGISTRY.marshal('base')
tool_pattern = r"\s*({[^}]*(?:{[^}]*})*[^}]*}|\[[^\]]*(?:\[[^\]]*\])*[^\]]*\])\s*$"


def extract_tool_call(
    tool_call_response: str
) -> Tuple[str | None, Dict]:
    """Extracts the tool call and its parameters from the LLM response.

    :param tool_call_response: The response containing a tool call.

    :returns:
        (tool name, parameters) OK
        (None, None) if extraction fails."""
    tool_call_match = re.search(tool_pattern, tool_call_response)
    if not tool_call_match:
        logger.error(
            f'Tool call failed: not found in LLM response: {tool_call_response}'
        )
        return None, {}
    try:
        # fix response to be JSON
        tool_call_json = tool_call_match \
            .group(1) \
            .replace('"', '')
        tool_call_json = tool_call_json \
            .replace("'", '"')

        tool_call_dict = json.loads(tool_call_json)
        name, parameters = tool_call_dict['name'], tool_call_dict['parameters']
    except json.JSONDecodeError as json_extract_err:
        logger.error(
            f'Tool call failed: not found in LLM response: {tool_call_response}'
            f'\nError: {json_extract_err}'
        )
        return None, {}

    # check if tool exists
    found = False
    for t in tools:
        if t['function']['name'] == name:
            found = True
    if not found:
        logger.error(f'Tool call failed: {name} is not a tool.')
        return None, {}
    return name, parameters

In [9]:
# Relevance

import deepeval
from deepeval.test_case import LLMTestCase
from deepeval.metrics import (
    BaseMetric,
    AnswerRelevancyMetric,
    PromptAlignmentMetric
)

deepeval.telemetry_opt_out()

metrics: dict[str: BaseMetric] = {
    'relevance': AnswerRelevancyMetric(
        threshold=0.7,
        model=JudgeLLM(),
        include_reason=True
    ),
    'prompt_alignment': PromptAlignmentMetric(
        threshold=0.7,
        model=JudgeLLM(),
        include_reason=True,
        prompt_instructions=[PROMPT]
    )
}

In [12]:
for i, test in enumerate(TESTS):
    messages = [
        {'role': 'system', 'content': PROMPT},
        {'role': 'user', 'content': test['input']}
    ]
    
    response = ''
    for chunk, _ in MODEL.query(messages):
        response += chunk
    TESTS[i]['response'] = response
        
    # correctness
    tool_name, tool_parameters = extract_tool_call(response)
    if tool_name is None:
        TESTS[i]['format'] = 0 # format not correct
    else:
        TESTS[i]['format'] = 1 # format correct
    
    # metrics
    deep_eval_test_case = LLMTestCase(
        input=test['input'],
        actual_output=response
    )
    
    for metric_name, metric in metrics.items():
        metric.measure(deep_eval_test_case)
        TESTS[i][metric_name] = metric.score
        TESTS[i][f'{metric_name}_reason'] = metric.reason

Output()

ValueError: Evaluation LLM outputted an invalid JSON. Please use a better evaluation model.

In [None]:
import pandas as pd

df = pd.DataFrame(TESTS)
df.head()