# Full WorkFlow: Functions to Send Alerts and Update Alert Database

For 10 minute averages of all sensors (using the ATM PurpleAir estimations)

This notebook retrieves readings from PurpleAir Sensors in Minneapolis and cleans the entries and texts people who are interested in the sensors if they are above a threshold

---
---
---
# Prep

In [2]:
# File Manipulation

import os # For working with Operating System
import sys # System arguments
from dotenv import load_dotenv # Loading .env info

# Web

import requests # Accessing the Web

# Time

import datetime as dt # Working with dates/times
import pytz # Timezones

# Database 

import psycopg2
from psycopg2 import sql

# Data Manipulation

import numpy as np
import geopandas as gpd
import pandas as pd

### Load Functions

In [5]:
script_path = os.path.join('..', '..', 'Scripts', 'python')

# Function definition - Please see Scripts/python/*
exec(open(os.path.join(script_path, 'Get_spikes_df.py')).read())
exec(open(os.path.join(script_path, 'Create_messages.py')).read())
exec(open(os.path.join(script_path, 'twilio_functions.py')).read())
exec(open(os.path.join(script_path, 'Update_Alerts.py')).read())
exec(open(os.path.join(script_path, 'Send_Alerts.py')).read())

### Global Variables

In [6]:
load_dotenv() # Load .env file

## API Keys

purpleAir_api = os.getenv('PURPLEAIR_API_TOKEN') # PurpleAir API Read Key

## Database credentials

creds = [os.getenv('DB_NAME'),
         os.getenv('DB_USER'),
         os.getenv('DB_PASS'),
         os.getenv('DB_PORT'),
         os.getenv('DB_HOST')
        ]

pg_connection_dict = dict(zip(['dbname', 'user', 'password', 'port', 'host'], creds))  

## Twilio Information

TWILIO_ACCOUNT_SID = os.getenv('TWILIO_ACCOUNT_SID')
TWILIO_AUTH_TOKEN = os.getenv('TWILIO_AUTH_TOKEN')

# Other Constants from System Arguments

spike_threshold = 35 # Value which defines an AQ_Spike (Micgrograms per meter cubed)

timestep = 10 # Sleep time in between updates (in Minutes)

# When to stop the program? (datetime)
days_to_run = 7 # How many days will we run this?
stoptime = dt.datetime.now() + dt.timedelta(days=days_to_run)

---
---
---
## Compute/Define other necessary variables 

In [7]:
#  Get the sensor_ids from sensors in our database

sensor_ids = get_sensor_ids(pg_connection_dict) # In Get_Spikes_df.py

In [8]:
# Get Spikes Dataframe and runtime

spikes_df, runtime, flagged_sensor_ids = Get_spikes_df(purpleAir_api, sensor_ids, spike_threshold)

In [9]:
# Sort the spiked sensors into new, ongoing, ended spiked sensors, and not spiked sensors

new_spike_sensors, ongoing_spike_sensors, ended_spike_sensors, not_spiked_sensors = sort_sensors_for_updates(spikes_df, sensor_ids, flagged_sensor_ids, pg_connection_dict) # In Update_Alerts.py

In [10]:
# Those are all sets

# new_spike_sensors, ongoing_spike_sensors, ended_spike_sensors#, not_spiked_sensors

In [11]:
# Initialize messages_df

record_ids_to_text = []
messages = []

---
---
---
# Defining new Functions

---
---
---
## New Alert
### For each newly spiked sensor:

In [35]:
# This is for testing

now = dt.datetime.now()

too_late_hr = 21 # 9pm
too_early_hr = 8 # 8am

new_spike_sensors = set([142720])

for sensor_id in new_spike_sensors:

    record_ids_nearby = Users_nearby_sensor(pg_connection_dict, sensor_index, 1000)

    if len(record_ids_nearby) > 0:

        if (now.hour < too_late_hr) & (now.hour > too_early_hr):

            record_ids_new_alerts = Users_to_message_new_alert(pg_connection_dict, record_ids_nearby)

            # Add to message/record_id storage for future messaging
            record_ids_to_text += record_ids_new_alerts
            messages += [new_alert_message(sensor_id)]*len(record_ids_new_alerts)

#### $A$) Query for people nearby

