In [None]:
import mysql.connector
from mysql.connector import Error
import openai
import json
import re
from datetime import datetime
from typing import Dict, List, Optional, Any
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
class DatabaseManager:
    """Handles all database operations"""
    
    def __init__(self, host: str, database: str, user: str, password: str):
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.connection = None
        self.create_connection()
        self.create_table()
    
    def create_connection(self):
        """Create database connection"""
        try:
            self.connection = mysql.connector.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password
            )
            if self.connection.is_connected():
                logger.info("Successfully connected to MySQL database")
        except Error as e:
            logger.error(f"Error connecting to MySQL: {e}")
    
    def create_table(self):
        """Create users table if it doesn't exist"""
        try:
            cursor = self.connection.cursor()
            create_table_query = """
            CREATE TABLE IF NOT EXISTS users (
                id INT AUTO_INCREMENT PRIMARY KEY,
                full_name VARCHAR(255) NOT NULL,
                email_address VARCHAR(255) UNIQUE NOT NULL,
                phone_number VARCHAR(20),
                date_of_birth DATE,
                address TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
            )
            """
            cursor.execute(create_table_query)
            self.connection.commit()
            logger.info("Users table created successfully")
        except Error as e:
            logger.error(f"Error creating table: {e}")
    
    def insert_user(self, user_data: Dict) -> Dict:
        """Insert a new user"""
        try:
            cursor = self.connection.cursor()
            insert_query = """
            INSERT INTO users (full_name, email_address, phone_number, date_of_birth, address)
            VALUES (%s, %s, %s, %s, %s)
            """
            cursor.execute(insert_query, (
                user_data['full_name'],
                user_data['email_address'],
                user_data.get('phone_number'),
                user_data.get('date_of_birth'),
                user_data.get('address')
            ))
            self.connection.commit()
            user_id = cursor.lastrowid
            return {"status": "success", "message": f"User created successfully with ID: {user_id}", "user_id": user_id}
        except Error as e:
            return {"status": "error", "message": f"Error inserting user: {e}"}
    
    def select_users(self, filters: Optional[Dict] = None) -> Dict:
        """Select users with optional filters"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            if filters:
                conditions = []
                values = []
                for key, value in filters.items():
                    if key in ['full_name', 'email_address', 'phone_number', 'address']:
                        conditions.append(f"{key} LIKE %s")
                        values.append(f"%{value}%")
                    elif key == 'id':
                        conditions.append(f"{key} = %s")
                        values.append(value)
                
                if conditions:
                    query = f"SELECT * FROM users WHERE {' AND '.join(conditions)}"
                    cursor.execute(query, values)
                else:
                    cursor.execute("SELECT * FROM users")
            else:
                cursor.execute("SELECT * FROM users")
            
            results = cursor.fetchall()
            return {"status": "success", "data": results, "count": len(results)}
        except Error as e:
            return {"status": "error", "message": f"Error selecting users: {e}"}
    
    def update_user(self, user_id: int, update_data: Dict) -> Dict:
        """Update user information"""
        try:
            cursor = self.connection.cursor()
            
            # Build dynamic update query
            set_clauses = []
            values = []
            for key, value in update_data.items():
                if key in ['full_name', 'email_address', 'phone_number', 'date_of_birth', 'address']:
                    set_clauses.append(f"{key} = %s")
                    values.append(value)
            
            if not set_clauses:
                return {"status": "error", "message": "No valid fields to update"}
            
            values.append(user_id)
            update_query = f"UPDATE users SET {', '.join(set_clauses)} WHERE id = %s"
            
            cursor.execute(update_query, values)
            self.connection.commit()
            
            if cursor.rowcount > 0:
                return {"status": "success", "message": f"User {user_id} updated successfully"}
            else:
                return {"status": "error", "message": f"No user found with ID {user_id}"}
        except Error as e:
            return {"status": "error", "message": f"Error updating user: {e}"}
    
    def delete_user(self, user_id: int) -> Dict:
        """Delete a user"""
        try:
            cursor = self.connection.cursor()
            delete_query = "DELETE FROM users WHERE id = %s"
            cursor.execute(delete_query, (user_id,))
            self.connection.commit()
            
            if cursor.rowcount > 0:
                return {"status": "success", "message": f"User {user_id} deleted successfully"}
            else:
                return {"status": "error", "message": f"No user found with ID {user_id}"}
        except Error as e:
            return {"status": "error", "message": f"Error deleting user: {e}"}


In [None]:
class LLMAgent:
    """AI Agent for processing natural language queries"""
    
    def __init__(self, api_key: str):
        openai.api_key = api_key
        self.system_prompt = """
You are a database assistant that helps users perform CRUD operations on a users database.
The database has the following columns: id, full_name, email_address, phone_number, date_of_birth, address.

Your job is to analyze user requests and determine:
1. The operation type (CREATE, READ, UPDATE, DELETE)
2. Extract relevant data fields
3. Identify search criteria or filters

