In [33]:
import os
import json

import dotenv
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential
from openai import AzureOpenAI

from config import settings

In [14]:
# CHOOSE APP VERSION TO TEST

version = "ct-fema-ocfo-gpt-llamaIndexTest"

In [130]:
# SET ALL CONFIGURATIONS AUTOMATICALLY 

env_file = dotenv.find_dotenv()
dotenv.load_dotenv(env_file)
dotenv.set_key(env_file, "VERSION_ID", version)

settings.VERSION_ID = version
settings.get_version_configs()

# Overwrite system settings with experiment-specific configs
settings.SHOULD_STREAM = False
settings.AZURE_OPENAI_MODEL = "dep-gpt-35-16k"
settings.AZURE_OPENAI_MODEL_NAME = "gpt-35-turbo-16k"
os.environ["AZURE_OPENAI_API_KEY"] = settings.AZURE_OPENAI_KEY

with open("config.json", "w") as f:
    json.dump(settings.PREP_CONFIG, f)

RETRIES = 2

## Test 3: Try Adding Node Relationships and Summary / Title Extractors

In [133]:
"""Data utilities for index preparation."""
import ast
from asyncio import sleep
import json
import os
import re
from openai import AzureOpenAI
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional, Generator, Tuple

import tiktoken
from azure.core.credentials import AzureKeyCredential
from langchain.text_splitter import MarkdownTextSplitter, RecursiveCharacterTextSplitter, PythonCodeTextSplitter
from llama_index.node_parser import LangchainNodeParser
from llama_index import download_loader
from llama_index import Document as LlamaDocument
from llama_index.extractors import TitleExtractor, SummaryExtractor, QuestionsAnsweredExtractor
from llama_index.ingestion import IngestionPipeline
from llama_index.llms import AzureOpenAI as LlamaAOAI
import nest_asyncio
nest_asyncio.apply()

from tqdm import tqdm

FILE_FORMAT_DICT = {
        "md": "markdown",
        "txt": "text",
        "html": "html",
        "shtml": "html",
        "htm": "html",
        "py": "python",
        "pdf": "pdf",
        "json": "json"
    }

RETRY_COUNT = 5

SENTENCE_ENDINGS = [".", "!", "?"]
WORDS_BREAKS = list(reversed([",", ";", ":", " ", "(", ")", "[", "]", "{", "}", "\t", "\n"]))
PDFReader = download_loader("PDFReader")

@dataclass
class Document(object):
    """A data class for storing documents

    Attributes:
        content (str): The content of the document.
        id (Optional[str]): The id of the document.
        title (Optional[str]): The title of the document.
        filepath (Optional[str]): The filepath of the document.
        url (Optional[str]): The url of the document.
        metadata (Optional[Dict]): The metadata of the document.    
    """

    content: str
    id: Optional[str] = None
    title: Optional[str] = None
    filepath: Optional[str] = None
    url: Optional[str] = None
    metadata: Optional[Dict] = None
    contentVector: Optional[List[float]] = None

def cleanup_content(content: str) -> str:
    """Cleans up the given content using regexes
    Args:
        content (str): The content to clean up.
    Returns:
        str: The cleaned up content.
    """
    output = re.sub(r"\n{2,}", "\n", content)
    output = re.sub(r"[^\S\n]{2,}", " ", output)
    output = re.sub(r"-{2,}", "--", output)

    return output.strip()

class BaseParser(ABC):
    """A parser parses content to produce a document."""

    @abstractmethod
    def parse(self, content: str, file_name: Optional[str] = None) -> Document:
        """Parses the given content.
        Args:
            content (str): The content to parse.
            file_name (str): The file name associated with the content.
        Returns:
            Document: The parsed document.
        """
        pass

    def parse_file(self, file_path: str) -> Document:
        """Parses the given file.
        Args:
            file_path (str): The file to parse.
        Returns:
            Document: The parsed document.
        """
        with open(file_path, "r") as f:
            return self.parse(f.read(), os.path.basename(file_path))

    def parse_directory(self, directory_path: str) -> List[Document]:
        """Parses the given directory.
        Args:
            directory_path (str): The directory to parse.
        Returns:
            List[Document]: List of parsed documents.
        """
        documents = []
        for file_name in os.listdir(directory_path):
            file_path = os.path.join(directory_path, file_name)
            if os.path.isfile(file_path):
                documents.append(self.parse_file(file_path))
        return documents

class TextParser(BaseParser):
    """Parses text content."""

    def __init__(self) -> None:
        super().__init__()

    def _get_first_alphanum_line(self, content: str) -> Optional[str]:
        title = None
        for line in content.splitlines():
            if any([c.isalnum() for c in line]):
                title = line.strip()
                break
        return title

    def _get_first_line_with_property(
        self, content: str, property: str = "title: "
    ) -> Optional[str]:
        title = None
        for line in content.splitlines():
            if line.startswith(property):
                title = line[len(property) :].strip()
                break
        return title

    def parse(self, content: str, file_name: Optional[str] = None) -> Document:
        """Parses the given content.
        Args:
            content (str): The content to parse.
            file_name (str): The file name associated with the content.
        Returns:
            Document: The parsed document.
        """
        title = self._get_first_line_with_property(
            content
        ) or self._get_first_alphanum_line(content)

        return Document(content=cleanup_content(content), title=title or file_name)

