In [1]:
import sys
sys.path.append('..')

from src.config import SnowflakeConfig, validate_config
from src.utils.snowflake_helper import SnowflakeHelper
from snowflake.cortex import complete, extract_answer, summarize
from snowflake.core import Root
from datetime import datetime

# Validate and connect
validate_config()
config = SnowflakeConfig()
sf_helper = SnowflakeHelper(config.get_connection_params())
session = sf_helper.connect()

print("✓ Cortex Search Service Test initialized")

✓ Configuration validated successfully
✓ Connected to Snowflake as harismad
  Role: "DEV_ROLE"
  Warehouse: "TEST_WAREHOUSE"
  Database: "TEST_DATABASE"
  Schema: "TEST_SCHEMA"
✓ Cortex Search Service Test initialized


In [3]:
# ============================================================================
# PART 1: Prepare Search Content for SOP and Facility (Cortex Search Services)
# ============================================================================

def prepare_search_content():
    """Prepare and enrich content for better search results"""

    # Combine SOP content with metadata for better search
    query = """
            SELECT SOP_ID,
                   SOP_TITLE || ' - ' || SOP_CATEGORY || '. ' || SOP_CONTENT as SEARCH_CONTENT,
                   SOP_TITLE,
                   SOP_CATEGORY,
                   DEPARTMENT,
                   SOP_CONTENT,
                   LAST_UPDATED
            FROM hospital_sop
            """

    sop_df = sf_helper.execute_query(query)
    print(f"✓ Prepared {len(sop_df)} SOP records for search")

    # Create enriched facility descriptions
    query_facility = """
                     SELECT FACILITY_ID,
                            FACILITY_NAME || ' is a ' || FACILITY_TYPE ||
                            ' located at ' || LOCATION ||
                            '. Operating hours: ' || OPERATING_HOURS ||
                            '. Contact: ' || CONTACT_INFO as SEARCH_CONTENT,
                            FACILITY_NAME,
                            FACILITY_TYPE,
                            LOCATION,
                            CAPACITY,
                            CURRENT_USAGE,
                            OPERATING_HOURS,
                            STATUS
                     FROM hospital_facilities
                     """

    facility_df = sf_helper.execute_query(query_facility)
    print(f"✓ Prepared {len(facility_df)} facility records for search")

    return sop_df, facility_df


sop_df, facility_df = prepare_search_content()

# Display samples
print("\nSOP Search Content Sample:")
print(sop_df[['SOP_ID', 'SOP_TITLE']].head(3))

print("\nFacility Search Content Sample:")
print(facility_df[['FACILITY_ID', 'FACILITY_NAME']].head(3))

✓ Prepared 40 SOP records for search
✓ Prepared 40 facility records for search

SOP Search Content Sample:
     SOP_ID                             SOP_TITLE
0  SOP-0001            Triage Assessment Protocol
1  SOP-0002             Patient Admission Process
2  SOP-0003  Medication Administration Guidelines

Facility Search Content Sample:
  FACILITY_ID     FACILITY_NAME
0    FAC-0001  Operating Room 1
1    FAC-0002  Operating Room 2
2    FAC-0003  Operating Room 3


In [4]:
# ============================================================================
# PART 2: Create Cortex Search Services for SOP
# ============================================================================

# Create a view with search-optimized content
create_sop_search_view = """
                         CREATE OR REPLACE VIEW sop_search_view AS
                         SELECT SOP_ID,
                                SOP_TITLE,
                                SOP_CATEGORY,
                                DEPARTMENT,
                                SOP_CONTENT,
                                SOP_TITLE || ' - ' || SOP_CATEGORY || '. ' ||
                                'Department: ' || DEPARTMENT || '. ' ||
                                SOP_CONTENT as SEARCH_DOCUMENT,
                                LAST_UPDATED,
                                VERSION
                         FROM hospital_sop
                         """

try:
    session.sql(create_sop_search_view).collect()
    print("✓ Created SOP search view")
except Exception as e:
    print(f"Error creating view: {e}")

