In [1]:
# ----- Notes on running this Jupyter Notebook ---- #

#Run the code cells one at a time, to first initialize the applications and databases
#Then to run the unit tests
#Then to finally run the web application after the unit tests
#The command to run the web application and use it is in the last code cell of this file

In [2]:
# Importing essential libraries
from flask import Flask, render_template, request, redirect, url_for, flash, session, redirect
import logging, re, os
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash 
import sqlite3
import csv
import pymongo
import math

# ------------------------------------
#Initialize Flask Application
app = Flask(__name__)
# ------------------------------------


# ------------------------------------
# In production set SECRET_KEY via environment variable
#Strong secret key for session cookies
app.secret_key = os.environ.get("SECRET_KEY", "3e9f6228a53b88057f78424ab609311b")
# ------------------------------------

# ------------------------------------
# Database path
DB_PATH = "hospitaldata.db"
CSV_PATH = "healthcare-dataset-stroke-data.csv"
# ------------------------------------

# ------------------------------------
#Creating user table if they don't exist
def init_db():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS user (
        ID INTEGER PRIMARY KEY AUTOINCREMENT,
        First_name TEXT NOT NULL,
        Last_name TEXT NOT NULL,
        username TEXT NOT NULL,
        email TEXT NOT NULL,
        password TEXT NOT NULL,
        role TEXT NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """)

    #Case-sensitive uniqueness on email
    cursor.execute("""
    CREATE UNIQUE INDEX IF NOT EXISTS idx_users_email_nocase
    ON user (lower(email));
    """)

    # ----- Inserting 5 new admins into the table ----
    admins = [
        {'first':'Charlie', 'last':'Alpha', 'user': 'admin1', 'email':'admin1@hospital.com', 'pass':'Admin123!'},
        {'first':'Max', 'last':'Beta', 'user': 'admin2', 'email':'admin2@hospital.com', 'pass':'Admin123?'},
        {'first':'Jade', 'last':'Gamma', 'user': 'admin3', 'email':'admin3@hospital.com', 'pass':'Admin456!'},
        {'first':'Amy', 'last':'Delta', 'user': 'admin4', 'email':'admin4@hospital.com', 'pass':'Admin456?'},
        {'first':'Bob', 'last':'Epsilon', 'user': 'admin5', 'email':'admin5@hospital.com', 'pass':'Admin789!'},
    ]

    #Loop to iterate through the list of admins for database insertion
    for admin in admins:
        cursor.execute("SELECT 1 FROM user WHERE username = ?", (admin['user'],))

        if cursor.fetchone() is None:

            #hash the passwords for security 
            hashed_pw = generate_password_hash(admin['pass'], method="pbkdf2:sha256")
            insert_query = """
            INSERT INTO user (First_name, Last_name, username, email, password, role)
            VALUES (?, ?, ?, ?, ?, ?)
            """

            #Preparing the tuple for insertion:
            user_values = (
                admin['first'],
                admin['last'],
                admin['user'],
                admin['email'],
                hashed_pw,
                'admin' #hardcoding the role as admin
            )

            cursor.execute(insert_query, user_values)
            print(f"Created user: {admin['user']}")
        else:
            print(f"User already exists: {admin['user']}")

    # ----- Inserting 5 new doctors into the table ----
    doctors = [
        {'first':'Helen', 'last':'Brown', 'user': 'hbrown', 'email':'doctor1@hospital.com', 'pass':'Doctor123!'},
        {'first':'Taylor', 'last':'Jones', 'user': 'tjones', 'email':'doctor2@hospital.com', 'pass':'Doctor123?'},
        {'first':'George', 'last':'White', 'user': 'gwhite', 'email':'doctor3@hospital.com', 'pass':'Doctor456!'},
        {'first':'Freya', 'last':'Rose', 'user': 'frose', 'email':'doctor4@hospital.com', 'pass':'Doctor456?'},
        {'first':'Luke', 'last':'Chester', 'user': 'lchester', 'email':'doctor5@hospital.com', 'pass':'Doctor789!'},
    ]

    #Loop to iterate through the list of admins for database insertion
    for doctor in doctors:
        cursor.execute("SELECT 1 FROM user WHERE username = ?", (doctor['user'],))

        if cursor.fetchone() is None:

            #hash the passwords for security 
            hash_pw = generate_password_hash(doctor['pass'], method="pbkdf2:sha256")
            insert_query2 = """
            INSERT INTO user (First_name, Last_name, username, email, password, role)
            VALUES (?, ?, ?, ?, ?, ?)
            """

            #Preparing the tuple for insertion:
            user_values2 = (
                doctor['first'],
                doctor['last'],
                doctor['user'],
                doctor['email'],
                hash_pw,
                'doctor' #hardcoding the role as admin
            )

            cursor.execute(insert_query2, user_values2)
            print(f"Created user: {doctor['user']}")
        else:
            print(f"User already exists: {doctor['user']}")

    # ----- Setting up the Patient data table ----- #
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS patient (
        id INTEGER PRIMARY KEY,
        gender TEXT NOT NULL,
        age INTEGER NOT NULL,
        hypertension INTEGER NOT NULL,
        heart_disease INTEGER NOT NULL,
        ever_married TEXT NOT NULL,
        work_type TEXT NOT NULL,
        Residence_type TEXT NOT NULL,
        avg_glucose_level REAL NOT NULL,
        bmi TEXT NOT NULL,
        smoking_status TEXT NOT NULL,
        stroke INTEGER NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """)

    if os.path.exists(CSV_PATH):
        #Check if table is empty so we don't duplicate data every time this is run
        cursor.execute("SELECT count(*) FROM patient")
        count = cursor.fetchone()[0]

        if count == 0:
            print("Reading patient data from CSV file")
            with open(CSV_PATH, 'r', encoding ='utf-8') as f:
                csv_reader = csv.DictReader(f)

                rows_to_insert = []
                for row in csv_reader:
                    data_tuple = (
                        row['id'],
                        row['gender'],
                        row['age'],
                        row['hypertension'],
                        row['heart_disease'],
                        row['ever_married'],
                        row['work_type'],
                        row['Residence_type'],
                        row['avg_glucose_level'],
                        row['bmi'],
                        row['smoking_status'],
                        row['stroke'],
                    )
                    rows_to_insert.append(data_tuple)

                #Using executemany to insert bulk data
                cursor.executemany("""
                    INSERT INTO patient (id, gender, age, hypertension, heart_disease, ever_married, work_type, Residence_type, avg_glucose_level, bmi, smoking_status, stroke)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """, rows_to_insert)
                print(f" Successfully inserted {len(rows_to_insert)} patients.")
        else:
            print("Patient table already has data. Skipping CSV data import.")
    else:
        print(f"WARNING: Could not find {CSV_PATH}. Skipping patient data import.")

    conn.commit()
    conn.close()

