In [None]:
import requests

def doctor_info(hospital_name):

    try:
        # Format the API URL
        base_url = "http://127.0.0.1:7001"
        endpoint = f"/doctors"
        api_url = base_url + endpoint
        params = {
            'hospital_name': hospital_name
        }

        # Make GET request to the API
        response = requests.get(api_url, params=params)
        # Check if request was successful
        response.raise_for_status()
        # Parse the response
        data = response.json()
        return data

    except requests.exceptions.RequestException as e:
        return f"Error checking availability: {str(e)}"
    
if __name__ == "__main__":
    result2 = doctor_info("Manav Jiban Pharmacy")
    print(result2)


In [10]:
import requests
import contextvars
ehr_id_var = contextvars.ContextVar("ehr_id")

id = "001"
ehr_id_var.set(id)


def check_availability_by_doctor(query_date: str, doctor_name: str,
                                 hospital_name: str) -> dict:
    """
    Checking the database if we have availability for the specific doctor.
    The parameters should be mentioned by the user in the query
    """
    try:
        # Format the API URL
        # logger.info(f"token:{token}")
        id = ehr_id_var.get("ehr_id")
        base_url = "http://localhost:7001"
        endpoint = f"/availability"
        api_url = base_url + endpoint
        params = {
            'query_date': query_date,
            'doctor_name': doctor_name,
            'hospital_name': hospital_name,
            'current_user': id
        }
        # headers = {
        #     'Authorization':
        #     f'Bearer {token}'  # Include the token in the Authorization header
        # }
        # Make GET request to the API
        response = requests.get(api_url, params=params)
        # Check if request was successful
        response.raise_for_status()
        # Parse the response
        data = response.json()
        available_slots = [
            item['date_slot'].split(' ')[1] for item in data.get("data", [])
        ]
        if not available_slots:
            output = "No availability in the entire day"
        else:
            output = f'This availability for {query_date}\n'
            output += "Available slots: " + ', '.join(available_slots)
        return {'message': data.get('message'), 'data': output}
    except requests.exceptions.RequestException as e:
        return f"Error checking availability: {str(e)}"



if __name__ == "__main__":
    result2 = check_availability_by_doctor("16-06-2025", "john doe",
                                           "Chaitanya hospital")
    # result2 = check_availability_by_doctor("16-06-2025", "jane smith", "manav jiban pharmacy")
    print(result2)


{'message': 'Success', 'data': 'This availability for 16-06-2025\nAvailable slots: 11.30, 13.00, 13.30, 15.00, 15.30, 16.00, 16.30'}


In [9]:
# from models.tools import DateModel, DateTimeModel, IdentificationNumberModel
# from langchain_core.tools import tool
import pandas as pd
from datetime import datetime
# from utils.database import db
import logging, re, os, requests
# from service.hospital_search import HospitalLocator
from typing import Dict, Any, Optional, List, Literal
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi import FastAPI, HTTPException, Header, Query
from jose import JWTError, jwt
from fastapi.security import OAuth2PasswordBearer
from langchain_core.runnables import RunnableConfig
from fastapi import APIRouter, Depends
# from models.generate_answer import GenerationResponse, GenerationRequest, ErrorResponse

router = APIRouter()
# SECRET_KEY = os.getenv("b4ekONIpZzL1BHWL7+A83Zup43wYKcyLYiPMfDhqY1Y=")
SECRET_KEY = "b4ekONIpZzL1BHWL7+A83Zup43wYKcyLYiPMfDhqY1Y="
if not SECRET_KEY:
    raise ValueError("JWT_SECRET_KEY not set in environment")

ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

def convert_datetime_format(dt_str):
    # Parse the input datetime string
    dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M")

    # Format the output as 'DD-MM-YYYY H.M' (removing leading zero from hour only)
    return dt.strftime("%d-%m-%Y %#H.%M")

from contextvars import ContextVar
ehr_id_ctx: ContextVar[str] = ContextVar("ehr_id")

