In [41]:
from fastapi import FastAPI
import psycopg2
from langchain.chains.query_constructor.schema import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_openai import ChatOpenAI
from langchain_openai import OpenAIEmbeddings
from langchain_postgres import PGVector
from typing import Dict, Tuple, Union
from langchain_core.structured_query import (
    Comparator,
    Comparison,
    Operation,
    Operator,
    StructuredQuery,
    Visitor,
)
from langchain_core.tools import InjectedToolArg, tool
from typing_extensions import Annotated
from langgraph.graph import MessagesState, StateGraph
from langchain_core.messages import SystemMessage
from langgraph.prebuilt import ToolNode
from copy import deepcopy
from langchain_core.runnables import chain
from langgraph.checkpoint.memory import MemorySaver
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import MessagesState
from langgraph.graph import StateGraph
from langgraph.graph import END
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import Connection
import uuid

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vector_store = PGVector(
    embeddings=embeddings,
    collection_name="my_docs2",
    connection="postgresql+psycopg://stefan:gigelfrone112@localhost:5432/techvector",
)
app = FastAPI()
conn = psycopg2.connect("dbname=techvector user=stefan password=gigelfrone112 host=localhost port=5432")
cursor = conn.cursor()

In [3]:
def replace_date_objects(data):
    """
    Recursively traverses the JSON object and replaces every dictionary
    containing 'date' and 'type' keys with the value of the 'date' key.

    :param data: JSON object (dict, list, or other types)
    :return: Updated JSON object
    """
    if isinstance(data, dict):
        # Check if the current dictionary is the one to replace
        if "date" in data and "type" in data:
            return data["date"]
        # Otherwise, process each key-value pair
        return {key: replace_date_objects(value) for key, value in data.items()}

    elif isinstance(data, list):
        # Process each element in the list
        return [replace_date_objects(item) for item in data]

    # Return the data as is for other types
    return data

In [4]:
class CustomTranslator(Visitor):
    """Translate `PGVector` internal query language elements to valid filters."""

    allowed_operators = [Operator.AND, Operator.OR]
    """Subset of allowed logical operators."""
    allowed_comparators = [
        Comparator.EQ,
        Comparator.NE,
        Comparator.GT,
        Comparator.LT,
        Comparator.IN,
        Comparator.NIN,
        Comparator.CONTAIN,
        Comparator.LIKE,
    ]
    """Subset of allowed logical comparators."""

    def _format_func(self, func: Union[Operator, Comparator]) -> str:
        self._validate_func(func)
        return f"${func.value}"

    def visit_operation(self, operation: Operation) -> Dict:
        args = [arg.accept(self) for arg in operation.arguments]
        return {self._format_func(operation.operator): args}



    def visit_comparison(self, comparison: Comparison) -> Dict:
        return {
            comparison.attribute: {
                self._format_func(comparison.comparator): comparison.value
            }
        }


    def visit_structured_query(
        self, structured_query: StructuredQuery
    ) -> Tuple[str, dict]:
        if structured_query.filter is None:
            kwargs = {}
        else:
            kwargs = {"filter": structured_query.filter.accept(self)}
            kwargs = replace_date_objects(kwargs)
        return structured_query.query, kwargs



In [5]:
metadata_field_info = [
    AttributeInfo(
        name="title",
        description="The title that the article was published under",
        type="string",
    ),
    AttributeInfo(
        name="author",
        description="The name of the author of the article",
        type="string",
    ),
    AttributeInfo(
        name="date",
        description="The date that the article was published on, in the format 'YYYY-MM-DD'. If the month is given by its name, it is converted to its number.",
        type="string",
    ),
    AttributeInfo(
        name="category",
        description="The category that the article belongs to. One of ['AI', 'Apps', 'Biotech & Health', 'Climate', 'Commerce', 'Crypto', 'Enterprise', 'Fintech', 'Fundraising', 'Gadgets', 'Gaming', 'Government & Policy', 'Hardware', 'Media & Entertainment', 'Privacy', 'Robotics', 'Security', 'Social', 'Space', 'Startups', 'Transportation', 'Venture']",
        type="string",
    ),
    AttributeInfo(
        name="url",
        description="The URL to the original TechCrunch article",
        type="link",
    )
]
document_content_description = "The article content"
llm = ChatOpenAI(temperature=0, model="gpt-4o-mini")
retriever = SelfQueryRetriever.from_llm(
    llm,
    vector_store,
    document_content_description,
    metadata_field_info,
    structured_query_translator=CustomTranslator(),
)