Always respond with a JSON object containing:
- "operation": one of ["CREATE", "READ", "UPDATE", "DELETE"]
- "data": extracted field values (for CREATE/UPDATE)
- "filters": search criteria (for READ/UPDATE/DELETE)
- "user_id": specific user ID if mentioned

Examples:
- "Add John Doe with email john@email.com" -> {"operation": "CREATE", "data": {"full_name": "John Doe", "email_address": "john@email.com"}}
- "Find users named John" -> {"operation": "READ", "filters": {"full_name": "John"}}
- "Update user 5's phone to 123-456-7890" -> {"operation": "UPDATE", "user_id": 5, "data": {"phone_number": "123-456-7890"}}
- "Delete user with ID 3" -> {"operation": "DELETE", "user_id": 3}
"""
    
    def process_query(self, user_input: str) -> Dict:
        """Process natural language query and extract intent"""
        try:
            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": self.system_prompt},
                    {"role": "user", "content": user_input}
                ],
                temperature=0.1
            )
            
            result = json.loads(response.choices[0].message.content)
            return result
        except Exception as e:
            logger.error(f"Error processing query with LLM: {e}")
            return self.fallback_parser(user_input)
    
    def fallback_parser(self, user_input: str) -> Dict:
        """Fallback parser using regex patterns"""
        user_input = user_input.lower()
        
        # CREATE patterns
        if any(word in user_input for word in ['add', 'create', 'insert', 'new']):
            return {"operation": "CREATE", "data": self.extract_user_data(user_input)}
        
        # UPDATE patterns
        elif any(word in user_input for word in ['update', 'modify', 'change', 'edit']):
            user_id = self.extract_user_id(user_input)
            return {"operation": "UPDATE", "user_id": user_id, "data": self.extract_user_data(user_input)}
        
        # DELETE patterns
        elif any(word in user_input for word in ['delete', 'remove']):
            user_id = self.extract_user_id(user_input)
            return {"operation": "DELETE", "user_id": user_id}
        
        # READ patterns (default)
        else:
            return {"operation": "READ", "filters": self.extract_search_filters(user_input)}
    
    def extract_user_data(self, text: str) -> Dict:
        """Extract user data from text"""
        data = {}
        
        # Extract email
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        email_match = re.search(email_pattern, text)
        if email_match:
            data['email_address'] = email_match.group()
        
        # Extract phone
        phone_pattern = r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
        phone_match = re.search(phone_pattern, text)
        if phone_match:
            data['phone_number'] = phone_match.group()
        
        # Extract name (simple approach)
        name_patterns = [
            r'name\s+(?:is\s+)?([A-Za-z\s]+?)(?:\s+with|\s+email|\s*$)',
            r'add\s+([A-Za-z\s]+?)(?:\s+with|\s+email)',
            r'create\s+([A-Za-z\s]+?)(?:\s+with|\s+email)'
        ]
        
        for pattern in name_patterns:
            name_match = re.search(pattern, text, re.IGNORECASE)
            if name_match:
                data['full_name'] = name_match.group(1).strip()
                break
        
        return data
    
    def extract_user_id(self, text: str) -> Optional[int]:
        """Extract user ID from text"""
        id_pattern = r'\b(?:id|user)\s*(\d+)\b'
        id_match = re.search(id_pattern, text, re.IGNORECASE)
        if id_match:
            return int(id_match.group(1))
        return None
    
    def extract_search_filters(self, text: str) -> Dict:
        """Extract search filters from text"""
        filters = {}
        
        # Extract email for search
        email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        email_match = re.search(email_pattern, text)
        if email_match:
            filters['email_address'] = email_match.group()
        
        # Extract name for search
        name_patterns = [
            r'named?\s+([A-Za-z\s]+?)(?:\s|$)',
            r'find\s+([A-Za-z\s]+?)(?:\s|$)',
            r'search\s+([A-Za-z\s]+?)(?:\s|$)'
        ]
        
        for pattern in name_patterns:
            name_match = re.search(pattern, text, re.IGNORECASE)
            if name_match:
                filters['full_name'] = name_match.group(1).strip()
                break
        
        return filters


In [None]:
class ConversationalCRUDChatbot:
    """Main chatbot class orchestrating all components"""
    
    def __init__(self, db_config: Dict, openai_api_key: str):
        self.db_manager = DatabaseManager(**db_config)
        self.llm_agent = LLMAgent(openai_api_key)
        self.conversation_history = []
    
    def process_message(self, user_message: str) -> str:
        """Process user message and return response"""
        try:
            # Add to conversation history
            self.conversation_history.append({"role": "user", "message": user_message})
            
            # Process with LLM agent
            intent = self.llm_agent.process_query(user_message)
            
            # Execute database operation
            response = self.execute_operation(intent)
            
            # Add to conversation history
            self.conversation_history.append({"role": "assistant", "message": response})
            
            return response
            
        except Exception as e:
            error_msg = f"I encountered an error processing your request: {str(e)}"
            logger.error(error_msg)
            return error_msg
    
    def execute_operation(self, intent: Dict) -> str:
        """Execute database operation based on intent"""
        operation = intent.get("operation", "READ")
        
        if operation == "CREATE":
            return self.handle_create(intent.get("data", {}))
        elif operation == "READ":
            return self.handle_read(intent.get("filters", {}))
        elif operation == "UPDATE":
            return self.handle_update(intent.get("user_id"), intent.get("data", {}))
        elif operation == "DELETE":
            return self.handle_delete(intent.get("user_id"))
        else:
            return "I'm not sure what operation you'd like to perform. Can you please clarify?"
    
    def handle_create(self, data: Dict) -> str:
        """Handle CREATE operation"""
        if not data.get("full_name") and not data.get("email_address"):
            return "To create a user, I need at least a name or email address. Could you provide more information?"
        
        result = self.db_manager.insert_user(data)
        
        if result["status"] == "success":
            return f"✅ {result['message']}"
        else:
            return f"❌ {result['message']}"
    
    def handle_read(self, filters: Dict) -> str:
        """Handle READ operation"""
        result = self.db_manager.select_users(filters if filters else None)
        
        if result["status"] == "success":
            if result["count"] == 0:
                return "No users found matching your criteria."
            
            response = f"Found {result['count']} user(s):\n\n"
            for user in result["data"]:
                response += f"🆔 ID: {user['id']}\n"
                response += f"👤 Name: {user['full_name']}\n"
                response += f"📧 Email: {user['email_address']}\n"
                if user['phone_number']:
                    response += f"📞 Phone: {user['phone_number']}\n"
                if user['date_of_birth']:
                    response += f"🎂 DOB: {user['date_of_birth']}\n"
                if user['address']:
                    response += f"🏠 Address: {user['address']}\n"
                response += "-" * 30 + "\n"
            
            return response
        else:
            return f"❌ {result['message']}"
    
    def handle_update(self, user_id: Optional[int], data: Dict) -> str:
        """Handle UPDATE operation"""
        if not user_id:
            return "Please specify the user ID you'd like to update."
        
        if not data:
            return "Please specify what information you'd like to update."
        
        result = self.db_manager.update_user(user_id, data)
        
        if result["status"] == "success":
            return f"✅ {result['message']}"
        else:
            return f"❌ {result['message']}"
    
    def handle_delete(self, user_id: Optional[int]) -> str:
        """Handle DELETE operation"""
        if not user_id:
            return "Please specify the user ID you'd like to delete."
        
        result = self.db_manager.delete_user(user_id)
        
        if result["status"] == "success":
            return f"✅ {result['message']}"
        else:
            return f"❌ {result['message']}"
    
    def get_help(self) -> str:
        """Return help information"""
        return """