class UnsupportedFormatError(Exception):
    """Exception raised when a format is not supported by a parser."""

    pass

class ParserFactory:
    def __init__(self):
        self._parsers = {
            "text": TextParser(),
        }

    @property
    def supported_formats(self) -> List[str]:
        "Returns a list of supported formats"
        return list(self._parsers.keys())

    def __call__(self, file_format: str) -> BaseParser:
        parser = self._parsers.get(file_format, None)
        if parser is None:
            raise UnsupportedFormatError(f"{file_format} is not supported")

        return parser
    

class LlamaIndexSplitter:
    def __init__(self, num_tokens, token_overlap, extractor_llm=None):
        splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
            separators=SENTENCE_ENDINGS + WORDS_BREAKS,
            chunk_size=num_tokens, chunk_overlap=token_overlap)
        parser = LangchainNodeParser(splitter)
        self.parser = parser
        self.extractor_llm = extractor_llm
        self.split_mods_map = {
            "summary_extraction": SummaryExtractor(summaries=["prev", "self", "next"], llm=extractor_llm),
            "qa_extraction": QuestionsAnsweredExtractor(questions=3, llm=extractor_llm)
        }
    
    def get_nodes_from_doc(self, llama_doc, split_mods):
        if split_mods and self.extractor_llm == None:
            raise ValueError(f"extractor_llm argument is required to apply modifications during splitting")

        pipeline = IngestionPipeline(
            transformations=[self.parser] + [self.split_mods_map[mod] for mod in split_mods]
        )

        nodes = pipeline.run(
            documents = [llama_doc],
            in_place=True,
            show_progress=True
        )

        return nodes
    

class TokenEstimator(object):
    GPT2_TOKENIZER = tiktoken.get_encoding("gpt2")

    def estimate_tokens(self, text: str) -> int:
        return len(self.GPT2_TOKENIZER.encode(text))

    def construct_tokens_with_size(self, tokens: str, numofTokens: int) -> str:
        newTokens = self.GPT2_TOKENIZER.decode(
            self.GPT2_TOKENIZER.encode(tokens)[:numofTokens]
        )
        return newTokens

parser_factory = ParserFactory()
TOKEN_ESTIMATOR = TokenEstimator()

def _get_file_format(file_name: str, extensions_to_process: List[str]) -> Optional[str]:
    """Gets the file format from the file name.
    Returns None if the file format is not supported.
    Args:
        file_name (str): The file name.
        extensions_to_process (List[str]): List of extensions to process.
    Returns:
        str: The file format.
    """

    # in case the caller gives us a file path
    file_name = os.path.basename(file_name)
    file_extension = file_name.split(".")[-1]
    if file_extension not in extensions_to_process:
        return None
    return FILE_FORMAT_DICT.get(file_extension, None)

@dataclass
class ChunkingResult:
    """Data model for chunking result

    Attributes:
        chunks (List[Document]): List of chunks.
        total_files (int): Total number of files.
        num_unsupported_format_files (int): Number of files with unsupported format.
        num_files_with_errors (int): Number of files with errors.
        skipped_chunks (int): Number of chunks skipped.
    """
    chunks: List[Document]
    total_files: int
    num_unsupported_format_files: int = 0
    num_files_with_errors: int = 0
    # some chunks might be skipped to small number of tokens
    skipped_chunks: int = 0

def get_files_recursively(directory_path: str) -> List[str]:
    """Gets all files in the given directory recursively.
    Args:
        directory_path (str): The directory to get files from.
    Returns:
        List[str]: List of file paths.
    """
    file_paths = []
    for dirpath, _, files in os.walk(directory_path):
        for file_name in files:
            file_path = os.path.join(dirpath, file_name)
            file_paths.append(file_path)
    return file_paths

def extract_pdf_content_regular(file_path):
    docs = PDFReader().load_data(file=file_path)
    full_text = "".join([doc.text for doc in docs])

    return full_text

def get_embedding(text):
    try:
        client = AzureOpenAI(
            api_key = settings.AZURE_OPENAI_KEY,
            api_version = settings.AZURE_OPENAI_PREVIEW_API_VERSION,
            azure_endpoint = settings.AZURE_OPENAI_EMBEDDING_ENDPOINT
        )

        response = client.embeddings.create(
            input=text,
            model="text-embedding-ada-002"
        )
        
        embeddings = ast.literal_eval(response.json())["data"][0]["embedding"]
        
        return embeddings

    except Exception as e:
        raise Exception(f"Error getting embeddings with endpoint={settings.AZURE_OPENAI_EMBEDDING_ENDPOINT} with error={e}")

