# Precipitation at All CROCUS Sites

In [1]:
from IPython.display import IFrame
import pandas as pd
import dask
import sage_data_client
import holoviews as hv
import hvplot.pandas
from metpy.units import units
import metpy.calc as mpcalc
from bokeh.models import DatetimeTickFormatter
import sage_data_client

hv.extension("bokeh")

def apply_formatter(plot, element):
    plot.handles['xaxis'].formatter = DatetimeTickFormatter(hours='%m/%d/%Y \n %l:%M %p',
                                                            minutes='%m/%d/%Y \n %l:%M %p',
                                                            hourmin='%m/%d/%Y \n %l:%M %p',
                                                            days='%m/%d/%Y \n %l:%M %p',
                                                            months='%m/%d/%Y \n %l:%M %p')

def add_local_time(df):
    df["Local_Time"] = pd.to_datetime(df.index,unit='s', utc=True)\
                          .map(lambda x: x.tz_convert('America/Chicago'))
    return df

def search_convert_to_inch(site):

    subset = sage_data_client.query(
                start="-1d",
                filter={
                    "plugin": "registry.sagecontinuum.org/jrobrien/waggle-wxt536:0.24.11.14.*",
                    "vsn": "W096",
                    "name": "wxt.rain.accumulation"
                }
            ).set_index("timestamp")

    subset["value"] = (subset.value.values * units.mm).to("in")

    if len(subset) > 0:
        subset["value"] = subset.value - subset.value.values[0]
    else:
        pass
    return subset.value

import warnings
warnings.filterwarnings('ignore')

In [3]:
# Mapping of keys to sensor codes
site_mapping = {
    "ATMOS": "W0A4",
    "HUM": "W0A1",
    "BIG": "W0A0",
    "SHEDD": "W09E",
    "DOWN": "W09D",
    "NU": "W099",
    "UIC": "W096",
    "VLPK": "W095",
    "CSU": "W08E",
    "NEIU": "W08D",
    "CCICS": "W08B",
}

# Wrap each function call in a delayed task
delayed_tasks = {key: dask.delayed(search_convert_to_inch)(code)
                 for key, code in site_mapping.items()}

# Compute all in parallel
results = dask.compute(*delayed_tasks.values())

# Create DataFrame
merged = pd.DataFrame({key: result for key, result in zip(delayed_tasks.keys(), results)})

CPU times: user 8min 25s, sys: 1min 23s, total: 9min 48s
Wall time: 9min 49s


In [4]:
# Deal with negavtive values
for column in merged.columns:
    if merged[column].min() < 0:
        merged = merged.drop(columns=[column])

# Resample to 1 min frequency
resampled = merged.resample("1min").mean()

# Add the local time
resampled = add_local_time(resampled)

resampled.hvplot(title= "12 Hour Precipitation Accumulation at \n CROCUS Sites [inches]",
                 ylabel="Precipitation Accumulation [inches]",
                 xlabel="Local Time (Central/US)",
                 x="Local_Time").opts(hooks=[apply_formatter])