<a href="https://colab.research.google.com/github/danielruales/Autograder/blob/master/rag/SpaceRAG_initial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
IN_COLAB = False
try:
    from google.colab import userdata
    import os
    os.environ["WANDB_API_KEY"] = userdata.get("WANDB_API_KEY")
    os.environ["TOGETHER_API_KEY"] = userdata.get("TOGETHER_API_KEY")
    os.environ["OPENAI_API_KEY"] = userdata.get("OPENAI_API_KEY")
    IN_COLAB = True
except:
    from dotenv import load_dotenv
    load_dotenv()

In [2]:
import os
import subprocess
import shutil

repo_url = "https://github.com/wandb/weave.git"
target_folder = "weave_cookbooks"
subdirectory = "examples/cookbooks"
branch = "anish/add-spacerag-example"

if not os.path.exists(target_folder) and IN_COLAB:
    print(f"Cloning repository: {repo_url}")

    # Clone the entire repository to a temporary folder
    temp_folder = "temp_weave_repo"
    subprocess.run(["git", "clone", "--depth", "1", "--branch", branch, repo_url, temp_folder], check=True)

    # Move the desired subdirectory to the target folder
    shutil.move(os.path.join(temp_folder, subdirectory), target_folder)

    # Remove the temporary folder
    shutil.rmtree(temp_folder)

    print(f"Successfully cloned {subdirectory} from branch '{branch}' to {target_folder}")

else:
    print(f"Folder '{target_folder}' already exists.")

Cloning repository: https://github.com/wandb/weave.git
Successfully cloned examples/cookbooks from branch 'anish/add-spacerag-example' to weave_cookbooks


In [3]:
if os.path.exists(target_folder) and IN_COLAB:
    %cd weave_cookbooks/rag/spacerag
    !pip install -r requirements.txt

/content/weave_cookbooks/rag/spacerag
Collecting weave (from -r requirements.txt (line 1))
  Downloading weave-0.51.19-py3-none-any.whl.metadata (21 kB)
Collecting faiss-cpu (from -r requirements.txt (line 3))
  Downloading faiss_cpu-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.4 kB)
Collecting together (from -r requirements.txt (line 5))
  Downloading together-1.3.3-py3-none-any.whl.metadata (11 kB)
Collecting python-dotenv (from -r requirements.txt (line 6))
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting datasets (from -r requirements.txt (line 7))
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting ragas (from -r requirements.txt (line 8))
  Downloading ragas-0.2.3-py3-none-any.whl.metadata (7.9 kB)
Collecting tonic-validate (from -r requirements.txt (line 9))
  Downloading tonic_validate-6.1.0-py3-none-any.whl.metadata (29 kB)
Collecting emoji>=2.12.1 (from weave->-r requirements.txt (line 1))
  D

In [4]:
import weave
from weave import Evaluation
import os
import numpy as np
import faiss
from openai import OpenAI
from together import Together
import re
import json

In [5]:
weave.init('lavanyashukla/spacerag_nov4')

# SERVE MODEL FROM TOGETHER ENDPOINT
client = Together(api_key=os.environ.get("TOGETHER_API_KEY"))

Logged in as Weights & Biases user: danielruales.
View Weave data at https://wandb.ai/lavanyashukla/spacerag_nov4/weave


In [6]:
# CHUNK DATA FROM EXTERNAL KNOWLEDGEBASE
@weave.op
def get_chunked_data(file):
    # get data - file
    with open(file, 'r') as file:
        # Read the contents of the file into a variable
        text = file.read()

    # split doc into chunks
    chunk_size = 10000 # 🛠️LEVER
    chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] # 🛠️LEVER (advanced)
    return chunks

# EMBED DATA
@weave.op
def get_text_embedding(input):
    api_key_openai = os.environ["OPENAI_API_KEY"]
    client = OpenAI(api_key=api_key_openai)

    response = client.embeddings.create(
        model="text-embedding-3-small", # 🛠️LEVER
        input=input
    )
    return response.data[0].embedding