🤖 **Database Chatbot Help**

I can help you manage users with these operations:

**➕ CREATE Users:**
- "Add John Doe with email john@email.com"
- "Create a new user named Jane Smith with phone 123-456-7890"

**🔍 READ/SEARCH Users:**
- "Show all users"
- "Find users named John"
- "Search for john@email.com"

**✏️ UPDATE Users:**
- "Update user 5's phone to 123-456-7890"
- "Change user 3's email to newemail@email.com"

**🗑️ DELETE Users:**
- "Delete user with ID 3"
- "Remove user 5"

**Database Fields:**
- Full Name
- Email Address  
- Phone Number
- Date of Birth
- Address

Just tell me what you'd like to do in natural language!
"""

def main():
    """Main function to run the chatbot"""
    
    # Database configuration
    DB_CONFIG = {
        "host": "localhost",
        "database": "chatbot_db",
        "user": "your_username",
        "password": "your_password"
    }
    
    # OpenAI API key
    OPENAI_API_KEY = "your_openai_api_key"
    
    # Initialize chatbot
    print("🤖 Initializing Conversational CRUD Chatbot...")
    chatbot = ConversationalCRUDChatbot(DB_CONFIG, OPENAI_API_KEY)
    
    print("✅ Chatbot initialized successfully!")
    print("Type 'help' for commands, 'quit' to exit\n")
    
    while True:
        try:
            user_input = input("You: ").strip()
            
            if user_input.lower() in ['quit', 'exit', 'bye']:
                print("👋 Goodbye!")
                break
            elif user_input.lower() in ['help', '?']:
                print(chatbot.get_help())
                continue
            elif not user_input:
                continue
            
            response = chatbot.process_message(user_input)
            print(f"🤖 Bot: {response}\n")
            
        except KeyboardInterrupt:
            print("\n👋 Goodbye!")
            break
        except Exception as e:
            print(f"❌ Error: {e}\n")

if __name__ == "__main__":
    main()