def chunk_content_helper(
        content: str, 
        file_format: str, 
        file_name: Optional[str],
        token_overlap: int,
        num_tokens: int = 256,
        extractor_llm = None
) -> Generator[Tuple[str, int, Document], None, None]:
    
    if num_tokens is None:
        num_tokens = 1000000000

    parser = parser_factory(file_format)
    doc = parser.parse(content, file_name=file_name)
    llama_doc = LlamaDocument(text=doc.content, metadata={"file_name": file_name, "title": doc.title})

    # if the original doc after parsing is < num_tokens return as it is
    doc_content_size = TOKEN_ESTIMATOR.estimate_tokens(doc.content)

    if file_format == "json" or doc_content_size < num_tokens:
        yield doc.content, doc_content_size, doc
    else:
        llama_splitter = LlamaIndexSplitter(
            num_tokens=settings.PREP_CONFIG["chunk_size"], 
            token_overlap=settings.PREP_CONFIG["token_overlap"], 
            extractor_llm=extractor_llm
        )
        nodes = llama_splitter.get_nodes_from_doc(llama_doc, settings.PREP_CONFIG.get("split_mods", []))
        print("split mods:", settings.PREP_CONFIG.get("split_mods", []))
        
        for node in nodes:
            chunk_size = TOKEN_ESTIMATOR.estimate_tokens(node.text)
            yield node, chunk_size, llama_doc

def chunk_content(
    content: str,
    file_name: Optional[str] = None,
    url: Optional[str] = None,
    ignore_errors: bool = False,
    num_tokens: int = 256,
    min_chunk_size: int = 10,
    token_overlap: int = 0,
    extensions_to_process = FILE_FORMAT_DICT.keys(),
    cracked_pdf = False,
    use_json_parsing = False,
    use_layout = False,
    add_embeddings = False
) -> ChunkingResult:
    """Chunks the given content. If ignore_errors is true, returns None
        in case of an error
    Args:
        content (str): The content to chunk.
        file_name (str): The file name. used for title, file format detection.
        url (str): The url. used for title.
        ignore_errors (bool): If true, ignores errors and returns None.
        num_tokens (int): The number of tokens in each chunk.
        min_chunk_size (int): The minimum chunk size below which chunks will be filtered.
        token_overlap (int): The number of tokens to overlap between chunks.
    Returns:
        List[Document]: List of chunked documents.
    """
    ignore_errors = False
    try:
        if use_json_parsing == True:
            file_format = "json"
        elif file_name is None or cracked_pdf == False or (cracked_pdf and not use_layout):
            file_format = "text"
        elif cracked_pdf:
            file_format = "html"
        else:
            file_format = _get_file_format(file_name, extensions_to_process)
            if file_format is None:
                raise Exception(
                    f"{file_name} is not supported")
        
        extractor_llm = LlamaAOAI(
                model=settings.AZURE_OPENAI_MODEL_NAME,
                deployment_name=settings.AZURE_OPENAI_MODEL,
                api_key=settings.AZURE_OPENAI_KEY,
                azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
                api_version=settings.AZURE_OPENAI_PREVIEW_API_VERSION,
                system_prompt=settings.AZURE_OPENAI_SYSTEM_MESSAGE
            )

        nodes = chunk_content_helper(
            content=content,
            file_name=file_name,
            file_format=file_format,
            num_tokens=num_tokens,
            token_overlap=token_overlap,
            extractor_llm=extractor_llm
        )
        
        chunks = []
        skipped_chunks = 0
        for node, chunk_size, llama_doc in nodes:
            if chunk_size >= min_chunk_size:
                if add_embeddings:
                    for _ in range(RETRY_COUNT):
                        try:
                            combined_content = f"{node.text}\n\nMETADATA: {node.metadata}"
                            contentVector = get_embedding(combined_content)
                            break
                        except:
                            sleep(30)
                    if contentVector is None:
                        raise Exception(f"Error getting embedding for chunk={node}")
                    

                chunks.append(
                    Document(
                        id=node.id_,
                        content=node.text,
                        title=node.metadata["title"],
                        url=url,
                        contentVector=contentVector,
                        metadata=node.metadata
                    )
                )
            else:
                skipped_chunks += 1

    except UnsupportedFormatError as e:
        if ignore_errors:
            return ChunkingResult(
                chunks=[], total_files=1, num_unsupported_format_files=1
            )
        else:
            raise e
    except Exception as e:
        if ignore_errors:
            return ChunkingResult(chunks=[], total_files=1, num_files_with_errors=1)
        else:
            raise e
    
    return ChunkingResult(
        chunks=chunks,
        total_files=1,
        skipped_chunks=skipped_chunks,
    )

