
# Reliability AI

## Installation

Remember to install dependendencies before using notebook:

`pip install -qU langchain_community unstructured "unstructured[md]" nltk langchain-text-splitters langchain_chroma`

In [21]:
import getpass
import os
from langchain_openai import ChatOpenAI

# Set your API key
def set_key(key):
    if key not in os.environ:
        print(f"Please enter your {key}")
        os.environ[key] = getpass.getpass()
    else:
        pass

set_key("OPENAI_API_KEY")
set_key("LANGSMITH_API_KEY")


os.environ["LANGCHAIN_TRACING_V2"] = "true"
llm = ChatOpenAI(model="gpt-4o-mini")

Please enter your OPENAI_API_KEY
Please enter your LANGSMITH_API_KEY


In [None]:
''' 
A simplified view of a component with a single reliability value (availability) and a list of dependencies.
The availability of a component is the probability that the component is operational over the system's availability time period (normally 30d).
'''
from typing import List, Optional
from typing_extensions import TypedDict
from pydantic import BaseModel, Field

class ServiceSLA(TypedDict):
    metric_name: str
    sla: float

class Service(BaseModel):
    """ A service is a component of a system.  It has a name, a list of dependencies, and a list of SLAs """
    name: str = Field(description="Name of the service")
    dependencies: dict[str, None] = Field(default_factory=dict[str], description="Dict of dependencies")
    SLAs: dict[str, ServiceSLA] = Field(default_factory=dict[str,ServiceSLA], description="Dict of SLAs")

    def calculate_availability_slo(self, lookup):
        availability =  self.__get_availability()
        for d in self.dependencies:
            dep = lookup(d)
            availability *= dep.calculate_availability_slo(lookup)
    
        return availability
    
    def __get_availability(self) -> float:
        if 'availability' in self.SLAs:
            return self.SLAs['availability']['sla']
        else:
            return float('nan')


class System(BaseModel):
    """ A system is a collection of services """
    services: dict[str, Service] = Field(default_factory=dict[str, Service], description="Dict of services")

    def calculate_availability_slo(self):
        availability = 1.0
        def lookup(name):
            return self.services[name]
        # TODO: use root node of services to calculate availability    
        for svc in self.services.values():
            availability *= svc.calculate_availability_slo(lookup)

        return availability

exmaple_system: dict[str, Service] = {
    'Foo': Service(name='Foo', dependencies={'Bar': None}, slas={ 'availability': {'metric_name': 'availability', 'sla': 0.99}}),
    'Bar': Service(name='Bar', dependencies={'Baz': None}, slas={ 'availability': {'metric_name': 'availability', 'sla': 0.99}}),
    'Baz': Service(name='Baz', dependencies={}, slas={ 'availability': {'metric_name': 'availability', 'sla': 0.99}}),
}

# TODO: move into a module and turn into a test
system = System(services=exmaple_system)
sla = system.calculate_availability_slo()
assert sla != float('nan')

In [46]:
from langchain_community.document_loaders import UnstructuredMarkdownLoader, TextLoader
from langchain_core.documents import Document
from langchain_text_splitters import MarkdownHeaderTextSplitter, ExperimentalMarkdownSyntaxTextSplitter
from os import path

notebook_name = "reliable-agent"
file_path = path.abspath("") + "/" + notebook_name + "/docs/system-design-document.md"
loader = TextLoader(file_path)
# sync loading data file
data = loader.load()
# check document is not empty
assert len(data) >= 1
assert isinstance(data[0], Document)
doc_content = data[0].page_content

headers_to_split_on = [
    ("#", "Header 1"),
    ("##", "Header 2"),
    ("###", "Header 3"),
    ("####", "Header 4"),
]

# TODO: consider using RecursiveCharacterTextSplitter?

markdown_splitter = ExperimentalMarkdownSyntaxTextSplitter(headers_to_split_on=headers_to_split_on)
md_header_splits = markdown_splitter.split_text(doc_content)
assert len(md_header_splits) > 3

In [None]:
from langchain import hub
from langchain_chroma import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import  PydanticOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

vectorstore = Chroma.from_documents(documents=md_header_splits, embedding=OpenAIEmbeddings())

# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": 6})


# TODO: convert to a agentic-type chain to cover cases where there is insufficient data in the document
# TODO: convert to Graph RAG to have a more structured representation of the data
parser = PydanticOutputParser(pydantic_object=System)
template = """Answer the question based only on the following context:
{context}

Format instructions: {format_instructions}

Question: {question}
"""
prompt = PromptTemplate(
    input_variables=["context", "question"],
    template=template,
    partial_variables={"format_instructions": parser.get_format_instructions()},
    )

rag_chain = (
    {"context": retriever | format_docs,
     "question": RunnablePassthrough()}
    | prompt
    | llm
    | parser
)

output = rag_chain.invoke("What are the services that make up the described system in the context?")
assert output == System(services={
    'Service A': Service(name='Service A', dependencies={'Service B': None, 'Service C': None}, SLAs={}),
    'Service B': Service(name='Service B', dependencies={}, SLAs={}),
    'Service C': Service(name='Service C', dependencies={}, SLAs={})
})