# Create Cortex Search Service for SOPs
create_sop_search_service = """
CREATE OR REPLACE CORTEX SEARCH SERVICE sop_search_service
ON SEARCH_DOCUMENT
ATTRIBUTES SOP_ID, SOP_TITLE, SOP_CATEGORY, DEPARTMENT, SOP_CONTENT
WAREHOUSE = TEST_WAREHOUSE
TARGET_LAG = '1 minute'
AS (
    SELECT
        SOP_ID,
        SOP_TITLE,
        SOP_CATEGORY,
        DEPARTMENT,
        SOP_CONTENT,
        SEARCH_DOCUMENT
    FROM sop_search_view
)
"""

try:
    session.sql(create_sop_search_service).collect()
    print("✓ Created Cortex Search Service: sop_search_service")
    print("  Note: Service may take a few minutes to build index")
except Exception as e:
    print(f"Note: {e}")
    print("  Search service might already exist or needs permissions")

✓ Created SOP search view
✓ Created Cortex Search Service: sop_search_service
  Note: Service may take a few minutes to build index


In [5]:
# ============================================================================
# PART 3: Create Cortex Search Services for Facility
# ============================================================================

create_facility_search_view = """
                              CREATE OR REPLACE VIEW facility_search_view AS
                              SELECT FACILITY_ID,
                                     FACILITY_NAME,
                                     FACILITY_TYPE,
                                     LOCATION,
                                     FACILITY_NAME || ' is a ' || FACILITY_TYPE ||
                                     ' located at ' || LOCATION ||
                                     '. Capacity: ' || CAPACITY ||
                                     '. Operating hours: ' || OPERATING_HOURS ||
                                     '. Equipment: ' || EQUIPMENT_LIST ||
                                     '. Contact: ' || CONTACT_INFO as SEARCH_DOCUMENT,
                                     CAPACITY,
                                     CURRENT_USAGE,
                                     OPERATING_HOURS,
                                     CONTACT_INFO,
                                     STATUS
                              FROM hospital_facilities
                              WHERE STATUS = 'OPERATIONAL'
                              """

try:
    session.sql(create_facility_search_view).collect()
    print("✓ Created facility search view")
except Exception as e:
    print(f"Error creating view: {e}")

create_facility_search_service = """
CREATE OR REPLACE CORTEX SEARCH SERVICE facility_search_service
ON SEARCH_DOCUMENT
ATTRIBUTES FACILITY_ID, FACILITY_NAME, FACILITY_TYPE, LOCATION, CAPACITY, OPERATING_HOURS
WAREHOUSE = TEST_WAREHOUSE
TARGET_LAG = '1 minute'
AS (
    SELECT
        FACILITY_ID,
        FACILITY_NAME,
        FACILITY_TYPE,
        LOCATION,
        CAPACITY,
        CURRENT_USAGE,
        OPERATING_HOURS,
        CONTACT_INFO,
        STATUS,
        SEARCH_DOCUMENT
    FROM facility_search_view
)
"""

try:
    session.sql(create_facility_search_service).collect()
    print("✓ Created Cortex Search Service: facility_search_service")
except Exception as e:
    print(f"Note: {e}")

✓ Created facility search view
✓ Created Cortex Search Service: facility_search_service


In [7]:
# ============================================================================
# PART 4: Create Custom Functions for Doctor Schedule (SQL Functions)
# ============================================================================

print("\n=== Creating Custom Functions for Doctor Schedule ===")