def chunk_file(
    file_path: str,
    ignore_errors: bool = True,
    num_tokens=256,
    min_chunk_size=10,
    url = None,
    token_overlap: int = 0,
    extensions_to_process = FILE_FORMAT_DICT.keys(),
    use_layout = False,
    add_embeddings=False
) -> ChunkingResult:
    """Chunks the given file.
    Args:
        file_path (str): The file to chunk.
    Returns:
        List[Document]: List of chunked documents.
    """
    file_name = os.path.basename(file_path)
    file_format = _get_file_format(file_name, extensions_to_process)
    if not file_format:
        if ignore_errors:
            return ChunkingResult(
                chunks=[], total_files=1, num_unsupported_format_files=1
            )
        else:
            raise UnsupportedFormatError(f"{file_name} is not supported")

    cracked_pdf = False
    use_json_parsing = False
    if file_format == "pdf":
        content = extract_pdf_content_regular(file_path)
    elif file_format == "json":
        with open(file_path) as f:
            content = json.load(f)
        use_json_parsing = True
        
    return chunk_content(
        content=content,
        file_name=file_name,
        ignore_errors=ignore_errors,
        num_tokens=num_tokens,
        min_chunk_size=min_chunk_size,
        url=url,
        token_overlap=max(0, token_overlap),
        extensions_to_process=extensions_to_process,
        cracked_pdf=cracked_pdf,
        use_json_parsing=use_json_parsing,
        use_layout=use_layout,
        add_embeddings=add_embeddings
    )


def process_file(
        file_path: str, # !IMP: Please keep this as the first argument
        directory_path: str,
        ignore_errors: bool = True,
        num_tokens: int = 1024,
        min_chunk_size: int = 10,
        url_prefix = None,
        token_overlap: int = 0,
        extensions_to_process: List[str] = FILE_FORMAT_DICT.keys(),
        form_recognizer_client = None,
        use_layout = False,
        add_embeddings = False
    ):

    is_error = False
    try:
        url_path = None
        rel_file_path = os.path.relpath(file_path, directory_path)
        if url_prefix:
            url_path = url_prefix + rel_file_path
            url_path = convert_escaped_to_posix(url_path)

        result = chunk_file(
            file_path,
            ignore_errors=ignore_errors,
            num_tokens=num_tokens,
            min_chunk_size=min_chunk_size,
            url=url_path,
            token_overlap=token_overlap,
            extensions_to_process=extensions_to_process,
            use_layout=use_layout,
            add_embeddings=add_embeddings
        )
        for chunk_idx, chunk_doc in enumerate(result.chunks):
            chunk_doc.filepath = rel_file_path
            chunk_doc.metadata["chunk_idx"] = str(chunk_idx)
    except Exception as e:
        if not ignore_errors:
            raise
        print(f"File ({file_path}) failed with ", e)
        is_error = True
        result =None
    return result, is_error

In [45]:
directory_path = "../../data/full_data"
chunks = []
total_files = 0
num_unsupported_format_files = 0
num_files_with_errors = 0
skipped_chunks = 0

all_files_directory = get_files_recursively(directory_path)
files_to_process = [file_path for file_path in all_files_directory if os.path.isfile(file_path)]
file_path = files_to_process[0]


In [16]:
!az login

ERROR: No subscriptions found for andre.chen@accenturefederal.com.


In [134]:

result, is_error = process_file(file_path=file_path,
                                directory_path=directory_path, 
                                ignore_errors=False,
                                num_tokens=settings.PREP_CONFIG["chunk_size"],
                                min_chunk_size=100, 
                                url_prefix=None,
                                token_overlap=settings.PREP_CONFIG["token_overlap"],
                                extensions_to_process=list(FILE_FORMAT_DICT.keys()),
                                add_embeddings=True)

Parsing nodes:   0%|          | 0/1 [00:00<?, ?it/s]

100%|██████████| 8/8 [00:02<00:00,  2.78it/s]


split mods: ['summary_extraction']


In [135]:
result.chunks[1]

