Notebook for the whole retrieval and generation process. Include checking the context size, creating content for Multi-Query and HyDE, retrieving the relevant documents and prompting the LLM.

In [None]:
import tiktoken
from transformers import T5Tokenizer
from transformers import LlamaTokenizerFast
from langchain.docstore.document import Document
from transformers import AutoTokenizer
from typing import List


def is_context_size_valid(model_provider: str, model_name: str, contexts: List[Document], query: str, max_context_size: int) -> bool:
    """
    Checks if the context size of the query + contexts fits into the context size of the LLM
    """
    
    # Same prompts as in the query llm method
    system_prompt = "You are an expert in information security, especially for ISO 27001 certifications. Answer the following question as truthfully as possible, using the provided context. If the answer is not contained within the context or the question is not related to the topic of information security or ISO 27001, respond with 'I don't know"

    concatenated_contexts = ""
    for index, document in enumerate(contexts, start=1):
        original_text = document.metadata.get("original_text", "")
        concatenated_contexts += f"{index}. {original_text}\n\n"

    if not query.endswith("?"):
        query = query + "?"
    context_question_formatted = f"Context: {concatenated_contexts} \n Question: {query}"
    full_prompt = system_prompt + "\n" + context_question_formatted
    # If the llm is from OpenAI, use the cl100k_base tokenizer
    if model_provider == "OpenAI":
        tokenizer = tiktoken.get_encoding("cl100k_base")
        token_length = len(tokenizer.encode(full_prompt))

    # If the llm is from Replicate (so in use-case only Llama models), use the Llama Tokenizer from HF
    elif model_provider == "Replicate":
        tokenizer = LlamaTokenizerFast.from_pretrained("hf-internal-testing/llama-tokenizer")
        token_length = len(tokenizer.encode(full_prompt))

    # If the llm is from HF, use T5 tokenizer
    elif model_provider == "HuggingFace":
        if model_name == "flan-t5-large":
            tokenizer = T5Tokenizer.from_pretrained("google/flan-t5-large")
            tokenizer.model_max_length = 2048
            token_length = len(tokenizer(full_prompt).input_ids)

    elif model_provider == "Mistral":
        if model_name == "mixtral-8x7B-v0.1":
            tokenizer = AutoTokenizer.from_pretrained("mistralai/Mixtral-8x7B-v0.1")
            token_length = len(tokenizer(full_prompt).input_ids)
    else:
        raise Exception("Error, raised exception: Wrong model_provider or model_name provided.")

    if token_length <= max_context_size:
        return True
    else:
        return False

In [None]:
def initialize_tokenizers():
    """ 
    Initiliazes all tokenizers. Can be used before evaluating to already load them into the cache and reduce errors.
    """
    models = [
        {"model_provider": "OpenAI", "model_name": "..", "max_context_size": 3000},
        {"model_provider": "Replicate", "model_name": "..", "max_context_size": 3000},
        {"model_provider": "HuggingFace", "model_name": "flan-t5-large", "max_context_size": 300},
        {"model_provider": "Mistral", "model_name": "mixtral-8x7B-v0.1", "max_context_size": 1}
    ]
    d1 = Document(page_content="Test", metadata={"original_text": "test"})
    d2 = Document(page_content="Test", metadata={"original_text": "test"})
    contexts = [d1, d2]
    query = "test"

    for model in models:
        print(c["model_provider"])
        print(is_context_size_valid(model["model_provider"], model["model_name"], contexts, query, model["max_context_size"]))

In [None]:
import json
from langchain.docstore.document import Document
from typing import List

# Helper methods for storing and loading already generated documents
def store_documents(documents, file_path: str) -> None:
    with open(file_path, "w") as jsonl_file:
        for doc in documents:
            jsonl_file.write(doc.json() + "\n")


def store_queries(multiple_queries, file_path: str) -> None:
    json_string = json.dumps(multiple_queries)
    with open(file_path, "w") as jsonl_file:
        jsonl_file.write(json_string)


def load_documents(file_path: str) -> List[Document]:
    documents = []
    with open(file_path, "r") as jsonl_file:
        for line in jsonl_file:
            data = json.loads(line)
            obj = Document(**data)
            documents.append(obj)
    return documents


def load_queries(file_path: str) -> List[List[str]]:
    with open(file_path, "r") as jsonl_file:
        queries = json.load(jsonl_file)
    return queries

In [None]:
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from typing import Any
from langchain_openai import OpenAI


def generate_multiple_queries(query: str, number_of_queries: int):
    '''
    Generates multiple (number_of_queries) queries based on the given query. Returns the original query with index 0 and all other generated ones. Is used for the Multi-Query retrieval strategy. Uses the ChatOpenAI API and a specific query.
    '''

    llm = OpenAI()
    system_message_prompt = SystemMessagePromptTemplate.from_template("""You are a helpful assistant that generates multiple search queries based on a single input query""")
    human_message_prompt = HumanMessagePromptTemplate.from_template("""Generate multiple search queries related to: {query}. Output exactly {number_of_queries} queries! For each query use a new line. Do not use any form of enumeration.""")
    chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
    prompts = chat_prompt.format_prompt(query=query, number_of_queries=number_of_queries).to_messages()
    text = prompts[0].content + "\n" + prompts[1].content

    generated_queries = [query]

    generated_queries_answer = llm(text)
    generated_queries_answer_list = generated_queries_answer.strip().split("\n")

    generated_queries.extend(generated_queries_answer_list)

    return generated_queries


def generate_and_store_multiple_queries_list(query_list: List[str], is_retrieval_eval: bool, number_of_queries: int, path_to_save: str) -> List[List[str]]:
    '''
    Calls the generate_multiple_queries for a list of queries and stores it under path_to_save. The path_to_save should include all the relevant metadata for the specific index, in order to reload the multiple queries.

    Path should be in this format: "Multi_Query_" + metadata["chunk_size"] + "_" +  metadata["chunk_overlap"] + "_"+ metadata["file_type"] + "_" + metadata["title_appended"]
    '''
    multiple_query_lists = []

    for index, query in enumerate(query_list):
        print(index)
        multiple_queries = generate_multiple_queries(query, number_of_queries)
        multiple_query_lists.append(multiple_queries)

    if is_retrieval_eval:
        store_queries(multiple_query_lists,
                        f"./../../retrievalInput/Queries/Retrieval_Eval/{path_to_save}.json")
    else:
        store_queries(multiple_query_lists,
                        f"./../../retrievalInput/Queries/Generation_Eval/{path_to_save}.json")

    return multiple_query_lists


def load_multiple_queries_list(is_retrieval_eval: bool, path_to_load: str) -> List[List[str]]:
    if is_retrieval_eval:
        multiple_queries = load_queries(
            f"./../../retrievalInput/Queries/Retrieval_Eval/{path_to_load}.json")
    else:
        multiple_queries = load_queries(
            f"./../../retrievalInput/Queries/Generation_Eval/{path_to_load}.json")
        
    return multiple_queries

