## Init sqllite

In [None]:
%pip install python-dotenv
%pip install flask

In [None]:
import sqlite3
from datetime import datetime, timedelta

def create_database(emergency_phone, emergency_email):
    connection = sqlite3.connect('database.sqlite')
    cursor = connection.cursor()

    CREATE_TABLE_QUERY = """
    DROP TABLE IF EXISTS fall_records;
    DROP TABLE IF EXISTS patients;

    CREATE TABLE IF NOT EXISTS fall_records (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        source TEXT NOT NULL,
        patient_id INTEGER NOT NULL,
        detected_at TEXT NOT NULL,
        recorded_at TEXT NOT NULL
    );
    CREATE TABLE IF NOT EXISTS patients (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        emergency_phone TEXT NOT NULL,
        emergency_email TEXT NOT NULL
    );
    """
    cursor.executescript(CREATE_TABLE_QUERY)

    INSERT_QUERY = """
    INSERT INTO patients (emergency_phone, emergency_email) VALUES (?, ?);
    """
    
    cursor.execute(INSERT_QUERY, (emergency_phone, emergency_email))

    connection.commit()
    cursor.close()
    connection.close()

def insert_fall_records(source, patient_id, detected_at):
    connection = sqlite3.connect('database.sqlite')
    cursor = connection.cursor()

    INSERT_FALL_QUERY = """
    INSERT INTO fall_records (source, patient_id, detected_at, recorded_at)
    VALUES (?, ?, ?, datetime('now'));
    """
    cursor.execute(INSERT_FALL_QUERY, (source, patient_id, detected_at))
    record_id = cursor.lastrowid

    # Fetch the newly inserted record
    cursor.execute("SELECT id, source, patient_id, detected_at, recorded_at FROM fall_records WHERE id = ?", (record_id,))
    new_record = cursor.fetchone()

    connection.commit()
    cursor.close()
    connection.close()
    return new_record

def get_patient_emergency_info(patient_id):
    connection = sqlite3.connect('database.sqlite')
    cursor = connection.cursor()

    SELECT_QUERY = """
    SELECT emergency_phone, emergency_email FROM patients WHERE id = ?;
    """
    cursor.execute(SELECT_QUERY, (patient_id,))
    emergency_info = cursor.fetchone()

    connection.close()
    return emergency_info if emergency_info else None

def check_existing_fall_record_in_1m(patient_id, detected_at):
    connection = sqlite3.connect('database.sqlite')
    cursor = connection.cursor()

    SELECT_QUERY = """
    SELECT id FROM fall_records WHERE patient_id = ? AND detected_at BETWEEN ? AND ?;
    """
    detected_at_dt = datetime.fromisoformat(detected_at)
    detected_at_minus_1m = (detected_at_dt - timedelta(minutes=1)).isoformat()
    cursor.execute(SELECT_QUERY, (patient_id, detected_at_minus_1m, detected_at))
    record = cursor.fetchone()

    connection.close()
    return record[0] if record else None

phone = input("Enter emergency phone: ")
email = input("Enter emergency email: ")
create_database(phone, email)

## Send SMS

In [None]:
from vonage import Auth, Vonage
from vonage_messages import Sms
from dotenv import load_dotenv
import os

load_dotenv()

def send_sms(to_number, text):
    vonage_client = Vonage(Auth(
        application_id=os.getenv("VONAGE_APP_ID"),
        private_key=os.getenv("VONAGE_PRIVATE_KEY")
    ))

    message = Sms(
        from_='Vonage APIs',
        to=to_number,
        text=text,
    )

    vonage_client.messages.send(message)

# send_sms(
#     '+84982766798',
#     f"Alert! Fall detected."
# )

## Send Email

In [8]:
import smtplib
from email.mime.text import MIMEText
from dotenv import load_dotenv
import os

load_dotenv()

def send_email_alert(subject, body, to_email):
    from_email = os.getenv("EMAIL_USER")
    app_password = os.getenv("EMAIL_PASS")

    msg = MIMEText(body)
    msg["Subject"] = subject
    msg["From"] = from_email
    msg["To"] = to_email

    with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
        server.login(from_email, app_password)
        server.sendmail(from_email, [to_email], msg.as_string())

# send_email_alert('Fail detection alert', 'There is a fall detected!', 'nhannhb92@gmail.com')

## Signal server

In [None]:
def fall_record_service(source, patient_id, detected_at):
    if check_existing_fall_record_in_1m(patient_id, detected_at):
        print("A fall record already exists for this patient within the last 1 minute.")
        raise Exception("Duplicate fall record detected.")
    new_record = insert_fall_records(source, patient_id, detected_at)
    patient_emergency_info = get_patient_emergency_info(patient_id)
    if patient_emergency_info:
        emergency_phone, emergency_email = patient_emergency_info
        print(f"Emergency Contact - Phone: {emergency_phone}, Email: {emergency_email}")
        send_email_alert('Fall detection alert', 'There is a fall detected!', emergency_email)
    else:
        print("No emergency contact information found for this patient.")
    return new_record

In [None]:
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/record_fall', methods=['POST'])
def record_fall():
    data = request.get_json()
    try:
        new_record = fall_record_service(data['source'], data['patient_id'], data['detected_at'])
        if new_record:
            return jsonify({
                "id": new_record[0],
                "source": new_record[1],
                "patient_id": new_record[2],
                "detected_at": new_record[3],
                "recorded_at": new_record[4]
            }), 201
        else:
            return jsonify({"error": "Failed to insert record"}), 500
    except Exception as e:
        return jsonify({"error": str(e)}), 400  

if __name__ == '__main__':
    app.run()