In [11]:
import sqlite3

# filename to form database
file = "Sqlite3.db"

try:
  conn = sqlite3.connect(file)
  print("Database Sqlite3.db formed.")
except:
  print("Database Sqlite3.db not formed.")

Database Sqlite3.db formed.


In [12]:
# Importing Sqlite3 Module
import sqlite3

try:
    # Making a connection between sqlite3 database and Python Program
    sqliteConnection = sqlite3.connect('SQLite_Retrieving_data.db')
    # If sqlite3 makes a connection with python program then it will print "Connected to SQLite"
    # Otherwise it will show errors
    print("Connected to SQLite")
except sqlite3.Error as error:
    print("Failed to connect with sqlite3 database", error)
finally:
    # Inside Finally Block, If connection is open, we need to close it
    if sqliteConnection:
        # using close() method, we will close the connection
        sqliteConnection.close()
        # After closing connection object, we will print "the sqlite connection is closed"
        print("the sqlite connection is closed")

Connected to SQLite
the sqlite connection is closed


In [13]:
import csv
myfile='/content/sample_data/Doctorlist.csv'


In [None]:
with open(myfile, 'r') as file:
                # Using csv reader with '|' as delimiter
                reader = csv.reader(file, delimiter='|')
                # Skip header row
                next(reader, None)
                for row in reader:
                  print(row[4])


555-0123
555-0124
555-0125


In [14]:
pip install openai



In [15]:
pip install openai==0.28



In [16]:
import openai
from datetime import datetime, timedelta
import json
import sqlite3
import csv

