<a href="https://colab.research.google.com/github/graphlit/graphlit-evals/blob/main/tonic-validate/Tonic_Validate_Graphlit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --upgrade tonic_validate

In [None]:
!pip install --upgrade graphlit-client

In [None]:
import os
from typing import Optional
from tonic_validate import ValidateScorer, Benchmark, BenchmarkItem, LLMResponse, BenchmarkItem, Run
from tonic_validate.metrics import AnswerSimilarityMetric
from concurrent.futures import ThreadPoolExecutor
import os
import time
import json
import pandas as pd
import matplotlib.pyplot as plt
from google.colab import userdata

In [None]:
from google.colab import drive

drive.mount('/content/drive', force_remount=True)

# configure shared, writable folder containing sample data
tonic_validate_directory = "/content/drive/MyDrive/Colab Notebooks/Tonic Validate"

Initialize Graphlit

In [None]:
from graphlit import Graphlit
from graphlit_api import input_types, enums, exceptions

os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

os.environ['GRAPHLIT_ORGANIZATION_ID'] = userdata.get('GRAPHLIT_ORGANIZATION_ID')
os.environ['GRAPHLIT_ENVIRONMENT_ID'] = userdata.get('GRAPHLIT_ENVIRONMENT_ID')
os.environ['GRAPHLIT_JWT_SECRET'] = userdata.get('GRAPHLIT_JWT_SECRET')

graphlit = Graphlit()

Load benchmark questions & answers

In [None]:
with open(f"{tonic_validate_directory}/Sample Data/qa_pairs.json", "r") as f:
    qa_pairs = json.load(f)

# for testing
qa_pairs = qa_pairs

benchmark = Benchmark(
    questions=[x["question"] for x in qa_pairs],
    answers=[x["answer"] for x in qa_pairs]
)

def run_to_dataframe(run: Run) -> pd.DataFrame:
    return pd.DataFrame(
        {
            "reference_question": [x.reference_question for x in run.run_data],
            "reference_answer": [x.reference_answer for x in run.run_data],
            "llm_answer": [x.llm_answer for x in run.run_data],
            "llm_context": [json.dumps(x.llm_context) for x in run.run_data],
            "answer_similarity": [x.scores["answer_similarity"] for x in run.run_data]
        }
    )

Define Graphlit helper functions

In [None]:
import asyncio
import time
from datetime import datetime

async def upload_graphlit_essays():
    if graphlit.client is None:
        return;

    directory = f"{tonic_validate_directory}/Sample Data/paul_graham_essays"

    response = await graphlit.client.create_collection(input_types.CollectionInput(name="Paul Graham Essays"))

    collection_id = response.create_collection.id if response.create_collection is not None else None

    if collection_id is not None:
        file_paths = [os.path.join(directory, filename)
                    for filename in os.listdir(directory)
                    if os.path.isfile(os.path.join(directory, filename))]

        start_time = time.time()

        tasks = []
        for file_path in file_paths:
            file_name = os.path.basename(file_path)
            content_name, _ = os.path.splitext(file_name)

            with open(file_path, "r", encoding='utf-8') as file:
                file_content = file.read()

            task = graphlit.client.ingest_text(content_name, file_content, text_type=enums.TextTypes.PLAIN,
                is_synchronous=True, collections=[input_types.EntityReferenceInput(id=collection_id)])
            tasks.append(task)

        results = await asyncio.gather(*tasks)

        duration = time.time() - start_time

        current_time = datetime.now()
        formatted_time = current_time.strftime("%H:%M:%S")

        print(f"Uploading essays took {duration:.2f} seconds. Finished at {formatted_time} UTC.")

        return collection_id
    else:
        return None

async def create_specification():
    if graphlit.client is None:
        return;

    input = input_types.SpecificationInput(
        name="Completion",
        type=enums.SpecificationTypes.COMPLETION,
        serviceType=enums.ModelServiceTypes.OPEN_AI,
        openAI=input_types.OpenAIModelPropertiesInput(model=enums.OpenAIModels.GPT4_TURBO_128K),
        searchType=enums.ConversationSearchTypes.VECTOR,
        numberSimilar=25,
#        promptStrategy=input_types.PromptStrategyInput(
#            type=enums.PromptStrategyTypes.REWRITE_QUESTION
#        ),
        retrievalStrategy=input_types.RetrievalStrategyInput(
            type=enums.RetrievalStrategyTypes.CHUNK,
            contentLimit=10,
        ),
        rerankingStrategy=input_types.RerankingStrategyInput(
            serviceType=enums.RerankingModelServiceTypes.COHERE
        )
    )

    try:
        response = await graphlit.client.create_specification(input)

        return response.create_specification.id if response.create_specification is not None else None
    except exceptions.GraphQLClientError as e:
        print(str(e))
        return None