*Find the sensor in database with this index*

*find users record_ids*

*WHERE subscription = True AND*

**ST_DWithin* 1000meters <- make sure to transform*

In [10]:
# Let 
sensor_index = 142720
distance = 1000 # (in meters)

# then...
            
conn = psycopg2.connect(**pg_connection_dict)
cur = conn.cursor()

cmd = sql.SQL('''
WITH sensor as
(
SELECT sensor_index, geometry
FROM "PurpleAir Stations"
WHERE sensor_index = {}
)
SELECT record_id
FROM "Sign Up Information" u, sensor s
WHERE u.subscribed = TRUE AND ST_DWithin(ST_Transform(u.geometry,26915),
										ST_Transform(s.geometry, 26915),{});
''').format(sql.Literal(sensor_index),
                sql.Literal(distance))

cur.execute(cmd)

conn.commit()

record_ids = [i[0] for i in cur.fetchall()]

# Close cursor
cur.close()
# Close connection
conn.close() 

record_ids

[2]

#### $B$) Query users from $A$ to text

*If A is not empty:*

*If within waking hours*

find users with empty cache/active alerts

Compose Messages (record_ids, sensor_index)

Concat to messages_df

In [24]:
# then...
            
conn = psycopg2.connect(**pg_connection_dict)
cur = conn.cursor()

cmd = sql.SQL('''
SELECT record_id
FROM "Sign Up Information"
WHERE active_alerts = {} AND cached_alerts = {} AND record_id = ANY ( {} );
''').format(sql.Literal('{}'), sql.Literal('{}'), sql.Literal(record_ids))

cur.execute(cmd)

conn.commit()

record_ids_to_text = [i[0] for i in cur.fetchall()]

# Close cursor
cur.close()
# Close connection
conn.close() 

record_ids_to_text

[2]

### *C*) Update Active Alerts See Update_Alerts.py & .ipynb

---
---
---
## Ended Alerts - NOT DONE

In [None]:
# ended_spike_sensors

In [10]:
# Get ended_alert_indices <- a list usually gotten through remove_active_alerts()

ended_alert_indices = [1]

### $A$) Query for people to message

*subscribed = TRUE and*

*active_alerts is empty and*

*cached_alerts not empty and*

*all cached_alerts is >= 10 minutes old - ie. ended_alert_indices intersect cached_alerts is empty*

In [12]:
def Users_to_message_end_alert(pg_connection_dict, ended_alert_indices):
    '''
    This function will return a list of record_ids from "Sign Up Information" that are subscribed, have empty active_alerts, non-empty cached_alerts, and cached_alerts intersect ended_alert_indices = empty (giving a 10 minute buffer before ending alerts - this can certainly change!)
    
    ended_alert_indices = a list of alert_ids that just ended
    
    returns record_ids_to_text (a list)
    '''
    
    conn = psycopg2.connect(**pg_connection_dict)
    cur = conn.cursor()

    cmd = sql.SQL('''
    SELECT record_id
    FROM "Sign Up Information"
    WHERE subscribed = TRUE
        AND active_alerts = {}
    	AND ARRAY_LENGTH(cached_alerts, 1) > 0 
    	AND NOT cached_alerts && {}::bigint[];
    ''').format(sql.Literal('{}'),
      sql.Literal(ended_alert_indices))

    cur.execute(cmd)

    conn.commit()

    record_ids_to_text = [i[0] for i in cur.fetchall()]

    # Close cursor
    cur.close()
    # Close connection
    conn.close() 

    return record_ids_to_text

In [13]:
Users_to_message_end_alert(pg_connection_dict, [1,69])

[2]

### if A not empty:

#### For each person in A:

#### $i$) Initialize Reports

generate random 5 characters + '-' + 'MMDDYY'
compute the other neccessary values for database

In [15]:
# Initialize storage for reports_for_day

reports_for_day = 0

now = dt.datetime.now()