class DoctorAppointmentBot:
    def __init__(self, api_key, db_path='doctor_appointments.db'):
        """Initialize the chatbot with OpenAI API key and database connection."""
        openai.api_key = api_key
        self.db_path = db_path
        self.conversation_history = []
        self._initialize_database()

    def _initialize_database(self):
        """Initialize SQLite database and create necessary tables."""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()

                # Create DoctorList table
                cursor.execute('''
                    CREATE TABLE IF NOT EXISTS DoctorList (
                        doctor_id INTEGER PRIMARY KEY AUTOINCREMENT,
                        name TEXT NOT NULL,
                        specialization TEXT NOT NULL,
                        schedule TEXT NOT NULL,
                        max_patients_per_day INTEGER,
                        contact_number TEXT,
                        email TEXT
                    )
                ''')

                # Create Appointments table
                cursor.execute('''
                    CREATE TABLE IF NOT EXISTS Appointments (
                        appointment_id INTEGER PRIMARY KEY AUTOINCREMENT,
                        doctor_id INTEGER,
                        patient_name TEXT NOT NULL,
                        patient_contact TEXT,
                        appointment_date TEXT NOT NULL,
                        appointment_time TEXT NOT NULL,
                        reason TEXT,
                        status TEXT DEFAULT 'scheduled',
                        FOREIGN KEY (doctor_id) REFERENCES DoctorList(doctor_id)
                    )
                ''')

                conn.commit()
        except sqlite3.Error as e:
            print(f"Database initialization error: {e}")
    def import_doctors_from_file(self, file_path):
        """Import doctor information from a pipe-delimited file."""
        try:
            with open(file_path, 'r') as file:
                # Using csv reader with '|' as delimiter
                reader = csv.reader(file, delimiter='|')
                # Skip header row
                next(reader, None)

                with sqlite3.connect(self.db_path) as conn:
                    cursor = conn.cursor()

                    for row in reader:
                        if len(row) >= 6:  # Ensure we have all required fields
                            cursor.execute('''
                                INSERT INTO DoctorList (
                                    name, specialization, schedule,
                                    max_patients_per_day, contact_number, email
                                ) VALUES (?, ?, ?, ?, ?, ?)
                            ''', (row[0], row[1], row[2], int(row[3]), row[4], row[5]))

                    conn.commit()
                print("Doctor information imported successfully!")

        except FileNotFoundError:
            print(f"Error: File {file_path} not found")
        except sqlite3.Error as e:
            print(f"Database error: {e}")
        except Exception as e:
            print(f"Error importing doctor information: {e}")

    def _define_functions(self):
        """Define the available functions for the ChatGPT API."""
        return [
            {
                "name": "get_available_doctors",
                "description": "Get a list of available doctors and their specializations",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "specialization": {
                            "type": "string",
                            "description": "Optional specialization to filter doctors"
                        }
                    }
                }
            },
            {
                "name": "get_available_slots",
                "description": "Get available appointment slots for a specific doctor",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "doctor_id": {
                            "type": "integer",
                            "description": "ID of the doctor to check availability"
                        },
                        "date": {
                            "type": "string",
                            "description": "Date to check availability (YYYY-MM-DD format)"
                        }
                    },
                    "required": ["doctor_id"]
                }
            },
            {
                "name": "schedule_appointment",
                "description": "Schedule a new appointment",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "doctor_id": {
                            "type": "integer",
                            "description": "ID of the doctor"
                        },
                        "patient_name": {
                            "type": "string",
                            "description": "Name of the patient"
                        },
                        "patient_contact": {
                            "type": "string",
                            "description": "Contact information of the patient"
                        },
                        "appointment_date": {
                            "type": "string",
                            "description": "Date of appointment (YYYY-MM-DD format)"
                        },
                        "appointment_time": {
                            "type": "string",
                            "description": "Time of appointment (HH:MM AM/PM format)"
                        },
                        "reason": {
                            "type": "string",
                            "description": "Reason for the appointment"
                        }
                    },
                    "required": ["doctor_id", "patient_name", "patient_contact",
                               "appointment_date", "appointment_time", "reason"]
                }
            }
        ]

    def get_available_doctors(self, specialization=None):
        """Get list of available doctors, optionally filtered by specialization."""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()

                if specialization:
                    cursor.execute("""
                        SELECT * FROM DoctorList
                        WHERE specialization LIKE ?
                    """, (f"%{specialization}%",))
                else:
                    cursor.execute("SELECT * FROM DoctorList")

                doctors = cursor.fetchall()
                return {
                    "doctors": [
                        {
                            "id": doc[0],
                            "name": doc[1],
                            "specialization": doc[2],
                            "schedule": doc[3],
                            "contact": doc[5]
                        }
                        for doc in doctors
                    ]
                }
        except sqlite3.Error as e:
            return {"error": str(e)}

    def get_available_slots(self, doctor_id, date=None):
        """Get available slots for a specific doctor and date."""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()

                # Get doctor's schedule
                cursor.execute("""
                    SELECT schedule FROM DoctorList WHERE doctor_id = ?
                """, (doctor_id,))

                doctor_schedule = cursor.fetchone()
                if not doctor_schedule:
                    return {"error": "Doctor not found"}

                working_days = doctor_schedule[0].split('-')

                # If no date specified, get slots for next 7 days
                if not date:
                    dates = [(datetime.now() + timedelta(days=i)).strftime('%Y-%m-%d')
                            for i in range(7)]
                else:
                    dates = [date]

                available_slots = {}
                for date_str in dates:
                    # Check if doctor works on this day
                    if datetime.strptime(date_str, '%Y-%m-%d').strftime('%a') in working_days:
                        # Get booked slots
                        cursor.execute("""
                            SELECT appointment_time
                            FROM Appointments
                            WHERE doctor_id = ? AND appointment_date = ?
                        """, (doctor_id, date_str))

                        booked_slots = [row[0] for row in cursor.fetchall()]

                        # Default time slots
                        all_slots = ["09:00 AM", "10:00 AM", "11:00 AM",
                                   "02:00 PM", "03:00 PM", "04:00 PM"]

                        # Remove booked slots
                        available_slots[date_str] = [
                            slot for slot in all_slots if slot not in booked_slots
                        ]

                return {"available_slots": available_slots}

        except sqlite3.Error as e:
            return {"error": str(e)}

    def schedule_appointment(self, doctor_id, patient_name, patient_contact,
                           appointment_date, appointment_time, reason):
        """Schedule a new appointment."""
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.cursor()

                # Verify doctor exists
                cursor.execute("""
                    SELECT COUNT(*) FROM DoctorList WHERE doctor_id = ?
                """, (doctor_id,))
                if cursor.fetchone()[0] == 0:
                    return {"error": "Doctor not found"}

                # Check if slot is available
                cursor.execute("""
                    SELECT COUNT(*) FROM Appointments
                    WHERE doctor_id = ? AND appointment_date = ?
                    AND appointment_time = ? AND status = 'scheduled'
                """, (doctor_id, appointment_date, appointment_time))

                if cursor.fetchone()[0] > 0:
                    return {"error": "Time slot not available"}

                # Book appointment
                cursor.execute("""
                    INSERT INTO Appointments (
                        doctor_id, patient_name, patient_contact,
                        appointment_date, appointment_time, reason
                    ) VALUES (?, ?, ?, ?, ?, ?)
                """, (doctor_id, patient_name, patient_contact,
                      appointment_date, appointment_time, reason))

                conn.commit()
                return {
                    "success": True,
                    "message": f"Appointment scheduled for {appointment_date} at {appointment_time}"
                }

        except sqlite3.Error as e:
            return {"error": str(e)}

    def _get_chatgpt_response(self, user_input):
        """Get response from ChatGPT API using function calling."""
        self.conversation_history.append({"role": "user", "content": user_input})
        system_prompt = f"""
        You are a medical appointment scheduling assistant with access to the following information.

        Help patients schedule appointments
        by collecting necessary information and using the available functions to check
        availability and book appointments. Be professional, courteous, and HIPAA-compliant.

        Follow these steps when helping patients:
        1. Understand their needs and preferred doctor/specialization
        2. Check doctor availability using the functions
        3. Suggest available time slots
        4. Collect patient information
        5. Schedule the appointment

        Always verify availability before suggesting time slots.
        """

        try:
            messages = [
                {"role": "system", "content": system_prompt},
                *self.conversation_history
            ]

            response = openai.ChatCompletion.create(
                model="gpt-4o-mini",
                messages=messages,
                functions=self._define_functions(),
                function_call="auto"
            )

            assistant_message = response.choices[0].message

            # Handle function calling
            if assistant_message.get("function_call"):
                function_name = assistant_message["function_call"]["name"]
                function_args = json.loads(assistant_message["function_call"]["arguments"])

                # Call the appropriate function
                if function_name == "get_available_doctors":
                    function_response = self.get_available_doctors(
                        function_args.get("specialization")
                    )
                elif function_name == "get_available_slots":
                    function_response = self.get_available_slots(
                        function_args["doctor_id"],
                        function_args.get("date")
                    )
                elif function_name == "schedule_appointment":
                    function_response = self.schedule_appointment(
                        function_args["doctor_id"],
                        function_args["patient_name"],
                        function_args["patient_contact"],
                        function_args["appointment_date"],
                        function_args["appointment_time"],
                        function_args["reason"]
                    )

                # Add function response to conversation
                messages.append({
                    "role": "function",
                    "name": function_name,
                    "content": json.dumps(function_response)
                })

                # Get final response
                second_response = openai.ChatCompletion.create(
                    model="gpt-4o-mini",
                    messages=messages
                )

                final_response = second_response.choices[0].message["content"]
                self.conversation_history.append(
                    {"role": "assistant", "content": final_response}
                )
                return final_response

            # Handle regular response
            self.conversation_history.append(
                {"role": "assistant", "content": assistant_message["content"]}
            )
            return assistant_message["content"]

        except Exception as e:
            return f"Error: {str(e)}"

    def process_user_input(self, user_input):
        """Process user input and return appropriate response."""
        return self._get_chatgpt_response(user_input)

def main():
    # Initialize the chatbot with your OpenAI API key
    api_key = ""
    bot = DoctorAppointmentBot(api_key)

    print("Welcome to the Doctor Appointment Chatbot!")
    print("Type 'quit' to exit")

    while True:
        user_input = input("\nYou: ")
        if user_input.lower() == 'quit':
            break

        response = bot.process_user_input(user_input)
        print(f"Bot: {response}")

if __name__ == "__main__":
    main()

Welcome to the Doctor Appointment Chatbot!
Type 'quit' to exit

You: book an appointment
Bot: I can help you with that. Could you please provide me with the following information:

1. What type of appointment do you need? (e.g., general check-up, specialist consultation)
2. Do you have a specific doctor or specialization in mind?
3. What date are you looking to schedule the appointment?
4. Your name and contact information (phone number or email) for the appointment. 

Once I have this information, I can check the availability and find a suitable time for you.

You: specialist consultation, dr. John Smith
Bot: Thank you for providing that information. Could you please let me know:

1. What date are you looking to schedule the appointment with Dr. John Smith?
2. Your name and contact information (phone number or email) for the appointment.

Once I have this information, I will check Dr. Smith's availability for the specified date.

You: 31 Jan 2025
Bot: It seems that there are currently