# ------------------------------------
# MongoDB Cluster Connection
MONGO_URI = "mongodb+srv://admin:Hospital123XYZ!@cluster0.aaf6kly.mongodb.net/?appName=Cluster0"

#Function for getting report ids
def get_next_report_id():
    client = pymongo.MongoClient(MONGO_URI)
    db = client['hospital_db']
    counters = db['counters']

    sequence_document = counters.find_one_and_update(
        {"_id": "report_id"},
        {"$inc": {"sequence_value": 1}},
        return_document=True
    )
    new_number = sequence_document["sequence_value"]
    return f"MR-{new_number}"

#Connecting to MongoDB and inserting sample medical records if collection is empty
def init_mongo_db():

    try:
        #Connecting to the database
        client = pymongo.MongoClient(MONGO_URI)
        db = client['hospital_db']
        collection = db['medical_reports']
        counters = db['counters']

        #Initialising counter to create unique medical report id only if it doesn't exist
        #Using a specific ID 'report_id' to track medical reports
        if counters.find_one({"_id": "report_id"}) is None:
            counters.insert_one({
                "_id": "report_id",
                "sequence_value": 100
            })
            print("Counter is initialised at 100")

        #Check if data already exists to avoid duplicates
        if collection.count_documents({}) > 0:
            print("MongoDB collection: 'medical_reports' collecting already has data. Skipping collection populating" )
            return
        
        #List of reports: created 2 medical reports per doctor for each of the 5 doctors created in earlier code. 
        #Using doctor's username as their 'doctor_id' for report as this is a unique string and cannot be changed.
        sample_reports = [

            #report 1
            {
                "report_id": get_next_report_id(),
                "patient_id": "67",
                "doctor_id": "hbrown",
                "doctor_notes": "Persistent cough for 5 days. Mild wheezing on left side.",
                "stroke_prediction": "0.25",
                "treatment": "Albuterol inhaler as needed. Rest and increased fluids.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 2
            {
                "report_id": get_next_report_id(),
                "patient_id": "77",
                "doctor_id": "hbrown",
                "doctor_notes": "Blood pressure 145/92 during check. Patient reports high stress levels.",
                "stroke_prediction": "0.5",
                "treatment": "Pescription 10mg daily. Referral to nutritionist.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 3
            {
                "report_id": get_next_report_id(),
                "patient_id": "84",
                "doctor_id": "tjones",
                "doctor_notes": "Severe throbbing pain on right side of head, sensitivity to light.",
                "stroke_prediction": "0.75",
                "treatment": "Pescription 50mg daily. Rest and increased fluids.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 4
            {
                "report_id": get_next_report_id(),
                "patient_id": "91",
                "doctor_id": "tjones",
                "doctor_notes": "Lower back pain after living heavy boxes. Impaired vision",
                "stroke_prediction": "0.8",
                "treatment": "Muscle relaxants and physical therapy refferal.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 5
            {
                "report_id": get_next_report_id(),
                "patient_id": "99",
                "doctor_id": "gwhite",
                "doctor_notes": "Redness in left eye with yellow discharge.",
                "stroke_prediction": "0.2",
                "treatment": "Eye ointment 4 times daily.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 6
            {
                "report_id": get_next_report_id(),
                "patient_id": "121",
                "doctor_id": "gwhite",
                "doctor_notes": "Facial pressure and high fever.",
                "stroke_prediction": "0.1",
                "treatment": "Prescription 2 doses per day. Ibuprofen for pain.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 7
            {
                "report_id": get_next_report_id(),
                "patient_id": "129",
                "doctor_id": "frose",
                "doctor_notes": "Sneezing and itchy eyes. Nasal passages swollen.",
                "stroke_prediction": "0.66",
                "treatment": "Loratadine 10mg daily. Nasal spray.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 8
            {
                "report_id": get_next_report_id(),
                "patient_id": "132",
                "doctor_id": "frose",
                "doctor_notes": "Burning sensation in chest after meals. Low-grade fever.",
                "stroke_prediction": "0.33",
                "treatment": "Pescription 20mg every morning. Avoid spicy foods.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 9
            {
                "report_id": get_next_report_id(),
                "patient_id": "156",
                "doctor_id": "lchester",
                "doctor_notes": "Redness in left eye. Range of motion limited due to ankle pain.",
                "stroke_prediction": "0.4",
                "treatment": "Eye ointment 4 times daily. Pain relief pescription once per day.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

            #report 10
            {
                "report_id": get_next_report_id(),
                "patient_id": "163",
                "doctor_id": "lchester",
                "doctor_notes": "Increased thirst and fatigue. Severe throbbing pain on right side of head.",
                "stroke_prediction": "0.7",
                "treatment": "Inhaler as needed. Pescription 3 doses per day.",
                "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            },

        ]

        #Insert the date
        collection.insert_many(sample_reports)
        print(f"Successfully inserted {len(sample_reports)} medical reports.")
    
    except Exception as e:
        print(f"Error initialising MongoDB: {e}")

        
# ------------------------------------
#Configure Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.FileHandler("app.log"), logging.StreamHandler()],
)
log = logging.getLogger(__name__)
# ------------------------------------


# ------------------------------------
# Simple in-memory storage
REGISTERED_USERS = [] # each item: {"username", "email", "age", "created_at"}
# ------------------------------------


# ------------------------------------
# Validation patterns

##Ensure that first name is only letters and hyphens
FIRSTNAME_PATTERN = re.compile(r'^[A-Za-z-]{1,50}$')

##Ensure that last name is only letters and hyphens
LASTNAME_PATTERN = re.compile(r'[A-Za-z-]{1,50}$')

## Ensures the email has a basic valid structure of name@domain.tld
EMAIL_PATTERN = re.compile(r'^[\w\.-]+@[\w\.-]+\.[A-Za-z]{2,}$')

## Ensures the username only allows letters, digits, underscores, and length between 5 and 16 characters.
USERNAME_PATTERN = re.compile(r'^[A-Za-z0-9_]{5,16}$')

## Strong password pattern that requires lowercase, uppercase, digit, special character, and minimum 8 characters.
PASSWORD_PATTERN = re.compile(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&]).{8,}$')
# ------------------------------------


# ------------------------------------
# Route for the Home page
@app.route('/')
def home():
    return render_template('Home_Page.html')

# Route for the About page
@app.route('/about')
def about():
    return render_template('About.html')

# Route for Registration success page 
@app.route('/success')
def success():
    return render_template('Registration_Success.html')

# Route for User Registration page
## Simple register form that intakes first name, last name, username, email and password for now 
@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        First_name = (request.form.get("First_name") or "").strip()
        Last_name = (request.form.get("Last_name") or "").strip()
        username = (request.form.get("username") or "").strip()
        email    = (request.form.get("email") or "").strip()
        password = (request.form.get("password") or "").strip()
        
        try:

            ## ---- SERVER-SIDE INPUT VALIDATION FOR REGISTRATION FORM ---- ##

            # ----- Checking empty fields -----
             if not First_name:
                 raise ValueError("First name is required")
             if not Last_name:
                 raise ValueError("Last name is required")
             if not username:
                 raise ValueError("Username is required")
             if not email:
                 raise ValueError("Email is required.")
             if not password:
                 raise ValueError("Password is required.")
             
            # ----- Type / Format checks -----
             if not FIRSTNAME_PATTERN.fullmatch(First_name):
                  raise ValueError("First name must only contain letters and hyphens")

             if not LASTNAME_PATTERN.fullmatch(Last_name):
                  raise ValueError("Last name must only contain letters and hyphens")
             
             if not USERNAME_PATTERN.fullmatch(username):
                  raise ValueError("Username must be 5â€“16 chars (letters, digits, underscore).")
             
             if not EMAIL_PATTERN.fullmatch(email):
                  raise ValueError("Email format is invalid.")
             
             if not PASSWORD_PATTERN.fullmatch(password):
                  raise ValueError("Password format is invalid.")
             
             if len(email) > 254:
                  raise ValueError("Email too long.")

            # ----- Business rules / Whitelist checks -----
             if username.lower() in {"admin", "root"}:
                 raise ValueError("Username is reserved.")

            # ----- Hashing password ------
             hashed_password = generate_password_hash(password, method="pbkdf2:sha256")

             conn= sqlite3.connect(DB_PATH)
             cursor = conn.cursor()

            # ----- Pre-check duplicate -----
             cursor.execute("SELECT 1 FROM user WHERE lower(email) = lower(?)", (email,))
             if cursor.fetchone():
                 flash("This email is already registered. Please log in instead.")
                 conn.close()
                 return redirect(url_for("login"))
             
            # ----- Insert the user into the database ----- 
             try:
                 cursor.execute("""
                    INSERT INTO user (First_name, Last_name, username, email, password, role)
                    VALUES (?, ?, ?, ?, ?, ?)
                 """, (First_name, Last_name, username, email, hashed_password, 'doctor')) #doctor is set as default role for role based access 
                 conn.commit()
                 flash("Registration successful! Please log in.")
             except sqlite3.IntegrityError: 
                 flash("This email is already registered. Please log in.")
             finally:
                 conn.close()

             return redirect(url_for("success"))
        
        except ValueError as e:
            flash(str(e), 'error')
            log.warning("Validation failed: %s", e)
            return redirect(url_for("register"))
       
    return render_template("Registration_Form.html")

# Route for Login page and Submissing Handling
@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == "GET":
        return render_template("Login.html")
    
    # ----- Post request handling -----
    username = request.form.get("username", "").strip()
    password = request.form.get("password", "")

    if not (username and password):
        flash("Please enter username and password.")
        return redirect(url_for("login"))
    
    # ----- Database connection and Query -----
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    #Querying the doctor table to find a user matching the username 
    cursor.execute("""
        SELECT ID, First_name, Last_name, username, email, password, role
        FROM user
        WHERE lower(username) = lower(?)
    """, (username,))
    row = cursor.fetchone()
    conn.close() #Close connection immediately after fetching data

    # ----- Check if we found a user in the database
    if not row:
        flash("Invalid username or password.")
        return redirect(url_for("login"))
    
    # ----- Map retrieved columns to variables -----
    ID, First_name, Last_name, username, email, password_hash, user_role = row

    # ----- Verify the submitted password against the stored hash -----
    if not check_password_hash(password_hash, password):
        flash("Invalid email or password.")
        return redirect(url_for("login"))
    
    # ----- Upon successful login, create Session -----
    session["ID"] = ID
    session["email"] = email
    session["user_name"] = f"{First_name} {Last_name}"
    session["username"] = username
    session["role"] = user_role
    flash(f"Welcome back!")
    return redirect(url_for('dashboard'))

#Dashboard page for an authenticated user who has logged in
#Role Based Access Control (RBAC) implemented
#Doctors and Admins are redirected to different dashboards due to their roles
@app.route("/dashboard")
def dashboard():
    if "ID" not in session:
        flash("Please log in to continue.")
        return redirect(url_for("login"))
    
    #retrieve role
    user_role = session.get('role')
    
    if user_role == 'admin': 
        #Admin views the admin dashboard page
        return redirect(url_for('admin_dashboard'))
    elif user_role == 'doctor':
        #Doctor views the doctor dashboard
        return render_template("dashboard.html")
    else: 
        #Default response for unrecognized roles
        flash("Your user role is unrecognized. Please contact support.", 'error')
        return redirect(url_for("login"))
    
#Page for a doctor to create a medical report.
@app.route('/create_report', methods=['GET', 'POST'])
def create_report():
    #Security check: only logged in doctors can access
    if 'ID' not in session or session.get('role') != 'doctor':
        flash("Access denied. Please log in as a doctor.")
        return redirect(url_for('login'))
    
    #Handle form submission (POST)
    if request.method == 'POST':
        #Get data from the form
        patient_id = request.form.get('patient_id')
        notes = request.form.get('doctor_notes')
        stroke_pred = request.form.get('stroke_prediction')
        treatment = request.form.get('treatment')

        #Generate a unique Report ID
        new_report_id = get_next_report_id()

        #Preparing the document for MongoDB
        report_data = {
            "report_id": new_report_id,
            "patient_id": patient_id,
            "doctor_id": session.get('username'),
            "doctor_notes": notes,
            "stroke_prediction": stroke_pred,
            "treatment": treatment,
            "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

        #Saving report to MongoDB
        try:
            client = pymongo.MongoClient(MONGO_URI)
            db = client['hospital_db']
            db['medical_reports'].insert_one(report_data)
            flash(f"Report {new_report_id} created successfully!")
            return redirect(url_for('dashboard'))
        except Exception as e:
            flash(f"Error saving report: {str(e)}", "error")
            return redirect(url_for('create_report'))
    
    #Getting valid Patient IDs from SQLite database to show in dropdown
    #This is so IDs are displayed in a dropdown list when creating report
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute("SELECT ID, gender, age FROM patient")
    patients = cursor.fetchall()
    conn.close()

    return render_template("create_report.html", patients=patients)

#Page to view all medical reports made by the doctor that is logged in:
@app.route('/view_reports')
def view_reports():
    #Security check: only logged in doctors can access
    if 'ID' not in session or session.get('role') != 'doctor':
        flash("Access denied. Please log in as a doctor.")
        return redirect(url_for('login'))
    
    try:
        #Connect to MongoDB
        client = pymongo.MongoClient(MONGO_URI)
        db = client['hospital_db']

        #Fetch reports created by the doctor logged in
        my_reports = list(db['medical_reports'].find(
            {"doctor_id": session.get('username')}
        ).sort("created_at", -1))

        return render_template('view_reports.html', reports=my_reports)

    except Exception as e:
        flash(f"Error fetching reports: {str(e)}", "error")
        return redirect(url_for('dashboard'))

#Deletion of medical report by doctor in session
@app.route('/delete_report/<report_id>', methods=['POST'])
def delete_report(report_id):
    #Security check: only logged in doctors can access
    if 'ID' not in session or session.get('role') != 'doctor':
        flash("Access denied. Please log in as a doctor.")
        return redirect(url_for('login'))
    
    try:
        #Connect to MongoDB
        client = pymongo.MongoClient(MONGO_URI)
        db = client['hospital_db']
        collection = db['medical_reports'] 

        #Perform the deletion
        #Query by report id and doctor id#
        #To ensure a doctor cannot accidentally delete another doctor's report
        result = collection.delete_one({
            "report_id": report_id,
            "doctor_id": session.get('username')
        })

        #Give feedback of deletion
        if result.deleted_count > 0:
            flash(f"Report {report_id} deleted successfully.")
        else:
            flash("Report not found or you don't have permission to delete it.")
    
    except Exception as e:
        flash(f"Error occurred: {str(e)}", "error")
    
    #Redirect back to list
    return redirect(url_for('view_reports'))

#Page for logged in doctor to view patient records
@app.route('/view_patients', methods=['GET'])
def view_patients():
    #Security check: only logged in doctors can access
    if 'ID' not in session or session.get('role') != 'doctor':
        flash("Access denied. Please log in as a doctor.")
        return redirect(url_for('login'))
    
    #Get parameters of page number and search term
    page = request.args.get('page', 1, type=int)
    search_query = request.args.get('search', '').strip()
    per_page = 20 #showing 20 records per page
    offset = (page - 1)

    #connect to database
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()

    #SQL Query for the data
    if search_query:
        #Filtering search by ID if Doctor uses search
        data_query = "SELECT * FROM patient WHERE CAST(id AS TEXT) LIKE ? LIMIT ? OFFSET ?"
        count_query = "SELECT COUNT(*) FROM patient WHERE CAST(id AS TEXT) LIKE ?"
        params_data = (f"%{search_query}%", per_page, offset)
        params_count = (f"%{search_query}%",)
    else:
        #Normal mode to show 20 records per page 
        data_query = "SELECT * FROM patient LIMIT ? OFFSET ?"
        count_query = "SELECT COUNT(*) FROM patient"
        params_data = (per_page, offset)
        params_count = ()
    
    #Execute queries
    cursor.execute(data_query, params_data)
    patients = cursor.fetchall()

    cursor.execute(count_query, params_count)
    total_records = cursor.fetchone()[0]

    #Fetch all IDs
    cursor.execute("SELECT id, gender, age FROM patient LIMIT 2000")
    all_patients_list = cursor.fetchall()

    conn.close()

    #Calculating total pages
    total_pages = math.ceil(total_records / per_page)

    return render_template(
        'view_patients.html',
        patients=patients,
        page=page,
        total_pages=total_pages,
        search_query=search_query
    )

#Admin dashboard
@app.route('/admin_dashboard')
def admin_dashboard():
    if "ID" not in session:
        flash("Please log in to continue.")
        return redirect(url_for("login"))
    
    #retrieve role
    user_role = session.get('role')
    
    if user_role == 'doctor': 
        #Admin views the admin dashboard page
        return redirect(url_for('dashboard'))
    elif user_role == 'admin':
        #Doctor views the doctor dashboard
        return render_template("admin_dashboard.html")
    else: 
        #Default response for unrecognized roles
        flash("Your user role is unrecognized. Please contact support.", 'error')
        return redirect(url_for("login"))
    
#Page where admins can view a list of doctors
#The list outputted is only with users with role 'doctors' due to the select statement
@app.route("/view_doctors")
def view_doctors():
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute("""
        SELECT ID, First_name, Last_name, username, email, created_at
                   FROM user
                   WHERE role = 'doctor'
                   ORDER BY created_at DESC
    """)
    rows = cursor.fetchall()
    conn.close()
    return render_template("view_doctors.html", users=rows)

#Delete user (doctor) page for admin use
@app.post("/view_doctors/<int:ID>/delete")
def delete_user(ID):
    if "ID" not in session:
        flash("Please log in to continue.")
        return redirect(url_for("login"))
    
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    cursor.execute("DELETE FROM user WHERE ID = ?", (ID,))
    deleted = cursor.rowcount
    conn.commit()
    conn.close()

    if deleted == 0: 
        flash("User not found or already deleted.")
        return redirect(url_for("view_doctors"))
    
    flash("User deleted.")
    return redirect(url_for("view_doctors"))

#Editing medical report page for doctor logged in
@app.route('/edit_report/<report_id>', methods=['GET', 'POST'])
def edit_report(report_id):
    #Security check to check if user is logged in and is a doctor
    if 'ID' not in session or session.get('role') != 'doctor':
        flash("Access denied.")
        return redirect(url_for('login'))
    
    #Connect to MongoDB
    client = pymongo.MongoClient(MONGO_URI)
    db = client['hospital_db']
    collection = db['medical_reports']

    #Handling form submission (POST)
    if request.method == 'POST':
        doctor_notes = request.form.get('doctor_notes')
        stroke_prediction = request.form.get('stroke_prediction')
        treatment = request.form.get('treatment')

        #Update specific fields
        #Query by report_id and doctor_id to ensure they own the report
        result = collection.update_one(
            {"report_id": report_id, "doctor_id": session.get('username')},
            {"$set": {
                "doctor_notes": doctor_notes,
                "stroke_prediction": stroke_prediction,
                "treatment": treatment
            }}
        )

        if result.matched_count > 0:
            flash("Report updated successfully.")
        else:
            flash("Error: Report not found or permission denied.", "error")

        return redirect(url_for('view_reports'))

    #Fetch the report to pre-fill the form, handle page load
    report = collection.find_one({"report_id": report_id})

    #Verify the report exists and belongs to the doctor logged in
    if not report or report.get('doctor_id') != session.get('username'):
        flash("Report not found or access denied.")
        return redirect(url_for('view_reports'))

    return render_template('edit_report.html', report=report)

#Update doctor credentials (GET = form, POST = save)
@app.route("/view_doctors/<int:ID>/edit", methods=["GET", "POST"])
def edit_user(ID):
    if "ID" not in session: 
        flash("Please log in to continue.")
        return redirect(url_for("login"))
    
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    if request.method == "GET":
        cursor.execute("""
            SELECT ID, First_name, Last_name, username, email
            FROM user
            WHERE ID = ? AND role = 'doctor'
        """, (ID,))
        row = cursor.fetchone()
        conn.close()
        
        if not row:
            flash("User not found.")
            return redirect(url_for("view_doctors"))
    
    #row = (ID, first_name, last_name, username, email)
        return render_template("edit_user.html", user=row)

    #POST: update fields
    First_name = request.form.get("First_name", "").strip()
    Last_name  = request.form.get("Last_name", "").strip()
    email      = request.form.get("email", "").strip()
    new_password = request.form.get("password", "") #optional

    if not (First_name and Last_name and email):
        conn.close()
        flash("First name, Last name, username, and email are required.")
        return redirect(url_for("edit_user", ID=ID))
    
    #Duplicate email check excluding this user
    cursor.execute("""
        SELECT 1 from user
        WHERE lower(email) = lower(?) AND ID != ? AND role = 'doctor'
    """, (email, ID))
    
    if cursor.fetchone():
        conn.close()
        flash("That email is already in use by another account.")
        return redirect(url_for("edit_user", ID=ID))
    
    #Build UPDATE dynamically (password change is optional)
    if new_password.strip():
        hashed = generate_password_hash(new_password.strip(), method="pbkdf2:sha256")
        cursor.execute("""
            UPDATE user
            SET First_name = ?, Last_name = ?, email = ?, password = ?
            WHERE ID = ? AND role = 'doctor'
        """, (First_name, Last_name, email, hashed, ID))
    else:
        cursor.execute("""
            UPDATE user
            SET First_name = ?, Last_name = ?, email = ?
            WHERE ID = ? 
        """, (First_name, Last_name, email, ID))

    conn.commit()
    conn.close()

    flash("User updated successfully.")
    return redirect(url_for("view_doctors"))

#Re routing user to login page when user logs out
@app.route("/logout")
def logout():
    session.clear()
    flash("You have been logged out.")
    return redirect(url_for("login"))

@app.errorhandler(404)
def not_found(e):
    # templates/404.html uses: {{ url_for('static', filename='images/3.jpg') }}
    log.warning("404 Not Found: %s", request.path)
    return render_template("404.html"), 404

#Initialize the databases
if __name__ == "__main__":
    init_db()
    init_mongo_db()

User already exists: admin1
User already exists: admin2
User already exists: admin3
User already exists: admin4
User already exists: admin5
User already exists: hbrown
User already exists: tjones
User already exists: gwhite
User already exists: frose
User already exists: lchester
Patient table already has data. Skipping CSV data import.
MongoDB collection: 'medical_reports' collecting already has data. Skipping collection populating


In [3]:
# ----- UNIT TESTING BELOW ----- #
#There are 15 unit tests below across different application features.
#These are the tests: 
#Test 1: Testing the Home Page loads correctly
#Test 2: Testing the About Page loads correctly
#Test 3: Testing the Login Page loads correctly
#Test 4: Testing the Registration Page loads correctly
#Test 5: Testing an admin user exists
#Test 6: Testing a doctor user exists
#Test 7: Testing a fake user does not exist
#Test 8: Testing Email Regex Validation Pattern works correctly
#Test 9: Testing Password Validation Pattern works correctly
#Test 10: Test that the doctor dashboard cannot be accessed without logging in
#Test 11: Test that the admin dashboard cannot be accessed without logging in
#Test 12: Test if unsuccessful login results in a redirect
#Test 13: Test if connection to MongoDB database is alive
#Test 14: Test if medical reports for a registered doctor are found 
#Test 15: Test if no reports are found for a non-existent doctor

In [4]:
# ----- 4 Unit Tests: Check That Home, About, Login & Registration Pages Load ----- #
import unittest

#Test to check if the website is working
#Creating a test case class:
class TestHospitalSite(unittest.TestCase):
    #Creating a Flask test client 
    def setUp(self):
        app.testing = True
        self.client = app.test_client()

    # ----- Test 1: Testing the Home Page ----- #
    def test_homepage_loads(self):
        response = self.client.get('/')

        #First check to see if page loaded properly
        self.assertEqual(response.status_code, 200)

        #2nd check: see if page contains the word 'Hospital'
        page_content = response.data.decode('utf-8')
        self.assertIn("Hospital", page_content)

        print("Test passed: Home page loaded successfully.")

    # ----- Test 2: Testing the About Page ----- #
    def test_aboutpage(self):
        response = self.client.get('/about')

        #First check to see if page loaded properly
        self.assertEqual(response.status_code, 200)

        #2nd check: see if page contains the word 'About'
        page_content = response.data.decode('utf-8')
        self.assertIn("About", page_content)

        print("Test passed: About page loaded successfully.")

    # ----- Test 3: Testing the Login Page ----- #
    def test_loginpage(self):
        response = self.client.get('/login')

        #First check to see if page loaded properly
        self.assertEqual(response.status_code, 200)

        #2nd check: see if page contains the word 'Login'
        page_content = response.data.decode('utf-8')
        self.assertIn("Login", page_content)

        print("Test passed: Login page loaded successfully.")

        # ----- Test 4: Testing the Registration Page ----- #
    def test_registerpage(self):
        response = self.client.get('/register')

        #First check to see if page loaded properly
        self.assertEqual(response.status_code, 200)

        #2nd check: see if page contains the word 'Registration'
        page_content = response.data.decode('utf-8')
        self.assertIn("Registration", page_content)

        print("Test passed: Registration page loaded successfully.")
    

In [5]:
# ----- SQL Database Unit Test: Check that database logic works correctly ----- #

#Checking that users exist in my SQL hospital database
#Checking by username
def user_exists(username):
    conn = sqlite3.connect("hospitaldata.db")
    cursor = conn.cursor()

    #Querying the 'user' table 
    cursor.execute("SELECT 1 FROM user WHERE username = ?", (username,))
    result = cursor.fetchone()

    conn.close()

    #Return True if found, False if not
    return result is not None

#Defining the test class
class TestHospitalDatabase(unittest.TestCase):

    #Test 5: Testing an admin user exists
    def test_admin(self):
        #Username: 'admin1' (Charlie Alpha) inserted by init_db()
        exists = user_exists('admin1')

        #Should be True
        self.assertTrue(exists)
        print("Test Passed: 'admin1' was found in the database.")

    #Test 6: Testing a doctor user exists
    def test_doctor(self):
        #Username: 'frose' (Freya Rose) inserted by init_db()
        exists = user_exists('frose')

        #Should be True
        self.assertTrue(exists)
        print("Test Passed: 'frose' was found in the database.")
    
    #Test 7: Testing a fake user does not exist
    def test_fakeuser_not_exist(self):
        #Username: 'fake_46bj294vf' was not inserted by init_db()
        exists = user_exists('fake_46bj294vf')

        #Should be False
        self.assertFalse(exists)
        print("Test Passed: Non-existent user was correctly not found.")


In [6]:
# ----- Data Validation Logic Unit Tests ----- #
#Testing that patterns for inputs in registration form work correctly

## Ensures the email has a basic valid structure of name@domain.tld

#Old email pattern that failed
#EMAIL_PATTERN = re.compile(r'^[\w\.-]+@[\w\.-]+\.[A-Za-z]{2,}$')

#New email pattern: Forces the domain to start with a letter or number
EMAIL_PATTERN = re.compile(r'^[\w\.-]+@[a-zA-Z0-9][\w\.-]+\.[A-Za-z]{2,}$')

## Strong password pattern that requires lowercase, uppercase, digit, special character, and minimum 8 characters.
PASSWORD_PATTERN = re.compile(r'^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&]).{8,}$')

#Defining the test class
class TestValidationPatterns(unittest.TestCase):

    #Test 8: Testing Email Regex Validation Pattern works correctly
    def test_email_valid(self):
        #Testing valid emails
        self.assertIsNotNone(EMAIL_PATTERN.fullmatch("doctor@hospital.com")) 
        self.assertIsNotNone(EMAIL_PATTERN.fullmatch("hospital.user@domain.co.uk"))

        #Testing Invalid emails
        self.assertIsNone(EMAIL_PATTERN.fullmatch("invalid_address"))
        self.assertIsNone(EMAIL_PATTERN.fullmatch("@no.username.com")) 
        self.assertIsNone(EMAIL_PATTERN.fullmatch("doctor@.com.my")) #no email domain
        print("Test Passed: Email regex validation is working correctly.")

    #Test 9: Testing Password Validation Pattern works correctly 
    def test_password_strength(self):
        #Strong password example should pass
        self.assertIsNotNone(PASSWORD_PATTERN.fullmatch("ValidStrongPass123!"))

        #Weak passwords should fail
        self.assertIsNone(PASSWORD_PATTERN.fullmatch("abc")) #Too short, should be minimum 8 characters
        self.assertIsNone(PASSWORD_PATTERN.fullmatch("justletters")) #No numbers or special characters
        self.assertIsNone(PASSWORD_PATTERN.fullmatch("12345678")) #No letters or special characters
        print("Test Passed: Password validation is working correctly.")


In [7]:
# ----- Access Control Unit Tests ----- #
#Checking if unauthorized users can access protect pages without logging in
#For example, the doctor dashboard and admin dashboard can only be accessed if logged in

class TestAccessControl(unittest.TestCase):

    #Creating a Flask test client 
    def setUp(self):
        app.testing = True
        self.client = app.test_client()

    #Test 10: Test that the doctor dashboard cannot be accessed without logging in
    def test_dashboard(self):
        #Try to visit dashboard without logging in
        response = self.client.get('/dashboard', follow_redirects=True)

        #Check that we land on the login page = Status 200
        self.assertEqual(response.status_code, 200)

        #Check for login text to confirm we were redirected
        page_content = response.data.decode('utf-8')
        self.assertIn("Login", page_content)
        print("Test Passed: Unathenticated user redirected to Login page.")

    #Test 11: Test that the admin dashboard cannot be accessed without logging in
    def test_admindashboard(self):
        #Try to visit admin area without logging in
        response = self.client.get('/admin_dashboard', follow_redirects=True)
        self.assertIn("Login", response.data.decode('utf-8'))
        print("Test Passed: Admin Dashboard is secure, redirected to Login page.")


In [8]:
# ----- Login Logic Unit Test ----- #
#Check if login form and submission is successful

#Define the test class

class TestLogin(unittest.TestCase):

    def setUp(self):
        app.testing = True
        self.client = app.test_client()

    #Test 12: Test if unsuccessful login results in a redirect
    def test_login(self):
        #Send a POST request to simulate form submit
        response = self.client.post('/login', data={
            'username':'frose',
            'password':'wrongpassword'
        }, follow_redirects=True)

        #Check if were on login page or if we see an error
        page_content = response.data.decode('utf-8')

        #We expect to remain on the login page and see 'Invalid'
        self.assertIn("Invalid", page_content)
        print("Test Passed: The wrong password was rejected.")


In [9]:
# ----- MongoDB Database Unit Test: Check that database logic works correctly ----- #

#Connect to MongoDB and checks if a specific doctor has any reports
def doctor_has_reports(doctor_username):
    uri="mongodb+srv://admin:Hospital123XYZ!@cluster0.aaf6kly.mongodb.net/?appName=Cluster0"

    try:
        client = pymongo.MongoClient(uri)
        db = client['hospital_db']
        collection = db['medical_reports']

        #Check database if this doctor credentials exists
        record = collection.find_one({"doctor_id": doctor_username})

        client.close()
        return record is not None
    
    except Exception as e:
        print(f"Connection failed: {e}")
        return False
    
#Define test class for database 
class TestMongoDatabase(unittest.TestCase):

    def setUp(self):
        #Setup connection for direct test
        self.uri = "mongodb+srv://admin:Hospital123XYZ!@cluster0.aaf6kly.mongodb.net/?appName=Cluster0"
        self.client = pymongo.MongoClient(self.uri)

    def tearDown(self):
        #Close connection after the tests
        self.client.close()

    #Test 13: Test if connection to MongoDB database is alive
    def test_connection(self):
        #Ping test to see if server is reachable
        try:
            self.client.admin.command('ping')
            connected = True
        except Exception:
            connected = False

        self.assertTrue(connected, "Could not connect to MongoDB Atlas")
        print("Test Passed: Confirmed that MongoDB connection is alive")

    #Test 14: Test if medical reports for a registered doctor are found 
    def test_doctor_exists(self):
        #Reports for doctor 'hbrown' exist as it was created from init_mongo_db()
        exists = doctor_has_reports('hbrown')

        self.assertTrue(exists, "No reports found for doctor 'hbrown'.")
        print("Test Passed: Found medical reports for doctor 'hbrown'.")

    #Test 15: Test if no reports are found for a non-existent doctor
    def test_fake_doctor(self):
        #Test that a fake doctor has no reports
        exists = doctor_has_reports('fakedoctor')

        self.assertFalse(exists, "Found reports for non-existent doctor.")
        print("Test Passed: Correctly found no reports for fake doctor.")

In [10]:
# Code chunk to run the unit tests
if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], verbosity=2, exit=False)

test_admindashboard (__main__.TestAccessControl.test_admindashboard) ... ok
test_dashboard (__main__.TestAccessControl.test_dashboard) ... ok
test_admin (__main__.TestHospitalDatabase.test_admin) ... ok
test_doctor (__main__.TestHospitalDatabase.test_doctor) ... ok
test_fakeuser_not_exist (__main__.TestHospitalDatabase.test_fakeuser_not_exist) ... ok
test_aboutpage (__main__.TestHospitalSite.test_aboutpage) ... ok
test_homepage_loads (__main__.TestHospitalSite.test_homepage_loads) ... ok
test_loginpage (__main__.TestHospitalSite.test_loginpage) ... ok
test_registerpage (__main__.TestHospitalSite.test_registerpage) ... ok
test_login (__main__.TestLogin.test_login) ... 

Test Passed: Admin Dashboard is secure, redirected to Login page.
Test Passed: Unathenticated user redirected to Login page.
Test Passed: 'admin1' was found in the database.
Test Passed: 'frose' was found in the database.
Test Passed: Non-existent user was correctly not found.
Test passed: About page loaded successfully.
Test passed: Home page loaded successfully.
Test passed: Login page loaded successfully.
Test passed: Registration page loaded successfully.


ok
test_connection (__main__.TestMongoDatabase.test_connection) ... 

Test Passed: The wrong password was rejected.


ok
test_doctor_exists (__main__.TestMongoDatabase.test_doctor_exists) ... 

Test Passed: Confirmed that MongoDB connection is alive


ok
test_fake_doctor (__main__.TestMongoDatabase.test_fake_doctor) ... 

Test Passed: Found medical reports for doctor 'hbrown'.


ok
test_email_valid (__main__.TestValidationPatterns.test_email_valid) ... ok
test_password_strength (__main__.TestValidationPatterns.test_password_strength) ... ok

----------------------------------------------------------------------
Ran 15 tests in 23.803s

OK


Test Passed: Correctly found no reports for fake doctor.
Test Passed: Email regex validation is working correctly.
Test Passed: Password validation is working correctly.


In [None]:
#Code chunk to run the web application and use it
if __name__ == '__main__':
    app.run(debug=False)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
2025-12-11 01:50:17,631 [INFO] [33mPress CTRL+C to quit[0m