In [40]:
def initialize_report(record_id, reports_for_day, pg_connection_dict):
    '''
    This function will initialize a unique report for a user in the database.

    It will also return the duration_minutes/max_reading of the report
    '''
    # Initialize report_id

    report_id = str(reports_for_day).zfill(5) + '-' + now.strftime('%m%d%y')
    
    # Create Cursor for commands
    conn = psycopg2.connect(**pg_connection_dict)
    cur = conn.cursor()

    # Use the record_id to query for the user's cached_alerts
    # Then aggregate from those alerts the start_time, time_difference, max_reading, and nested sensor_indices
    # Unnest the sensor indices into an array of unique sensor_indices
    # Lastly, it will insert all the information into "Reports Archive"
    
    cmd = sql.SQL('''WITH alert_cache as
(
	SELECT cached_alerts
	FROM "Sign Up Information"
	WHERE record_id = {} --inserted record_id
), alerts as
(
	SELECT MIN(p.start_time) as start_time,
			CURRENT_TIMESTAMP AT TIME ZONE 'America/Chicago' 
				- MIN(p.start_time) as time_diff,
			MAX(p.max_reading) as max_reading, 
			ARRAY_AGG(p.sensor_indices) as sensor_indices
	FROM "Archived Alerts Acute PurpleAir" p, alert_cache c
	WHERE p.alert_index = ANY (c.cached_alerts)
), unnested_sensors as 
(
	SELECT ARRAY_AGG(DISTINCT x.v) as sensor_indices
	FROM alerts cross JOIN LATERAL unnest(alerts.sensor_indices) as x(v)
)
INSERT INTO "Reports Archive"
SELECT {}, -- Inserted report_id
        a.start_time, -- start_time
		(((DATE_PART('day', a.time_diff) * 24) + 
    		DATE_PART('hour', a.time_diff)) * 60 + 
		 	DATE_PART('minute', a.time_diff)) as duration_minutes,
			a.max_reading, -- max_reading
		n.sensor_indices,
		c.cached_alerts
FROM alert_cache c, alerts a, unnested_sensors n;
''').format(sql.Literal(record_id),
            sql.Literal(report_id))

    cur.execute(cmd)
    # Commit command
    conn.commit()

    # Now get the information from that report

    cmd = sql.SQL('''SELECT duration_minutes, max_reading
             FROM "Reports Archive"
             WHERE report_id = {};
''').format(sql.Literal(report_id))

    cur.execute(cmd)
    # Commit command
    conn.commit()

    # Unpack response
    duration_minutes, max_reading = cur.fetchall()[0]

    # Close cursor
    cur.close()
    # Close connection
    conn.close()

    return duration_minutes, max_reading

#### $ii$) Compose message
see Create_messages.py - end_alert_message
(add message, record_id) to messages_df

#### $iii$) clear user's cached_alerts

See Update_Alerts.py & .ipynb

In [47]:
# For testing

record_ids_end_alert_message = [1]

base_report_url = 'https://redcap.ahc.umn.edu/surveys/?s=LN3HHDCJXYCKFCLE'
# Create Cursor for commands
conn = psycopg2.connect(**pg_connection_dict)
cur = conn.cursor()

for record_id in record_ids_end_alert_message:

    report_id = str(reports_for_day).zfill(5) + '-' + now.strftime('%m%d%y')

    duration_minutes, max_reading = initialize_report(record_id, reports_for_day, pg_connection_dict)

    print(end_alert_message(duration_minutes, max_reading, report_id, base_report_url))
    
    reports_for_day += 1

Alert Over
Duration: 11371 minutes 
Max value: 79.0 ug/m3

Report here - https://redcap.ahc.umn.edu/surveys/?s=LN3HHDCJXYCKFCLE&report_id=00007-110223


---
---
---
# Archive - New Alerts
---
---
---

In [16]:
# Initialize for For Loop

# Iterable (Find rows in spikes_df that are new spikes)

new_spikes = spikes_df[spikes_df.sensor_index.isin(new_spike_sensors)]

# Storage for messages

new_alert_messages_df = pd.DataFrame(np.empty(0, dtype = [('user_index', int),
                                                          ('intersection_index', int), 
                                                          ('phone_number', int),
                                                          # ('last_messaged', pd._libs.tslibs.timestamps.Timestamp),
                                                          # ('active_alerts', list),
                                                          ('sensor_index', int),
                                                          ('reading', float)])
                                    )

In [18]:
# The For Loop

