In [None]:
%pip install llama-index langfuse

In [3]:
from dotenv import load_dotenv

load_dotenv()

True

# Simple rag for generating evaluation case

## Retrive chunks

In [14]:
import pandas as pd
import requests
import json
import time

def search_by_semantic(query, top_k=10):
    """
    Query the similar chunks by semantic.

    :return: The API response as a List object if successful, otherwise an error message.
    List of:
     - text_content
     - source_uri
     - source_name
     - relevance_score
    """
    url = "https://tidb.ai/api/v1/indexes/default/retrieve"
    headers = {
        'Content-Type': 'application/json',
    }
    data = {
        "top_k": top_k,
        "search_top_k": 150,
        "query": query,
    }

    max_retries = 20  # Maximum number of retries
    retry_delay = 5  # Delay between retries in seconds

    for attempt in range(max_retries):
        try:
            response = requests.post(url, headers=headers, data=json.dumps(data))
            response.raise_for_status()  # This will raise an HTTPError if the response was an error
            return response.json() # Return the successful response as JSON
        except requests.exceptions.HTTPError as err:
            if err.response.status_code == 500 or err.response.status_code == 504:
                print(f"Attempt {attempt + 1} of {max_retries}: HTTP {err.response.status_code} Server Error - {err.response}. Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
            else:
                # Properly raising an exception with a formatted message
                raise
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1} of {max_retries}: Request Error. Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)

    # If the loop completes without returning, raise an exception
    raise Exception("Error: Max retries reached. The request failed to complete successfully.")


## generate answer

In [15]:
from typing import List
from langfuse.decorators import langfuse_context, observe
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager

from llama_index.core.schema import (
    NodeWithScore,
    TextNode,
)

@observe()
def generation_bench(id: str, question: str, chunks:List[str]) -> str:
    # Set callback manager for LlamaIndex, will apply to all LlamaIndex executions in this function
    langfuse_handler = langfuse_context.get_current_llama_index_handler()
    Settings.callback_manager = CallbackManager([langfuse_handler])

    nodes = [
        NodeWithScore(
            node=TextNode(
                text=chunk
            )
        ) for chunk in chunks
    ]

    response = ""
    with Settings.callback_manager.as_trace(id):
        synthesizer = CompactAndRefine()
        response = synthesizer.synthesize(question, nodes)

    return response

## run RAG

In [16]:
def run_case(question):
    relevant_chunks = search_by_semantic(question)
    relevant_chunks_df = pd.DataFrame(relevant_chunks)
    chunks = [chunk for chunk in relevant_chunks_df['text']]
    response = generation_bench("test", question, chunks)
    return response

In [17]:
questions = [
    "what's tidb？",
]


for question in questions:
    answer = run_case(question)
    print(answer)

TiDB is an advanced open-source, distributed SQL database engineered to support Hybrid Transactional and Analytical Processing (HTAP) workloads.


# RAG evaluation

In [9]:
from openai import OpenAI
import json
oai = OpenAI()


def call_openai_evaluation(output, expected_output):
    instruction = (
       "You are a teacher, one of your job is to evalute student's answer based on the reference answer which is correct.\n"
       "your evaluation result should be a score between 0-10, 10 is completely correct, and 0 is complete wrong. You don't need to pursue the same words, as long as the meaning is the same."
    )

    user_message = (
        f"student answer:\n{output}\n\n"
        f"reference answer (correct answer):\n{expected_output}\n\n"
        "Now, please begin to evaluate whether student answer is correct, output your evaluation into a json List {'score': ...}!"
    )

    messages = [{
        "role": "system",
        "content": instruction,
    },{
        "role": "user",
        "content": user_message,
    }]

    response = oai.chat.completions.create(
        response_format={ "type": "json_object" },
        messages=messages,
        model="gpt-4-0125-preview",
    )
    return response.choices[0].message.content

def simple_evaluation(output, expected_output):
  response =  call_openai_evaluation(output, expected_output)
  data = json.loads(response)
  return data['score']

In [11]:
from datetime import datetime
from langfuse import Langfuse
 
# init
langfuse = Langfuse()
 
def run_app(input):
  generationStartTime = datetime.now()
 
  openai_completion = generation_bench(input['args'][0], input['args'][1], input['args'][2])
 
  langfuse_generation = langfuse.generation(
    name="tidb.ai llm generation",
    input=input,
    output=openai_completion,
    model="gpt-3.5-turbo",
    start_time=generationStartTime,
    end_time=datetime.now()
  )
 
  return openai_completion, langfuse_generation

In [12]:
def run_experiment(experiment_name):
  dataset = langfuse.get_dataset("answer generation")
 
  for item in dataset.items:
    completion, langfuse_generation = run_app(item.input)
 
    item.link(langfuse_generation, experiment_name) # pass the observation/generation object or the id
 
    langfuse_generation.score(
      name="relevance",
      value=simple_evaluation(completion, item.expected_output)
    )

In [13]:
run_experiment("test")