### Realtime Rainfall ETL
An hourly scheduled notebook that performs and ETL process on recent RTRR and RTRG data and loads it into the appropriate feature layers in the realtimte timeseries map. The goal is to constantly updating data for the timeseries animation. We only delete/load the data that is necessary rather than loading all 72 hours of the data every hour.

Parameters:

CHUNK_SIZE - size of the chunks when adding and deleting features on AGOL - don't exceed 250

NUM_OF_HOURS_ON_VIEW = number of hours that are displayed on the web map - defaults to 72 to view the last 72 hours worth of rainfall data

HOURS_TO_UPDATE - the number of hours before the current time to go back and add to the web map - if 0 it will query for the latest pixel timestamp and add points after that


Environment: conda activate arcgispro-py3


In [26]:
import requests
from datetime import datetime, timedelta, timezone
import pytz
import pandas as pd
from urllib.parse import urljoin
from arcgis.geometry import Polygon, Point
from arcgis.gis import GIS
from tqdm import tqdm

In [27]:
CHUNK_SIZE = 40 #don't exceed 250
NUM_OF_HOURS_ON_VIEW = 48
HOURS_TO_UPDATE = 0

RAINFALL_DATA_API_URL = "https://trwwapi.herokuapp.com"
REALTIME_TIMESERIES_ITEM_ID = '412c957447f64e0a8975034f399725b5'
REALTIME_TIMESERIES_POINTS_LAYER_INDEX = 0

TZINFO = pytz.timezone('UTC')

In [28]:
gis = GIS(username="CM_3RWW") #Need the CM_3RWW profile - admin role

In [29]:
realtime_timeseries_item = gis.content.get(REALTIME_TIMESERIES_ITEM_ID)
realtime_timeseries_points = realtime_timeseries_item.layers[REALTIME_TIMESERIES_POINTS_LAYER_INDEX]

In [42]:
def get_latest_ts_from_points_layer(layer = realtime_timeseries_points):
    query = layer.query(where="1=1", out_fields="ts", order_by_fields="ts DESC")
    if not query.features: #No features
        return None 
    latest_timestamp = query.features[0].attributes["ts"]
    latest_datetime = datetime.fromtimestamp(latest_timestamp / 1000, tz= TZINFO) #Need to divide on Windows
    return latest_datetime


def get_latest_ts_from_points_layer(layer = realtime_timeseries_points):
    # Making sid = 1 to imporve performance in not having to sort through all the data - may lead to slight time errors
    query = layer.query(where="sid='1'", out_fields="ts", order_by_fields="ts DESC")
    if not query.features: #No features
        return None 
    latest_timestamp = query.features[0].attributes["ts"]
    latest_datetime = datetime.fromtimestamp(latest_timestamp / 1000, tz= TZINFO) #Need to divide on Windows
    return latest_datetime

def get_hours_before(end_datetime):
    #If HOURS_TO_UPDATE is not specified - we grab the latest point from the layer
    if HOURS_TO_UPDATE and 1 <= HOURS_TO_UPDATE <= NUM_OF_HOURS_ON_VIEW:
        return HOURS_TO_UPDATE
    latest_point = get_latest_ts_from_points_layer()
    if not latest_point:
        return NUM_OF_HOURS_ON_VIEW
    time_difference = end_datetime - latest_point
    hours_difference = (time_difference.seconds * 60 * 60)
    if hours_difference > NUM_OF_HOURS_ON_VIEW:
        return NUM_OF_HOURS_ON_VIEW
    return hours_difference