if len(new_spike_sensors) > 0: # If there are new spikes
    
    # Variables for Database

    cols_for_db = ['sensor_index', 'start_time', 'max_reading'] # Cols_for_db What are we keeping track of in active alerts?
    
    runtime_for_db = runtime.strftime('%Y-%m-%d %H:%M:%S') # The time PurpleAir API queried

    # Iterate through new spikes
    
    for index, row in new_spikes.iterrows():

        # 1) Add to active alerts
    
        add_to_active_alerts(row, pg_connection_dict, cols_for_db, runtime_for_db)

        # 2) Get People to message about this alert
    
        response_df = get_new_alert_messages(row, pg_connection_dict)
        
        new_alert_messages_df =  pd.concat([new_alert_messages_df, 
                                               response_df
                                              ],
                                               ignore_index=True)


In [19]:
new_alert_messages_df[['user_index', 'intersection_index', # 'phone_number', <- Don't show phonenumbers
                        # 'last_messaged', 'active_alerts',
                       'sensor_index','reading']].head(5)

Unnamed: 0,user_index,intersection_index,sensor_index,reading


### Ongoing Alerts

In [20]:
ongoing_spike_sensors

{142752, 143248, 143944, 145202, 145504, 145610, 168327}

In [21]:
# Initialize for For Loop

# Iterable (Find rows in spikes_df that are new spikes)

ongoing_spikes = spikes_df[spikes_df.sensor_index.isin(ongoing_spike_sensors)]

ongoing_spikes

Unnamed: 0,sensor_index,pm25
12,142752,27.0
26,143248,36.4
35,143944,79.0
36,145202,28.3
45,145504,25.6
47,145610,32.8
62,168327,33.0


In [22]:
# Function Definitions

# For each ongoing alert, we should

# 1) Update max_reading if it's higher

# See update_max_reading() in Update_Alerts.py

# 2) Maybe message if it's getting much worse?

# NOT DONE

In [23]:
# The For Loop

if len(ongoing_spikes) > 0:

    for _, spike in ongoing_spikes.iterrows():
    
        update_max_reading(spike, pg_connection_dict)

### Ended Alerts

In [29]:
# Initialize for For Loop

# Iterable (Find rows in spikes_df that are new spikes)

list(ended_spike_sensors)

[142718]

In [107]:
# Function Definitions

# For each ended alert, we should

# 1) Add to archived alerts

# See add_to_archived_alerts() in Update_Alerts.py

#~~~~~~~~~~~~~~~~

# 2) Remove from active alerts

# See remove_active_alerts() in Update_Alerts.py

#~~~~~~~~~~~~~~~~

# 3) Message people it's over

# Query for ended alert information

# See end_alert_messages() in Create_messages.py

In [33]:
# Perform the above

if len(ended_spike_sensors) > 0:
    
    add_to_archived_alerts(ended_spike_sensors, pg_connection_dict) # Add the ended SpikeAlerts to archive
    
    ended_alert_indices = remove_active_alerts(ended_spike_sensors, pg_connection_dict) # Remove them from Active Alerts

    print(ended_alert_indices)

[(10,)]


In [37]:
ended_alert_indices

[10]

In [40]:
# Get end alert information

# Create Cursor for commands
conn = psycopg2.connect(**pg_connection_dict)
cur = conn.cursor()

cmd = sql.SQL('''
SELECT sensor_index, duration_minutes, max_reading
FROM "Archived Alerts Acute PurpleAir"
WHERE alert_index IN (SELECT UNNEST({}));
''').format(sql.Literal(ended_alert_indices))

cur.execute(cmd)
# Commit command
conn.commit()

end_message_info_df = pd.DataFrame(cur.fetchall(), columns = ['sensor_index', 'duration_minutes', 'max_reading'])

# Close cursor
cur.close()
# Close connection
conn.close()

In [41]:
end_message_info_df

Unnamed: 0,alert_index,sensor_index,duration_minutes,max_reading
0,10,142718,340,31.5


In [39]:
# Go through the ended alerts and see if people were watching this alert

sensor_indices = end_message_info_df.sensor_index
durations = end_message_info_df.duration_minutes
max_readings = end_message_info_df.max_reading

# Compose messages


Unnamed: 0,sensor_index,duration_minutes,max_reading
0,142718,340,31.5