# Function 1: Search doctors by specialization
create_search_doctors_by_specialization = """
CREATE OR REPLACE FUNCTION search_doctors_by_specialization(specialization_query VARCHAR)
RETURNS TABLE (
    doctor_id VARCHAR,
    doctor_name VARCHAR,
    specialization VARCHAR,
    available_days VARCHAR,
    total_slots INTEGER
)
AS
$$
    SELECT
        DOCTOR_ID,
        DOCTOR_NAME,
        SPECIALIZATION,
        LISTAGG(DISTINCT DAY_OF_WEEK, ', ') WITHIN GROUP (ORDER BY
            CASE DAY_OF_WEEK
                WHEN 'Monday' THEN 1
                WHEN 'Tuesday' THEN 2
                WHEN 'Wednesday' THEN 3
                WHEN 'Thursday' THEN 4
                WHEN 'Friday' THEN 5
                WHEN 'Saturday' THEN 6
                ELSE 7
            END
        ) as available_days,
        COUNT(*) as total_slots
    FROM doctor_schedule
    WHERE UPPER(SPECIALIZATION) LIKE UPPER('%' || specialization_query || '%')
        AND STATUS = 'AVAILABLE'
    GROUP BY DOCTOR_ID, DOCTOR_NAME, SPECIALIZATION
    ORDER BY DOCTOR_NAME
$$
"""

try:
    session.sql(create_search_doctors_by_specialization).collect()
    print("✓ Created function: search_doctors_by_specialization")
except Exception as e:
    print(f"Note: {e}")

# Function 2: Get doctor schedule by day
create_get_doctor_schedule_by_day = """
CREATE OR REPLACE FUNCTION get_doctor_schedule_by_day(day_name VARCHAR)
RETURNS TABLE (
    schedule_id VARCHAR,
    doctor_name VARCHAR,
    specialization VARCHAR,
    start_time TIME,
    end_time TIME,
    room_number VARCHAR,
    available_slots INTEGER,
    status VARCHAR
)
AS
$$
    SELECT
        SCHEDULE_ID,
        DOCTOR_NAME,
        SPECIALIZATION,
        START_TIME,
        END_TIME,
        ROOM_NUMBER,
        (MAX_PATIENTS - BOOKED_PATIENTS) as available_slots,
        STATUS
    FROM doctor_schedule
    WHERE UPPER(DAY_OF_WEEK) = UPPER(day_name)
    ORDER BY START_TIME, DOCTOR_NAME
$$
"""

try:
    session.sql(create_get_doctor_schedule_by_day).collect()
    print("✓ Created function: get_doctor_schedule_by_day")
except Exception as e:
    print(f"Note: {e}")

# Function 3: Find available doctors by specialization and day
create_find_available_doctors = """
CREATE OR REPLACE FUNCTION find_available_doctors(
    specialization_query VARCHAR,
    day_name VARCHAR
)
RETURNS TABLE (
    doctor_name VARCHAR,
    specialization VARCHAR,
    start_time TIME,
    end_time TIME,
    room_number VARCHAR,
    available_slots INTEGER
)
AS
$$
    SELECT
        DOCTOR_NAME,
        SPECIALIZATION,
        START_TIME,
        END_TIME,
        ROOM_NUMBER,
        (MAX_PATIENTS - BOOKED_PATIENTS) as available_slots
    FROM doctor_schedule
    WHERE UPPER(SPECIALIZATION) LIKE UPPER('%' || specialization_query || '%')
        AND UPPER(DAY_OF_WEEK) = UPPER(day_name)
        AND STATUS = 'AVAILABLE'
        AND (MAX_PATIENTS - BOOKED_PATIENTS) > 0
    ORDER BY START_TIME, DOCTOR_NAME
$$
"""

try:
    session.sql(create_find_available_doctors).collect()
    print("✓ Created function: find_available_doctors")
except Exception as e:
    print(f"Note: {e}")



=== Creating Custom Functions for Doctor Schedule ===
✓ Created function: search_doctors_by_specialization
✓ Created function: get_doctor_schedule_by_day
✓ Created function: find_available_doctors


In [8]:
# ============================================================================
# PART 5: Create Custom Functions for Appointments (SQL Functions)
# ============================================================================

print("\n=== Creating Custom Functions for Appointments ===")