async def get_current_user_id(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        ehr_id: str = payload.get("user_ehr_id")
        ehr_id_ctx.set(ehr_id)
        if ehr_id is None:
            raise credentials_exception
        return ehr_id
    except JWTError:
        raise credentials_exception

def get_ehr_id_from_context() -> str:
    return ehr_id_ctx.get()

@router.post("/generate-stream/")
def book_appointment(query_date: str, doctor_name: str, hospital_name: str) -> str:
    try:
        current_user = get_ehr_id_from_context()
        # logger.info(f"patient_id_number:{patient_to_attend}")
        # Format the API URL
        base_url = "http://127.0.0.1:7001"
        endpoint = f"/booking"
        api_url = base_url + endpoint

        params = {
            'query_date': query_date,
            'doctor_name': doctor_name,
            'hospital_name': hospital_name,
            'patient_to_attend': current_user
        }

        # Make GET request to the API
        response = requests.post(api_url, params=params)

        # Check if request was successful
        response.raise_for_status()

        # Parse the response
        data = response.json()
        return data

    except requests.exceptions.RequestException as e:
        return f"Error checking availability: {str(e)}"


def set_appointment(query_date: str, doctor_name: str,
                    hospital_name: str) -> str:
    """Book appointment with selected doctor"""

    return book_appointment(query_date, doctor_name, hospital_name)


if __name__ == "__main__":
    sent = set_appointment("16-06-2025", "john doe", "Chaitanya hospital")
    print(sent)

LookupError: <ContextVar name='ehr_id' at 0x000001D63A6BDDF0>

In [52]:
import secrets
import base64

def generate_secret_key():
    return base64.b64encode(secrets.token_bytes(32)).decode('utf-8')

if __name__ == "__main__":
    secret_key = generate_secret_key()
    print(secret_key)

b4ekONIpZzL1BHWL7+A83Zup43wYKcyLYiPMfDhqY1Y=


In [None]:
import requests
from fastapi.security import OAuth2PasswordBearer
import os
from fastapi import Depends, HTTPException
from jose import JWTError, jwt
import logging
import asyncio


logger = logging.getLogger(__name__)
SECRET_KEY = "b4ekONIpZzL1BHWL7+A83Zup43wYKcyLYiPMfDhqY1Y="
if not SECRET_KEY:
    raise ValueError("JWT_SECRET_KEY not set in environment")

ALGORITHM = "HS256"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="placeholder")


