## Using the EWXPWSDB Collector Class

# <span style="color:red">clear all output before saving: db output contains passwords! </span>

this walks through process of

- creating a temporary DB
- using the collector class for existing station records to
    - get past data 
    - pull data for short period, e.g. from a scheduler
    - get all recent data, aka catch up data from last record


In [1]:
%load_ext autoreload
%autoreload 2

## Constants/Config

In [2]:

station_file = '../data/test_stations.tsv'
station_type = 'DAVIS'
station_code = 'EWXDAVIS01' 

In [3]:

from ewxpwsdb.db.database import Session, init_db, get_db_url, get_engine
from ewxpwsdb.db.models import WeatherStation, Reading, StationType, APIResponse
from ewxpwsdb.db.importdata import import_station_file
from ewxpwsdb.collector import Collector
from ewxpwsdb.time_intervals import UTCInterval
from sqlmodel import select

## create engine temp database

In [4]:
from ewxpwsdb.db.database import create_temp_pg_engine, get_db_url, init_db, Session
from ewxpwsdb.db.models import WeatherStation # , Reading, StationType, APIResponse

engine = create_temp_pg_engine(get_db_url(), name_prefix = 'notebook_testing')

temp_db_url = engine.url
print(temp_db_url.database)
init_db(engine,station_file)

notebook_testing_v0ojm4lmvk
Station with type 'DAVIS' merged into the database
Station with type 'LOCOMOS' merged into the database
Station with type 'ONSET' merged into the database
Station with type 'RAINWISE' merged into the database
Station with type 'SPECTRUM' merged into the database
Station with type 'ZENTRA' merged into the database


In [5]:
def get_one_station(station_type, station_code = None, engine=engine):
    """ global engine is default"""

    if station_code:
        statement = select(WeatherStation).where(WeatherStation.station_code == station_code)
    else:
        statement = select(WeatherStation).where(WeatherStation.station_type == station_type)

    with Session(engine) as session:
        results = session.exec(statement)
        weather_station = results.first()

    return weather_station

In [6]:
station = get_one_station(station_type, station_code, engine)
print(station.station_code)


EWXDAVIS01


In [7]:
collector = Collector(station, engine)
collector.weather_api

<ewxpwsdb.weather_apis.davis_api.DavisAPI at 0x1318d26d0>

In [8]:
collector._session.commit()

In [9]:
from datetime import timedelta
from ewxpwsdb.time_intervals import UTCInterval
duration_min = 70
viable_interval = UTCInterval.previous_interval(delta_mins=duration_min)
yesterday = UTCInterval(start=viable_interval.start - timedelta(days = 1), 
                           end = viable_interval.end - timedelta(days = 1)
                           )
yesterday

UTCInterval(start=datetime.datetime(2024, 6, 30, 10, 20, tzinfo=datetime.timezone.utc), end=datetime.datetime(2024, 6, 30, 11, 30, tzinfo=datetime.timezone.utc))

In [10]:
from datetime import timedelta
from ewxpwsdb.time_intervals import previous_fourteen_minute_interval

interval = previous_fourteen_minute_interval()

interval.start = interval.start - timedelta(hours = 1)
interval.end = interval.end - timedelta(hours = 0.5)
interval

UTCInterval(start=datetime.datetime(2024, 7, 1, 10, 16, tzinfo=datetime.timezone.utc), end=datetime.datetime(2024, 7, 1, 11, 0, tzinfo=datetime.timezone.utc))

In [11]:
today_interval = UTCInterval.one_day_interval()  # this defaults to getting the time range from midnight to now
two_day_interval = UTCInterval(start = (today_interval.start - timedelta(days = 1)), end = today_interval.end)
two_day_interval

UTCInterval(start=datetime.datetime(2024, 6, 30, 0, 0, tzinfo=datetime.timezone.utc), end=datetime.datetime(2024, 7, 1, 11, 30, tzinfo=datetime.timezone.utc))