Document(content=".................... ................................ ................................ ................................ ................................ ............................... 8 \nPersonnel Compensation and Benefits ................................ ................................ ................................ ................................ ................................ ........................ 9 \nNon Pay Budget Exhibits ................................ ................................ ................................ ................................ ................................ ................................ .......... 12 \n \n Federal Emergency Management Agency Radiological Emergency Preparedness Program \nFEMA – REPP - 3 Radiological Emergency Preparedness Program \nBudget Comparison and Adjustments \nComparison of Budget Authority and Request \n(Dollars in Thousands) \n \n \nFY 2022 \nEnacted FY 2023 \nEnacted FY 2024 \nPresident's Budget 

# Test 1: 
### Try refactoring our indexing / retrieval / generation pipeline fully in LlamaIndex

Current Issues:
- Queries against the ACS vector store don't work, potentially need to specify additional arguments to have it work with vectors
- Not clear whether a fully refactor using LlamaIndex for storage, retrieval, and response generation LlamaIndex query engine does not use natural language query transformations out of the box, whereas it is unclear whether this is done on Azure use your data API (https://learn.microsoft.com/en-us/azure/ai-services/openai/concepts/use-your-data?tabs=ai-search)

In [4]:
search_service_name = settings.AZURE_SEARCH_SERVICE
key = settings.AZURE_SEARCH_KEY
cognitive_search_credential = AzureKeyCredential(key)
service_endpoint = f"https://{search_service_name}.search.windows.net"
index_name = settings.AZURE_SEARCH_INDEX

search_client = SearchClient(
    endpoint=service_endpoint,
    index_name=index_name,
    credential=cognitive_search_credential
)

In [7]:
from llama_index.vector_stores import CognitiveSearchVectorStore
from llama_index.vector_stores.cogsearch import (
    IndexManagement,
    MetadataIndexFieldType,
    CognitiveSearchVectorStore,
)
from llama_index.llms import AzureOpenAI
from llama_index.embeddings import AzureOpenAIEmbedding
from llama_index import (
    SimpleDirectoryReader,
    StorageContext,
    ServiceContext,
    VectorStoreIndex,
)
from llama_index import set_global_service_context
from llama_index.vector_stores.types import (
    ExactMatchFilter,
    MetadataFilters,
    VectorStore,
    VectorStoreQuery,
    VectorStoreQueryMode,
    VectorStoreQueryResult,
)

from llama_index.query_engine import CitationQueryEngine
from llama_index.prompts import PromptTemplate


In [6]:
# Set Storage Context

vector_store = CognitiveSearchVectorStore(
    search_or_index_client=search_client,
    #index_management=IndexManagement.NO_VALIDATION,
    id_field_key=settings.AZURE_SEARCH_ID_COLUMNS,
    chunk_field_key=settings.AZURE_SEARCH_CONTENT_COLUMNS,
    embedding_field_key=settings.AZURE_SEARCH_VECTOR_COLUMNS,
    metadata_string_field_key=settings.AZURE_SEARCH_METADATA_COLUMNS,
    doc_id_field_key=settings.AZURE_SEARCH_FILENAME_COLUMN
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

In [9]:
# Set Service Context

embed_model = AzureOpenAIEmbedding(
    model="text-embedding-ada-002",
    deployment_name="dep-embeddings",
    api_key=settings.AZURE_OPENAI_KEY,
    azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
    api_version=settings.AZURE_OPENAI_PREVIEW_API_VERSION
)

llm = AzureOpenAI(
    model=settings.AZURE_OPENAI_MODEL_NAME,
    deployment_name=settings.AZURE_OPENAI_MODEL,
    api_key=settings.AZURE_OPENAI_KEY,
    azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
    api_version=settings.AZURE_OPENAI_PREVIEW_API_VERSION,
    system_prompt=settings.AZURE_OPENAI_SYSTEM_MESSAGE
)

service_context = ServiceContext.from_defaults(
    llm=llm,
    embed_model=embed_model
)



In [10]:
index = VectorStoreIndex.from_documents(
    [], storage_context=storage_context, service_context=service_context
)

In [23]:
from llama_index.memory import ChatMemoryBuffer

memory = ChatMemoryBuffer.from_defaults(token_limit=1500)
chat_engine = index.as_chat_engine(chat_mode="context", memory=memory, system_prompt=settings.AZURE_OPENAI_SYSTEM_MESSAGE, vector_store_query_mode="hybrid")


In [56]:
import unicodedata
import openai

class AOAIGenerator:
    """
    A class for building and managing messages in a chat conversation.
    Attributes:
        message (list): A list of dictionaries representing chat messages.
        model (str): The name of the GPT model.
        token_count (int): The total number of tokens in the conversation.
    Methods:
        __init__(self, system_content: str, chatgpt_model: str): Initializes the MessageBuilder instance.
        append_message(self, role: str, content: str, index: int = 1): Appends a new message to the conversation.
    """

    def __init__(self, system_content: str, model: str = "gpt-35-turbo"):
        
        system_prompt = {
            "role": "system",
            "content": self.normalize_content(system_content),
        }
        self.messages = [system_prompt]
        self.model = model
        self.client = AOAI(
            azure_endpoint = settings.AZURE_OPENAI_ENDPOINT, 
            api_key=settings.AZURE_OPENAI_KEY,  
            api_version=settings.AZURE_OPENAI_PREVIEW_API_VERSION
        )

    def chat_completion(self, messages):
        self.messages += messages
        response = self.client.chat.completions.create(
            model=settings.AZURE_OPENAI_MODEL,
            messages=self.messages,
            temperature=settings.AZURE_OPENAI_TEMPERATURE,
            max_tokens=settings.AZURE_OPENAI_MAX_TOKENS,
            top_p=settings.AZURE_OPENAI_TOP_P,
            frequency_penalty=0,
            presence_penalty=0,
            stop=None,
        )

        try:
            _ = response.choices[0].message.content
            return response
        except:
            return ""

    def normalize_content(self, content: str):
        return unicodedata.normalize("NFC", content)

In [72]:
test = AOAIGenerator(settings.AZURE_OPENAI_SYSTEM_MESSAGE, model=settings.AZURE_OPENAI_MODEL_NAME)
test_messages = [
    {"role": "user", "content": "who are you?"}
]
response = test.chat_completion(test_messages)
print(response.choices[0].message.content)

Hello! I am FEMA OCFO GPT, a helpful AI assistant. My purpose is to provide accurate and relevant information about the FEMA budget. How can I assist you today?


In [83]:
from flask import Response
def format_as_ndjson(obj: dict) -> str:
    return json.dumps(obj, ensure_ascii=False) + "\n"

In [25]:
import uuid
import datetime

from typing import Any, List, Optional, Sequence
from llama_index.core import BaseQueryEngine, BaseRetriever
from llama_index.callbacks.base import CallbackManager
from llama_index.postprocessor.types import BaseNodePostprocessor
from llama_index.response.schema import RESPONSE_TYPE
from llama_index.callbacks.schema import CBEventType, EventPayload
from llama_index.node_parser import SentenceSplitter, TextSplitter
from llama_index.prompts.base import BasePromptTemplate
from llama_index.response_synthesizers import (
    BaseSynthesizer,
    ResponseMode,
    get_response_synthesizer,
)
from llama_index.indices.base import BaseGPTIndex
from llama_index.schema import MetadataMode, NodeWithScore, QueryBundle
from llama_index.retrievers import BM25Retriever, QueryFusionRetriever
from llama_index.response.schema import Response


CITATION_QA_TEMPLATE = PromptTemplate(
    "Please provide an answer based solely on the provided sources. "
    "When referencing information from a source, "
    "cite the appropriate source(s) using their corresponding numbers. "
    "Every answer should include at least one source citation. "
    "Only cite a source when you are explicitly referencing it. "
    "If none of the sources are helpful, you should indicate that. "
    "For example:\n"
    "Source 1:\n"
    "The sky is red in the evening and blue in the morning.\n"
    "Source 2:\n"
    "Water is wet when the sky is red.\n"
    "Query: When is water wet?\n"
    "Answer: Water will be wet when the sky is red [2], "
    "which occurs in the evening [1].\n"
    "Now it's your turn. Below are several numbered sources of information:"
    "\n------\n"
    "{context_str}"
    "\n------\n"
    "Query: {query_str}\n"
    "Answer: "
)

CITATION_REFINE_TEMPLATE = PromptTemplate(
    "Please provide an answer based solely on the provided sources. "
    "When referencing information from a source, "
    "cite the appropriate source(s) using their corresponding numbers. "
    "Every answer should include at least one source citation. "
    "Only cite a source when you are explicitly referencing it. "
    "If none of the sources are helpful, you should indicate that. "
    "For example:\n"
    "Source 1:\n"
    "The sky is red in the evening and blue in the morning.\n"
    "Source 2:\n"
    "Water is wet when the sky is red.\n"
    "Query: When is water wet?\n"
    "Answer: Water will be wet when the sky is red [2], "
    "which occurs in the evening [1].\n"
    "Now it's your turn. "
    "We have provided an existing answer: {existing_answer}"
    "Below are several numbered sources of information. "
    "Use them to refine the existing answer. "
    "If the provided sources are not helpful, you will repeat the existing answer."
    "\nBegin refining!"
    "\n------\n"
    "{context_msg}"
    "\n------\n"
    "Query: {query_str}\n"
    "Answer: "
)

class BaseOCFOQueryEngine(BaseQueryEngine):
    def __init__(
        self,
        retriever: BaseRetriever,
        generator: AOAIGenerator,
        node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
        callback_manager: Optional[CallbackManager] = None,
        metadata_mode: MetadataMode = MetadataMode.NONE,
        citation_qa_template: BasePromptTemplate = CITATION_QA_TEMPLATE,
        citation_refine_template: BasePromptTemplate = CITATION_REFINE_TEMPLATE,
    ):
        self._retriever = retriever
        self._generator = generator
        self._node_postprocessors = node_postprocessors or []
        self._metadata_mode = metadata_mode
        self.citation_qa_template = citation_qa_template
        self.citation_refine_template = citation_refine_template

        callback_manager = callback_manager or CallbackManager()
        for node_postprocessor in self._node_postprocessors:
            node_postprocessor.callback_manager = callback_manager

        super().__init__(callback_manager)


    @classmethod
    def from_args(
        cls,
        index: VectorStoreIndex,
        query_type: str,
        citation_qa_template: BasePromptTemplate = CITATION_QA_TEMPLATE,
        citation_refine_template: BasePromptTemplate = CITATION_REFINE_TEMPLATE,
        node_postprocessors: Optional[List[BaseNodePostprocessor]] = None,
        
        # class-specific args
        metadata_mode: MetadataMode = MetadataMode.NONE,
        **kwargs: Any,
    ) -> "BaseOCFOQueryEngine":

        #TODO: Make this more generalizable and allow retrievers to be set outside of class definition
        vector_retriever = index.as_retriever(similarity_top_k=settings.AZURE_SEARCH_TOP_K)
        bm25_retriever = BM25Retriever.from_defaults(
            docstore=index.docstore, similarity_top_k=settings.AZURE_SEARCH_TOP_K
        )
        rrf_retriever = QueryFusionRetriever(
            [vector_retriever, bm25_retriever],
            similarity_top_k=settings.AZURE_SEARCH_TOP_K,
            num_queries=1,  # set this to 1 to disable query generation
            mode="reciprocal_rerank",
            use_async=True,
            verbose=True,
            # query_gen_prompt="...",  # we could override the query generation prompt here
        )

        retriever_mapping = {
            "simple": bm25_retriever,
            "semantic": bm25_retriever,
            "vector": vector_retriever,
            "vectorSimpleHybrid": rrf_retriever,
            "vectorSemanticHybrid": rrf_retriever
        }

        return cls(
            retriever=retriever_mapping[query_type],
            callback_manager=index.service_context.callback_manager,
            node_postprocessors=node_postprocessors,
            metadata_mode=metadata_mode,
            citation_qa_template=citation_qa_template,
            citation_refine_template=citation_refine_template
        )
    
    def retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        nodes = self._retriever.retrieve(query_bundle)

        for postprocessor in self._node_postprocessors:
            nodes = postprocessor.postprocess_nodes(nodes, query_bundle=query_bundle)

        return nodes
    
    @property
    def retriever(self) -> BaseRetriever:
        """Get the retriever object."""
        return self._retriever

    def query(self, messages: List[dict]):
        str_or_query_bundle = messages[-1]["content"]
        with self.callback_manager.as_trace("query"):
            if isinstance(str_or_query_bundle, str):
                str_or_query_bundle = QueryBundle(str_or_query_bundle)
            return self._query(str_or_query_bundle, messages)


    def _get_response(self, source_nodes, messages):
        query_str = messages[-1]["content"]
        context_str = "\n\n".join([n.node.get_content() for n in source_nodes])
        aoai_generator = self._generator(
            system_message=settings.AZURE_OPENAI_SYSTEM_MESSAGE,
            model=settings.AZURE_OPENAI_MODEL_NAME
        )
        messages[-1]["content"] = self.citation_qa_template.format(context_str=context_str, query_str=query_str)
        
        return self._format_response(aoai_generator.chat_completion(messages), source_nodes)
    
        
    def _format_response(self, aoai_response: dict, source_nodes: List[NodeWithScore]):
        citations_json = [
            {
                "content": ct.node.text,
                "id": "",
                "title": "",
                "filepath": "",
                "url": "",
                "metadata": ct.node.metadata
            } for ct in source_nodes
        ]

        output_json = {
            "id": aoai_response.id,
            "model": settings.AZURE_OPENAI_MODEL_NAME,
            "created": (datetime.now() - datetime(1970, 1, 1)).total_seconds(),
            "object": "chat.completion",
            "choices": [
                {
                    'index': 0,
                    'messages': [
                        {
                            'index': 0,
                            'role': 'tool',
                            'content': {
                                'citations': citations_json,
                                'intent': ''
                            },
                            'end_turn': False
                        },
                        {
                            'index': 1,
                            'role': 'assistant',
                            'content': aoai_response.choices[0].message.content,
                            'end_turn': True
                        }
                    ]
                }
            ],
            # TODO: add function to compute tokens used
            "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
            "history_metadata": {}
        }

        return output_json
    
    def _query(self, query_bundle: QueryBundle, messages: List[dict]) -> RESPONSE_TYPE:
        """Answer a query."""
        with self.callback_manager.event(
            CBEventType.QUERY, payload={EventPayload.QUERY_STR: query_bundle.query_str}
        ) as query_event:
            with self.callback_manager.event(
                CBEventType.RETRIEVE,
                payload={EventPayload.QUERY_STR: query_bundle.query_str},
            ) as retrieve_event:
                nodes = self.retrieve(query_bundle)
                retrieve_event.on_end(payload={EventPayload.NODES: nodes})

            context_str = "\n\n".join([n.node.get_content() for n in nodes])
            aoai_generator = self._generator(
                system_message=settings.AZURE_OPENAI_SYSTEM_MESSAGE,
                model=settings.AZURE_OPENAI_MODEL_NAME
            )
            messages[-1]["content"] = self.citation_qa_template.format(context_str=context_str, query_str=query_bundle.query_str)

            query_event.on_end(payload={EventPayload.RESPONSE: response})

        return response

        


AttributeError: 'ContextChatEngine' object has no attribute 'get_prompts'

In [50]:
print(CITATION_QA_TEMPLATE.format(context_str="hello", query_str="goodbye"))

Please provide an answer based solely on the provided sources. When referencing information from a source, cite the appropriate source(s) using their corresponding numbers. Every answer should include at least one source citation. Only cite a source when you are explicitly referencing it. If none of the sources are helpful, you should indicate that. For example:
Source 1:
The sky is red in the evening and blue in the morning.
Source 2:
Water is wet when the sky is red.
Query: When is water wet?
Answer: Water will be wet when the sky is red [2], which occurs in the evening [1].
Now it's your turn. Below are several numbered sources of information:
------
hello
------
Query: goodbye
Answer: 


In [60]:
from IPython.display import Markdown, display

query_mode_mapping = {
    "simple": "text_search",
    "vector": "default",
    "vectorSimpleHybrid": "hybrid",
    "vectorSemanticHybrid": "semantic_hybrid"
}

query_engine = CitationQueryEngine.from_args(index, 
                                             similarity_top_k=settings.AZURE_SEARCH_TOP_K, 
                                             citation_chunk_size=settings.PREP_CONFIG["chunk_size"],
                                             vector_store_query_mode=query_mode_mapping[settings.AZURE_SEARCH_QUERY_TYPE])

qe_response = query_engine.query("What are the PPAs within Operations and Support?")

display(Markdown(f"<b>{qe_response}</b>"))

<b>The PPAs (Program, Project, or Activity) within Operations and Support are as follows:

1. Preparedness and Protection (PPA code: FEMA - O&S - 85) [Source 1]
2. Response and Recovery (PPA code: FEMA - O&S - 86) [Source 1]
3. Response (PPA code: FEMA - O&S - 96) [Source 2]
4. Response (PPA code: FEMA - O&S - 97) [Source 2]
5. Mission Support (PPA code: FEMA - O&S - 49) [Source 3]
6. Mission Support (PPA code: FEMA - O&S - 50) [Source 3]

Please note that these PPAs are specific to the FEMA Operations and Support budget.</b>

In [70]:
print(json.dumps(qe_response.source_nodes[0].to_dict(), indent=3))
qe_response.source_nodes[0]


{
   "node": {
      "id_": "56",
      "embedding": null,
      "metadata": {
         "chunk_id": "56"
      },
      "excluded_embed_metadata_keys": [],
      "excluded_llm_metadata_keys": [],
      "relationships": {},
      "hash": "6005d11cb8f1053002578eb2971fa74a85db4e02db2d10362f5c0844f31ffce1",
      "text": "Source 1:\n. Operations and Support Preparedness and Protection \u2013 PPA \nFEMA \u2013 O&S - 85 \nNational Preparedness Directorate (NPD): NPD is instrumental in building a culture of preparedness across the Nation, with a focus on helping \npeople prepare for disasters, organizing a scalable and capable incident workforce, lessons learned and continuous improvement . NPD will also \ncontinue to provide doctrine, programs, and resources to implement the National Preparedness System which prepares the Nation to prevent, protect, \nmitigate, respond to and recover from disasters. Activities include training, edu cation, and evaluation, support to the National Incident Man

NodeWithScore(node=TextNode(id_='56', embedding=None, metadata={'chunk_id': '56'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, hash='6005d11cb8f1053002578eb2971fa74a85db4e02db2d10362f5c0844f31ffce1', text="Source 1:\n. Operations and Support Preparedness and Protection – PPA \nFEMA – O&S - 85 \nNational Preparedness Directorate (NPD): NPD is instrumental in building a culture of preparedness across the Nation, with a focus on helping \npeople prepare for disasters, organizing a scalable and capable incident workforce, lessons learned and continuous improvement . NPD will also \ncontinue to provide doctrine, programs, and resources to implement the National Preparedness System which prepares the Nation to prevent, protect, \nmitigate, respond to and recover from disasters. Activities include training, edu cation, and evaluation, support to the National Incident Management \nSystem, technical assistance, assessing levels of national preparedness, nat

In [71]:
citations_json = [
    {
        "content": ct.node.text,
        "id": "",
        "title": "",
        "filepath": "",
        "url": "",
        "metadata": ct.node.metadata
    } for ct in qe_response.source_nodes
]

citations_json

[{'content': "Source 1:\n. Operations and Support Preparedness and Protection – PPA \nFEMA – O&S - 85 \nNational Preparedness Directorate (NPD): NPD is instrumental in building a culture of preparedness across the Nation, with a focus on helping \npeople prepare for disasters, organizing a scalable and capable incident workforce, lessons learned and continuous improvement . NPD will also \ncontinue to provide doctrine, programs, and resources to implement the National Preparedness System which prepares the Nation to prevent, protect, \nmitigate, respond to and recover from disasters. Activities include training, edu cation, and evaluation, support to the National Incident Management \nSystem, technical assistance, assessing levels of national preparedness, national exercise leadership and support, Individual and Community \nPreparedness, and executive oversight. The FY 20 24 Budget includes decreases for non -recurs from the FY 2023 Enacted. \nResilience Offices: The Resilience Offices

# Test 2: 
### Try to get text splitting / metadata extraction pipeline to work using LlamaIndex Nodes

In [21]:
from data_utils import chunk_directory

directory_path = settings.PREP_CONFIG["data_path"]

chunking_result = chunk_directory(
        directory_path=directory_path,
        ignore_errors=False,
        num_tokens=settings.PREP_CONFIG["chunk_size"],
        min_chunk_size=10,
        url_prefix = None,
        token_overlap=settings.PREP_CONFIG["token_overlap"],
        form_recognizer_client = None,
        use_layout = False,
        njobs=1,
        add_embeddings = True
)

Total files to process=2 out of total directory size=2
Single process to chunk and parse the files. --njobs > 1 can help performance.


  0%|          | 0/2 [00:00<?, ?it/s]

SingletonFormRecognizerClient: Creating instance of Form recognizer per process
SingletonFormRecognizerClient: Skipping since credentials not provided. Assuming NO form recognizer extensions(like .pdf) in directory
Processing PDF WITHOUT Document Intelligence


 50%|█████     | 1/2 [00:41<00:41, 41.22s/it]

Processing PDF WITHOUT Document Intelligence


100%|██████████| 2/2 [01:00<00:00, 30.14s/it]


In [23]:
len(chunking_result.chunks)

98

In [28]:
from llama_index import Document
test = Document(text="hello", metadata={"title": "hello"})
test.metadata["title"]

'hello'

In [29]:
test.text

'hello'