def get_current_user_id(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        ehr_id: str = payload.get("user_ehr_id")
        if ehr_id is None:
            raise credentials_exception
        logger.info("ehr_id from tools:", ehr_id)
        return ehr_id
    except JWTError:
        raise credentials_exception



if __name__ == "__main__":
    user_id = get_current_user_id()
    logger.info(user_id)

    # token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTc1MDc1NDA2NCwianRpIjoiNTVmY2ExNDAtZDg3ZS00MGI3LWJlYjctNTBlNDJmNjljMWVlIiwidHlwZSI6ImFjY2VzcyIsInN1YiI6Im1vbnR5Lm5hdGguMTk5NkBnbWFpbC5jb20iLCJuYmYiOjE3NTA3NTQwNjQsImV4cCI6MTc1MDc1NzY2NCwibmFtZSI6IkFtcml0ZW5kdSBOYXRoIiwicGljdHVyZSI6Imh0dHBzOi8vbGgzLmdvb2dsZXVzZXJjb250ZW50LmNvbS9hL0FDZzhvY0s3eVVvM21uT296S2dKZDJXa21ZRXRwb1RVSW1zbHJXOGZpbl9WWHFoMWRsekpNajR1PXM5Ni1jIiwiYXV0aF9wcm92aWRlciI6Imdvb2dsZSIsInVzZXJfZWhyX2lkIjoiMDAwMDAwMSJ9.09kPcIxAnorgd_KUMzVFKTHe9rr9pyQtJXwVfuXJ4oM"  # Replace with your actual token
    # asyncio.run(main())
# def bookAppointment(query_date, doctor_name, hospital_name, patient_to_attend):
#     """
#     Checking the database if we have availability for the specific doctor.
#     The parameters should be mentioned by the user in the query
#     """
#     try:
#         # Format the API URL
#         base_url = "http://127.0.0.1:7000"
#         endpoint = f"/booking/{query_date}/{doctor_name}/{hospital_name}/{patient_to_attend}"
#         api_url = base_url + endpoint

#         # Make GET request to the API
#         response = requests.post(api_url)

#         # Check if request was successful
#         response.raise_for_status()

#         # Parse the response
#         data = response.json()
#         return data

#     except requests.exceptions.RequestException as e:
#         return f"Error checking availability: {str(e)}"


AttributeError: 'OAuth2PasswordBearer' object has no attribute 'rsplit'

In [None]:

import requests

def cancel_appointment(query_date, doctor_name, hospital_name, patient_to_attend):
    """
    Checking the database if we have availability for the specific doctor.
    The parameters should be mentioned by the user in the query
    """
    try:
        # Format the API URL
        base_url = "http://127.0.0.1:7000"
        endpoint = f"/cancel/{query_date}/{doctor_name}/{hospital_name}/{patient_to_attend}"
        api_url = base_url + endpoint

        # Make GET request to the API
        response = requests.post(api_url)
        # Check if request was successful
        response.raise_for_status()
        # Parse the response
        data = response.json()
        return data

    except requests.exceptions.RequestException as e:
        return f"Error checking availability: {str(e)}"

if __name__ == "__main__":
    result2 = cancel_appointment("16-06-2025 11.30", "john doe", "Chaitanya Hospital", "1234567")
    print(result2)






{'message': 'Success', 'data': {'appointment_cancelled': True, 'slot': '16-06-2025 11.30'}}


In [None]:
import requests

def doctor_info(hospital_name):

    try:
        # Format the API URL
        base_url = "http://127.0.0.1:7001"
        endpoint = f"/doctor/{hospital_name}"
        api_url = base_url + endpoint

        # Make GET request to the API
        response = requests.get(api_url)
        # Check if request was successful
        response.raise_for_status()
        # Parse the response
        data = response.json()
        return data

    except requests.exceptions.RequestException as e:
        return f"Error checking availability: {str(e)}"
    
if __name__ == "__main__":
    result2 = doctor_info("Manav Jiban Pharmacy")
    print(result2)


In [None]:
import requests

def patient_info(patient_to_attend: str):

    try:
        # Format the API URL
        base_url = "http://127.0.0.1:7000"
        endpoint = f"/patient/{patient_to_attend}"
        api_url = base_url + endpoint

        # Make GET request to the API
        response = requests.get(api_url)
        # Check if request was successful
        response.raise_for_status()
        # Parse the response
        data = response.json()
        return data

    except requests.exceptions.RequestException as e:
        return f"Error checking availability: {str(e)}"
    
if __name__ == "__main__":
    result2 = patient_info("1234567")
    print(result2)


{'message': 'Success', 'data': ['16-06-2025 12.00']}


In [12]:
from pymongo import MongoClient
import pandas as pd

# Replace with your MongoDB connection string
MONGO_URI = "mongodb+srv://amritendunath1:mcZIh40RvT5ATcaX@cluster0.c5rezyo.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"
DATABASE_NAME = "postgres" # Replace with your database name
COLLECTION_NAME = "new_tablee" # Replace with your collection name

def get_appointment_by_patient(patient_id: str):
    """
    Get appointment details by patient ID.
    """
    client  = MongoClient(MONGO_URI)
    db = client[DATABASE_NAME]
    collection = db[COLLECTION_NAME]
    try:
        # appointment = list(collection.find({"patient_to_attend": patient_id}))
        appointment = collection.find_one({"patient_to_attend": int(patient_id)})
        if appointment:
            print("Appointment found:", appointment)
            return appointment
        else:
            return "No appointment found for this patient ID."
    except Exception as e:
        return f"Error retrieving appointment: {str(e)}"
    finally:
        if client:
            client.close()
if __name__ == "__main__":

    appointment_details = get_appointment_by_patient("5560894")
    print(appointment_details)
    # result = csv_to_mongodb()


# def csv_to_mongodb():
#     """
#     Load data from CSV to MongoDB.
#     """
#     client = MongoClient(MONGO_URI)
#     db = client[DATABASE_NAME]
#     collection = db[COLLECTION_NAME]
    
#     # Read the CSV file
#     data = pd.read_csv("availability.csv")
#     if not data.empty:
#         collection.insert_many(data.to_dict('records'))
#         print(f"Successfully imported {len(data)} documents into '{COLLECTION_NAME}' collection.")
#     else:
#         print("No data found in the CSV file to import.")

    



Appointment found: {'_id': ObjectId('683ff4b1f864ea8002e0a300'), 'hospital_name': '" NRS Medical & Dental Specialists Centre"', 'doctor_name': 'john doe', 'specialization': 'general_dentist', 'date_slot': '23-05-2025 12.00', 'is_available': False, 'patient_to_attend': 5560894.0}
{'_id': ObjectId('683ff4b1f864ea8002e0a300'), 'hospital_name': '" NRS Medical & Dental Specialists Centre"', 'doctor_name': 'john doe', 'specialization': 'general_dentist', 'date_slot': '23-05-2025 12.00', 'is_available': False, 'patient_to_attend': 5560894.0}


In [32]:
import pandas as pd
import os, requests
from typing import List, Dict, Any
def check_availability_by_doctor(query_date:str, doctor_name:str, hospital_name: str) -> str:
    """
    Checking the database if we have availability for the specific doctor.
    The parameters should be mentioned by the user in the query
    """
    try:
        # Format the API URL
        base_url = "http://127.0.0.1:7000"
        endpoint = f"/availability/{query_date}/{doctor_name}/{hospital_name}"
        api_url = base_url + endpoint

        response = requests.get(api_url)
        data=response.json()
        response.raise_for_status()
        return data
    except requests.exceptions.RequestException as e:
        print(f"API request failed: {e}")
        return f"Error checking availability: {str(e)}"

if __name__ == "__main__":
    result2 = check_availability_by_doctor("16-06-2025", "john doe", "Chaitanya Hospital")
    print(result2)

{'message': 'Success', 'data': 'This availability for 16-06-2025\nAvailable slots: 11.30, 13.00, 13.30, 15.00, 15.30, 16.00, 16.30'}


In [38]:
import pandas as pd
import os, requests
from typing import List, Dict, Any
def check_availability_by_doctor(query_date:str, doctor_name:str, hospital_name: str) -> str:
    """
    Checking the database if we have availability for the specific doctor.
    The parameters should be mentioned by the user in the query
    """
    csv_path = os.path.join('..', 'availability.csv')
    df = pd.read_csv(csv_path)
    df['date_slot_time'] = df['date_slot'].apply(lambda input: input.split(' ')[-1])
    rows = list(df[
        (df['date_slot'].apply(lambda input: input.split(' ')[0]) == query_date)&
        (df['doctor_name'] == doctor_name)&
        (df['is_available'] == True)]['date_slot_time'])

    if len(rows) == 0:
        output = "No availability in the entire day"
    else:
        output = f'This availability for {query_date}\n'
        output += "Available slots: " + ', '.join(rows)

    return output


if __name__ == "__main__":
    result2 = check_availability_by_doctor("05-08-2024", "john doe", "Chaitanya Hospital")
    print(result2)

This availability for 05-08-2024
Available slots: 11.00


In [None]:
import re, os
import pandas as pd


def set_appointment(
    hospital_name: str,
    desired_date: str, 
    doctor_name: str,
    patient_to_attend: str
) -> str:
    """Book appointment with selected doctor"""
    try:
        # Read and update availability
        csv_path = os.path.join('..', 'availability.csv')
        # csv_path = 'backend\chat_sathi\src\tools\availability.csv'  # Adjust path as necessary
        if not os.path.exists(csv_path):
            return "Availability data not found. Please ensure the availability.csv file exists."
        df = pd.read_csv(csv_path)
        
        # Convert doctor name to lowercase for case-insensitive comparison
        doctor_name = doctor_name.lower()
        
        case = df[
            (df['date_slot'] == desired_date) &
            (df['doctor_name'].str.lower() == doctor_name) &
            (df['hospital_name'] == hospital_name) &
            (df['is_available'] == True)
        ]
        
        if len(case) == 0:
            if not df['doctor_name'].str.lower().str.contains(doctor_name, case=False).any():
                return f"Doctor '{doctor_name}' not found in the system."
            if not df['hospital_name'].str.contains(hospital_name, case=False).any():
                return f"Hospital '{hospital_name}' not found in the system."
            return "No available appointments for that particular date, time, and hospital"
        
        # Update availability
        df.loc[
            (df['date_slot'] == desired_date) &
            (df['doctor_name'].str.lower() == doctor_name) &
            (df['hospital_name'] == hospital_name) &
            (df['is_available'] == True),
            ['is_available', 'patient_to_attend']
        ] = [False, patient_to_attend]
        
        df.to_csv(csv_path, index=False)
        # return f"Appointment booked successfully for {doctor_name} at {hospital_name} on {desired_date} for patient ID {patient_to_attend}."
    except Exception as e:
        print(f"Error booking appointment: {e}")
        return f"I apologize, there was an error booking the appointment. Details: {e}"
    
if __name__ == "__main__":
    result = set_appointment(
        hospital_name="chaitanya hospital",
        desired_date="05-08-2024 08.00", 
        doctor_name="rajesh kumar", 
        patient_to_attend="1234567", 
    )
    print(result)  # Should print success message or error details

No available appointments for that particular date, time, and hospital
