Install the dependencies below

In [None]:
! pip install -U langchain-nomic langchain_community tiktoken chromadb langchainhub langchain langgraph tavily-python gpt4all firecrawl-py

Next we load our .env file. If you don't have one, create it and include a langchain api key, jina key and tavily search key

In [None]:
import os
from dotenv import load_dotenv

load_dotenv()  # Take environment variables from .env.

os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_ENDPOINT'] = 'https://api.smith.langchain.com'
os.environ['LANGCHAIN_API_KEY'] = os.getenv('LANG_KEY') # replace with your own key

In [None]:
local_llm = 'llama3' # Change this to a model of your choice

Below, I am using a bunch of webURLs as my primary data source. You can try this out by loading a CSV file with your own data or maybe even a database with company information, a JSON file with some semantic encoding information etc.

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain.docstore.document import Document
import requests

# Bunch of randomly generated URLs (restricting to 3 to not destroy API token limits)
urls = {
    'https://en.wikipedia.org/wiki/Knowledge_graph',
    'https://en.wikipedia.org/wiki/Semantic_technology',
    'https://en.wikipedia.org/wiki/Semantic_integration'
    # 'https://en.wikipedia.org/wiki/Logical_graph',
    # 'https://en.wikipedia.org/wiki/Knowledge_graph_embedding',
    # 'https://en.wikipedia.org/wiki/Graph_database',
    # 'https://en.wikipedia.org/wiki/Formal_semantics_(natural_language)',
    # 'https://en.wikipedia.org/wiki/Artificial_general_intelligence',
    # 'https://en.wikipedia.org/wiki/Recursive_self-improvement',
    # 'https://en.wikipedia.org/wiki/Automated_planning_and_scheduling',
    # 'https://en.wikipedia.org/wiki/Machine_learning',
    # 'https://en.wikipedia.org/wiki/Natural_language_processing'
}

headers = {
   'Accept': 'application/json',
   'Authorization': os.getenv('JINA_KEY') # replace with your own api key 
}

base_url = 'https://r.jina.ai/'

docs = [requests.get(base_url+url, headers=headers).json() for url in urls]

docs_list = []

# Look up JINA API response format but essentially we are extracting the content and reconstructing metadata from the response
for doc in docs:
    metadata = {k: v for k, v in doc['data'].items() if k != 'content'}
    docs_list.append({"content": doc['data']['content'], "metadata": metadata})


In [None]:
# Split document into smaller chunks. Smaller chunk sizes are usually better for tasks where you want to extract more granular information or meaning from individual words e.g. SEO or grammar/syntax checking. For a more holistic understanding of your data, use a larger chunk size. Chunk overlap is the number of characters that will be shared between adjacent chunks. This is useful for tasks where you want to maintain context between chunks e.g. sentiment analysis or topic modeling.

text_splitter = RecursiveCharacterTextSplitter().from_tiktoken_encoder(
    chunk_size=300, chunk_overlap=5
)
doc_splits = text_splitter.create_documents(texts=[doc['content'] for doc in docs_list], metadatas=[doc['metadata'] for doc in docs_list])


# Filter out metadata that comes as an array because that isn't supported
filtered_docs = []

for doc in doc_splits:
    if isinstance(doc, Document) and hasattr(doc, 'metadata'):
        if doc.metadata is not None:
            clean_metadata = {k: v for k, v in doc.metadata.items() if type(v) in [str, int, float, bool]}
        else:
            clean_metadata = {}
        filtered_docs.append(Document(page_content=doc.page_content, metadata=clean_metadata))


In [None]:
# Add to vector DB
vectorstore = Chroma.from_documents(
    documents=filtered_docs,
    collection_name="rag-chroma",
    embedding = GPT4AllEmbeddings(
        model_name="all-MiniLM-L6-v2.gguf2.f16.gguf", #this is a smaller embedding model for test purposes. make sure your chunk size doesnt exceed the model's context length (in this mode, the max context length is 512)
        gpt4all_kwargs={'allow_download': 'True'}
    )
)