def get_points_df_from_3rww_api(end_datetime):
    request_url = urljoin(RAINFALL_DATA_API_URL, "rainfall/v3/realtime/ago/")
    hours_ago = get_hours_before(end_datetime)
    response = requests.get(
        url = request_url,
        params = {"hours": hours_ago}
    )
    if response.status_code !=200:
        return {"success": False, "error_message": f"Failed with status code {response.status_code}"}
                
    response_json = response.json()
    points_df = pd.DataFrame(response_json["data"], columns=response_json["columns"] )
    points_df = points_df[points_df["val"] > 0] #Only need values where rain occurred (otherwise wouldn't appear in the map)
    points_df["SHAPE"] = points_df.apply(lambda r: {"x": r["x"], "y": r["y"], "spatialReference" : {"wkid": 4326}}, axis=1)
    points_df.drop(columns=["x", "y"])

    return {"success": True, "points_df": points_df}

def add_points_to_AGOL_realtime_timeseries_layer(layer = realtime_timeseries_points, df_to_add = None):
    all_added = []
    for i in tqdm(range(0, len(df_to_add.index), CHUNK_SIZE)):
                #print(f"adding {i} to {i+chunks}")
        adds = (df_to_add.iloc[i:i+CHUNK_SIZE,]).copy()
        add_results = layer.edit_features(
            adds=adds.spatial.to_featureset(), 
            rollback_on_failure=True
        )
        added = [x for x in add_results.get('addResults', []) if x.get('success')]
        if added:
            all_added.extend(added)
        else:
            print(f"Error adding records @ batch {i+CHUNK_SIZE}")
            print(add_results)
            print(all_added)

def get_object_ids_to_remove(delete_datetime):
    query_string = f"ts < '{delete_datetime}'"
    delete_points_query = realtime_timeseries_points.query(where = query_string, return_ids_only=True)
    delete_points_objectids_list = [str(objectId) for objectId in delete_points_query["objectIds"]]
    return delete_points_objectids_list
    
def delete_points_from_AGOL_realtime_timeseries_layer(layer = realtime_timeseries_points, delete_points_objectids_list = []):
    all_removed = []
    for i in tqdm(range(0, len(delete_points_objectids_list), CHUNK_SIZE)):
        delete_results = layer.edit_features(
            deletes = delete_points_objectids_list[i:i+CHUNK_SIZE],
            rollback_on_failure = True,
        )
        removed = [x for x in delete_results.get('deleteResults', []) if x.get('success')]
        if removed:
            all_removed.extend(removed)
        else:
            print(f"Error removing records @ batch {i+CHUNK_SIZE}")
            print(delete_results)
            print(all_removed)

In [31]:
def add_and_delete_points_on_AGOL_realtime_timeseries_layer():
    
    end_datetime = datetime.now(tz=TZINFO)
    delete_datetime = end_datetime-timedelta(hours=NUM_OF_HOURS_ON_VIEW)
    print(f"Grabbing object ids to delete after {delete_datetime}")
    delete_object_ids = get_object_ids_to_remove(delete_datetime)

    print(f"Deleting {len(delete_object_ids)} from layer")

    delete_points_from_AGOL_realtime_timeseries_layer(delete_points_objectids_list=delete_object_ids)

    print(f"Getting recent points from 3rww api")

    status = get_points_df_from_3rww_api(end_datetime)

    if not status["success"]:
        print(status["error_message"])
        return None
    points_df = status["points_df"]
    print(f"Adding {len(points_df)} to layer")
    
    add_points_to_AGOL_realtime_timeseries_layer(df_to_add=points_df)

In [32]:
add_and_delete_points_on_AGOL_realtime_timeseries_layer()

Grabbing object ids to delete after 2025-02-16 19:37:47.125945+00:00
Deleting 76548 from layer


100%|██████████| 1914/1914 [05:31<00:00,  5.78it/s]


Getting recent points from 3rww api
Adding 37718 to layer


  0%|          | 0/943 [00:00<?, ?it/s]


KeyError: None

In [40]:
end_datetime = datetime.now(tz=TZINFO)
status = get_points_df_from_3rww_api(end_datetime)
points_df = status["points_df"]

In [41]:
a = (points_df.iloc[0:CHUNK_SIZE,]).copy()
a.spatial.to_featureset()

<FeatureSet> 40 features