In [None]:
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from typing import Any
from langchain_openai import OpenAI

def generate_hyde_doc(query: str) -> Document:
    """
    Generates a document based on the given query. Is used for the HyDE retriever.
    Uses the ChatOpenAI API and a specific query.
    """

    llm = OpenAI()
    system_message_prompt = SystemMessagePromptTemplate.from_template(
        """You are an expert in information security, especially for ISO 27001 certifications. Please write a short passage (3-4 sentences) to answer the question."""
    )
    human_message_prompt = HumanMessagePromptTemplate.from_template("""Question: {question}""")
    chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
    prompts = chat_prompt.format_prompt(question=query).to_messages()
    text = prompts[0].content + "\n" + prompts[1].content

    answer = llm(text)

    # Create a document with the LLM answer and the original query.
    document = Document(page_content=answer, metadata={"original_prompt": query})
    
    return document

def generate_and_store_multiple_hyde_docs(query_list: List[str], is_retrieval_eval: bool, path_to_save: str) -> List[Document]:
    """
    Calls the generate_multiple_hyde_docs for a list of queries and stores it.
    Path should be in this format: "Hyde_" + metadata["chunk_size"] + "_" +  metadata["chunk_overlap"] + "_"+ metadata["file_type"] + "_" + metadata["title_appended"]
    """
    hyde_docs = []

    for index, query in enumerate(query_list):
        print(index)
        hyde_doc = generate_hyde_doc(query)
        hyde_docs.append(hyde_doc)

    if is_retrieval_eval:
        store_documents(hyde_docs, f"./../../retrievalInput/HyDE_Documents/Retrieval_Eval/{path_to_save}")
    else:
        store_documents(hyde_docs, f"./../../retrievalInput/HyDE_Documents/Generation_Eval/{path_to_save}")

    return hyde_docs


def load_hyde_docs(is_retrieval_eval: bool, path_to_load: str) -> List[List[str]]:
    if is_retrieval_eval:
        hyde_docs = load_documents(f"./../../retrievalInput/HyDE_Documents/Retrieval_Eval/{path_to_load}.json")
    else:
        hyde_docs = load_documents(f"./../../retrievalInput/HyDE_Documents/Generation_Eval/{path_to_load}.json")

    return hyde_docs

In [None]:
# Mean pooling helper method for Contriever
def mean_pooling(token_embeddings, mask):
    token_embeddings = token_embeddings.masked_fill(~mask[..., None].bool(), 0.)
    sentence_embeddings = token_embeddings.sum(dim=1) / mask.sum(dim=1)[..., None]
    return sentence_embeddings

In [None]:
# Python file for retrieving relevant documents and generating answer with LLM
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain.retrievers import BM25Retriever, EnsembleRetriever, TFIDFRetriever
from langchain.retrievers import ContextualCompressionRetriever
from langchain_openai import OpenAI
from langchain.retrievers import MultiVectorRetriever
from langchain.retrievers.document_compressors import CohereRerank
from langchain.storage.file_system import LocalFileStore
from langchain.storage._lc_store import create_kv_docstore
from sentence_transformers import CrossEncoder
from transformers import AutoTokenizer, AutoModel
from FlagEmbedding import FlagReranker
import math
import time
import torch
import os

