In [7]:
import pandas as pd
import lithops
from dataretrieval import wqp
import dask.dataframe

In [2]:
site_list = pd.read_csv('NWQN_sites.csv', comment='#', dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str})
site_list

Unnamed: 0,SITE_ABB,SITE_QW_ID,SITE_QW_NAME,DA,QW_LAT,QW_LONG,SITE_FLOW_ID,SITE_FLOW_NAME,FLOW_LAT,FLOW_LONG,SITE_TYPE
0,GILE,01054200,"Wild River at Gilead, Maine",69.6,44.390556,-70.979722,01054200,"Wild River at Gilead, Maine",44.390556,-70.979722,Reference
1,COLR,01170100,"Green River near Colrain, MA",41.4,42.703417,-72.670647,01170100,"Green River near Colrain, MA",42.703417,-72.670647,Reference
2,THOM,01184000,"Connecticut River at Thompsonville, CT",9660.0,41.987319,-72.605367,01184000,"Connecticut River at Thompsonville, CT",41.987319,-72.605367,Coastal Rivers
3,WINN,01209710,"Norwalk River at Winnipauk, CT",33.0,41.135374,-73.426231,01209700,"Norwalk River a T South Wilton, CT",41.163767,-73.419544,Urban
4,CANA,01349150,"Canajoharie Creek near Canajoharie, NY",59.7,42.876111,-74.603056,01349150,"Canajoharie Creek near Canajoharie, NY",42.876111,-74.603056,Agriculture
...,...,...,...,...,...,...,...,...,...,...,...
141,HONO,16618000,"Kahakuloa Stream near Honokohau, Maui, HI",3.4,20.978694,-156.554500,16618000,"Kahakuloa Stream near Honokohau, Maui, HI",20.978694,-156.554500,Reference
142,VICK,322023090544500,"Mississippi River above Vicksburg, MS",1131100.0,32.339722,-90.912500,COMPUTED,Difference of streamflow at 07289000 (Miss R a...,32.339722,-90.912500,Large Inland Rivers
143,MILK,394220106431500,"EAGLE RIVER BELOW MILK CREEK NEAR WOLCOTT, CO",600.0,39.705000,-106.725833,394220106431500,"EAGLE RIVER BELOW MILK CREEK NEAR WOLCOTT, CO",39.705000,-106.725833,WSC Site
144,NEWP,394340085524601,"Sugar Creek at New Palestine, IN",92.6,39.727821,-85.879425,03361650,"Sugar Creek at New Palestine, IN",39.714211,-85.885536,Agriculture


In [23]:
sites = site_list['SITE_QW_ID'].to_list()[0:8]

In [26]:
def map_references(site):
    """ Map function to open virtual datasets.
    """
    nawqa, _ = wqp.get_results(siteid=f'USGS-{site}', 
                               # characteristicName='Phosphorus',
                               project="National Water Quality Assessment Program (NAWQA)")

    # Need this in case we get no results from our request
    if len(nawqa) != 0: 
        nawqa.astype(str).to_parquet('nawqa.parquet', engine='pyarrow', partition_cols=['MonitoringLocationIdentifier'])
        
        return nawqa


# def reduce_references(results):
#     """ Reduce to concat virtual datasets.

#     """
#     combined_vds = xr.combine_nested(
#         results,
#         concat_dim=["Time"],
#         coords="minimal",
#         compat="override",
#     )
#     return combined_vds


fexec = lithops.LocalhostExecutor()

# futures = fexec.map_reduce(
#     map_references,
#     file_pattern,
#     reduce_references,
#     spawn_reducer=100,
# )

futures = fexec.map(
    map_references,
    sites,
)

# futures
ds = futures.get_result()

2024-08-01 15:47:40,226 [INFO] config.py:139 -- Lithops v3.4.1 - Python3.12
2024-08-01 15:47:40,229 [INFO] localhost.py:39 -- Localhost storage client created
2024-08-01 15:47:40,230 [INFO] localhost.py:78 -- Localhost compute v2 client created
2024-08-01 15:47:41,541 [INFO] invokers.py:107 -- ExecutorID 87abcd-3 | JobID M000 - Selected Runtime: python 
2024-08-01 15:47:41,550 [INFO] invokers.py:174 -- ExecutorID 87abcd-3 | JobID M000 - Starting function invocation: map_references() - Total: 8 activations
2024-08-01 15:47:41,577 [INFO] invokers.py:213 -- ExecutorID 87abcd-3 | JobID M000 - View execution logs at /tmp/lithops-kdoore/logs/87abcd-3-M000.log
2024-08-01 15:47:41,588 [INFO] executors.py:491 -- ExecutorID 87abcd-3 - Getting results from 8 function activations
2024-08-01 15:47:41,591 [INFO] wait.py:101 -- ExecutorID 87abcd-3 - Waiting for 8 function activations to complete
2024-08-01 15:48:08,306 [INFO] executors.py:615 -- ExecutorID 87abcd-3 - Cleaning temporary data


In [28]:
x = pd.read_parquet('nawqa.parquet', engine='pyarrow')
x['MonitoringLocationIdentifier'].unique()

['USGS-01054200', 'USGS-01170100', 'USGS-01184000', 'USGS-01209710', 'USGS-01349150', 'USGS-01372043', 'USGS-01391500']
Categories (7, object): ['USGS-01054200', 'USGS-01170100', 'USGS-01184000', 'USGS-01209710', 'USGS-01349150', 'USGS-01372043', 'USGS-01391500']

In [28]:
x = dask.dataframe.read_parquet('nawqa.parquet', engine='pyarrow')
x['MonitoringLocationIdentifier'].compute().unique()

['USGS-01054200', 'USGS-01170100', 'USGS-01184000', 'USGS-01209710', 'USGS-01349150', 'USGS-01372043', 'USGS-01391500']
Categories (7, object): ['USGS-01054200', 'USGS-01170100', 'USGS-01184000', 'USGS-01209710', 'USGS-01349150', 'USGS-01372043', 'USGS-01391500']