In [1]:
import os
import boto3
import random
from datetime import datetime
from typing import List, Dict, Any
from dotenv import load_dotenv
from langchain_core.messages import HumanMessage, AIMessage
from langchain_aws import ChatBedrock

In [2]:
load_dotenv()

class Config:
    AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
    AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
    KB_ID = os.getenv("KB_ID")
    MODEL_ARN = os.getenv("MODEL_ARN")
    DYNAMODB_TABLE_NAME = os.getenv("DYNAMODB_TABLE_NAME")
    AWS_REGION = os.getenv("AWS_REGION", "us-east-1")

# Initialize AWS Bedrock Runtime Client
client = boto3.client("bedrock-agent-runtime", region_name=Config.AWS_REGION)

In [3]:
llm = ChatBedrock(
    model="amazon.nova-pro-v1:0",
    region_name=Config.AWS_REGION,
    beta_use_converse_api=True,
    streaming=True
)

In [5]:
from boto3.dynamodb.conditions import Key,Attr
dynamodb = boto3.resource('dynamodb',region_name=Config.AWS_REGION)
TABLE_NAME = Config.DYNAMODB_TABLE_NAME
table = dynamodb.Table(TABLE_NAME)

class ChatHistoryManager:
    def __init__(self, user_id: str = "default", session_id: str = "default_session"):
        self.user_id = user_id
        self.session_id = session_id

    def load_history(self) -> List[Dict[str, Any]]:
        today = datetime.now().date()
        start_time = datetime.combine(today, datetime.min.time()).isoformat()
        end_time = datetime.combine(today, datetime.max.time()).isoformat()

        try:
            response = table.query(
                KeyConditionExpression=Key('user_id').eq(self.user_id) & Key('timestamp').between(start_time, end_time),
                FilterExpression=Attr('session_id').eq(self.session_id)
            )
            return response.get('Items', [])
        except Exception as e:
            print("Error loading from DynamoDB:", e)
            return []

    def save_history(self, history: List[Dict[str, Any]]):
        for entry in history:
            item = {
                'user_id': self.user_id,
                'timestamp': entry.get("timestamp", datetime.now().isoformat()),
                'session_id': entry.get("session_id", self.session_id),
                'user_message': entry.get("user_message", ""),
                'assistant_response': entry.get("assistant_response", ""),
                'summarized': entry.get("summarized", False)
            }

            if "summary" in entry:
                item["summary"] = entry["summary"]
            if "summary_of" in entry:
                item["summary_of"] = entry["summary_of"]

            try:
                table.put_item(Item=item)
            except Exception as e:
                print("Error writing to DynamoDB:", e)

    def summarize_if_needed(self, history: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        unsummarized_blocks = [
            entry for entry in history
            if not entry.get("summarized", False)
            and entry.get("user_message") and entry.get("assistant_response")
        ]

        if len(unsummarized_blocks) < 5:
            return history

        history_text = "\n".join(
            f"User: {entry['user_message']}\nAssistant: {entry['assistant_response']}"
            for entry in unsummarized_blocks[:10]
        )

        summary_prompt = f"""
        Summarize the following 10 interactions into one concise but informative summary:
        {history_text}
        """

        summary = llm.invoke(summary_prompt.strip())
        if isinstance(summary, AIMessage):
            summary = summary.content

        summary_entry = {
            "role": "system",
            "summary": summary,
            "timestamp": datetime.now().isoformat(),
            "session_id": self.session_id,
            "summarized": True,
            "summary_of": [entry["timestamp"] for entry in unsummarized_blocks[:10]]
        }

        history = [entry for entry in history if entry not in unsummarized_blocks[:10]]
        history.insert(0, summary_entry)
        return history

    def append_chat_pair(self, history: List[Dict[str, Any]], user_msg: str, assistant_msg: str) -> List[Dict[str, Any]]:
        new_entry = {
            "user_message": user_msg,
            "assistant_response": assistant_msg,
            "timestamp": datetime.now().isoformat(),
            "session_id": self.session_id,
            "summarized": False
        }

        history.append(new_entry)
        self.save_history([new_entry])
        return self.summarize_if_needed(history)


In [6]:
def retrieve_from_kb(input_text: str):
    if not isinstance(input_text, str):
        raise TypeError("Expected input_text to be a string")

    try:
        response = client.retrieve_and_generate(
            input={'text': input_text},
            retrieveAndGenerateConfiguration={
    "type": "KNOWLEDGE_BASE",
    "knowledgeBaseConfiguration": {
        "knowledgeBaseId": Config.KB_ID,
        "modelArn":Config.MODEL_ARN,
        "retrievalConfiguration": {
            "vectorSearchConfiguration": {
                "numberOfResults": 5
            }
        },
        "generationConfiguration": {
            "promptTemplate": {
                "textPromptTemplate": "You are a precise HR assistant at Ayatacommerce that answers questions using ONLY the provided context and consider you as an employee of ayatacommerce responsible for responding to all queries related to the company by other employees.\r\n\r\nContext:\r\n$search_results$\r\n\r\nQuestion: $query$\r\n\r\nAlways respond in a concise and professional manner and also just don't answer simply if always create a scentence with the answer.\r\nNOTE:Do not mention the source of the context or the document name in your answer or anything related to the provided context.\r\nIf no information is avialable in the context, please respond with the following data:\r\nContact HR at Ayatacommerce for assistance: hr@ayatacommerce.com \r\nHuman Resourse email: hr@ayatacommerce.com\r\n\r\nRules:\r\n1. If the context contains relevant information, provide a concise answer based solely on that.\r\n2. If the question asks about something NOT in the context, respond ONLY with: 'I don't have this information in my knowledge base. Please contact hr@ayatacommerce.com for assistance.'\r\n3. Never infer or make up information not explicitly stated in the context.\r\n4. If the question is ambiguous or unclear, ask for clarification.\r\n5. Do not provide any personal opinions or subjective statements.\r\n6. Always maintain a professional tone and language.\r\n7. Avoid using filler phrases like 'I think' or 'In my opinion'.\r\n8. If the context is too long, summarize it before answering.\r\n10. If the question is a yes/no question, provide a clear yes or no answer based on the context.\r\n11. If the question is a list, provide a clear and concise list based on the context.\r\n12. If the question is a how-to question, provide a clear and concise step-by-step guide based on the context.\r\n13. If the question is a why question, provide a clear and concise explanation based on the context.\r\n14. If the question is a when question, provide a clear and concise answer based on the context.\r\n\r\nAnswer:\"\"\""
            },
            "inferenceConfig": {
                "textInferenceConfig": {
                    "temperature": 0,
                    "topP": 0.9,
                    "maxTokens": 512,
                    "stopSequences": []
                }
            }
        },
        "orchestrationConfiguration": {
            "inferenceConfig": {
                "textInferenceConfig": {
                    "temperature": 0,
                    "topP": 0.9,
                    "maxTokens": 512,
                    "stopSequences": []
                }
            }
        }
    }
}
        )
        return response
    except Exception as e:
        print(f"Error retrieving knowledge base: {e}")
        return None


In [7]:
def rewrite_question(user_input: str, chat_history: List) -> str:
    messages = []
    
    # Ensure we start with a user message
    if not chat_history or isinstance(chat_history[0], HumanMessage):
        messages.append(HumanMessage(content="You are a helpful assistant that rewrites questions to be standalone."))
    
    # Add conversation history (alternating user/assistant messages)
    for entry in chat_history[-10:]:  # Keep only the latest 10 for context
        messages.append(entry)
    
    retriever_prompt = (
        "Given a chat history and the latest user question which might reference context in the chat history, "
        "formulate a standalone question which can be understood without the chat history. "
        "Do NOT answer the question, just reformulate it if needed and otherwise return it as is. "
        "Latest question: {question}"
    )
    
    messages.append(HumanMessage(content=retriever_prompt.format(question=user_input)))
    
    try:
        response = llm.invoke(messages)
        return response.content if isinstance(response, AIMessage) else str(response)
    except Exception as e:
        print(f"Error rewriting question: {e}")
        return user_input  # Fallback to original input

In [8]:
def chat(user_input: str, user_id: str = "default", session_id: str = "default_session") -> str:
    history_manager = ChatHistoryManager(user_id, session_id)
    chat_history = history_manager.load_history()

    try:
        # Format history for context
        formatted_history = []
        for entry in chat_history:
            if entry.get("summarized", False):
                formatted_history.append(AIMessage(content=entry["summary"]))
            else:
                formatted_history.append(HumanMessage(content=entry["user_message"]))
                formatted_history.append(AIMessage(content=entry["assistant_response"]))

        # Rewrite the question to be standalone
        standalone_question = rewrite_question(user_input, formatted_history)
        print(f"Standalone question: {standalone_question[0]['text']}")

        # Call retrieve_and_generate with clean string
        response = retrieve_from_kb(standalone_question[0]['text'])
        if not response:
            raise ValueError("No response returned from retrieve_from_kb")

        # Handle response format
        if isinstance(response, dict) and 'output' in response:
            answer = response['output'].get('text', 'Sorry, I could not find information about that topic.')
        else:
            answer = 'Sorry, I could not find information about that topic.'

        # Save chat
        chat_history = history_manager.append_chat_pair(
            chat_history, user_msg=user_input, assistant_msg=answer
        )
        history_manager.save_history(chat_history)

        return answer

    except Exception as e:
        print(f"Error in RAG chain: {e}")
        fallback_responses = [
            "I'm having trouble accessing that information. Could you rephrase your question?",
            "My knowledge base seems to be unavailable at the moment. Please try again later.",
            "I encountered an unexpected error while processing your request."
        ]
        return random.choice(fallback_responses)


In [11]:
chat("what do you know about the CEO of AyataCommerce?","123","1234")

Standalone question: What is known about the CEO of AyataCommerce?


'The CEO of AyataCommerce is Shine Mathew, who is also the founder of the company.'

In [12]:
chat("Tell me about this company","123","1234")

Standalone question: Could you provide information about AyataCommerce, including its services, history, and key personnel?


"AyataCommerce is dedicated to providing structure for clients and a framework to understand core issues, focusing on building, measuring, and learning. The company supports customers in realizing their aspirations, understanding their points of contention, and providing structure and flexibility for deeper issue understanding. AyataCommerce allows customers to determine success and pivot when necessary, ensuring promises are delivered and both customers and employees are successful.\n\nThe journey of AyataCommerce includes significant milestones such as the establishment of a partnership with SAP in 2017, the founding of Ayatacommerce in Bracknell in 2016, the implementation of a 'Remote First' culture, the start of India Operations, and the opening of a Regional Office in Bangalore. In 2020, a new website was launched, and in 2021, an office was opened in Kochi.\n\nAyataCommerce's values include EMPATHY, which fosters a worldwide team with diverse mindsets, personalities, and viewpoi

In [None]:
print(chat("what do you know about the CEO of AyataCommerce?", "123", "1234"))

In [None]:
print(chat("what do you know about him", "123", "1234"))

In [10]:
print(chat("What is the net worth of this company?", "123", "1234"))


Standalone question: What is the current net worth of the specified company?
I don't have this information in my knowledge base. Please contact hr@ayatacommerce.com for assistance.


In [None]:
print(chat("what do you know about him", "123", "123"))