In [12]:
from datetime import datetime, timezone
today_utc = datetime.now(timezone.utc).date()

collector.request_and_store_weather_data_utc(UTCInterval.one_day_interval(d = today_utc- timedelta(days = 1)))

[1]

In [13]:
collector.get_readings(1)

[Reading(apiresponse_id=1, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='51f0eaef-ed51-4733-a8ce-76d6d93d2169', pcpn=0.0, wspd=6.0, weatherstation_id=2, relh=54.0, wspd_max=12.0, rpet=None, atmp=19.78, smst=27.97621, atmp_min=19.72, stmp=22.63, data_datetime=datetime.datetime(2024, 7, 1, 3, 55, tzinfo=datetime.timezone.utc), atmp_max=19.78, srad=187.0, id=287, dwpt=10.2)]

In [14]:
# call the method to see if it's working
somerex = collector.request_and_store_weather_data_utc(interval)
somerex

[2]

In [15]:
# are api response ids being saved in the object?

collector.current_api_response_record_ids

[2]

In [16]:
# to re-fill this sqlalchemy record cache, just ask for some piece of the data
print(collector.current_api_response.id)
# now the object cache is refilled and should be present
collector.current_api_response

2


APIResponse(response_text='{"station_id_uuid":"ed88c104-36a4-4244-ad3f-7f048dfa5a50","sensors":[{"lsid":434004,"data":[{"noise_floor_rssi":-109,"port_energy_consumed_4":0,"port_energy_consumed_3":0,"port_energy_consumed_2":0,"bluetooth_firmware_version":null,"beacon_interval":2097,"port_energy_consumed_1":0,"tz_offset":-14400,"door_switch_status":0,"link_layer_packets_received":155,"solar_panel_voltage":1204,"rank":512,"health_version":1,"false_wakeup_rssi":-52,"inside_box_temp":46.846924,"dcell_battery_voltage":5747,"power_percentage_tx":0,"last_rx_rssi":-33,"power_percentage_mcu":6,"application_firmware_version":1620422234,"false_wakeup_count":440,"etx":256,"rpl_mode":0,"uptime":76006125,"number_of_neighbors":1,"last_parent_rtt_ping":0,"li_ion_cell_voltage":3955,"platform_id":3,"power_percentage_rx":2,"rpl_parent_node_id":8426185,"ts":1719829800},{"noise_floor_rssi":-85,"port_energy_consumed_4":0,"port_energy_consumed_3":0,"port_energy_consumed_2":0,"bluetooth_firmware_version":null,

In [17]:
# show the transformed readings, if any
collector.current_readings

[Reading(apiresponse_id=2, station_sampling_interval=5, lws=1.0, wdir=0.0, request_id='8a7abe81-815d-4958-93db-8127aa69ebf7', pcpn=0.0, wspd=0.0, weatherstation_id=2, relh=90.0, wspd_max=1.0, rpet=None, atmp=9.28, smst=27.1208, atmp_min=9.22, stmp=17.36, data_datetime=datetime.datetime(2024, 7, 1, 14, 20, tzinfo=datetime.timezone.utc), atmp_max=9.33, srad=10.0, id=288, dwpt=7.72),
 Reading(apiresponse_id=2, station_sampling_interval=5, lws=1.0, wdir=None, request_id='8a7abe81-815d-4958-93db-8127aa69ebf7', pcpn=0.0, wspd=0.0, weatherstation_id=2, relh=90.0, wspd_max=0.0, rpet=None, atmp=9.28, smst=27.12646, atmp_min=9.22, stmp=17.33, data_datetime=datetime.datetime(2024, 7, 1, 14, 25, tzinfo=datetime.timezone.utc), atmp_max=9.33, srad=11.0, id=289, dwpt=7.72),
 Reading(apiresponse_id=2, station_sampling_interval=5, lws=1.0, wdir=0.0, request_id='8a7abe81-815d-4958-93db-8127aa69ebf7', pcpn=0.0, wspd=0.0, weatherstation_id=2, relh=90.0, wspd_max=1.0, rpet=None, atmp=9.28, smst=27.09821, a

In [18]:
# again, demonstrate that the readings were stored in the database by checking the ID field
collector.current_readings[0].id


288

## Test/Demo getting a full day of readings

In [19]:
viable_interval = UTCInterval.previous_fifteen_minutes()
collector.request_and_store_weather_data_utc(viable_interval)


[3]

In [20]:
yesterday = UTCInterval(start=viable_interval.start - timedelta(days = 1),
                             end = viable_interval.end - timedelta(days = 1)
                               )
print(yesterday)
collector._session.rollback()

response_ids = collector.request_and_store_weather_data_utc(yesterday)
print("response ids:")
print(response_ids)
print("reading ids:")
print(collector.current_reading_ids)

start=datetime.datetime(2024, 6, 30, 11, 15, tzinfo=datetime.timezone.utc) end=datetime.datetime(2024, 6, 30, 11, 30, tzinfo=datetime.timezone.utc)
response ids:
[4]
reading ids:
[136, 137, 138]


In [21]:
## there _should_ be readings from the same interval in there now     

readings = collector.get_readings_by_date(yesterday)

In [22]:
print(readings)

[Reading(apiresponse_id=1, station_sampling_interval=5, lws=0.0, wdir=15.0, request_id='51f0eaef-ed51-4733-a8ce-76d6d93d2169', pcpn=0.0, wspd=7.0, weatherstation_id=2, relh=80.0, wspd_max=13.0, rpet=None, atmp=19.72, smst=29.98606, atmp_min=19.61, stmp=21.86, data_datetime=datetime.datetime(2024, 6, 30, 11, 15, tzinfo=datetime.timezone.utc), atmp_max=19.94, srad=0.0, id=87, dwpt=16.17), Reading(apiresponse_id=1, station_sampling_interval=5, lws=0.0, wdir=15.0, request_id='51f0eaef-ed51-4733-a8ce-76d6d93d2169', pcpn=0.0, wspd=8.0, weatherstation_id=2, relh=81.0, wspd_max=15.0, rpet=None, atmp=19.5, smst=29.95008, atmp_min=19.33, stmp=21.75, data_datetime=datetime.datetime(2024, 6, 30, 11, 20, tzinfo=datetime.timezone.utc), atmp_max=19.61, srad=0.0, id=88, dwpt=16.15), Reading(apiresponse_id=1, station_sampling_interval=5, lws=0.0, wdir=14.0, request_id='51f0eaef-ed51-4733-a8ce-76d6d93d2169', pcpn=0.0, wspd=7.0, weatherstation_id=2, relh=82.0, wspd_max=11.0, rpet=None, atmp=19.22, smst=2

## Test/demo of restricting data inserts

This used to throw an exception when saving readings with the same timestamp and station. 
however, even though this is a unique constraint on these columns, the collector code checks for that, and simply 
updates the record.  This is known as an 'upsert' but it doesn't using the Postgresql + SQLAlchemy upsert but rather some custom code. 

In [23]:

from sqlalchemy.exc import IntegrityError

try:
    something = collector.save_readings_from_responses(api_responses = collector.current_api_response)
except IntegrityError as e:
    collector._session.rollback()
    print("integrity error prevented duplicate records from being inserted")

# what happens to the current readings? 
collector.current_readings

[Reading(apiresponse_id=4, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='18a3855a-90cf-4e88-ba95-b388a8272424', pcpn=0.0, wspd=9.0, weatherstation_id=2, relh=79.0, wspd_max=17.0, rpet=None, atmp=13.78, smst=29.69299, atmp_min=13.72, stmp=20.18, data_datetime=datetime.datetime(2024, 6, 30, 15, 20, tzinfo=datetime.timezone.utc), atmp_max=13.83, srad=47.0, id=136, dwpt=10.2),
 Reading(apiresponse_id=4, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='18a3855a-90cf-4e88-ba95-b388a8272424', pcpn=0.0, wspd=10.0, weatherstation_id=2, relh=78.0, wspd_max=15.0, rpet=None, atmp=13.67, smst=29.70491, atmp_min=13.61, stmp=20.18, data_datetime=datetime.datetime(2024, 6, 30, 15, 25, tzinfo=datetime.timezone.utc), atmp_max=13.72, srad=56.0, id=137, dwpt=9.9),
 Reading(apiresponse_id=4, station_sampling_interval=5, lws=1.0, wdir=0.0, request_id='18a3855a-90cf-4e88-ba95-b388a8272424', pcpn=0.0, wspd=8.0, weatherstation_id=2, relh=79.0, wspd_max=16.0, rpet=None, atmp=13.56, smst

In [24]:
# ensure the rollback worked

try:
    something = collector.save_readings_from_responses(api_responses = collector.current_api_response)
except IntegrityError as e:
    collector._session.rollback()
    print("didn't get the rollback error")

    

In [25]:
# do we still have radings after a rollback?
readings = collector.get_readings(n=5)
readings


[Reading(apiresponse_id=3, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='19f502f9-7d99-4152-a024-46a6ef04e635', pcpn=0.0, wspd=1.0, weatherstation_id=2, relh=84.0, wspd_max=3.0, rpet=None, atmp=11.56, smst=27.08127, atmp_min=11.44, stmp=16.98, data_datetime=datetime.datetime(2024, 7, 1, 15, 30, tzinfo=datetime.timezone.utc), atmp_max=11.67, srad=109.0, id=299, dwpt=8.94),
 Reading(apiresponse_id=3, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='19f502f9-7d99-4152-a024-46a6ef04e635', pcpn=0.0, wspd=1.0, weatherstation_id=2, relh=85.0, wspd_max=2.0, rpet=None, atmp=11.33, smst=27.08691, atmp_min=11.17, stmp=17.05, data_datetime=datetime.datetime(2024, 7, 1, 15, 25, tzinfo=datetime.timezone.utc), atmp_max=11.44, srad=89.0, id=298, dwpt=8.9),
 Reading(apiresponse_id=3, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='19f502f9-7d99-4152-a024-46a6ef04e635', pcpn=0.0, wspd=1.0, weatherstation_id=2, relh=86.0, wspd_max=2.0, rpet=None, atmp=11.11, smst=27.0

## Test of Backfill process

this will look for gaps and fill them 

start with some disconnected time intervals

In [26]:
# start by adding data to create gaps.  This assumes the station has been online for a while

few_days_ago = UTCInterval(start = yesterday.start - timedelta(days = 3), end = yesterday.end - timedelta(days = 3))
print(few_days_ago)
more_days_ago = UTCInterval(start = few_days_ago.start - timedelta(days = 2), end = few_days_ago.end - timedelta(days = 2))
print(more_days_ago)

start=datetime.datetime(2024, 6, 27, 11, 15, tzinfo=datetime.timezone.utc) end=datetime.datetime(2024, 6, 27, 11, 30, tzinfo=datetime.timezone.utc)
start=datetime.datetime(2024, 6, 25, 11, 15, tzinfo=datetime.timezone.utc) end=datetime.datetime(2024, 6, 25, 11, 30, tzinfo=datetime.timezone.utc)


In [27]:
print(f"getting data for interval: {more_days_ago}")
print(collector.request_and_store_weather_data_utc(more_days_ago))
print("requests")

print(f"getting data for interval: {few_days_ago}")
print(collector.request_and_store_weather_data_utc(few_days_ago))
print("requests")




getting data for interval: start=datetime.datetime(2024, 6, 25, 11, 15, tzinfo=datetime.timezone.utc) end=datetime.datetime(2024, 6, 25, 11, 30, tzinfo=datetime.timezone.utc)
[5]
requests
getting data for interval: start=datetime.datetime(2024, 6, 27, 11, 15, tzinfo=datetime.timezone.utc) end=datetime.datetime(2024, 6, 27, 11, 30, tzinfo=datetime.timezone.utc)
[6]
requests


*now try to get a backfill*

In [28]:
from datetime import UTC

days_from_now = (datetime.now(UTC) - more_days_ago.start).days

print(f"initiating backfill for {days_from_now} days")
readings_added = collector.backfill(n_days_prior=days_from_now)

print(f"{len(readings_added)} readings added")
print(f"first reading element = {readings_added[0]}")





initiating backfill for 6 days

        SELECT DISTINCT
            CASE
            when gap_start =true and gap_end = true then missing_datetime
            when gap_start = false and gap_end = true then LAG(missing_datetime, 1) OVER ( ORDER BY missing_datetime ) 
            when gap_start = true and gap_end = false then missing_datetime
            when gap_start = false and gap_end = false then Null
            END start, 
            CASE
            when gap_start =true and gap_end = true then missing_datetime
            when gap_start = false and gap_end = true then missing_datetime
            when gap_start = true and gap_end = false then LEAD(missing_datetime, 1) OVER ( ORDER BY missing_datetime )
            when gap_start = false and gap_end = false then Null
            END end
            -- ,
            -- missing_datetime as actual_missing_datetime, gap_start, gap_end 
        FROM 
            ( SELECT 
                clock.tick as missing_datetime, 
              

double checking on how a collector needs to get records:  the time interval must not have start = end


In [29]:
from ewxpwsdb.station_readings import StationReadings
station_readings = StationReadings(collector.station, engine=engine)


In [30]:
known_interval = UTCInterval(start=datetime(2024, 6, 27, 0, 0, tzinfo=timezone.utc), end=datetime(2024, 6, 27, 20, 0, tzinfo=timezone.utc))
empty_interval = UTCInterval(start=datetime(2024, 6, 27, 15, 0, tzinfo=timezone.utc), end=datetime(2024, 6, 27, 15, 0, tzinfo=timezone.utc))
print(station_readings.readings_by_interval_utc(known_interval))
empty_interval.end = empty_interval.end + timedelta(minutes = 5)
x = collector.request_and_store_weather_data_utc(empty_interval)
print(x)

[Reading(apiresponse_id=6, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='5bd2665f-1650-449d-8564-68105b20acbb', pcpn=0.0, wspd=6.0, weatherstation_id=2, relh=86.0, wspd_max=10.0, rpet=None, atmp=13.06, smst=27.40987, atmp_min=13.06, stmp=19.34, data_datetime=datetime.datetime(2024, 6, 27, 15, 20, tzinfo=datetime.timezone.utc), atmp_max=13.11, srad=61.0, id=303, dwpt=10.77), Reading(apiresponse_id=6, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='5bd2665f-1650-449d-8564-68105b20acbb', pcpn=0.0, wspd=6.0, weatherstation_id=2, relh=86.0, wspd_max=10.0, rpet=None, atmp=13.17, smst=27.39851, atmp_min=13.11, stmp=19.34, data_datetime=datetime.datetime(2024, 6, 27, 15, 25, tzinfo=datetime.timezone.utc), atmp_max=13.17, srad=90.0, id=304, dwpt=10.88), Reading(apiresponse_id=6, station_sampling_interval=5, lws=0.0, wdir=0.0, request_id='5bd2665f-1650-449d-8564-68105b20acbb', pcpn=0.0, wspd=6.0, weatherstation_id=2, relh=85.0, wspd_max=10.0, rpet=None, atmp=13.22, smst

### Clean up

remove test database

In [None]:

from ewxpwsdb.db.database import drop_pg_db, list_pg_databases
from sqlalchemy.orm import close_all_sessions

if collector:
    collector._session.close()
    collector._engine.dispose()

close_all_sessions()




print(f"attempting to drop db {engine.url.database}")
result = drop_pg_db(engine.url.database, get_db_url())
    # engine)
print(result)
engine.dispose()