# MAKE VECTORDB
@weave.op
def make_vector_db(file):
    # get chunked data from function get_chunked_data()
    chunks = get_chunked_data(file)
    # embed data
    text_embeddings = np.array([get_text_embedding(chunk) for chunk in chunks])
    # embed data into vectordb
    d = text_embeddings.shape[1]
    index = faiss.IndexFlatL2(d)
    index.add(text_embeddings)
    return index, chunks

In [7]:
file = './data/space.txt'
index, chunks = make_vector_db(file)

🍩 https://wandb.ai/lavanyashukla/spacerag_nov4/r/call/0192fa4b-fcee-7ab0-9a27-924ad6ff31c9


In [8]:
# ANSWER QUESTION
@weave.op
def predict(model, prompt):
    completion = client.chat.completions.create(
        model=model,
        messages=[{"role":"user","content":prompt}],
        temperature=1.0, # 🛠️LEVER
        top_k=1000, # 🛠️LEVER
        max_tokens=1024,
        stream=True
    )

    answer = []
    for chunk in completion:
        if chunk.choices[0].delta.content is not None:
            answer.append(chunk.choices[0].delta.content)

    result = ''.join(answer)
    print(result)

    return result

In [9]:
# RETRIEVE CHUNKS SIMILAR TO THE QUESTION
@weave.op
def retrieve_context(question: str) -> list:
    question_embeddings = np.array([get_text_embedding(question)])
    # Retrieve similar chunks from the vectorDB
    D, I = index.search(question_embeddings, k=1) # 🛠️LEVER k=?
    retrieved_chunk = [chunks[i] for i in I.tolist()[0]]
    return retrieved_chunk

class SpaceRAGModel(weave.Model):
    model: str

    @weave.op()
    def predict(self, question: str):
        retrieved_chunk = retrieve_context(question)
        print("Question: "+question)

        # Combine context and question in a prompt # 🛠️LEVER
        prompt = f"""
        Use this context to answer the question, don't use any prior knowledge.
        Be concise in your answers.
        ---------------------
        {retrieved_chunk}
        ---------------------
        Question: {question}
        Answer:
        """
        answer = predict(self.model, prompt)
        print("___________________________")
        return {'answer': answer, 'retrieved_chunk': retrieved_chunk}

In [10]:
def string_to_dict(input_string):
    # Use regular expressions to find all JSON-like objects in the string
    json_objects = re.findall(r'\{.*?\}', input_string)

    # Initialize an empty dictionary to store the combined results
    combined_dict = {}

    for obj in json_objects:
        try:
            # Parse each JSON object
            parsed_dict = json.loads(obj)
            # Update the combined dictionary with the parsed data
            combined_dict.update(parsed_dict)
        except (ValueError, json.JSONDecodeError) as e:
            print(f"Error processing part: {obj}\nError: {e}")

    return combined_dict

In [11]:
dataset_ref = weave.ref("weave:///lavanyashukla/spacedata/object/space_dataset_llm_comprehensive:VBd5Ys7b3hGFmJJqdGTATVQYgKKrg70EiNV5FdpwFxs").get()
small_questions = dataset_ref.rows[:5] # 🔵 NOTE: CHANGE TO 5 WHEN RUNNING OUT YOUR EXPERIMENTS TO QUICKLY TEST.
                                        # CHANGE BACK TO 50 BEFORE FINAL LEADERBOARD SUBMISSION

In [12]:
def replace_nan_in_dict(result):
    for key in result:
        if isinstance(result[key], float) and np.isnan(result[key]):
            result[key] = 0
    return result

