# Download and cache data for Delta region
Development notebook. 

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import pandas as pd
import hvplot.pandas

In [None]:
from cdec_maps import cdec

In [None]:
c = cdec.Reader()

In [None]:
daily_stations = c.read_daily_stations()

In [None]:
realtime_stations = c.read_realtime_stations()

In [None]:
all_stations = daily_stations.merge(realtime_stations, how='outer')
all_stations

In [None]:
#hvplot.help(kind='points')

In [None]:
all_stations.hvplot.points('Longitude',
                           'Latitude', hover_cols='all',
                           geo=True, tiles='OSM')
#crs='+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs',
#crs='+proj=merc +a=6378137 +b=6378137 +lat_ts=0.0 +lon_0=0.0 +x_0=0.0 +y_0=0 +k=1.0 +units=m +nadgrids=@null +wktext  +no_defs',
# xlim=[-123,-121], ylim=[37,39])
# ,ylim=(37,39))

In [None]:
import geopandas as gpd
all_stations = gpd.GeoDataFrame(
    all_stations, geometry=gpd.points_from_xy(all_stations.Longitude, all_stations.Latitude))

In [None]:
delta_boundary = gpd.read_file('./Delta_Simplified.geojson')

In [None]:
all_stations = all_stations.set_crs(epsg=4326)

In [None]:
all_stations_delta = all_stations[all_stations.within(
    delta_boundary.geometry[0])]

In [None]:
all_stations_delta.hvplot(geo=True, tiles='OSM',
                          frame_width=400, hover_cols='all')

In [None]:
all_stations_delta

In [None]:
delta_boundary.bounds

In [None]:
from shapely.geometry import Polygon, LineString, Point
delta_region = Polygon(
    [(-122.5, 39.5), (-120.5, 39.5), (-120.5, 37), (-122.5, 37)])

In [None]:
all_stations_delta_region = all_stations[all_stations.within(delta_region)]

In [None]:
all_stations_delta_region.hvplot(
    geo=True, tiles='OSM', frame_width=400, hover_cols='all')

In [None]:
all_stations_delta_region

In [None]:
import dask

from dask.distributed import Client
client = Client()  # start distributed scheduler locally.  Launch dashboard localhost:8787

In [None]:
delta_boundary_ids = list(all_stations_delta.ID.sort_values())
print(len(delta_boundary_ids))

In [None]:
###
import dask
@dask.delayed
def read_station_data_for_sensor_row(station_id, df_sensor, row_index):
    r = cdec.Reader()
    sensor_row = df_sensor.iloc[row_index]
    sensor_number = sensor_row['Sensor Number']
    duration = sensor_row['Duration']
    sdate, edate = tuple([s.strip()
                         for s in sensor_row['Data Available'].split('to')])
    print(f'Reading {station_id}_{sensor_number}_{cdec.get_duration_code(duration)}')
    df = r.read_station_data(station_id, sensor_number,
                             cdec.get_duration_code(duration),
                             cdec.to_date_format(sdate), cdec.to_date_format(edate))
    return df
##


def cache_station_data(station_id):
    r = cdec.Reader()
    dflist = r.read_station_meta_info(station_id)
    df_sensor = dflist[1]
    #display(df_sensor)
    delayed_tasks=[read_station_data_for_sensor_row(station_id, df_sensor, row_index) for row_index in df_sensor.index]
    return delayed_tasks

In [None]:
# Took about 1 h and 15 m to run last time 2021-11-02
# Set REFRESH_CACHE to True to rerun
# Now using dask per station sensor list 
# Took about 23 minutes to run
# Now doing dask with all stations and all sensors
# Took about 10 minutes to run
REFRESH_CACHE=True
if REFRESH_CACHE:
    tasks = [cache_station_data(id) for id in delta_boundary_ids]

In [None]:
print(f'Submitting all {len(tasks)} via dask')
_=dask.compute(*tasks)