<a href="https://colab.research.google.com/github/RexPersicus/ChatGPT_Prompt_Eng_01/blob/main/Claude_RAG_LngGrph_mlti_agnt_Blg_3_emls_gen_01.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#========================================================================================================================================================
# This code Uses multi-agents in Langgraph and using Open AI GPT API and Tavily. I want to run the code in Google Colab. API Keys will be in a .env file.
# This application analyses the information about a company that I will name, and provide recommendation on how to approach them for marketing our services.
# This recommendation is in the form of a blog post with relevant sections and some action items at the end. The application will ask the name of the prospective
# organization and will use a RAG file containing information about my company's services and background. Once it has created the recommendation blog post it
# should ask the user if I want it to also generate emails for marketing my company's services to the prospective organization and if I say yes, it should create
# the context of max 3 emails to 3 different key people in that organization. It can ask if I want to repeat this whole thing for another prospectice organization.
# If I say yes, it should repeat the process.
#========================================================================================================================================================


# Install required packages
!pip install python-dotenv langchain langchain-openai langchain-community tiktoken langgraph openai tavily-python chromadb python-magic PyPDF2 docx2txt


Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Collecting langchain-openai
  Downloading langchain_openai-0.2.10-py3-none-any.whl.metadata (2.6 kB)
Collecting langchain-community
  Downloading langchain_community-0.3.8-py3-none-any.whl.metadata (2.9 kB)
Collecting tiktoken
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting langgraph
  Downloading langgraph-0.2.53-py3-none-any.whl.metadata (15 kB)
Collecting tavily-python
  Downloading tavily_python-0.5.0-py3-none-any.whl.metadata (11 kB)
Collecting chromadb
  Downloading chromadb-0.5.20-py3-none-any.whl.metadata (6.8 kB)
Collecting python-magic
  Downloading python_magic-0.4.27-py2.py3-none-any.whl.metadata (5.8 kB)