def retrieve_contexts(vectordb, retrieval_method: str, k: int, query: str, rerank_k: int=50, dense_percent: float = 0.5, hyde_document: Document=None, multiple_queries: List[str]=[]):
    """
    Method for all retrieval strategies. Returns k documents.

    retrieval methods: Dense, BM25, TF-IDF, Rerank_Hybrid_BM25_Dense, Rerank_Hybrid_TF-IDF_Dense, Multi_Query, Hybrid_Multi_Query, Hybrid_Multi_Query_Cohere, Rerank_Cohere, Hybrid_Rerank_Cohere, Hybrid_Rerank_Cohere_Compression, Rerank_Contriever, Hybrid_Rerank_Contriever, Rerank_Cross_Encoder_Ms_Marco, Hybrid_Rerank_Cross_Encoder_Ms_Marco, Rerank_Cross_Encoder_BGE, Hybrid_Rerank_Cross_Encoder_BGE, HyDE, Hybrid_HyDE, MMR, Hybrid_MMR, Hybrid_MMR_Cohere, Parent_Child, Hybrid_Parent_Child, Hybrid_Parent_Child_Cohere, Hybrid_Parent_Child_MMR_Cohere
    rerank_k: Determines how many documents should be retrieved from the vector database, if a re-ranker is used.
    dense_percent: Determines how many percent of the documents should be retrieved from the dense index and the sparse index, if a hybrid retrieval strategy is used.
    hyde_document: The provided HyDE doc, if the HyDE strategy is used. If it is empty and HyDE is used, generates a new one.
    multiple_queries: The provided list of queries, if the Multi Query strategy is used. If it is empty three new queries based on the prompt are generated.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    query = query.lower()

    documents = []
    if "Hybrid" in retrieval_method or retrieval_method == "BM25" or retrieval_method == "TF-IDF" or retrieval_method == "Rerank_Hybrid_BM25_Dense" or retrieval_method == "Rerank_Hybrid_TF-IDF_Dense":

        if "Parent_Child" in retrieval_method:
            chunk_size_parent = vectordb._collection.metadata["chunk_size_parent"]
            chunk_overlap_parent = vectordb._collection.metadata["chunk_overlap_parent"]
            chunk_size_child = vectordb._collection.metadata["chunk_size_child"]
            chunk_overlap_child = vectordb._collection.metadata["chunk_overlap_child"]
            title_appended = vectordb._collection.metadata["title_appended"]
            file_type = vectordb._collection.metadata["file_type"]
            
            document_file_name = str(chunk_size_parent) + "_" + str(chunk_overlap_parent) + "_PC_" + str (chunk_size_child) + "_" + str(chunk_overlap_child) + "_" + file_type + "_" + str(title_appended)
            documents = load_documents(f"./../../retrievalInput/Documents_For_Sparse/{document_file_name}")
        else:
            chunk_size = vectordb._collection.metadata["chunk_size"]
            chunk_overlap = vectordb._collection.metadata["chunk_overlap"]
            title_appended = vectordb._collection.metadata["title_appended"]
            file_type = vectordb._collection.metadata["file_type"]

            document_file_name = str(chunk_size) + "_" + str(chunk_overlap) + "_" + file_type + "_" + "False"
            documents = load_documents(f"./../../retrievalInput/Documents_For_Sparse/{document_file_name}")

    start_time = time.time()

    if retrieval_method == "Dense":
        result_documents = vectordb.similarity_search(
            query, k=k)
    
    elif retrieval_method == "BM25" or retrieval_method == "TF-IDF" or retrieval_method == "Rerank_Hybrid_BM25_Dense" or retrieval_method == "Rerank_Hybrid_TF-IDF_Dense":
        # Populate the information for getting the documents for the sparse retrieval with the same metadata as used with the index

        if retrieval_method == "BM25":
            bm25_retriever = BM25Retriever.from_documents(documents)
            bm25_retriever.k = k
            result_documents = bm25_retriever.get_relevant_documents(query)
            
        if retrieval_method == "TF-IDF":
            tf_idf_retriever = TFIDFRetriever.from_documents(documents)
            tf_idf_retriever.k = k
            result_documents = tf_idf_retriever.get_relevant_documents(query)

        # Create an EnsembleRetriever with the dense and sparse (BM25) retrievers, then get the relevant documents. Automaticall reranks them based on Reciprocal Rank Fusion Algorithm
        if retrieval_method == "Rerank_Hybrid_BM25_Dense":
            bm25_retriever = BM25Retriever.from_documents(documents)
            bm25_retriever.k = k
            vectordb_retriever = vectordb.as_retriever(
                search_kwargs={"k": k}, search_type="similarity")
            ensemble_retriever = EnsembleRetriever(
                retrievers=[bm25_retriever, vectordb_retriever], weights=[0.5, 0.5])
            result_documents = ensemble_retriever.get_relevant_documents(
                query)
            result_documents = result_documents[:k]

        # Create an EnsembleRetriever with the dense and sparse (TF-IDF) retrievers, then get the relevant documents. Automaticall reranks them based on Reciprocal Rank Fusion Algorithm
        elif retrieval_method == "Rerank_Hybrid_TF-IDF_Dense":
            tf_idf_retriever = TFIDFRetriever.from_documents(documents)
            tf_idf_retriever.k = k
            vectordb_retriever = vectordb.as_retriever(
                search_kwargs={"k": k}, search_type="similarity")
            ensemble_retriever = EnsembleRetriever(
                retrievers=[tf_idf_retriever, vectordb_retriever], weights=[0.5, 0.5])
            result_documents = ensemble_retriever.get_relevant_documents(
                query)
            result_documents = result_documents[:k]

    # Use the ensemble retriever to rerank as it uses the Reciprocal Rank Fusion Algorithm. Then retrieve the first k documents
    elif retrieval_method == "Multi_Query":
        retrievers = []
        relevant_document_list = []

        if len(multiple_queries) == 0:
            multiple_queries = generate_multiple_queries(query, 3)

        # Generate an own retriever (necessary for using the ensemble) for each query and find relevant documents
        for query in multiple_queries:
            vectordb_retriever = vectordb.as_retriever(search_kwargs={"k": k}, search_type="similarity")
            retrievers.append(vectordb_retriever)
            documents = vectordb_retriever.get_relevant_documents(query)
            relevant_document_list.append(documents)

        # Create an ensemble retriever
        ensemble_retriever = EnsembleRetriever(retrievers=retrievers)
        # Rerank the already retrieved documents
        result_documents = ensemble_retriever.weighted_reciprocal_rank(relevant_document_list)
        # Give back the top k documents
        result_documents = result_documents[:k]

    elif retrieval_method == "Hybrid_Multi_Query":

        retrievers = []
        relevant_document_list = []

        if len(multiple_queries) == 0:
            multiple_queries = generate_multiple_queries(query, 3)

        # Generate an own retriever (necessary for using the ensemble) for each query and find relevant documents
        for query in multiple_queries:
            vectordb_retriever = vectordb.as_retriever(search_kwargs={"k": k}, search_type="similarity")
            retrievers.append(vectordb_retriever)
            documents = vectordb_retriever.get_relevant_documents(query)
            relevant_document_list.append(documents)

        for query in multiple_queries:
            bm25_retriever = BM25Retriever.from_documents(documents)
            bm25_retriever.k = k
            retrievers.append(bm25_retriever)
            documents = bm25_retriever.get_relevant_documents(query)
            relevant_document_list.append(documents)

        # Create an ensemble retriever
        ensemble_retriever = EnsembleRetriever(retrievers=retrievers)
        # Rerank the already retrieved documents
        result_documents = ensemble_retriever.weighted_reciprocal_rank(relevant_document_list)
        # Give back the top k documents
        result_documents = result_documents[:k]

    elif retrieval_method == "Hybrid_Multi_Query_Cohere":

        retrievers = []
        relevant_document_list = []

        if len(multiple_queries) == 0:
            multiple_queries = generate_multiple_queries(query, 3)

        # Generate an own retriever (necessary for using the ensemble) for each query and find relevant documents
        for query in multiple_queries:
            vectordb_retriever = vectordb.as_retriever(search_kwargs={"k": k}, search_type="similarity")
            retrievers.append(vectordb_retriever)
            documents = vectordb_retriever.get_relevant_documents(query)
            relevant_document_list.extend(documents)

        for query in multiple_queries:
            bm25_retriever = BM25Retriever.from_documents(documents)
            bm25_retriever.k = k
            retrievers.append(bm25_retriever)
            documents = bm25_retriever.get_relevant_documents(query)
            relevant_document_list.extend(documents)

        unique_documents_dict = {}
        for doc in relevant_document_list:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())

        compressor = CohereRerank(top_n=k, user_agent="langchain")
        result_documents = compressor.compress_documents(documents=result_documents_unique, query=query)
        
    # Uses dense retrieval and Cohere reranking (always retrieve 50 documents and then return the top k after reranking)
    elif retrieval_method == "Rerank_Cohere":
        vectordb_retriever = vectordb.as_retriever(search_kwargs={"k": 50}, search_type="similarity")
        compressor = CohereRerank(top_n=k, user_agent="langchain")
        retriever_rerank = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=vectordb_retriever)
        result_documents = retriever_rerank.get_relevant_documents(query=query)

    elif retrieval_method == "Hybrid_Rerank_Cohere":
        dense_k = int(rerank_k * dense_percent)
        sparse_k = rerank_k - dense_k

        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = sparse_k
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents_Dense = vectordb.similarity_search(query, k=dense_k)

        result_documents_all = []
        result_documents_all.extend(result_documents_BM25)
        result_documents_all.extend(result_documents_Dense)

        unique_documents_dict = {}

        for doc in result_documents_all:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())

        compressor = CohereRerank(top_n=k, user_agent="langchain")
        result_documents = compressor.compress_documents(documents=result_documents_unique, query=query)

    elif retrieval_method == "Hybrid_Rerank_Cohere_Compression":
        dense_k = int(rerank_k * dense_percent)
        sparse_k = rerank_k - dense_k

        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = sparse_k
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents_Dense = vectordb.similarity_search(query, k=dense_k)

        result_documents_all = []
        result_documents_all.extend(result_documents_BM25)
        result_documents_all.extend(result_documents_Dense)

        unique_documents_dict = {}

        for doc in result_documents_all:
            doc.metadata["original_text"] = doc.page_content
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())

        compressor_1 = CohereRerank(top_n=k, user_agent="langchain")
        result_documents_Cohere = compressor.compressor_1(documents=result_documents_unique, query=query)

        llm = OpenAI(temperature=0)
        compressor_2 = LLMChainExtractor.from_llm(llm)
        result_documents = compressor_2.compress_documents(documents=result_documents_Cohere, query=query)

    elif retrieval_method == "Rerank_Contriever":
        retrieved_documents = vectordb.similarity_search(query, k=50)     
        tokenizer = AutoTokenizer.from_pretrained('facebook/contriever-msmarco')
        model = AutoModel.from_pretrained('facebook/contriever-msmarco')
        model.to(device)

        texts = [query]
        for doc in retrieved_documents:
            texts.append(doc.page_content)

        # Process texts in batches and compute embeddings
        embeddings = []
        for i in range(0, len(texts), 16):
            batch_texts = texts[i:i+16]
            inputs = tokenizer(batch_texts, padding=True, truncation=True, return_tensors='pt').to(device)
            with torch.no_grad():  # Inference mode
                outputs = model(**inputs)
            batch_embeddings = mean_pooling(outputs[0], inputs['attention_mask'])
            embeddings.append(batch_embeddings)

        # Calculate score from contriever, embeddings[0] is the embedding of the query
        scores_and_documents = [(embeddings[0] @ embedding, doc) for doc, embedding in zip(retrieved_documents, embeddings[1:])]
        # Order docs
        sorted_documents = sorted(scores_and_documents, key=lambda x: x[0].item(), reverse=True)
        # Only retrieve top k documents
        result_documents = [doc for _, doc in sorted_documents[:k]]

    elif retrieval_method == "Hybrid_Rerank_Contriever":

        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 25
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents_Dense = vectordb.similarity_search(query, k=25)

        result_documents_all = []
        result_documents_all.extend(result_documents_BM25)
        result_documents_all.extend(result_documents_Dense)

        unique_documents_dict = {}

        for doc in result_documents_all:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())
                        
        tokenizer = AutoTokenizer.from_pretrained('facebook/contriever-msmarco')
        model = AutoModel.from_pretrained('facebook/contriever-msmarco').to(device)

        texts = [query]
        for doc in result_documents_unique:
            texts.append(doc.page_content)

        embeddings = []
        for i in range(0, len(texts), 16):
            torch.cuda.empty_cache()
            batch_texts = texts[i:i+16]
            inputs = tokenizer(batch_texts, padding=True, truncation=True, return_tensors='pt').to(device)
            with torch.no_grad():  # Inference mode
                outputs = model(**inputs)
            batch_embeddings = mean_pooling(outputs[0], inputs['attention_mask'])
            embeddings.extend(batch_embeddings)
        

        # Calculate score from contriever, embeddings[0] is the embedding of the query
        scores_and_documents = [(embeddings[0] @ embedding, doc) for doc, embedding in zip(result_documents_unique, embeddings[1:])]
        # Order docs
        sorted_documents = sorted(scores_and_documents, key=lambda x: x[0].item(), reverse=True)
        # Only retrieve top k documents
        result_documents = [doc for _, doc in sorted_documents[:k]]

    # Uses dense retrieval and Ms marco cross encoder for reranking (always retrieve 50 documents and then return the top k after reranking)
    elif retrieval_method == "Rerank_Cross_Encoder_Ms_Marco":
        retrieved_documents = vectordb.similarity_search(query, k=50)     
        inputs = [(doc.page_content, query) for doc in retrieved_documents]

        model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', max_length=512)
        scores = model.predict(inputs, batch_size=16)

        # Associate scores with documents
        scores_and_documents = list(zip(scores, result_documents_unique))   
        # Sort the documents based on scores
        sorted_documents = sorted(scores_and_documents, key=lambda x: x[0], reverse=True)
        # Retrieve top k documents
        result_documents = [doc for _, doc in sorted_documents[:k]]

    elif retrieval_method == "Hybrid_Rerank_Cross_Encoder_Ms_Marco":

        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 25
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents_Dense = vectordb.similarity_search(query, k=25)

        result_documents_all = []
        result_documents_all.extend(result_documents_BM25)
        result_documents_all.extend(result_documents_Dense)

        unique_documents_dict = {}

        for doc in result_documents_all:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())
        inputs = [(doc.page_content, query) for doc in result_documents_unique]

        model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2', max_length=512)
        scores = model.predict(inputs, batch_size=16)

        # Associate scores with documents
        scores_and_documents = list(zip(scores, result_documents_unique))   
        # Sort the documents based on scores
        sorted_documents = sorted(scores_and_documents, key=lambda x: x[0], reverse=True)
        # Retrieve top k documents
        result_documents = [doc for _, doc in sorted_documents[:k]]

    # Uses dense retrieval and BGE cross encode for reranking (always retrieve 50 documents and then return the top k after reranking)
    elif retrieval_method == "Rerank_Cross_Encoder_BGE":
        retrieved_documents = vectordb.similarity_search(query, k=50)   
        inputs = [(doc.page_content, query) for doc in retrieved_documents]

        reranker = FlagReranker('BAAI/bge-reranker-base', use_fp16=False)
        scores = reranker.compute_score(inputs, batch_size=16)

        # Associate scores with documents
        scores_and_documents = list(zip(scores, result_documents_unique))   
        # Sort the documents based on scores
        sorted_documents = sorted(scores_and_documents, key=lambda x: x[0], reverse=True)
        # Retrieve top k documents
        result_documents = [doc for _, doc in sorted_documents[:k]]

    elif retrieval_method == "Hybrid_Rerank_Cross_Encoder_BGE":
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 25
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents_Dense = vectordb.similarity_search(query, k=25)

        result_documents_all = []
        result_documents_all.extend(result_documents_BM25)
        result_documents_all.extend(result_documents_Dense)

        unique_documents_dict = {}

        for doc in result_documents_all:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())
        inputs = [(doc.page_content, query) for doc in result_documents_unique]

        reranker = FlagReranker('BAAI/bge-reranker-base', use_fp16=False)
        scores = reranker.compute_score(inputs, batch_size=16)

        # Associate scores with documents
        scores_and_documents = list(zip(scores, result_documents_unique))   
        # Sort the documents based on scores
        sorted_documents = sorted(scores_and_documents, key=lambda x: x[0], reverse=True)
        # Retrieve top k documents
        result_documents = [doc for _, doc in sorted_documents[:k]]

    elif retrieval_method == "HyDE":
        if hyde_document is None:
            hyde_document = generate_hyde_doc(query)
        result_documents = vectordb.similarity_search(hyde_document.page_content, k=k)

    elif retrieval_method == "Hybrid_HyDE":
        if hyde_document is None:
            hyde_document = generate_hyde_doc(query)

        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = k
        vectordb_retriever = vectordb.as_retriever(search_kwargs={"k": k}, search_type="similarity")
        ensemble_retriever = EnsembleRetriever(retrievers=[bm25_retriever, vectordb_retriever], weights=[0.5, 0.5])
        result_documents = ensemble_retriever.get_relevant_documents(hyde_document.page_content)
        result_documents = result_documents[:k]
        
    elif retrieval_method == "MMR":
        result_documents = vectordb.max_marginal_relevance_search(query=query, k=k, fetch_k=50)
        
    elif retrieval_method == "Hybrid_MMR":

        vectordb_retriever = vectordb.as_retriever(search_kwargs={"k": 25}, search_type="similarity")
        result_documents_MRR = vectordb.max_marginal_relevance_search(query=query, k=25, fetch_k=50)
        
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 25
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents = []
        result_documents.append(result_documents_MRR)
        result_documents.append(result_documents_BM25)

        ensemble_retriever = EnsembleRetriever(retrievers=[vectordb_retriever, bm25_retriever], weights=[0.5, 0.5])
        result_documents = ensemble_retriever.weighted_reciprocal_rank(result_documents)
        result_documents = result_documents[:k]

    elif retrieval_method == "Hybrid_MMR_Cohere":

        vectordb_retriever = vectordb.as_retriever(search_kwargs={"k": 25}, search_type="similarity")
        result_documents_MRR = vectordb.max_marginal_relevance_search(query=query, k=25, fetch_k=50)
        
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 25
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents_all = []
        result_documents_all.extend(result_documents_MRR)
        result_documents_all.extend(result_documents_BM25)

        unique_documents_dict = {}

        for doc in result_documents_all:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())

        compressor = CohereRerank(top_n=k, user_agent="langchain")
        result_documents = compressor.compress_documents(documents=result_documents_unique, query=query)

    elif retrieval_method == "Parent_Child":
        fs = LocalFileStore(os.environ.get("PARENT_DOC_PATH") + f"\\{vectordb._collection.name}")
        store = create_kv_docstore(fs)
        parent_child_retriever = MultiVectorRetriever(vectorstore=vectordb, docstore=store, id_key="parent_id", search_kwargs={"k": k})
        result_documents = parent_child_retriever.get_relevant_documents(query=query)

    elif retrieval_method == "Hybrid_Parent_Child":
        fs_dense = LocalFileStore(os.environ.get("PARENT_DOC_PATH") + f"\\{vectordb._collection.name}")
        store_dense = create_kv_docstore(fs_dense)  

        fs_sparse= LocalFileStore(os.environ.get("PARENT_DOC_PATH") + f"\\{document_file_name}")
        store_sparse = create_kv_docstore(fs_sparse)  

        result_documents = []
       
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 25
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        ids = []
        for d in result_documents_BM25:
            if d.metadata["parent_id"] not in ids:
                ids.append(d.metadata["parent_id"])
        docs = store_sparse.mget(ids)
        result_documents_sparse = [d for d in docs if d is not None]
        result_documents.append(result_documents_sparse)

        parent_child_retriever_dense = MultiVectorRetriever(vectorstore=vectordb, docstore=store_dense, id_key="parent_id", search_kwargs={"k": 25})
        result_documents_dense = parent_child_retriever_dense.get_relevant_documents(query=query)
        result_documents.append(result_documents_dense)

        ensemble_retriever = EnsembleRetriever(retrievers=[parent_child_retriever_dense, bm25_retriever], weights=[0.5, 0.5])
        result_documents = ensemble_retriever.weighted_reciprocal_rank(result_documents)
        result_documents = result_documents[:k]

    elif retrieval_method == "Hybrid_Parent_Child_Cohere":
        fs_dense = LocalFileStore(os.environ.get("PARENT_DOC_PATH") + f"\\{vectordb._collection.name}")
        store_dense = create_kv_docstore(fs_dense)  

        fs_sparse= LocalFileStore(os.environ.get("PARENT_DOC_PATH") + f"\\{document_file_name}")
        store_sparse = create_kv_docstore(fs_sparse)  
       
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 25
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)
        ids = []
        for d in result_documents_BM25:
            if d.metadata["parent_id"] not in ids:
                ids.append(d.metadata["parent_id"])
        docs = store_sparse.mget(ids)
        result_documents_sparse = [d for d in docs if d is not None]

        parent_child_retriever_dense = MultiVectorRetriever(vectorstore=vectordb, docstore=store_dense, id_key="parent_id", search_kwargs={"k": 25})
        result_documents_dense = parent_child_retriever_dense.get_relevant_documents(query=query)

        result_documents_all = []
        result_documents_all.extend(result_documents_sparse)
        result_documents_all.extend(result_documents_dense)

        unique_documents_dict = {}

        for doc in result_documents_all:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())

        compressor = CohereRerank(top_n=k, user_agent="langchain")
        result_documents = compressor.compress_documents(documents=result_documents_unique, query=query)

    elif retrieval_method == "Hybrid_Parent_Child_MMR_Cohere":
        fs_dense = LocalFileStore(os.environ.get("PARENT_DOC_PATH") + f"\\{vectordb._collection.name}")
        store_dense = create_kv_docstore(fs_dense)  

        fs_sparse= LocalFileStore(os.environ.get("PARENT_DOC_PATH") + f"\\{document_file_name}")
        store_sparse = create_kv_docstore(fs_sparse)  
       
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 13
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)
        parent_child_retriever_sparse = MultiVectorRetriever(sparse_documents=result_documents_BM25, docstore=store_sparse, id_key="parent_id", search_kwargs={"k": 13})
        result_documents_sparse = parent_child_retriever_sparse.get_relevant_documents(query=query)

        parent_child_retriever_dense = MultiVectorRetriever(vectorstore=vectordb, docstore=store_dense, id_key="parent_id", search_kwargs={"k": 13})
        result_documents_dense = parent_child_retriever_dense.get_relevant_documents(query=query)


        result_documents_MRR = vectordb.max_marginal_relevance_search(query=query, k=13, fetch_k=50)
        
        bm25_retriever = BM25Retriever.from_documents(documents)
        bm25_retriever.k = 13
        result_documents_BM25 = bm25_retriever.get_relevant_documents(query)

        result_documents_all = []
        result_documents_all.extend(result_documents_MRR)
        result_documents_all.extend(result_documents_BM25)
        result_documents_all.extend(result_documents_sparse)
        result_documents_all.extend(result_documents_dense)

        unique_documents_dict = {}

        for doc in result_documents_all:
            if doc.page_content not in unique_documents_dict:
                unique_documents_dict[doc.page_content] = doc

        # Extracting the unique documents from the dictionary
        result_documents_unique = list(unique_documents_dict.values())
        print("length unique: ", len(result_documents_unique))

        compressor = CohereRerank(top_n=k, user_agent="langchain")
        result_documents = compressor.compress_documents(documents=result_documents_unique, query=query)

    end_time = time.time()
    duration = end_time - start_time

    # If the generation does not happen in this method, add a constant of an average time taken to generate multi query / HyDE
    if (retrieval_method == "Multi_Query" and len(multiple_queries) > 0) or (retrieval_method == "Rerank_Multi_Query" and len(multiple_queries)> 0) or (retrieval_method == "Hybrid_Multi_Query" and len(multiple_queries) > 0):
        duration += 2
    if (retrieval_method == "HyDE" and hyde_document is not None) or (retrieval_method == "Hybrid_HyDE" and hyde_document is not None):
        duration += 2.5

    return result_documents, duration

In [None]:
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from typing import Any
from typing import Tuple
from mistralai.models.chat_completion import ChatMessage
from mistralai.client import MistralClient


def prompt_LLM(contexts: List[Document], prompt_version: int, llm, prompt: str, model_provider: str) -> Tuple[str, float, str]:
    """
    Generates the answers to a prompt with the provided contexts by using the llm and prompt template.
    """

    start_time = time.time()

    # Check if the query is aimed at a template and check if the context document also have a template
    # If it is a template question the query and system prompt has to be altered
    # Only check first two because otherwise the re-ranked score is not high enough to assume that the retrieved template is valid for that question
    is_template_question = False
    template_path = ""
    if "template" in prompt.lower():
        for context in contexts[:2]:
            if "template_path" in context.metadata:
                is_template_question = True
                template_path = context.metadata["template_path"]
                break

    if is_template_question:
        concatenated_contexts = ""
        for index, document in enumerate(contexts[:2], start=1):
            original_text = document.metadata.get("original_text", "")
            concatenated_contexts += f"{index}. {original_text}\n\n"
    else:
        concatenated_contexts = ""
        for index, document in enumerate(contexts, start=1):
            original_text = document.metadata.get("original_text", "")
            concatenated_contexts += f"{index}. {original_text}\n\n"

    # Check if question mark is at the end of the prompt
    if not prompt.endswith("?"):
        prompt = prompt + "?"

    # Concatenate the sources of all contexts
    concatenated_sources = ""
    for index, document in enumerate(contexts, start=1):
        source = document.metadata.get("source", "")
        concatenated_sources += f"{index}. {source}\n\n"

    # When using GPT
    if model_provider == "OpenAI":
        if is_template_question and prompt_version == 1:
            system_message_prompt = SystemMessagePromptTemplate.from_template(
                f"""Answer the following question with that you can provide a template to the user and explicitly name {template_path} as the path to the file. After that end your answer."""
            )
            human_message_prompt = HumanMessagePromptTemplate.from_template("""Question: {question} \n Context: {context}""")
            chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

            answer = llm(chat_prompt.format_prompt(context=concatenated_contexts, question=prompt).to_messages()).content

            answer

        elif is_template_question and prompt_version == 2:
            system_message_prompt = SystemMessagePromptTemplate.from_template(
                f"""Answer the following question with that you can provide a template to the user and explicitly name {template_path} as the path to the file. After that end your answer."""
            )
            human_message_prompt = HumanMessagePromptTemplate.from_template(
                """Question: I need a template for the backup policy inside ISO 27001. Can you provide me with that?
                Context: 
                1. You can find a possible template for the backup policy from the Annex A of ISO 27001 in the local file system under: './../../inputData/Templates/template_files/processed/Backup policy.docx'. It contains pre-written texts for purpose, scope, content and more for the backup policy.

                2. ISO 27001 compliance checklist: ISO 27001 is the global gold standard for ensuring the security of information and its supporting assets. Obtaining ISO 27001 certification can help an organization prove its security practices to potential customers anywhere in the world. Our ISO 27001 checklist: 1 Develop a roadmap for successful implementation of an ISMS and ISO 27001 certification Implement Plan, Do, Check, Act (PDCA) process to recognize challenges and identify gaps for remediation Consider ISO 27001 certification costs relative to org size and number of employees Clearly define scope of work to plan certification time to completion Select an ISO 27001 auditor 2 Set the scope of your organization's ISMS Decide which business areas are covered by the ISMS and which are out of scope Consider additional security controls for business processes that are required to pass ISMS-protected information across the trust boundary Inform stakeholders regarding scope of the ISMS 3 Establish an ISMS governing body Build a governance team with management oversight Incorporate key members of top management, e.g. senior leadership and executive management with responsibility for strategy and resource allocation 4 Conduct an inventory of information assets Consider all assets where information is stored, processed, and accessible

                Answer: You can find a possible template for the backup policy from the Annex A of ISO 27001 in the local file system under: './../../inputData/Templates/template_files/processed/Backup policy.docx'. It contains pre-written texts for purpose, scope, content and more for the backup policy.

                Question: I need a template for the change management policy inside ISO 27001. Can you provide me with that?
                Context:
                1. What Are ISO 27001 Annex A Controls? **Set by the International Organization for Standardization (ISO) and the > International Electrotechnical Commission (IEC), ISO/IEC 27001 Annex A > defines the 14 categories with a toal of 114 information security controls an organization can address to > receive and maintain its ISO 27001 certification. ** ISO 27001 defines and audits these controls during stage two of the ISO 27001 certification process. An external accredited certification body runs a series of evidentiary audits that confirm the organization's technology and processes are correctly deployed and working properly. The auditors also confirm the implemented solutions align with the controls that were declared to be in use by the organization during part one, the documentation review stage of the certification process. Since industry compliance requirements, technology needs, and scope of operations are unique for each organization, the ISO 27001 Annex A control list serves as a framework, rather than a checklist of requirements. For the certification, however, each firm must draft a Statement of Applicability (SoA), defining the specific Annex A controls based on the company's identified risks, legal and contractual requirements, and overall business needs.

                2. You can find a possible template for the change management policy from the Annex A of ISO 27001 in the local file system under: './../../inputData/Templates/template_files/processed/Change management policy.docx

                Answer: You can find a possible template for the change management policy from the Annex A of ISO 27001 in the local file system under: './../../inputData/Templates/template_files/processed/Change management policy.docx
                
                Question: {question}
                Context: {context}
                Answer:"""
            )
            chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

            answer = llm(chat_prompt.format_prompt(context=concatenated_contexts, question=prompt).to_messages()).content

            answer

        elif prompt_version == 1:
            system_message_prompt = SystemMessagePromptTemplate.from_template(
                """You are an expert in information security, especially for ISO 27001 certifications. Answer the following question as truthfully as possible, using the provided context and not prior knowledge. If the answer is not contained within the context or the question is not related to the topic of information security or ISO 27001, respond with 'I am sorry. I do not have knowledge on that topic'. Write a maximum of 200 words."""
            )
            human_message_prompt = HumanMessagePromptTemplate.from_template("""Question: {question} \n Context: {context}""")
            chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])
            answer = llm(chat_prompt.format_prompt(context=concatenated_contexts, question=prompt).to_messages()).content

            answer

        elif prompt_version == 2:
            system_message_prompt = SystemMessagePromptTemplate.from_template(
                """You are an expert in information security, especially for ISO 27001 certifications. Answer the following question as truthfully as possible, using the provided context and not prior knowledge. If the answer is not contained within the context or the question is not related to the topic of information security or ISO 27001, respond with 'I am sorry. I do not have knowledge on that topic'. Write a maximum of 200 words."""
            )
            human_message_prompt = HumanMessagePromptTemplate.from_template(
                """Question: What does the term "asset" mean in ISO-27001 and what requirement does the standard have regarding the identification and inventory of information assets?
                Context: 
                1. asset valuation the whole risk assessment process should consider the organisation's context and the needs and expectations of interested parties. Furthermore, ISO 27001 provides no guidance as to the basis on which control selection decisions should be made, other than to say that they should be selected "taking account of the risk assessment results" (6.1.3 a), which will necessarily take into account how the organisation prioritises the risks for treatment (6.1.2). Finally, the ISO 27001 management system clauses have no requirement in terms of your methodology for the identification or valuation of information assets. How you value an asset in an asset-based methodology is, however, going to be fundamental to how much you will be prepared to invest in protecting it. ISO 27000 doesn't offer a definition for an asset, although it is reasonable to simply define it as 'anything that has value to the organisation'. The organisation's fixed-asset register is unlikely to provide practical help in this regard: many critical assets may already (through application of the financial depreciation policy, or of the accounting convention that assets should be shown on the balance sheet at the /ower of historic cost — less depreciation — or current market value) have been written down below their actual useful value to the organisation. Many other, even more critical, assets (such as brand value, key supplier and customer contracts, staff know-how, intellectual property and databases) may not even be on

                2. or home office. - Information about systems in a demilitarized zone, as part of a security/network concept. Such lists or plans are a good starting point for an inventory list according to A-5.9. It is worth noting that not all data is contained in a single directory. Often, it is spread across multiple subdirectories that are under the responsibility of different entities. The ISO 27002 explicitly allows this, but it requires clear organizational guidelines to avoid duplicate or inconsistent entries, different names, etc. In practice, due to expected challenges, it is often concluded that it is better to have a single central inventory list of information assets. If existing (older) inventories of the organization are discovered, it is necessary to check whether they meet all the requirements described in A-5.9 before incorporating them into the ISMS. If necessary, updates or revisions should be made. For each asset, the inventory list should include at least the following information: a unique identifier, asset owner, and asset location. Before explaining these three data fields, it is advisable to maintain the inventory list not only because the standard requires it but also as a basis for further work in the ISMS. use: As part of the risk assessment (ISMS-6.1), information about security objectives, specific risks, and references to action lists may be listed for each asset in the directory. e Similarly, for certain types of assets, information about suppliers, procurement costs, maintenance work

                3. the guidelines and any related asset labelling scheme developed to meet the requirements of control A.8.2.2 (Labelling of information) or similar controls. Are vendors assets? We identified one of the classes of information assets as "Services on which computer systems depend: computing and communications services, and general utilities such as heating, lighting, power and air-conditioning". This gives rise to a simple question: are the suppliers/vendors of these essential services also assets? There are two practical answers to this question. The best solution is probably to use a mix of the two, but in doing so *" See, specifically, chapter 9 of IT Governance: An International Guide to Data Security and ISO27001/TSO27002, Alan Calder and Steve G Watkins (Kogan Page, 2019). 101 8: Information assets it is essential that the exact approach to be used in each specific case is determined by a common set of rules. These should be defined in the risk assessment documentation. One option is to decide that the vendor itself is not an asset — the organisation that is within the scope of the ISMS does not own the vendors — but that the services provided by the vendor and, possibly, the relationship with the vendor are both assets within the scope of the ISMS. The logic behind this option is that a relationship with a vendor can be an asset if it is a key supplier in terms of the information aspects of whatever it is they supply. For example, a stationery supplier would not, we suggest, be a key relationship

                Answer: The term "asset" in ISO-27001 refers to anything that holds value for an organization. This includes properties, buildings, machinery, facilities, business processes, as well as information assets such as data, systems, and IT services. One requirement of the standard is that all relevant information assets must be identified and inventoried. This is typically done by recording information such as asset location, classification and the asset owner in a table or database. Inventorying can be facilitated by grouping similar assets or implementing a hierarchy.

                Question: What is the purpose of information security policies (A.5) in an organization and how are they defined?
                Context:
                1. What is an information security policy?** An information security policy, often referred to as an _infosec policy_ , is a set of regulations carefully designed to govern the access, use and retention of critical business information. These policies implement a robust framework of processes and tools to ensure absolute protection against unauthorised access, thereby safeguarding an organisation's sensitive information assets. Information security policies follow a common structure and format. They include:  * A statement describing the types of activities covered by the policy  * A statement of commitment issued by management, providing evidence that management has assigned sufficient resources to support ongoing compliance with the policy  * A number of specific responsibilities for employees regarding their use and protection of organisational data. Note that most organisations should aim to employ a data protection officer, whose role it is to maintain and implement these changes as well as add solutions to data protection problems. ## **What is Annex A.5?** This Annex describes the concepts, requirements and recommendations related to information security policies. The purpose of this Annex is to describe the concepts, requirements and recommendations related to information security policies. It covers policy definition, implementation and review. In addition to providing guidance on the implementation of information security policies, Annex A.5 also addresses how to report on

                2. though both are against organisational policy), that could be considered inconsistent enforcement.  * **Integrity:** When assigning system permissions, have the system users got minimum viable access rights, or do they have permissions that could compromise the integrity of the system unnecessarily? ## **What is the objective of Annex A.5?** The purpose of information security policies is to help protect an organisation's assets and operations from risks associated with cybersecurity. They are meant to be flexible enough to cover different types of systems and their vulnerabilities, as well as multiple modes of operation, such as traditional and cloud-based operations. Information security policies are the documents that define the standards for information security within an organisation. They can be formal or informal. This Annex describes how to develop an information security policy and how to implement it in your organisation. ## **What are the Annex A.5 information security policy controls?** ### **A.5.1.1 Policies for information security** According to ISO 27001, all organisations must conduct themselves in a transparent manner with their stakeholders. To protect their data, all stakeholders must be informed of the policies in place within the organisation. Policies play a critical role throughout the whole information security process. Therefore, any policies created by the business must first be examined, authorised, and then communicated to employees and third parties. They must also be

                3. within the organisation. Policies play a critical role throughout the whole information security process. Therefore, any policies created by the business must first be examined, authorised, and then communicated to employees and third parties. They must also be included in the A.7 human resource security control, and they must be adhered to by all employees. ### **A.5.1.2 Review of the policies for information security** To keep updated with any changes, whether internal or external, the organisation's ISMS policies must be updated on a regular basis. Management changes, governing laws, industry standards, and technology are examples of these developments. The documentation should always represent standards and procedures to preserve the confidentiality, integrity, and availability of files, and an information security breach may result in policy change and improvement. ## **Why is information security policy important for your organisation's information security management?** An information security policy helps your organisation classify your organisations' sensitive data. This depends in part on applicable regulations, but it should also take into account any external factors that could affect risk perception, such as industry competition or geopolitical climate change. Information classifications can range from low (confidential) through medium (secret), high (top secret), even top secret plus or beyond top secret. The exact terms used may vary slightly depending on which agency or company

                Answer: Information security policies (A.5) hold great importance in an organization. They serve to depict the overall direction of the organization regarding information security and establish goals and strategies to achieve these objectives. These policies contain fundamental rules and procedures that are applicable within the organization. In addition to a security policy, there are often topic-specific security policies targeting specific audiences, which describe the applicable security rules and measures for a particular subject. Examples of such policies include workplace security practices, virus/malware protection, email security, and access control. The organization is free to create and implement relevant policies.
                
                Question: {question}
                Context: {context}
                Answer:"""
            )
            chat_prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])

            answer = llm(chat_prompt.format_prompt(context=concatenated_contexts, question=prompt).to_messages()).content

            answer

    # When using LLama
    elif model_provider == "Replicate":
        if is_template_question:
            system_prompt = f"Answer the following question with that you can provide a template to the user and explicitly name {template_path} as the path to the file. After that end your answer."
            prompt = f"""\
            [INST]  Question: {prompt} \n Context: {concatenated_contexts} [/INST]
            """
            answer = llm(system_prompt=system_prompt, prompt=prompt, max_new_tokens=200, temperature=0.01)
            answer

        elif prompt_version == 1:
            system_prompt = "You are an expert in information security, especially for ISO 27001 certifications. Answer the following question as truthfully as possible, using the provided context and not prior knowledge. If the answer is not contained within the context or the question is not related to the topic of information security or ISO 27001, respond with 'I am sorry. I do not have knowledge on that topic'. Write a maximum of 200 words."
            prompt = f"""\
            [INST]  Question: {prompt} \n Context: {concatenated_contexts}[/INST]
            """
            answer = llm(system_prompt=system_prompt, prompt=prompt, max_new_tokens=200, temperature=0.01)
            answer

    # When using HuggingFace
    # Prompt style resembles t5 style
    elif model_provider == "HuggingFace":
        if is_template_question:
            prompt_ = f"""\
            Answer the following question with that you can provide a template to the user and explicitly name {template_path} as the path to the file. After that end your answer. \n
           Question: {prompt} \n Context: {concatenated_contexts} \n Answer:
            """
            llm.tokenizer.model_max_length = 2048
            answer = llm(prompt_, min_length=100, max_length=250)
            answer = answer[0]["generated_text"]

        elif prompt_version == 1:
            prompt_ = f"""\
            You are an expert in information security, especially for ISO 27001 certifications. Answer the following question as truthfully as possible, using the provided context. Write at least 100 words. \n
           Question: {prompt} \n Context: {concatenated_contexts} \n Answer:
            """
            llm.tokenizer.model_max_length = 2048
            answer = llm(prompt_, min_length=100, max_length=210)
            answer = answer[0]["generated_text"]

    elif model_provider == "Mistral":
        if is_template_question:

            sys_message = f"Answer the following question with that you can provide a template to the user and explicitly name {template_path} as the path to the file. After that end your answer."
            query_ = f"Question: {prompt} \n Context: {concatenated_contexts}"
            prompt_ = f"<s> [INST] {sys_message} [/INST] \n User: {query_} \n Answer: "

            api_key = os.environ["MISTRAL_API_KEY"]
            llm = MistralClient(api_key=api_key)
            messages = [ChatMessage(role="user", content=prompt_)]

            response = llm.chat(model="mistral-small", messages=messages, temperature=0, max_tokens=210)
            answer = response.choices[0].message.content

        elif prompt_version == 1:

            sys_message = f"""\
           "You are an expert in information security, especially for ISO 27001 certifications. Answer the following question as truthfully as possible, using the provided context and not prior knowledge. If the answer is not contained within the context or the question is not related to the topic of information security or ISO 27001, respond with 'I am sorry. I do not have knowledge on that topic'. Write a maximum of 200 words."""
            query_ = f"Question: {prompt} \n Context: {concatenated_contexts}"
            prompt_ = f"<s> [INST] {sys_message} [/INST] \n User: {query_} \n Answer: "

            api_key = os.environ["MISTRAL_API_KEY"]
            llm = MistralClient(api_key=api_key)
            messages = [ChatMessage(role="user", content=prompt_)]

            response = llm.chat(model="open-mixtral-8x7b", messages=messages, temperature=0, max_tokens=260)
            answer = response.choices[0].message.content
    else:
        raise Exception("Error, raised exception: Wrong modelProvider provided.")

    end_time = time.time()
    duration = end_time - start_time
    return answer, duration, concatenated_sources