# Function 1: Get upcoming appointments
create_get_upcoming_appointments = """
CREATE OR REPLACE FUNCTION get_upcoming_appointments(days_ahead INTEGER)
RETURNS TABLE (
    appointment_id VARCHAR,
    patient_id VARCHAR,
    doctor_name VARCHAR,
    specialization VARCHAR,
    appointment_date DATE,
    appointment_time TIME,
    room_number VARCHAR,
    status VARCHAR
)
AS
$$
    SELECT
        a.APPOINTMENT_ID,
        a.PATIENT_ID,
        d.DOCTOR_NAME,
        d.SPECIALIZATION,
        a.APPOINTMENT_DATE,
        a.APPOINTMENT_TIME,
        d.ROOM_NUMBER,
        a.STATUS
    FROM appointments a
    JOIN doctor_schedule d ON a.SCHEDULE_ID = d.SCHEDULE_ID
    WHERE a.APPOINTMENT_DATE BETWEEN CURRENT_DATE() AND DATEADD(day, days_ahead, CURRENT_DATE())
        AND a.STATUS = 'SCHEDULED'
    ORDER BY a.APPOINTMENT_DATE, a.APPOINTMENT_TIME
$$
"""

try:
    session.sql(create_get_upcoming_appointments).collect()
    print("✓ Created function: get_upcoming_appointments")
except Exception as e:
    print(f"Note: {e}")

# Function 2: Get patient appointment history
create_get_patient_appointments = """
CREATE OR REPLACE FUNCTION get_patient_appointments(patient_id_param VARCHAR)
RETURNS TABLE (
    appointment_id VARCHAR,
    doctor_name VARCHAR,
    specialization VARCHAR,
    appointment_date DATE,
    appointment_time TIME,
    status VARCHAR,
    created_at TIMESTAMP_NTZ
)
AS
$$
    SELECT
        a.APPOINTMENT_ID,
        d.DOCTOR_NAME,
        d.SPECIALIZATION,
        a.APPOINTMENT_DATE,
        a.APPOINTMENT_TIME,
        a.STATUS,
        a.CREATED_AT
    FROM appointments a
    JOIN doctor_schedule d ON a.SCHEDULE_ID = d.SCHEDULE_ID
    WHERE a.PATIENT_ID = patient_id_param
    ORDER BY a.APPOINTMENT_DATE DESC, a.APPOINTMENT_TIME DESC
$$
"""

try:
    session.sql(create_get_patient_appointments).collect()
    print("✓ Created function: get_patient_appointments")
except Exception as e:
    print(f"Note: {e}")

# Function 3: Get appointment statistics by date range
create_get_appointment_stats = """
CREATE OR REPLACE FUNCTION get_appointment_stats(
    start_date DATE,
    end_date DATE
)
RETURNS TABLE (
    total_appointments INTEGER,
    scheduled INTEGER,
    completed INTEGER,
    cancelled INTEGER,
    no_show INTEGER,
    top_specialization VARCHAR
)
AS
$$
    WITH stats AS (
        SELECT
            COUNT(*) as total_appointments,
            SUM(CASE WHEN a.STATUS = 'SCHEDULED' THEN 1 ELSE 0 END) as scheduled,
            SUM(CASE WHEN a.STATUS = 'COMPLETED' THEN 1 ELSE 0 END) as completed,
            SUM(CASE WHEN a.STATUS = 'CANCELLED' THEN 1 ELSE 0 END) as cancelled,
            SUM(CASE WHEN a.STATUS = 'NO_SHOW' THEN 1 ELSE 0 END) as no_show
        FROM appointments a
        WHERE a.APPOINTMENT_DATE BETWEEN start_date AND end_date
    ),
    top_spec AS (
        SELECT d.SPECIALIZATION
        FROM appointments a
        JOIN doctor_schedule d ON a.SCHEDULE_ID = d.SCHEDULE_ID
        WHERE a.APPOINTMENT_DATE BETWEEN start_date AND end_date
        GROUP BY d.SPECIALIZATION
        ORDER BY COUNT(*) DESC
        LIMIT 1
    )
    SELECT
        s.total_appointments,
        s.scheduled,
        s.completed,
        s.cancelled,
        s.no_show,
        t.SPECIALIZATION as top_specialization
    FROM stats s
    CROSS JOIN top_spec t
$$
"""