# NOTE: for cleaning up project data
async def delete_all_collections():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_collections()

    print('Deleted all collections.')

async def delete_all_contents():
    if graphlit.client is None:
        return;

    _ = await graphlit.client.delete_all_contents()

    print('Deleted all contents.')

Define Graphlit RAG function

In [None]:
async def get_graphlit_rag_response(benchmarkItem: BenchmarkItem, specification_id: str, collection_id: str):
    if graphlit.client is None:
        return;

    prompt = benchmarkItem.question

    input = input_types.ConversationInput(name="Conversation", specification=input_types.EntityReferenceInput(id=specification_id),
        filter=input_types.ContentCriteriaInput(collections=[input_types.EntityReferenceInput(id=collection_id)]))

    conversation_id = None

    try:
        response = await graphlit.client.create_conversation(input)

        conversation_id = response.create_conversation.id if response.create_conversation is not None else None
    except exceptions.GraphQLClientHttpError as e:
        print(str(e))

    if conversation_id is None:
        print('Failed to create conversation.')
        return None

    print(f'Created conversation [{conversation_id}].')

    try:
        response = await graphlit.client.prompt_conversation(prompt, conversation_id)

        message = response.prompt_conversation.message if response.prompt_conversation is not None else None

        return message
    except exceptions.GraphQLClientHttpError as e:
        print(str(e))
    finally:
        _ = await graphlit.client.delete_conversation(conversation_id)

Initialize Graphlit test

In [None]:
# NOTE: this will delete all contents and collections in project
await delete_all_contents()
await delete_all_collections()

# Initialize specification
specification_id = await create_specification()

if specification_id is not None:
    print(f'Created specification [{specification_id}].')
else:
    print('Failed to create specification.')

In [None]:
collection_id = None

# Upload all essays
try:
    collection_id = await upload_graphlit_essays()

    print(f'Essays ingested into collection [{collection_id}].')
except exceptions.GraphQLClientHttpError as e:
    print(str(e))

Validate Graphlit response

In [None]:
benchmark_item = BenchmarkItem(
    question="In what month and year was the talk regarding Lisp for web-based applications given?",
    answer=""
)

if collection_id is not None and specification_id is not None:
    message = await get_graphlit_rag_response(benchmark_item, specification_id, collection_id)

    if message is not None:
        print(message.message)
        print(f'Tokens: {message.tokens}, took [{message.completion_time}]')

Perform Graphlit test and score run

In [None]:
import asyncio

async def run_test(specification_id, collection_id):
    tasks = []
    for item in benchmark.items:
        task = get_graphlit_rag_response(item, specification_id, collection_id)
        tasks.append(task)

    return await asyncio.gather(*tasks)

messages = await run_test(specification_id, collection_id)

raw_graphlit_responses = []

for message in messages:
    raw_graphlit_responses.append(message.message.strip() if message is not None else None)

In [None]:
graphlit_responses = [
    LLMResponse(
        llm_answer=r, llm_context_list=[], benchmark_item=bi
    ) for r, bi in zip(raw_graphlit_responses, benchmark.items)
]

In [None]:
scorer = ValidateScorer(model_evaluator="gpt-4-turbo", metrics=[AnswerSimilarityMetric()])
graphlit_run = scorer.score_run(graphlit_responses, parallelism=5)

In [None]:
graphlit_run_df = run_to_dataframe(graphlit_run)
graphlit_run_df.to_csv(f"{tonic_validate_directory}/graphlit_run.csv", index=False)

Visualize Graphlit test

In [None]:
graphlit_answer_similarity_scores = pd.Series([x.scores["answer_similarity"] for x in graphlit_run.run_data])
category_counts = graphlit_answer_similarity_scores.value_counts()
plt.bar(category_counts.index, category_counts.values)

plt.title('Distribution of scores for graphlit')
plt.xlabel('Score')
plt.ylabel('Count')

plt.bar(category_counts.index, category_counts.values, color='#A679C8')

# Remove all scores except whole numbers
plt.xticks(range(0, 6, 1))

plt.show()