In [6]:
@app.get("/get_articles_by_query")
async def get_articles_by_query(query: str):
    query = query.replace("'", "\'")
    query = query.replace("’", "\'")
    docs = retriever.invoke(query)
    urls = list(set([doc.metadata["url"] for doc in docs]))
    cursor.execute("SELECT * FROM article WHERE link = ANY(%s);", (urls,))
    tuples = cursor.fetchall()
    result_dict = [dict(zip(['url', 'title', 'time', 'img', 'category', 'summary', 'questions', 'author'], tup)) for tup in tuples]
    return result_dict 


In [7]:
@app.get("/get_articles")
async def get_articles():
    cursor.execute("SELECT * FROM article;")
    tuples = cursor.fetchall()
    result_dict = [dict(zip(['url', 'title', 'time', 'img', 'category', 'summary', 'questions', 'author'], tup)) for tup in tuples]
    return result_dict 


In [8]:
@app.get("/get_article")
async def get_article(url: str):
    cursor.execute(f"SELECT * FROM article where link = '{url}';")
    tuples = cursor.fetchone()
    result_dict = dict(zip(['url', 'title', 'time', 'img', 'category', 'summary', 'questions', 'author'], tuples))
    return result_dict



In [60]:
print(await get_articles_by_query('Give me an article about AI published on 9 January, 2025 by Maxwell Zeff, talking about nvidia'))