In [13]:
# Evaluate with an LLM
@weave.op
def llm_judge_scorer(ground_truth: str, model_output: dict) -> dict:
    scorer_llm = "meta-llama/Meta-Llama-3-70B-Instruct-Turbo"
    answer = model_output['answer']
    retrieved_chunk = model_output['retrieved_chunk']

    eval_rubrics = [
    {
        "metric": "concise",
        "rubrics": """
        false: The answer is long and difficult to understand.
        true: The answer is completely concise, readable and engaging.
        """,
    },
    {
        "metric": "relevant",
        "rubrics": """
        false: The answer is not relevant to the original text, or has significant flaws.
        true: The answer is completely relevant to the original text, and provides additional value or insight.
        """,
    },
    {
        "metric": "🥇accurate",
        "rubrics": """
        Compare the factual content of the model's answer with the correct answer. Ignore any differences in style, grammar, or punctuation.
        false: There is a disagreement between the model's answer and the correct answer.
        true: The model's answer contains all the same details as the correct answer.
        """,
    },
]

    scoring_prompt = """
    You have the correct answer, original text and the model's answer below.
    Based on the specified evaluation metric and rubric, assign a true or false score the summary.
    Then, return a JSON object with the metric name as the key and the evaluation score (false or true) as the value. Don't output anything else.

    # Evaluation metric:
    {metric}

    # Evaluation rubrics:
    {rubrics}

    # Correct Answer
    {ground_truth}

    # Original Text
    {retrieved_chunk}

    # Model Answer
    {model_answer}

    """
    evals = ""
    for i in eval_rubrics:
        eval_output = predict(scorer_llm,
            scoring_prompt.format(
                ground_truth=ground_truth, retrieved_chunk=retrieved_chunk, model_answer=answer,
                metric=i["metric"], rubrics=i["rubrics"]
            ))+" "
        evals+=eval_output
    # evals_json = format_string_to_json(evals)
    evals_dict = string_to_dict(evals)
    # print("___________________________")
    # print(evals_dict)
    # print("___________________________")
    return evals_dict

In [14]:
# def ragas_score(question, ground_truth, model_output):
#     from datasets import Dataset
#     from ragas import evaluate
#     from ragas.metrics import faithfulness, answer_relevancy, answer_correctness, context_recall, context_precision

#     metric_modules = [
#         answer_relevancy,
#         context_recall,
#     ]

#     # Convert the retrieved_chunk to a list of strings
#     contexts = [str(chunk) for chunk in model_output["retrieved_chunk"]]

#     qa_dataset = Dataset.from_dict(
#         {
#             "question": [question],
#             "ground_truth": [ground_truth],
#             "answer": [model_output["answer"]],
#             "contexts": [contexts],  # Wrap contexts in another list
#         }
#     )
#     result = evaluate(qa_dataset, metrics=metric_modules,
#                       raise_exceptions=False)
#     return replace_nan_in_dict(result)

In [15]:
@weave.op()
def tonic_validate_score(question: str, ground_truth: str, model_output: dict) -> dict:
    from tonic_validate import Benchmark, ValidateScorer
    from tonic_validate.metrics import DuplicationMetric, AnswerSimilarityMetric, AnswerConsistencyMetric

    metric_modules = [DuplicationMetric(), AnswerSimilarityMetric(), AnswerConsistencyMetric()]

    def get_llm_response(question):
        return {
            "llm_answer": model_output['answer'],
            "llm_context_list": (
                [model_output['retrieved_chunk']]
                if isinstance(model_output['retrieved_chunk'], str)
                else model_output['retrieved_chunk']
            ),
        }

    benchmark = Benchmark(questions=[question], answers=[ground_truth])
    scorer = ValidateScorer(metrics=metric_modules)
    run = scorer.score(benchmark, get_llm_response)
    return run.run_data[0].scores

In [None]:
models = ["meta-llama/Meta-Llama-3-70B-Instruct-Turbo",
          "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
          "mistralai/Mixtral-8x22B-Instruct-v0.1"
          ]  # 🛠️LEVER - find more models @ https://docs.together.ai/docs/serverless-models#chat-models
for model in models:
    rag_model = SpaceRAGModel(model=model)
    model_name = model.split('/')[-1]  # Get only the part after the '/'
    evaluation = Evaluation(name=f"spacerag-{model_name}", dataset=small_questions, scorers=[
    llm_judge_scorer,
    # ragas_score,
    tonic_validate_score
])
    print(f"RAG Model: {model}")
    await evaluation.evaluate(rag_model, __weave={"display_name": f"spacerag-{model_name}"})




RAG Model: meta-llama/Meta-Llama-3-70B-Instruct-Turbo
Question: How is the sense of spaciousness addressed in the design of habitats for the colony?
