In [1]:
pip install langchain neo4j-driver textblob anthropic google-search-results

Collecting neo4j-driver
  Downloading neo4j_driver-5.28.1-py3-none-any.whl.metadata (6.1 kB)
Collecting anthropic
  Downloading anthropic-0.49.0-py3-none-any.whl.metadata (24 kB)
Collecting google-search-results
  Downloading google_search_results-2.4.2.tar.gz (18 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading neo4j_driver-5.28.1-py3-none-any.whl (312 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m312.5/312.5 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading anthropic-0.49.0-py3-none-any.whl (243 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m243.4/243.4 kB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: google-search-results
  Building wheel for google-search-results (setup.py) ... [?25l[?25hdone
  Created wheel for google-search-results: filename=google_search_results-2.4.2-py3-none-any.whl size=32009 sha256=575b5420e32046da19ad9c2cd03ed9f7a133f719080a89b0071fb69153c1

In [2]:
!pip install langchain-community

Collecting langchain-community
  Downloading langchain_community-0.3.20-py3-none-any.whl.metadata (2.4 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain-community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain-community)
  Downloading pydantic_settings-2.8.1-py3-none-any.whl.metadata (3.5 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain-community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain-community)
  Downloading typing_inspect-0.9.0-py3-none-any.whl.metadata (1.5 kB)
Collecting python-dotenv>=0.21.0 (from pydantic-settings<3.0.0,>=2.4.0->langchain-community)
  Downloading python_dotenv-1.1.0-py3-none-any.whl.metadata (24 kB

In [3]:
from langchain.chat_models import ChatAnthropic
from langchain.utilities import SerpAPIWrapper
from neo4j import GraphDatabase
from textblob import TextBlob
import anthropic
import json
from serpapi import GoogleSearch
import logging

In [4]:
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

class AdaptivePersuader:
    def __init__(self, neo4j_uri, neo4j_user, neo4j_password, anthropic_key, serpapi_key):
        try:
            self.driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
            self.client = anthropic.Client(api_key=anthropic_key)
            self.serpapi_key = serpapi_key
            # Test Neo4j connection
            with self.driver.session() as session:
                session.run("RETURN 1")
            logger.info("Successfully initialized connections")
        except Exception as e:
            logger.error(f"Initialization error: {e}")
            raise

    def initialize_topic(self, topic):
        """Create root node for topic"""
        try:
            logger.debug(f"Initializing topic: {topic}")
            query = """
            CREATE (root:Response {
                id: randomUUID(),
                content: $topic,
                type: 'root',
                strategy: 'logos'
            })
            RETURN root.id as node_id
            """
            with self.driver.session() as session:
                result = session.run(query, topic=topic)
                record = result.single()
                if not record:
                    raise Exception("Failed to create root node")
                return record["node_id"]
        except Exception as e:
            logger.error(f"Topic initialization error: {e}")
            raise

    def search_info(self, topic, strategy):
        """Gather knowledge via SerpAPI"""
        try:
            logger.debug(f"Searching for: {topic} with strategy: {strategy}")
            search = GoogleSearch({
                "q": f"{topic} {strategy} arguments evidence",
                "api_key": self.serpapi_key,
                "num": 3
            })
            results = search.get_dict()

            if not results or "organic_results" not in results:
                logger.warning("No search results found")
                return [{"title": "No results", "snippet": "Using general knowledge"}]

            search_context = []
            for result in results["organic_results"][:3]:
                search_context.append({
                    "title": result.get("title", "Default Title"),
                    "snippet": result.get("snippet", "No snippet available")
                })
            logger.debug(f"Found {len(search_context)} results")
            return search_context
        except Exception as e:
            logger.error(f"Search error: {e}")
            return [{"title": "Error", "snippet": "Using general knowledge"}]

    def generate_response(self, topic, strategy, search_results):
        """Generate response using Claude"""
        try:
            logger.debug(f"Generating response for topic: {topic}, strategy: {strategy}")
            context = "\n".join([
                f"Source: {r['title']}\nInfo: {r['snippet']}"
                for r in search_results
            ])

            system_prompt = f"""You are an expert in persuasive communication discussing {topic}.
            Use strictly {strategy}-based arguments:
            - For ethos: Focus on credibility, expertise, and authority
            - For logos: Focus on logic, facts, and rational arguments
            - For pathos: Focus on emotional appeals and personal stories

            Use this information to support your response:
            {context}"""

            message = self.client.messages.create(
                model="claude-3-opus-20240229",
                max_tokens=500,
                temperature=0.7,
                system=system_prompt,
                messages=[{"role": "user", "content": f"Generate a persuasive response about {topic}"}]
            )

            if not message or not message.content:
                logger.warning("Empty response from Claude")
                return "I apologize, but I couldn't generate a response at this time."

            return str(message.content)
        except Exception as e:
            logger.error(f"Response generation error: {e}")
            return "I apologize, but I encountered an error generating a response."

    def analyze_sentiment(self, feedback):
      """Analyze user feedback sentiment using Claude"""

      system_prompt = """You are a sentiment analyzer that only responds with one word: either 'positive' or 'negative'.
      Analyze if the user is agreeing/satisfied with the previous argument (positive) or disagreeing/unsatisfied (negative)."""

      message = self.client.messages.create(
          model="claude-3-opus-20240229",
          max_tokens=10,
          temperature=0,  # Keep it deterministic
          system=system_prompt,
          messages=[{"role": "user", "content": feedback}]
      )

      sentiment = response = message.content[0].text.strip().lower()
      print(f"LLM sentiment analysis: {sentiment} for feedback: {feedback}")
      return sentiment


    def create_node(self, parent_id, content, strategy=None, is_negative=False):
        """Create response node and connect to parent"""
        try:
            logger.debug(f"Creating node with strategy: {strategy}, is_negative: {is_negative}")

            # Handle tuple case
            if isinstance(parent_id, tuple):
                parent_id = parent_id[0]

            # Handle TextBlock case
            if hasattr(content, 'text'):
                content = content.text

            query = """
            MATCH (parent:Response {id: $parent_id})
            CREATE (child:Response {
                id: randomUUID(),
                content: $content,
                strategy: $strategy,
                timestamp: datetime()
            })
            CREATE (parent)-[:LEADS_TO]->(child)
            """

            if is_negative:
                query += "SET child:NegativeStrat"

            query += " RETURN child.id as node_id"

            with self.driver.session() as session:
                result = session.run(query,
                    parent_id=parent_id,
                    content=str(content),  # Ensure content is string
                    strategy=strategy
                )
                record = result.single()
                if not record:
                    raise Exception("Failed to create node")
                return record["node_id"]
        except Exception as e:
            logger.error(f"Node creation error: {e}")
            raise

    def get_node_strategy(self, node_id):
        """Get the strategy used for a specific node"""
        try:
            # Handle tuple case
            if isinstance(node_id, tuple):
                node_id = node_id[0]

            logger.debug(f"Getting strategy for node: {node_id}")
            query = """
            MATCH (n:Response {id: $node_id})
            RETURN COALESCE(n.strategy, 'logos') as strategy
            """
            with self.driver.session() as session:
                result = session.run(query, node_id=node_id)
                record = result.single()
                if not record:
                    logger.warning(f"No strategy found for node {node_id}, defaulting to logos")
                    return "logos"
                return record["strategy"]
        except Exception as e:
            logger.error(f"Strategy retrieval error: {e}")
            return "logos"  # Default to logos on error

    def get_parent_id(self, node_id):
        """Get parent node ID"""
        try:
            # Handle tuple case
            if isinstance(node_id, tuple):
                node_id = node_id[0]

            logger.debug(f"Getting parent for node: {node_id}")
            query = """
            MATCH (parent:Response)-[:LEADS_TO]->(current:Response {id: $node_id})
            RETURN parent.id as parent_id
            """
            with self.driver.session() as session:
                result = session.run(query, node_id=node_id)
                record = result.single()
                return record["parent_id"] if record else None
        except Exception as e:
            logger.error(f"Parent ID retrieval error: {e}")
            return None

    def mark_node_negative(self, node_id):
      """Mark an existing node as NegativeStrat"""
      try:
          if isinstance(node_id, tuple):
              node_id = node_id[0]

          logger.debug(f"Marking node {node_id} as NegativeStrat")
          query = """
          MATCH (n:Response {id: $node_id})
          SET n:NegativeStrat
          RETURN n.id as node_id
          """
          with self.driver.session() as session:
              result = session.run(query, node_id=node_id)
              if not result.single():
                  raise Exception(f"Failed to mark node {node_id} as negative")
              logger.debug(f"Successfully marked node {node_id} as NegativeStrat")
      except Exception as e:
          logger.error(f"Error marking node as negative: {e}")
          raise

    def process_interaction(self, topic, initial, user_feedback=None, current_node_id=None):
      """Main interaction processing method"""
      try:
          logger.debug(f"Processing interaction - Topic: {topic}, Current node: {current_node_id}")

          # Initial topic setup
          if current_node_id is None:
              logger.debug("Initializing new conversation")
              current_node_id = self.initialize_topic(topic)
              return current_node_id

          if initial:
              # Create first argument
              current_strategy = self.get_node_strategy(current_node_id)
              search_results = self.search_info(topic, current_strategy)
              response = self.generate_response(topic, current_strategy, search_results)
              new_node_id = self.create_node(
                  current_node_id,
                  response,
                  current_strategy,
                  is_negative=False
              )
              return new_node_id, response

          # Process user feedback for subsequent responses
          if user_feedback:
              logger.debug(f"Processing feedback: {user_feedback}")
              sentiment = self.analyze_sentiment(user_feedback)
              print(sentiment)
              current_strategy = self.get_node_strategy(current_node_id)
              logger.debug(f"Current strategy: {current_strategy}, Sentiment: {sentiment}")

              if sentiment == "negative":
                  # Mark current node as negative
                  self.mark_node_negative(current_node_id)

                  # Get parent and change strategy
                  parent_id = self.get_parent_id(current_node_id)
                  if parent_id:
                      next_strategy = self.get_next_strategy(current_strategy)
                      base_node = parent_id
                  # else:
                  #     next_strategy = self.get_next_strategy(current_strategy)
                  #     base_node = current_node_id
              if sentiment == "positive":
                  # For positive feedback, continue chain from current node with same strategy
                  next_strategy = current_strategy
                  base_node = current_node_id

              # Generate new response
              search_results = self.search_info(topic, next_strategy)
              response = self.generate_response(topic, next_strategy, search_results)
              new_node_id = self.create_node(
                  base_node,
                  response,
                  next_strategy,
                  is_negative=False
              )
              return new_node_id, response

      except Exception as e:
          logger.error(f"Error in process_interaction: {e}", exc_info=True)
          return current_node_id, f"I apologize, but I encountered an error: {str(e)}"

    def get_next_strategy(self, current_strategy):
        """Get next strategy to try"""
        try:
            strategies = ['logos', 'ethos', 'pathos']
            current_index = strategies.index(current_strategy)
            next_strategy = strategies[(current_index + 1) % len(strategies)]
            logger.debug(f"Switching strategy from {current_strategy} to {next_strategy}")
            return next_strategy
        except Exception as e:
            logger.error(f"Strategy rotation error: {e}")
            return "logos"  # Default to logos on error

    def clear_database(self):
      """Clear all nodes and relationships from the Neo4j database"""
      try:
          logger.debug("Attempting to clear all nodes from database")
          query = """
          MATCH (n)
          DETACH DELETE n
          """
          with self.driver.session() as session:
              session.run(query)
          logger.info("Successfully cleared database")
      except Exception as e:
          logger.error(f"Error clearing database: {e}")
          raise

In [6]:
 if __name__ == "__main__":
    # Configuration
    #Removed as it contains sensitive data
    CONFIG = {
        "neo4j_uri": "",
        "neo4j_user": "",
        "neo4j_password": "",
        "serpapi_key": "",
        "anthropic_key": ""
    }



    # Initialize agent
    agent = AdaptivePersuader(**CONFIG)

    agent.clear_database()

    # Start new topic
    topic = "Renewable Energy"
    current_node = agent.process_interaction(topic,True)
    counter = 0

    # Example interaction loop
    while True:
        user_feedback = input("Your response (or 'quit' to exit): ")
        if user_feedback.lower() == 'quit':
            break

        if counter == 0:
            initial = True
        else:
          initial = False
        counter+=1
        current_node, response = agent.process_interaction(
            topic,
            initial,
            user_feedback,
            current_node
        )

        print(f"AI Response: {response}")
#

Your response (or 'quit' to exit): Hi I would like to talk about renewable energy and debate this topic 
AI Response: [TextBlock(citations=None, text="Here is a persuasive argument in favor of renewable energy using logos-based arguments:\n\nRenewable energy is the logical choice for powering our future. The facts are clear - renewable sources like solar, wind, hydro, and geothermal energy are clean, abundant, and increasingly cost-competitive with fossil fuels. \n\nA recent report from the International Renewable Energy Agency found that the cost of electricity from onshore wind fell by 13%, and solar PV electricity costs fell by 13.1% in 2021 alone. In many parts of the world, renewables are now the cheapest source of new power generation. Economically, renewables make sense.\n\nThe environmental case for renewables is also compelling. Unlike coal, oil and natural gas, renewable energy produces little to no greenhouse gas emissions or air pollutants. Shifting to renewables is one of 