[{'url': 'https://techcrunch.com/2025/01/09/nvidias-ai-avatar-sat-on-my-computer-screen-and-weirded-me-out/', 'title': 'Nvidia’s AI avatar sat on my computer screen and weirded me out', 'time': '4:31 PM PST · January 9, 2025', 'img': 'https://techcrunch.com/wp-content/uploads/2025/01/IMG_9CC69B31B7BF-1.jpeg?w=1024', 'category': 'AI', 'summary': 'Nvidia has introduced R2X, a prototype AI avatar designed to assist users directly from their desktop, combining advanced AI models with a human-like interface. While it can navigate apps, process files, and even observe users’ screens, early demos reveal some quirks, like odd facial expressions and occasional inaccuracies in its guidance. The company plans to open source R2X in 2025, potentially allowing developers to create personalized AI interactions. Despite its promise, the technology still faces challenges, hinting at both the excitement and the uncertainties of integrating AI into everyday computing.', 'questions': 'What specific featur

In [46]:


@tool(response_format="content_and_artifact")
def retrieve(query: str):
    """Retrieve information related to a query."""
    retrieved_docs = vector_store.similarity_search(query, k=3)
    serialized = "\n\n".join(
        (f"Source: {doc.metadata}\n" f"Content: {doc.page_content}")
        for doc in retrieved_docs
    )
    return serialized, retrieved_docs


def query_or_respond(state: MessagesState):
    """Generate tool call for retrieval or respond."""
    llm_with_tools = llm.bind_tools([retrieve])
    response = llm_with_tools.invoke(state["messages"])
    # MessagesState appends messages to state instead of overwriting
    return {"messages": [response]}


tools = ToolNode([retrieve])


def generate(state: MessagesState):
    """Generate answer."""
    # Get generated ToolMessages
    recent_tool_messages = []
    for message in reversed(state["messages"]):
        if message.type == "tool":
            recent_tool_messages.append(message)
        else:
            break
    tool_messages = recent_tool_messages[::-1]

    # Format into prompt
    docs_content = "\n\n".join(doc.content for doc in tool_messages)
    system_message_content = (
        "You are an assistant for question-answering tasks. "
        "Use the following pieces of retrieved context to answer "
        "the question. If you don't know the answer based on the retrieved context,"
        "say that the context doesn't contain the answer, but nevertheless try to provide an"
        "explanation based on your pre-trained knowledge. If you still don't know,"
        "say that you don't know. Use three sentences maximum and keep the "
        "answer concise.It is ABSOLUTELY NECESSARY to mention that the retrieved context does not contain the answer if it does not."
        "\n\n"
        f"{docs_content}"
    )
    conversation_messages = [
        message
        for message in state["messages"]
        if message.type in ("human", "system")
        or (message.type == "ai" and not message.tool_calls)
    ]
    prompt = [SystemMessage(system_message_content)] + conversation_messages

    response = llm.invoke(prompt)
    return {"messages": [response]}

In [47]:
graph_builder = StateGraph(MessagesState)
graph_builder.add_node(query_or_respond)
graph_builder.add_node(tools)
graph_builder.add_node(generate)

graph_builder.set_entry_point("query_or_respond")
graph_builder.add_conditional_edges(
    "query_or_respond",
    tools_condition,
    {END: END, "tools": "tools"},
)
graph_builder.add_edge("tools", "generate")
graph_builder.add_edge("generate", END)

db_url = "postgresql://stefan:gigelfrone112@localhost:5432/techvector"

postgresCheckpointer = PostgresSaver(Connection.connect(db_url))

#postgresCheckpointer.setup()
graph = graph_builder.compile(checkpointer=postgresCheckpointer)



In [42]:
from pydantic import BaseModel

class chatbot_data(BaseModel):
    query: str
    thread_id: str = ""

@app.post("/general_chatbot")
async def general_chatbot(chatbot_data: chatbot_data):
    query = chatbot_data.query
    thread_id = chatbot_data.thread_id

    if thread_id == "":
        thread_id = str(uuid.uuid4())

    input_message = query
    config = {"configurable": {"thread_id": thread_id}}

    ans = graph.invoke({"messages": [{"role": "user", "content": input_message}]} ,config=config,)
    
    return ans["messages"][-1], thread_id


In [44]:
print(await general_chatbot("What are some improvements RTX 5090 GPU has over its predecessor?", "7964dd41-14b6-4b6e-8478-63a9ae58de86"))

(AIMessage(content='The RTX 5090 GPU is claimed to outperform the RTX 4090 by as much as 2x. It features 92 billion transistors, 4,000 AT TOPS, 380 ray-tracing TFLOPS, and 1.8 TB/s bandwidth. Additionally, it delivers breakthroughs in AI-driven rendering, including neural shaders and enhanced geometry and lighting capabilities.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 78, 'prompt_tokens': 924, 'total_tokens': 1002, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_72ed7ab54c', 'finish_reason': 'stop', 'logprobs': None}, id='run-48f36081-e927-4647-bbf6-d3b4141fc391-0', usage_metadata={'input_tokens': 924, 'output_tokens': 78, 'total_tokens': 1002, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_d

In [71]:



class CustomState(MessagesState):
    url: str


@tool(response_format="content_and_artifact")
def retrieve_by_url(query: str, url: Annotated[str, InjectedToolArg]) -> Tuple[str, list]:
    """Retrieve information related to a query, only fetching documents with a specific URL."""
    retrieved_docs = vector_store.similarity_search(query, k=2, filter={"url": {'$eq': url}})
    serialized = "\n\n".join(
        (f"Source: {doc.metadata}\n" f"Content: {doc.page_content}")
        for doc in retrieved_docs
    )
    return serialized, retrieved_docs

# Step 1: Generate an AIMessage that may include a tool-call to be sent.
def query_or_respond_custom(state: CustomState):
    """Generate tool call for retrieval or respond."""
    llm_with_tools = llm.bind_tools([retrieve_by_url])
    response = llm_with_tools.invoke(state["messages"])

    for call in response.tool_calls:
        call["args"]["url"] = state['url']


    # MessagesState appends messages to state instead of overwriting
    return {"messages": [response]}


# Step 2: Execute the retrieval.
tools_by_url = ToolNode([retrieve_by_url])

# Step 3: Generate a response using the retrieved content.
def generate_custom(state: CustomState):
    """Generate answer."""
    # Get generated ToolMessages
    recent_tool_messages = []
    for message in reversed(state["messages"]):
        if message.type == "tool":
            recent_tool_messages.append(message)
        else:
            break
    tool_messages = recent_tool_messages[::-1]

    # Format into prompt
    docs_content = "\n\n".join(doc.content for doc in tool_messages)
    system_message_content = (
        "You are an assistant for question-answering tasks. "
        "Use the following pieces of retrieved context to answer "
        "the question. If you don't know the answer based on the retrieved context,"
        "PLEASE EXPLICITLY SAY that the context doesn't contain the answer, but nevertheless try to provide an"
        "explanation based on your pre-trained knowledge. If you still don't know,"
        "say that you don't know. Use three sentences maximum and keep the "
        "answer concise. It is ABSOLUTELY NECESSARY to mention that the retrieved context does not contain the answer if it does not."
        "\n\n"
        f"{docs_content}"
    )
    conversation_messages = [
        message
        for message in state["messages"]
        if message.type in ("human", "system")
        or (message.type == "ai" and not message.tool_calls)
    ]
    prompt = [SystemMessage(system_message_content)] + conversation_messages

    # Run
    response = llm.invoke(prompt)
    return {"messages": [response]}




In [72]:
workflow = StateGraph(CustomState)

workflow.add_node(tools_by_url)
workflow.add_node(query_or_respond_custom)
workflow.add_node(generate_custom)

workflow.set_entry_point("query_or_respond_custom")
workflow.add_edge("tools", "generate_custom")

workflow.add_conditional_edges(
    "query_or_respond_custom",
    tools_condition,
    {END: END, "tools": "tools"},
)

workflow.add_edge("generate_custom", END)

db_url = "postgresql://stefan:gigelfrone112@localhost:5432/techvector"

postgresCheckpointer = PostgresSaver(Connection.connect(db_url))

postgresCheckpointer.setup()
url_graph = workflow.compile(checkpointer=postgresCheckpointer)


In [53]:
input_message = "What did I ask you first?"
config = {"configurable": {"thread_id": "1"}}

for step in url_graph.stream(
    {"messages": [{"role": "user", "content": input_message}], "url": "https://techcrunch.com/2025/01/09/innovaccer-aims-to-become-healthcares-ai-powerhouse-with-275m-series-f/"},
    stream_mode="values",
    config=config,
):
    step["messages"][-1].pretty_print()


What did I ask you first?

You asked, "What did I ask you first?"


In [73]:
from pydantic import BaseModel

class Url_chatbot_data(BaseModel):
    query: str
    url: str
    thread_id: str = ""

@app.post("/url_chatbot")
async def url_chatbot(url_chatbot_data: Url_chatbot_data):
    query = url_chatbot_data.query
    url = url_chatbot_data.url
    thread_id = url_chatbot_data.thread_id

    if thread_id == "":
        thread_id = str(uuid.uuid4())

    input_message = query
    config = {"configurable": {"thread_id": thread_id}}

    ans = graph.invoke({"messages": [{"role": "user", "content": input_message}], "url": url}, config=config)
    
    return ans["messages"][-1], thread_id


In [74]:
print(await url_chatbot(Url_chatbot_data(query="Was the idea behind Facebook stolen?", url="https://techcrunch.com/2025/01/09/google-searches-for-deleting-facebook-instagram-explode-after-meta-ends-fact-checking/", thread_id="")))

(AIMessage(content='The context does not contain the answer to whether the idea behind Facebook was stolen. However, based on pre-trained knowledge, the founding of Facebook involved legal disputes, particularly with the Winklevoss twins, who claimed that Mark Zuckerberg stole their idea for a social networking site. Ultimately, the case was settled, and Zuckerberg has maintained that he developed Facebook independently.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 72, 'prompt_tokens': 883, 'total_tokens': 955, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63', 'finish_reason': 'stop', 'logprobs': None}, id='run-35d8181d-6b58-4b6d-9101-403c4f3dd82a-0', usage_metadata={'input_tokens': 883, 'output_tokens': 7

In [33]:
# from langgraph_sdk import get_client

# client = get_client()

# specific_thread = await client.threads.get('5')

# print(specific_thread)

print(postgresCheckpointer.get({"configurable": {"thread_id": "1"}}))


{'v': 1, 'id': '1efd71f5-7d29-6990-8001-84a197576a55', 'ts': '2025-01-20T11:11:48.915226+00:00', 'pending_sends': [], 'versions_seen': {'__input__': {}, '__start__': {'__start__': '00000000000000000000000000000001.0.3561542597271431'}, 'query_or_respond_custom': {'start:query_or_respond_custom': '00000000000000000000000000000002.0.005184707796906829'}}, 'channel_versions': {'url': '00000000000000000000000000000002.0.44641456651518807', 'messages': '00000000000000000000000000000003.0.9933475634978586', '__start__': '00000000000000000000000000000002.0.12023397766813171', 'query_or_respond_custom': '00000000000000000000000000000003.0.6729835461101962', 'start:query_or_respond_custom': '00000000000000000000000000000003.0.29169407040297723'}, 'channel_values': {'url': 'https://techcrunch.com/2025/01/09/innovaccer-aims-to-become-healthcares-ai-powerhouse-with-275m-series-f/', 'messages': [HumanMessage(content='What did I ask you first?', additional_kwargs={}, response_metadata={}, id='ca6b67