In [54]:
import unittest
from sqlalchemy import create_engine, text
from datetime import datetime, timedelta
import pandas as pd
import logging
from streamlit.testing.v1 import AppTest
import streamlit as st
from unittest.mock import MagicMock
from mock_app import MockApp
from care_logger import (
    init_connection,
    initialize_metadata,
    populate_sample_data,
    add_resident,
    add_care_note,
    analyse_sentiment
)

In [55]:
# Set up logging for tests
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [56]:
class TestResidentManagement(unittest.TestCase):
    """
    Test suite for the Resident Management System.
    Uses unittest for structured testing and follows test-driven development principles.
    """
    
    @classmethod
    def setUpClass(cls):
        """
        Set up the test environment before any tests run.
        Creates a test database and populates it with sample data.
        """
        # Create a test database
        cls.engine = create_engine('sqlite:///test_database.db')
        cls.engine, cls.metadata, cls.residents, cls.care_notes = initialize_metadata()

        #Creating the app instance
        cls.app = AppTest.from_file("care_logger.py")
        cls.app.run()
        
        # Populate with sample data
        populate_sample_data()
        logger.info("Test database initialized and populated with sample data")

    def setUp(self):
        """
        Set up before each test method.
        Creates a new database connection for each test.
        """
        self.connection = self.engine.connect()
        logger.info("Created new connection for test")

        # Create a mock app object
        self.app = MagicMock()

    def tearDown(self):
        """
        Clean up after each test method.
        Closes the database connection.
        """
        self.connection.close()
        logger.info("Closed test connection")

    
    
    def _test_add_resident_tab(self):
        """Test the 'Add Resident' tab functionality."""
        self.app.text_input("First Name").input("John").run()
        self.app.text_input("Last Name").input("Doe").run()
        self.app.number_input("Room Number").set_value(101).run()
        
        # Simulate clicking the "Add Resident" button
        self.app.button("Add Resident").click().run()
        
        # Mocking the expected success message
        self.app.get.return_value = [{"key": "success", "value": "Resident John Doe added successfully!"}]
        
        success_message = self.app.get("success")[0]["value"]
        self.assertEqual(success_message, "Resident John Doe added successfully!")

    def test_add_care_note_ui(self):
        self.app.set_test_case("add_care_note")
        
        # Simulate adding a care note
        self.app.text_area("Care Note").input("Patient is doing well").run()
        self.app.button("Add Care Note").click()
        
        # Mocking the expected behavior of the app
        self.app.get.return_value = [{"key": "success", "value": "Care note added successfully!"}]
        
        success_message = self.app.get("success")[0]["value"]
        self.assertEqual(success_message, "Care note added successfully!")
        
    def test_sentiment_analysis_ui(self):
        """Test sentiment analysis feedback on adding a care note."""
        
        # Mock the sentiment feedback (for positive sentiment)
        self.app.get.return_value = [MagicMock(value="positive")]
        
        sentiment_feedback = self.app.get("success")[0].value
        self.assertIn("positive", sentiment_feedback, "Sentiment feedback not found")

    def test_view_residents_ui(self):
        """Test that residents table displays properly."""
        
        # Mock the residents table (simulate a row of data)
        self.app.get_dataframe.return_value = [{"first_name": "John", "last_name": "Doe", "room": 101}]
        
        dataframe = self.app.get_dataframe()
        self.assertGreater(len(dataframe), 0, "Residents table not displayed")

    def test_tab_navigation(self):
        """Test tab navigation and ensure each tab behaves as expected."""
        # Navigate to 'Add Resident' and test functionality
        self.app.session_state["current_tab"] = "Add Resident"
        self._test_add_resident_tab()

        # Navigate to 'View Residents' and test functionality
        self.app.session_state["current_tab"] = "View Residents"
        self._test_view_residents_tab()

        # Navigate to 'Add Care Note' and test functionality
        self.app.session_state["current_tab"] = "Add Care Note"
        self._test_add_care_note_tab()

        # Navigate to 'View Care Notes' and test functionality
        self.app.session_state["current_tab"] = "View Care Notes"
        self._test_view_care_notes_tab()

    def test_view_care_notes_ui(self):
        self.app.set_test_case("view_care_notes")
        
        # Mocking the care notes table data
        self.app.get_dataframe.return_value = [{"Note": "Patient is doing well", "Sentiment": "Positive"}]
        
        care_notes = self.app.get_dataframe()
        self.assertGreater(len(care_notes), 0, "Care notes table not displayed")
        
    def _test_view_residents_tab(self):
        """Test functionality in the 'View Residents' tab."""
        residents = self.app.get_dataframe()
        self.assertGreater(len(residents), 0, "Residents table not displayed")

    def _test_add_care_note_tab(self):
        """Test functionality in the 'Add Care Note' tab."""
        self.app.text_input("Care Note").input("Helped resident with lunch").run()
        self.app.button("Add Care Note").click()

        success_message = self.app.get("success")[0]["value"]
        self.assertEqual(success_message, "Care note added successfully!")

    def _test_view_care_notes_tab(self):
        """Test functionality in the 'View Care Notes' tab."""
        care_notes = self.app.get_dataframe()
        self.assertGreater(len(care_notes), 0, "Care notes table not displayed")
        self.assertEqual(care_notes[0]["Note"], "Provided assistance with meals")


    def test_basic_data_presence(self):
        """
        Test if the database contains the expected basic data.
        Verifies that both residents and care notes tables have records.
        """
        # Check residents
        resident_count = self.connection.execute(text("SELECT COUNT(*) FROM residents")).scalar()
        self.assertGreater(resident_count, 0, "No residents found in database")
        
        # Check care notes
        notes_count = self.connection.execute(text("SELECT COUNT(*) FROM care_notes")).scalar()
        self.assertGreater(notes_count, 0, "No care notes found in database")
        
        logger.info(f"Found {resident_count} residents and {notes_count} care notes")

    def test_data_quality(self):
        """
        Test the quality and completeness of the data.
        Checks for null values and data integrity.
        """
        # Check for null values in residents
        null_check_residents = """
        SELECT COUNT(*) 
        FROM residents 
        WHERE first_name IS NULL 
        OR last_name IS NULL 
        OR room_num IS NULL
        """
        null_residents = self.connection.execute(text(null_check_residents)).scalar()
        self.assertEqual(null_residents, 0, f"Found {null_residents} residents with null values")
        
        # Check for null values in care notes
        null_check_notes = """
        SELECT COUNT(*) 
        FROM care_notes 
        WHERE note_text IS NULL 
        OR sentiment_score IS NULL 
        OR timestamp IS NULL
        """
        null_notes = self.connection.execute(text(null_check_notes)).scalar()
        self.assertEqual(null_notes, 0, f"Found {null_notes} care notes with null values")
        
        logger.info("Data quality checks passed")

    def test_sentiment_ranges(self):
        """
        Test if sentiment scores are within valid ranges.
        Verifies that all sentiment scores are between -1 and 1.
        """
        query = """
        SELECT 
            MIN(sentiment_score) as min_score,
            MAX(sentiment_score) as max_score,
            AVG(sentiment_score) as avg_score
        FROM care_notes
        """
        stats = self.connection.execute(text(query)).first()
        
        self.assertGreaterEqual(stats.min_score, -1, "Sentiment score below minimum threshold")
        self.assertLessEqual(stats.max_score, 1, "Sentiment score above maximum threshold")
        
        logger.info(f"Sentiment scores range from {stats.min_score:.2f} to {stats.max_score:.2f}")

    def test_resident_care_history(self):
        """
        Test if each resident has a proper care history.
        Ensures no resident is without care notes.
        """
        query = """
        SELECT 
            r.id,
            r.first_name || ' ' || r.last_name as resident_name,
            COUNT(cn.id) as note_count
        FROM residents r
        LEFT JOIN care_notes cn ON r.id = cn.resident_id
        GROUP BY r.id
        HAVING note_count < 1
        """
        residents_without_notes = self.connection.execute(text(query)).fetchall()
        self.assertEqual(
            len(residents_without_notes), 
            0, 
            f"Found {len(residents_without_notes)} residents without care notes"
        )
        
        logger.info("All residents have care history")

    def test_room_assignments(self):
        """
        Test if room assignments are valid.
        Checks for duplicate room assignments and valid room numbers.
        """
        # Check for duplicate room assignments
        query = """
        SELECT 
            room_num,
            COUNT(*) as resident_count
        FROM residents
        GROUP BY room_num
        HAVING resident_count > 1
        """
        duplicate_rooms = self.connection.execute(text(query)).fetchall()
        self.assertEqual(
            len(duplicate_rooms), 
            0, 
            f"Found {len(duplicate_rooms)} rooms with multiple residents"
        )
        
        # Validate room numbers
        room_numbers = self.connection.execute(text("SELECT room_num FROM residents")).fetchall()
        for room in room_numbers:
            self.assertTrue(
                isinstance(room[0], int) or str(room[0]).isdigit(),
                f"Invalid room number found: {room[0]}"
            )
        
        logger.info("All room assignments are valid")

    def test_recent_activity(self):
        """
        Test if there's recent activity in the system.
        Verifies that care notes are being added regularly.
        """
        thirty_days_ago = datetime.now() - timedelta(days=30)
        
        query = """
        SELECT COUNT(*) 
        FROM care_notes
        WHERE timestamp >= :start_date
        """
        
        recent_count = self.connection.execute(
            text(query), 
            {"start_date": thirty_days_ago}
        ).scalar()
        
        self.assertGreater(
            recent_count, 
            0, 
            "No recent care notes found in the last 30 days"
        )
        
        logger.info(f"Found {recent_count} care notes in the last 30 days")

    def test_sentiment_analysis(self):
        """
        Test the sentiment analysis functionality.
        Verifies that the sentiment analyzer produces expected results for sample texts.
        """
        # Test positive sentiment
        positive_text = "Patient had a great day and participated enthusiastically in activities"
        positive_score = analyse_sentiment(positive_text)
        self.assertGreater(positive_score, 0, "Failed to detect positive sentiment")
        
        # Test negative sentiment
        negative_text = "Patient seemed very tired and refused to participate"
        negative_score = analyse_sentiment(negative_text)
        self.assertLess(negative_score, 0, "Failed to detect negative sentiment")
        
        # Test neutral sentiment
        neutral_text = "Patient attended lunch in the dining room"
        neutral_score = analyse_sentiment(neutral_text)
        self.assertTrue(
            -0.1 <= neutral_score <= 0.1, 
            "Failed to identify neutral sentiment"
        )
        
        logger.info("Sentiment analysis tests passed")
    
    def test_add_resident(self):
        """
        Test adding a new resident through direct SQL execution, matching the
        implementation in the Streamlit interface.
        Verifies that the resident is properly added with all required fields.
        """
        # Test data
        first_name = "Jane"
        last_name = "Smith"
        room_num = 301
        
        # Execute the insert matching the Streamlit implementation
        with self.engine.connect() as conn:
            conn.execute(
                text("INSERT INTO residents (first_name, last_name, room_num) VALUES (:first, :last, :room)"),
                {"first": first_name, "last": last_name, "room": room_num}
            )
            
            # Verify the insertion
            result = conn.execute(
                text("SELECT first_name, last_name, room_num FROM residents WHERE first_name = :first AND last_name = :last"),
                {"first": first_name, "last": last_name}
            ).first()
        
        # Verify all fields match
        self.assertIsNotNone(result, "Resident was not added to database")
        self.assertEqual(result.first_name, first_name)
        self.assertEqual(result.last_name, last_name)
        self.assertEqual(result.room_num, room_num)
        
        logger.info(f"Successfully added and verified resident {first_name} {last_name}")
    
    def test_add_care_note(self):
        """
        Test adding a care note through direct SQL execution, matching the
        implementation in the Streamlit interface.
        Verifies that care notes are properly added with all required fields.
        """
        # First add a test resident
        with self.engine.connect() as conn:
            conn.execute(
                text("INSERT INTO residents (first_name, last_name, room_num) VALUES (:first, :last, :room)"),
                {"first": "Test", "last": "Patient", "room": 302}
            )
            
            # Get the resident's ID
            resident_id = conn.execute(
                text("SELECT id FROM residents WHERE first_name = 'Test' AND last_name = 'Patient'")
            ).scalar()
        
        # Prepare care note data
        note_text = "Patient participated well in morning activities"
        staff_name = "Nurse Johnson"
        sentiment_score = 0.8
        timestamp = datetime.now()
        
        # Add the care note matching Streamlit implementation
        with self.engine.connect() as conn:
            conn.execute(
                text("""
                    INSERT INTO care_notes 
                    (resident_id, note_text, staff_name, sentiment_score, timestamp)
                    VALUES (:id, :note, :staff, :sentiment, :timestamp)
                """),
                {
                    "id": resident_id,
                    "note": note_text,
                    "staff": staff_name,
                    "sentiment": sentiment_score,
                    "timestamp": timestamp
                }
            )
            
            # Verify the insertion
            result = conn.execute(
                text("""
                    SELECT resident_id, note_text, staff_name, sentiment_score, timestamp
                    FROM care_notes
                    WHERE resident_id = :id AND note_text = :note
                """),
                {"id": resident_id, "note": note_text}
            ).first()
        
        # Verify all fields are present and correct
        self.assertIsNotNone(result, "Care note was not added to database")
        self.assertEqual(result.resident_id, resident_id)
        self.assertEqual(result.note_text, note_text)
        self.assertEqual(result.staff_name, staff_name)
        self.assertEqual(result.sentiment_score, sentiment_score)
        self.assertIsNotNone(result.timestamp)
        
        logger.info(f"Successfully added and verified care note for resident {resident_id}")