try:
    session.sql(create_get_appointment_stats).collect()
    print("✓ Created function: get_appointment_stats")
except Exception as e:
    print(f"Note: {e}")


=== Creating Custom Functions for Appointments ===
✓ Created function: get_upcoming_appointments
✓ Created function: get_patient_appointments
✓ Created function: get_appointment_stats


In [12]:
# ============================================================================
# PART 6: Python Wrapper Functions for Easy Testing
# ============================================================================

def cortex_search_sop(query: str, limit: int = 5):
    """Search SOP using Cortex Search Service"""
    root = Root(session)
    my_service = (
        root
        .databases["TEST_DATABASE"]
        .schemas["TEST_SCHEMA"]
        .cortex_search_services["SOP_SEARCH_SERVICE"]
    )
    resp = my_service.search(
        query=query,
        columns=["SEARCH_DOCUMENT", "SOP_ID", "SOP_TITLE", "SOP_CATEGORY"],
        limit=limit,
    )
    return resp


def cortex_search_facility(query: str, limit: int = 5):
    """Search Facility using Cortex Search Service"""
    root = Root(session)
    my_service = (
        root
        .databases["TEST_DATABASE"]
        .schemas["TEST_SCHEMA"]
        .cortex_search_services["FACILITY_SEARCH_SERVICE"]
    )
    resp = my_service.search(
        query=query,
        columns=["SEARCH_DOCUMENT", "FACILITY_ID", "FACILITY_NAME", "FACILITY_TYPE", "LOCATION", "CAPACITY"],
        limit=limit,
    )
    return resp


def search_doctors_by_specialization(specialization: str):
    """Search doctors by specialization using custom function"""
    query = f"SELECT * FROM TABLE(search_doctors_by_specialization('{specialization}'))"
    return sf_helper.execute_query(query)


def get_doctor_schedule_by_day(day: str):
    """Get doctor schedules for a specific day"""
    query = f"SELECT * FROM TABLE(get_doctor_schedule_by_day('{day}'))"
    return sf_helper.execute_query(query)


def find_available_doctors(specialization: str, day: str):
    """Find available doctors by specialization and day"""
    query = f"SELECT * FROM TABLE(find_available_doctors('{specialization}', '{day}'))"
    return sf_helper.execute_query(query)


def get_upcoming_appointments(days_ahead: int = 7):
    """Get upcoming appointments"""
    query = f"SELECT * FROM TABLE(get_upcoming_appointments({days_ahead}))"
    return sf_helper.execute_query(query)


def get_patient_appointments(patient_id: str):
    """Get all appointments for a patient"""
    query = f"SELECT * FROM TABLE(get_patient_appointments('{patient_id}'))"
    return sf_helper.execute_query(query)


def get_appointment_stats(start_date: str, end_date: str):
    """Get appointment statistics for date range"""
    query = f"SELECT * FROM TABLE(get_appointment_stats(TO_DATE('{start_date}'), TO_DATE('{end_date}')))"
    return sf_helper.execute_query(query)


In [13]:
# ============================================================================
# PART 7: Display Functions for Clean Output
# ============================================================================

def print_clean_sop_results(result):
    """Pretty print SOP search results"""
    results = result.to_dict().get('results', [])
    for idx, sop in enumerate(results, 1):
        print(f"{idx}. {sop['SOP_TITLE']} [{sop['SOP_CATEGORY']}] (ID: {sop['SOP_ID']})")
        snippet = sop['SEARCH_DOCUMENT'][:100] + "..." if 'SEARCH_DOCUMENT' in sop else ""
        score = sop.get('@scores', {}).get('cosine_similarity', None)
        if score is not None:
            print(f"   Similarity: {score:.2f}")
        print(f"   Snippet: {snippet}\n")