Collecting PyPDF2
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Collecting docx2txt
  Downloading docx2txt-0.8.tar.gz (2.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting langch

In [78]:
import os
from dotenv import load_dotenv
#------------------------------------------------------------------
from typing import Dict, TypedDict, Annotated, Sequence, List
#------------------------------------------------------------------
from tavily import TavilyClient
#------------------------------------------------------------------
from langgraph.graph import Graph, MessageGraph
from langgraph.graph import StateGraph

#from langgraph.prebuilt import ToolMessage

# Import ToolMessage from langchain_core.messages instead of langgraph.prebuilt
from langchain_core.messages import ToolMessage
#------------------------------------------------------------------
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.messages import BaseMessage
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.text_splitter import CharacterTextSplitter
#------------------------------------------------------------------
import json
import glob
import PyPDF2
import docx2txt
import magic
import operator

from operator import add
from functools import partial

In [64]:
# Load environment variables
load_dotenv()

# Initialize API clients
openai_api_key = os.getenv("OPENAI_API_KEY")
tavily_api_key = os.getenv("TAVILY_API_KEY")
tavily_client = TavilyClient(api_key=tavily_api_key)

# Initialize LLM
llm = ChatOpenAI(
    model="gpt-4-turbo-preview",
    temperature=0.7,
    api_key=openai_api_key
)

In [65]:
class DocumentProcessor:
    """Handles reading and processing different types of documents"""

    @staticmethod
    def read_text_file(file_path: str) -> str:
        """Read plain text files"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

    @staticmethod
    def read_pdf_file(file_path: str) -> str:
        """Read PDF files"""
        text = ""
        with open(file_path, 'rb') as f:
            pdf_reader = PyPDF2.PdfReader(f)
            for page in pdf_reader.pages:
                text += page.extract_text() + "\n"
        return text

    @staticmethod
    def read_docx_file(file_path: str) -> str:
        """Read Word documents"""
        return docx2txt.process(file_path)

    @staticmethod
    def get_file_type(file_path: str) -> str:
        """Determine file type using python-magic"""
        mime = magic.Magic(mime=True)
        file_type = mime.from_file(file_path)
        return file_type

    @classmethod
    def process_file(cls, file_path: str) -> str:
        """Process file based on its type"""
        file_type = cls.get_file_type(file_path)

        if 'text/plain' in file_type:
            return cls.read_text_file(file_path)
        elif 'application/pdf' in file_type:
            return cls.read_pdf_file(file_path)
        elif 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in file_type:
            return cls.read_docx_file(file_path)
        else:
            raise ValueError(f"Unsupported file type: {file_type}")


In [79]:
def initialize_rag() -> Chroma:
    """Initialize RAG system by reading all documents from uploads folder"""
    uploads_path = 'uploads'

    # Create uploads folder if it doesn't exist
    if not os.path.exists(uploads_path):
        os.makedirs(uploads_path)
        print(f"Created {uploads_path} directory. Please add your company documents there.")
        return None

    # Get all files in uploads directory
    files = glob.glob(os.path.join(uploads_path, '*'))

    if not files:
        print(f"No files found in {uploads_path} directory. Please add your company documents.")
        return None

    # Process all files
    all_texts = []
    doc_processor = DocumentProcessor()

    print("Processing documents:")
    for file_path in files:
        try:
            print(f"Reading {os.path.basename(file_path)}...")
            text = doc_processor.process_file(file_path)
            all_texts.append(text)
        except Exception as e:
            print(f"Error processing {file_path}: {str(e)}")

    if not all_texts:
        print("No valid documents were processed.")
        return None

    # Split texts into chunks
    text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
    chunks = []
    for text in all_texts:
        chunks.extend(text_splitter.split_text(text))

    # Create vector store
    embeddings = OpenAIEmbeddings(api_key=openai_api_key)
    vectorstore = Chroma.from_texts(chunks, embeddings)

    print(f"Successfully processed {len(files)} documents into {len(chunks)} chunks")
    return vectorstore

In [87]:

# Define a setter function for non-list values
def set_value(_, new_value):
    return new_value

class AgentState(TypedDict):
    messages: Annotated[List[Dict], add]  # Use operator.add for lists
    company_name: Annotated[str, set_value]  # Use our custom setter for simple values
    research: Annotated[Dict, set_value]
    analysis: Annotated[str, set_value]
    blog_post: Annotated[str, set_value]
    emails: Annotated[str, set_value]

In [88]:
def research_agent(state: AgentState):
    """Research agent that gathers information about the target company"""
    print(f"\n🔍 Research Agent: Starting research for {state['company_name']}...")

    try:
        search_results = tavily_client.search(
            query=f"{state['company_name']} company overview business model recent news",
            search_depth="advanced"
        )
        print("✅ Research Agent: Successfully gathered company information")

        return {
            "messages": state["messages"] + [
                {"role": "assistant", "content": f"Research completed for {state['company_name']}"}
            ],
            "company_name": state["company_name"],
            "research": search_results,
            "analysis": state["analysis"],
            "blog_post": state["blog_post"],
            "emails": state["emails"]
        }
    except Exception as e:
        print(f"❌ Research Agent Error: {str(e)}")
        return {
            "messages": state["messages"] + [
                {"role": "assistant", "content": f"Error during research: {str(e)}"}
            ],
            "company_name": state["company_name"],
            "research": {},
            "analysis": state["analysis"],
            "blog_post": state["blog_post"],
            "emails": state["emails"]
        }

def analysis_agent(state: AgentState):
    """Analysis agent that processes research and RAG data"""
    global vectorstore
    print(f"\n🤔 Analysis Agent: Starting analysis of {state['company_name']}...")

    service_info = vectorstore.similarity_search(
        f"services relevant for {state['company_name']}",
        k=3
    )
    print("📚 Analysis Agent: Retrieved relevant service information")

    analysis_prompt = f"""
    Based on the following information about {state['company_name']}:
    {json.dumps(state['research'])}

    And our company's relevant services:
    {service_info}

    Provide a detailed analysis of:
    1. Company's current challenges and needs
    2. How our services align with their needs
    3. Key decision makers to target
    4. Recommended approach strategy
    """

    print("🔄 Analysis Agent: Generating analysis...")
    analysis_result = llm.invoke(analysis_prompt)
    print("✅ Analysis Agent: Analysis completed")

    return {
        "messages": state["messages"] + [
            {"role": "assistant", "content": "Analysis completed"}
        ],
        "company_name": state["company_name"],
        "research": state["research"],
        "analysis": analysis_result.content,
        "blog_post": state["blog_post"],
        "emails": state["emails"]
    }

def blog_writer_agent(state: AgentState):
    """Agent that creates the blog post recommendation"""
    print("\n✍️ Blog Writer Agent: Starting blog post creation...")

    print(f"Debug - Analysis available: {bool(state['analysis'])}")
    print(f"Debug - Analysis content preview: {state['analysis'][:200] if state['analysis'] else 'No analysis'}")

    blog_prompt = f"""
    Create a detailed blog post about approaching {state['company_name']} for our services.
    Use the following analysis: {state['analysis']}

    Format the blog post with:
    1. Compelling title
    2. Executive summary
    3. Company overview
    4. Identified needs and challenges
    5. Our solution fit
    6. Recommended approach strategy
    7. Action items

    Make it engaging and professional.
    """

    print("🔄 Blog Writer Agent: Writing blog post...")
    blog_post = llm.invoke(blog_prompt)
    print(f"Debug - Blog post generated: {bool(blog_post.content)}")
    print(f"Debug - Blog post preview: {blog_post.content[:200] if blog_post.content else 'No content'}")

    return {
        "messages": state["messages"] + [
            {"role": "assistant", "content": "Blog post generated"}
        ],
        "company_name": state["company_name"],
        "research": state["research"],
        "analysis": state["analysis"],
        "blog_post": blog_post.content,  # Verify this is being set
        "emails": state["emails"]
    }

def email_writer_agent(state: AgentState):
    """Agent that creates marketing emails"""
    print("\n📧 Email Writer Agent: Starting email template creation...")

    email_prompt = f"""
    Based on the analysis of {state['company_name']}:
    {state['analysis']}

    Create 3 distinct email templates for different key decision makers.
    Each email should be:
    - Personalized to their role
    - Highlight relevant benefits
    - Include a clear call to action
    - Be concise and professional
    """

    print("🔄 Email Writer Agent: Crafting email templates...")
    emails = llm.invoke(email_prompt)
    print("✅ Email Writer Agent: Email templates completed")

    return {
        "messages": state["messages"] + [
            {"role": "assistant", "content": "Email templates generated"}
        ],
        "company_name": state["company_name"],
        "research": state["research"],
        "analysis": state["analysis"],
        "blog_post": state["blog_post"],
        "emails": emails.content
    }

In [93]:
# Define state management functions
def get_company_name(state: AgentState) -> str:
    return state["company_name"]

def build_graph():
    """Build the LangGraph workflow using StateGraph"""
    workflow = StateGraph(AgentState)

    # Add nodes
    workflow.add_node("research_node", research_agent)
    workflow.add_node("analysis_node", analysis_agent)
    workflow.add_node("blog_writer", blog_writer_agent)
    workflow.add_node("email_writer", email_writer_agent)

    # Define edges - Change the order of execution
    workflow.add_edge("research_node", "analysis_node")
    workflow.add_edge("analysis_node", "blog_writer")
    # Only connect email_writer after blog_writer is done
    workflow.add_edge("blog_writer", "email_writer")

    # Set the entry point
    workflow.set_entry_point("research_node")

    # Set the final node
    workflow.set_finish_point("email_writer")

    return workflow.compile()

In [94]:
def run_analysis():
    """Main application loop"""
    global vectorstore
    print("Initializing RAG system...")
    vectorstore = initialize_rag()

    if vectorstore is None:
        print("Please add documents to the 'uploads' folder and restart the application.")
        return

    while True:
        print("\n" + "="*50)
        company_name = input("\nEnter the name of the prospective organization: ")

        graph = build_graph()

        # Create initial state
        initial_state = {
            "messages": [{
                "content": f"Starting analysis for {company_name}",
                "role": "user",
            }],
            "company_name": company_name,
            "research": {},
            "analysis": "",
            "blog_post": "",
            "emails": ""
        }

        try:
            result = graph.invoke(initial_state)

            # Display blog post with better error handling
            print("\n=== Generated Blog Post ===")
            if result and "blog_post" in result and result["blog_post"]:
                print("\n" + result["blog_post"])
            else:
                print("No blog post was generated.")
                print(f"Debug - Result keys: {result.keys() if result else 'No result'}")

            generate_emails = input("\nWould you like to generate marketing emails? (yes/no): ")
            if generate_emails.lower() == 'yes':
                print("\n=== Generated Email Templates ===")
                if result and "emails" in result and result["emails"]:
                    print("\n" + result["emails"])
                else:
                    print("No email templates were generated.")

        except Exception as e:
            print(f"Error during execution: {str(e)}")

        another = input("\nWould you like to analyze another company? (yes/no): ")
        if another.lower() != 'yes':
            break

In [95]:
if __name__ == "__main__":
    run_analysis()

Initializing RAG system...
Processing documents:
Reading royal_persicus.pdf...
Successfully processed 1 documents into 1 chunks


Enter the name of the prospective organization: IBM Canada

🔍 Research Agent: Starting research for IBM Canada...
✅ Research Agent: Successfully gathered company information

🤔 Analysis Agent: Starting analysis of IBM Canada...
📚 Analysis Agent: Retrieved relevant service information
🔄 Analysis Agent: Generating analysis...
✅ Analysis Agent: Analysis completed

✍️ Blog Writer Agent: Starting blog post creation...
Debug - Analysis available: True
Debug - Analysis content preview: ### 1. Company's Current Challenges and Needs:

IBM Canada is actively engaging in several innovative projects and expansions that highlight its current focus areas and implicit challenges. These incl
🔄 Blog Writer Agent: Writing blog post...
Debug - Blog post generated: True
Debug - Blog post preview: # Empowering Innovation: How Royal Persicus Can Transform IBM Canada's Future

## 