In [None]:
from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)
from typing import Any
from typing import Tuple


def prompt_LLM_only(prompt_version: int, llm, prompt: str, model_provider: str) -> Tuple[str, float]:
    """
    Generates the answers to a prompt by using the llm and prompt template.
    """

    start_time = time.time()

    # Check if question mark is at the end of the prompt
    if not prompt.endswith("?"):
        prompt = prompt + "?"

    # When using GPT
    if model_provider == "OpenAI":
        if prompt_version == 1:
            system_message_prompt = SystemMessagePromptTemplate.from_template(
                """You are an expert in information security, especially for ISO 27001 certifications. Answer the following question with a maximum of 200 words. If you do not know the answer, respond with 'I am sorry. I do not have knowledge on that topic'."""
            )
            human_message_prompt = HumanMessagePromptTemplate.from_template(
                """Question: {question}""")
            chat_prompt = ChatPromptTemplate.from_messages(
                [system_message_prompt, human_message_prompt])

            answer = llm(chat_prompt.format_prompt(question=prompt).to_messages()).content

    # When using LLama
    elif model_provider == "Replicate":
        if prompt_version == 1:
            system_prompt = "You are an expert in information security, especially for ISO 27001 certifications. Answer the following question with a maximum of 200 words."
            prompt_ = f"""\
            [INST]  Question: {prompt} [/INST]
            """
            answer = llm(system_prompt=system_prompt, prompt=prompt_,max_new_tokens=200, temperature=0.01)

    # When using HuggingFace
    elif model_provider == "HuggingFace":
        if prompt_version == 1:
            prompt_ = f"""\
           You are an expert in information security, especially for ISO 27001 certifications. Answer the following question with a maximum of 200 words. \n
           Question: {prompt} \n Answer:
            """
            answer = llm(prompt_, min_length=100, max_length=250)
            answer = answer[0]["generated_text"]

    elif model_provider == "Mistral":
        if prompt_version == 1:
            sys_message = f"""\
           You are an expert in information security, especially for ISO 27001 certifications. Answer the following question with a maximum of 200 words. \n
           Question: {prompt}
            """
            query_ = f"Question: {prompt}"
            prompt_ = f'<s> [INST] {sys_message} [/INST] \n User: {query_} \n Answer: '
            
            api_key = os.environ["MISTRAL_API_KEY"]
            llm = MistralClient(api_key=api_key)
            messages = [
                ChatMessage(role="user", content=prompt_)
            ]
            
            response = llm.chat(model="mistral-small", messages=messages, temperature=0, max_tokens=370)
            answer = response.choices[0].message.content

    else:
        raise Exception(
            "Error, raised exception: Wrong modelProvider provided.")

    end_time = time.time()
    duration = end_time - start_time
    return answer, duration