def print_clean_facility_results(result):
    """Pretty print Facility search results"""
    results = result.to_dict().get('results', [])
    for idx, fac in enumerate(results, 1):
        print(f"{idx}. {fac['FACILITY_NAME']} [{fac['FACILITY_TYPE']}] (ID: {fac['FACILITY_ID']})")
        print(f"   Location: {fac['LOCATION']}")
        print(f"   Capacity: {fac['CAPACITY']}")
        snippet = fac['SEARCH_DOCUMENT'][:100] + "..." if 'SEARCH_DOCUMENT' in fac else ""
        score = fac.get('@scores', {}).get('cosine_similarity', None)
        if score is not None:
            print(f"   Similarity: {score:.2f}")
        print(f"   Snippet: {snippet}\n")

In [14]:
# ============================================================================
# PART 8: Comprehensive Testing
# ============================================================================

print("\n" + "="*80)
print("=== TESTING ALL SEARCH SERVICES AND CUSTOM TOOLS ===")
print("="*80)

# Test 1: SOP Search
print("\n📋 TEST 1: SOP Search - 'How to handle patient emergencies?'")
print("-" * 80)
result1 = cortex_search_sop("How to handle patient emergencies?", limit=3)
if result1 is not None:
    print_clean_sop_results(result1)
else:
    print("  Note: Search service might still be indexing. Try again in a few minutes.")

# Test 2: Facility Search
print("\n🏥 TEST 2: Facility Search - 'MRI scanner'")
print("-" * 80)
result2 = cortex_search_facility("MRI scanner", limit=3)
if result2 is not None:
    print_clean_facility_results(result2)
else:
    print("  Note: Search service might still be indexing.")

# Test 3: Doctor Search by Specialization
print("\n👨‍⚕️ TEST 3: Search Doctors - 'Cardiologist'")
print("-" * 80)
doctors = search_doctors_by_specialization("Cardio")
print(doctors)

# Test 4: Doctor Schedule by Day
print("\n📅 TEST 4: Doctor Schedule - 'Monday'")
print("-" * 80)
schedule = get_doctor_schedule_by_day("Monday")
print(schedule.head())

# Test 5: Find Available Doctors
print("\n🔍 TEST 5: Available Doctors - 'Pediatrician on Wednesday'")
print("-" * 80)
available = find_available_doctors("Pediatric", "Wednesday")
print(available)

# Test 6: Upcoming Appointments
print("\n📆 TEST 6: Upcoming Appointments (Next 7 days)")
print("-" * 80)
upcoming = get_upcoming_appointments(7)
print(upcoming.head(10))

# Test 7: Appointment Statistics
print("\n📊 TEST 7: Appointment Statistics (This month)")
print("-" * 80)
from datetime import date
start = date.today().replace(day=1).strftime('%Y-%m-%d')
end = date.today().strftime('%Y-%m-%d')
stats = get_appointment_stats(start, end)
print(stats)

print("\n" + "="*80)
print("✓ All tests completed!")
print("="*80)


=== TESTING ALL SEARCH SERVICES AND CUSTOM TOOLS ===

📋 TEST 1: SOP Search - 'How to handle patient emergencies?'
--------------------------------------------------------------------------------
1. Fire Emergency Evacuation [Emergency Procedures] (ID: SOP-0019)
   Similarity: 0.52
   Snippet: Fire Emergency Evacuation - Emergency Procedures. Department: Hospital-Wide. Follow RACE protocol: R...

2. Medication Administration Guidelines [Patient Care] (ID: SOP-0003)
   Similarity: 0.52
   Snippet: Medication Administration Guidelines - Patient Care. Department: Emergency. Follow five rights: righ...

3. Mass Casualty Incident Response [Emergency Procedures] (ID: SOP-0018)
   Similarity: 0.50
   Snippet: Mass Casualty Incident Response - Emergency Procedures. Department: Emergency. Activate hospital inc...


🏥 TEST 2: Facility Search - 'MRI scanner'
--------------------------------------------------------------------------------
1. Operating Room 4 [Operating Room] (ID: FAC-0004)
   Loca