In [57]:
def run_all_tests():
    """
    Run all tests and provide a summary of results.
    """
    # Create a test suite
    suite = unittest.TestLoader().loadTestsFromTestCase(TestResidentManagement)
    
    # Run the tests with a test runner that provides detailed output
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(suite)
    
    # Print summary
    print("\nTest Summary:")
    print(f"Tests run: {result.testsRun}")
    print(f"Failures: {len(result.failures)}")
    print(f"Errors: {len(result.errors)}")
    
    return result.wasSuccessful()

if __name__ == '__main__':
    success = run_all_tests()
    exit(0 if success else 1)

INFO:care_logger:Database connection successful
INFO:care_logger:Database schema created successfully
INFO:__main__:Database connection successful
INFO:__main__:Database schema created successfully
INFO:__main__:Database connection successful
INFO:__main__:Database connection successful
INFO:care_logger:Database connection successful
INFO:__main__:Test database initialized and populated with sample data
test_add_care_note (__main__.TestResidentManagement.test_add_care_note)
Test adding a care note through direct SQL execution, matching the ... INFO:__main__:Created new connection for test
INFO:__main__:Successfully added and verified care note for resident 2
INFO:__main__:Closed test connection
ok
test_add_care_note_ui (__main__.TestResidentManagement.test_add_care_note_ui) ... INFO:__main__:Created new connection for test
INFO:__main__:Closed test connection
ok
test_add_resident (__main__.TestResidentManagement.test_add_resident)
Test adding a new resident through direct SQL execution


Test Summary:
Tests run: 14
Failures: 1
Errors: 0