# Create a retriever from our vectorstore
retriever = vectorstore.as_retriever()

We will now create a **retrieval grader** to determine if the document pulled is relevant to the user question

In [None]:
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatOllama
from langchain_core.output_parsers import JsonOutputParser

# Initialize the chat model
llm = ChatOllama(model=local_llm, format='json', temperature=0)

# The following prompt is generally how you would structure a retrieval grader prompt with roles defined between header_ids
prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
    You are a grader assessing relevance of a retrieved document to a user question. If the document contains topics/concepts/keywords related
    to the user question, grade it as relevant. It does not need to be a stringent test. The goal is to filter out erroneous retrievals.
    \n Give a binary score of 'yes' or 'no' to indicate whether the document is relevant to the question. \n
    Provide the binary score as a JSON with a single key 'score'.
    <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the retrieved document: \n\n {document} \n\n
    Question: {question} \n <|eot_id|><|start_header_id|>assistant<|end_header_id|>
    """,
    input_variables=["question", "document"]
)

# # Chain these steps together
# retrieval_grader = prompt | llm | JsonOutputParser()
# question_right = "What is a knowledge graph?" # Test question to see if it can identify a relevant document from our store
# docs_right = retriever.invoke(question_right)
# doc_text = docs_right[1].page_content # We take a sample document from the retrieved documents
# print(retrieval_grader.invoke({"question": question_right, "document": doc_text})) # Now we check to see if that doc is relevant

# question_wrong = "Who made sesame street?"
# docs_wrong = retriever.invoke(question_wrong)
# doc_text = docs_wrong[1].page_content
# print(retrieval_grader.invoke({"question": question_wrong, "document": doc_text})) # Now we check to see if that doc is relevant

Let's now handle response generation with the document that was retrieved by defining a **rag_chain**

In [None]:

from langchain import hub
from langchain_core.output_parsers import StrOutputParser

prompt = PromptTemplate(
    template="""<|begin_of_text|><|start_header_id|>system<|end_header_id|> You are an AI assistant tasked with generating a response to a user question. Use the following pieces of retrieved context to answer the question. If you don't know the answer, just say that you don't know. Use a maximum of 3 sentences and keep the answer concise <|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is my question: {question} 
    Here is the potential context: {context}
    <|eot_id|><|start_header_id|>assistant<|end_header_id|>""",
    input_variables=["question", "context"],
)

# Initialize the chat model this time you don't need json as you want a string output
llm = ChatOllama(model=local_llm, temperature=0)

# Post-processing
def format_docs(docs):
    return "\n\n".join([doc.page_content for doc in docs])

# Chain the steps together
rag_chain = prompt | llm | StrOutputParser()

# # Run
# question = "Who made sesame street?"
# docs = retriever.invoke(question)
# result = rag_chain.invoke({"question": question, "context": docs})
# print(result)


In the event that the retrieved document is not relevant, let's opt for a web search via Tavily via a **web_search_tool**

In [None]:
from langchain_community.tools.tavily_search import TavilySearchResults
# The api key is already set as TAVILY_API_KEY in the .env file and will be automatically pulled
web_search_tool = TavilySearchResults(maxResults=3) 

Next we determine if the output was a hallucination or not by creating another **hallucination_grader** agent

In [None]:

# LLM instantiation
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# This agent will assume the responsibilty of checking for hallucinations (i.e. is the response grounded in facts)
prompt = PromptTemplate(
    template='''<|begin_of_text|><|start_header_id|>system<|end_header_id|>
    You are a grader assessing the quality of a generated response. If the response is based in facts relevant to the question, grade it as relevant. If the response is incoherent or irrelevant, grade it as irrelevant. The goal is to filter out erroneous or hallucinating responses. \n Give a binary score of 'yes' or 'no' to indicate whether the response is grounded in truths and known facts. Your output should be a key of 'score' and the binary score you have determined.<|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the question: \n {question} \n
    Here is the generated response: \n {response} \n
    <|eot_id|><|start_header_id|>assistant<|end_header_id|>''',
    input_variables=["question", "response"]
)

hallucination_grader = prompt | llm | JsonOutputParser()
# hallucination_grader.invoke({"question": question, "response": result}) #using values from the previous cell

Now we create an **answer_grader** agent to determine how good our response was for evaluation purposes

In [None]:
# LLM instantiation
llm = ChatOllama(model=local_llm, format="json", temperature=0)

# This agent will assume the responsibilty of checking how useful the answer was
prompt = PromptTemplate(
    template='''<|begin_of_text|><|start_header_id|>system<|end_header_id|>
    You are a grader assessing the quality of a generated response. If the response is coherent and relevant to the question, grade it as relevant. If the response is incoherent or irrelevant, grade it as irrelevant. The goal is to filter out responses that don't actually answer the question well. \n Give a binary score of 'yes' or 'no' to indicate whether the response is relevant to the question. Your output should be a key of 'score' and the binary score you have determined.<|eot_id|><|start_header_id|>user<|end_header_id|>
    Here is the question: \n {question} \n
    Here is the generated response: \n {response} \n
    <|eot_id|><|start_header_id|>assistant<|end_header_id|>''',
    input_variables=["question", "response"]
)

answer_grader = prompt | llm | JsonOutputParser()
# answer_grader.invoke({"question": question, "response": result}) #using values from the previous cell

Finally we chain everything together by defining states and nodes using lang graph

In [None]:
from typing import List
from typing_extensions import TypedDict
from langchain.schema import Document
from langgraph.graph import END, StateGraph

# Define the global state of our graph
class MyState(TypedDict):
    question: str  # the question that the user asked
    generation: str  # the output at each step
    web_search: bool  # whether a web_search was conducted
    documents: List[Document]  # list of documents currently valid

# Define nodes
def retrieve(state):
    """
    Retrieve documents from the vectorstore
    Args:
        state (dict): The current state of the graph
    Returns:
        state(dict): The updated state of the graph with the retrieved documents
    """
    print("---RETRIEVE---")
    question = state['question']
    documents = retriever.invoke(question)
    return {"question": question, "documents": documents}  # Updating the global documents state here

def grade_documents(state):
    """
    Determines whether retrieved documents are relevant to the question. If any document is not relevant it will switch the flag of web_search to True
    Args:
        state (dict): The current state of the graph
    Returns:
        state(dict): Filters out irrelevant documents and updates web_search state
    """
    print("---CHECK DOCUMENT RELEVANCE---")
    question = state['question']
    documents = state['documents']
    filtered_docs = []
    web_search = False
    for doc in documents:
        score = retrieval_grader.invoke({"question": question, "document": doc.page_content})
        grade = score['score']
        if grade.lower() == 'yes':
            print("---DOCUMENT IS RELEVANT---")
            filtered_docs.append(doc)
        else:
            print("---DOCUMENT IS NOT RELEVANT---")
            web_search = True

    return {"documents": filtered_docs, "question": question, "web_search": web_search}  # Once again updating the global state here

def generate_rag(state):
    """
    Generate answers using RAG on retrieved documents
    Args:
        state (dict): The current state of the graph
    Returns:
        state(dict): Adds a new key to the state, generation, which contains the generated LLM response
    """
    print("---GENERATE---")
    question = state['question']
    documents = state['documents']

    # RAG Generation
    generation = rag_chain.invoke({"question": question, "context": documents})
    print(f"Generated response: {generation}")  # Debugging line

    return {"question": question, "documents": documents, "generation": generation}

def perform_web_search(state):
    """
    Conducts a web search using Tavily
    Args:
        state (dict): The current state of the graph
    Returns:
        state(dict): Adds the web search results to the documents
    """
    print("---WEB SEARCH---")
    question = state['question']
    documents = state['documents']

    # Web search generation
    docs = web_search_tool.invoke({'query': question})
    web_results = "\n".join([d['content'] for d in docs])
    web_results_doc = Document(page_content=web_results)
    if documents is not None:
        documents.append(web_results_doc)
    else:
        documents = [web_results_doc]
    
    return {"documents": documents, "question": question}

# Conditional edge
def decide_to_generate(state):
    """
    Decides whether to generate an answer or conduct a web search
    Args:
        state (dict): The current state of the graph
    Returns:
        str: binary decision for next node to be called
    """
    print("---ASSESS GRADED DOCUMENTS---")
    question = state["question"]
    web_search = state["web_search"]
    filtered_docs = state["documents"]

    if web_search:
        print("---DECISION: SOME DOCUMENTS ARE NOT RELEVANT TO THE QUESTION, INCLUDE WEB SEARCH---")
        return "perform_web_search"
    else:
        print("---DECISION: GENERATE---")
        return "generate"

# Conditional edge 2
def check_hallucination(state):
    """
    Checks if the generated response is coherent
    Args:
        state (dict): The current state of the graph
    Returns:
        str: binary decision for next node to be called
    """
    print("---CHECK HALLUCINATION---")
    question = state["question"]
    generation = state["generation"]
    score = hallucination_grader.invoke({"question": question, "response": generation})
    grade = score['score']
    if grade.lower() == 'yes':
        print("---DECISION: GENERATION IS GROUNDED IN RELEVANT DOCUMENTS---")
        print("---GRADE GENERATION vs QUESTION ---")
        score = answer_grader.invoke({"question": question, "response": generation})
        grade = score['score']
        if grade.lower() == 'yes':
            print("---DECISION: GENERATION ADDRESSES QUESTION---")
            return "useful"
        else:
            print("---DECISION: GENERATION DOES NOT ADDRESS QUESTION---")
            return "not useful"
    else:
        print("---DECISION: GENERATION IS NOT GROUNDED IN FACTS...RETRY---")
        return "not supported"

# Initialize graph and nodes
workflow = StateGraph(MyState)

workflow.add_node("retrieve", retrieve)
workflow.add_node("grade_documents", grade_documents)
workflow.add_node("generate", generate_rag)
workflow.add_node("perform_web_search", perform_web_search)


Last but not least, let's connect our conditional edges as well

In [None]:
# ------------------------------ Build the graph ----------------------------- #
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade_documents")
workflow.add_conditional_edges(
    "grade_documents",
    decide_to_generate,
    {
        "perform_web_search": "perform_web_search",
        "generate": "generate"
    },
)
workflow.add_edge("perform_web_search", "generate")
workflow.add_conditional_edges(
    "generate",
    check_hallucination,
    {
        "useful": END,
        "not useful": "perform_web_search",
        "not supported": "generate"
    },
)

In [None]:
app = workflow.compile()

from pprint import pprint
inputs = {"question": "How can I create a knowledge graph from scratch?"}
for output in app.stream(inputs):
    for k, v in output.items():
        print(f"Finished running: {k}")

In [27]:
# Get  the value corresponding to 'generation' in the final output

final = list(output.values())
for k, v in final[0].items():
    if k == 'generation':
        print(v)

To create a knowledge graph from scratch, you can follow these general steps:

1. Define the scope and purpose of your knowledge graph, including the types of entities and relationships you want to include.
2. Design an ontology or schema for your knowledge graph, which will define the structure and vocabulary used to represent your data.
3. Collect and prepare your data, such as extracting information from various sources like databases, documents, or web pages.
4. Use a graph database like Neo4j or GraphDB to store your data as entities and their interrelationships.
5. Implement methods for reasoning over your data, such as node embedding and ontology development.

Note that the specific steps may vary depending on the size and complexity of your knowledge graph, as well as the tools and technologies you choose to use.
