Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performances in fetching a large datasets online #27

Closed
gmaze opened this issue Jun 23, 2020 · 7 comments · Fixed by #28
Closed

Improve performances in fetching a large datasets online #27

gmaze opened this issue Jun 23, 2020 · 7 comments · Fixed by #28
Assignees
Labels
internals Internal machinery performance

Comments

@gmaze
Copy link
Member

gmaze commented Jun 23, 2020

Using the erddap data source to fetch a large amount of data has poor performances (in terms of wall time).
Such kind of fetching is problematic because:

To check this out, we can use the example from #16:

from argopy import DataFetcher as ArgoDataFetcher
box = [-120., -85., -10, 10, 0, 500, '2019-01-01','2019-12-31']
ds = ArgoDataFetcher(src='erddap').region(box).to_xarray()

To fix this issue, and more importantly improve performances and reliability of large datasets fetching, I propose to test the following pattern:

In argopy.data_fetchers.ErddapArgoDataFetcher, implement a chunking procedure for http requests:

  • Given a box request: split the domain into chunks/sub-domains: in space or time, or may be both.
  • Given a wmo request: split the request into chunks, one each wmo.

Each chunk of the full request would be fetched using its own http request and chunk data would be gathered/concatenated as they come along from the server.

Chunking the request, i.e. creating a list of urls to fetch, would be done in at the fetcher level in argopy.data_fetchers.ErddapArgoDataFetcher.
Managing a pull of requests, would be done at the file system level in argopy.stores.httpstore.

I suspect zarr or fsspec can already manage an (asynchronous) pull of requests.
In zarr, we can read it is capable of "Read an array concurrently from multiple threads or processes.", so this looks very close to the pattern above. May be @rabernat can give us his impressions on this ?

@gmaze gmaze added performance internals Internal machinery labels Jun 23, 2020
@gmaze gmaze self-assigned this Jun 23, 2020
@gmaze
Copy link
Member Author

gmaze commented Jun 23, 2020

Here is way to test this that make the above large request to work !

1st, we need some machinery to split the box request in chunks:

import pandas as pd
import concurrent.futures
import requests
import time

def split_box(large_box, n=1, d='x'):
    """ Split a box domain in one direction in n equal chunks """
    if d == 'x':
        i_left, i_right = 0, 1
    if d == 'y':
        i_left, i_right = 2, 3
    if d == 'z':
        i_left, i_right = 4, 5
    if n == 1:
        return large_box
    else:
        n = n + 1
    boxes = []
    bins = np.linspace(large_box[i_left], large_box[i_right], n)
    for ii, left in enumerate(bins):
        if ii < len(bins)-1:
            right = bins[ii+1]
            if d == 'x':
                boxes.append([left, right, 
                              large_box[2], large_box[3], 
                              large_box[4], large_box[5], 
                              large_box[6], large_box[7]])
            elif d == 'y':
                boxes.append([large_box[0], large_box[1], 
                              left, right, 
                              large_box[4], large_box[5], 
                              large_box[6], large_box[7]])
            elif d == 'z':
                boxes.append([large_box[0], large_box[1], 
                              large_box[2], large_box[3],
                              left, right, 
                              large_box[6], large_box[7]])
    return boxes

def split_this_box(box, nx=1, ny=1, nz=1):
    box_list = []
    split_x = split_box(box, n=nx, d='x')
    for bx in split_x:
        split_y = split_box(bx, n=ny, d='y')
        for bxy in split_y:
            split_z = split_box(bxy, n=nz, d='z')
            for bxyz in split_z:
                box_list.append(bxyz)
    return box_list

Then we can split the large box and create a list of urls to fetch (I used random boundaries to avoid the server to cache my requests):

large_box = [np.random.randint(-122,-118), np.random.randint(-87,-83), -10, 10, 0, 500, '2019-01-01', '2020-01-01']

boxes = split_this_box(large_box, nx=4, ny=4, nz=2)
urls = []
for box in boxes:
    # Read the url to be requested:
    urls.append(ArgoDataFetcher().region(box).fetcher.url)

This gives about 32 urls.

Then we can implement asynchronous http requests this way (grabbed from stackoverflow)

results = []
CONNECTIONS = 100
TIMEOUT = 120
fs = httpstore(timeout=TIMEOUT)

def load_url(url, timeout):
    ans = fs.open_dataset(url)
    if isinstance(ans, xr.Dataset):
        return ans
    else:
        return None

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
            data = data.rename({'row': 'N_POINTS'})
        except Exception as exc:
            pass
        finally:
            results.append(data)

            print(str(len(results)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')

This took 33.27 s to run ! This is not amazing and way too long, but at least we can retrieve the data, which was not the case with a single request.

Finally, we can merge all chunk results in a single xarray dataset:

results = [r for r in results if r is not None]  # Only keep non-empty results
if len(results) > 0:
    # ds = xr.concat(results, dim='N_POINTS', data_vars='all', coords='all', compat='equals')
    ds = xr.concat(results, dim='N_POINTS', data_vars='all', coords='all', compat='override')
    ds['N_POINTS'] = np.arange(0, len(ds['N_POINTS']))  # Re-index to avoid duplicate values
    ds = ds.set_coords('N_POINTS')
else:
    raise ValueError("CAN'T FETCH ANY DATA !")
ds['N_POINTS']
<xarray.DataArray 'N_POINTS' (N_POINTS: 942696)>
array([     0,      1,      2, ..., 942693, 942694, 942695])
Coordinates:
  * N_POINTS  (N_POINTS) int64 0 1 2 3 4 ... 942691 942692 942693 942694 942695

So nearly 1 million of measurements.

@rabernat
Copy link
Contributor

I suspect zarr or fsspec can already manage an (asynchronous) pull of requests.
In zarr, we can read it is capable of "Read an array concurrently from multiple threads or processes.", so this looks very close to the pattern above.

Zarr supports concurrent reading, but it doesn't implement that part itself. Instead, it relies on Dask (or another parallel execution framework) for all parallelism.

Here you are basically trying to assemble an array from a set of chunks. I would stop messing around with threadpools and appending and instead just use Dask.array. I recommend constructing a Dask.array from a delayed function. See example at https://docs.dask.org/en/latest/array-creation.html#using-dask-delayed.

@gmaze
Copy link
Member Author

gmaze commented Jun 24, 2020

Thanks for your suggestion @rabernat !
So, if we retrieve by chunks more than one variable, should we try to work with https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.from_delayed in a similar way than for array creation ?

@rabernat
Copy link
Contributor

If your goal is to return an Xarray DataArray or Dataset, I would go with dask.array rather than dask.dataframe. You would write a generic function to assemble an array for a given variable and then call it in a loop over all your variables. These dask.arrays can be used directly in constructing an xarray Dataset, i.e.

def load_delayed(varname):
    # do stuff
    return dask.Array(...)

data_vars = {varname: load_delayed(varname) for varname in varnames}

ds = xr.Dataset{data_vars, ...)

@gmaze
Copy link
Member Author

gmaze commented Jun 24, 2020

I see, thanks ! I'll try to work on that design.

@rabernat
Copy link
Contributor

This took 33.27 s to run ! This is not amazing and way too long, but at least we can retrieve the data

If you're concerned about the performance of ERDAP, you might start considering creating a cloud-based mirror of of ARGO. This notebooks shows how cloud object storage can be thousands of times faster than opendap. We would be happy to help with this.

@gmaze
Copy link
Member Author

gmaze commented Jun 24, 2020

Yes, it is not clear at this point if it's the ERDDAP capabilities that are limiting here, but it's the 1st candidate.

Having a cloud-based version of Argo is an ongoing effort at the European level, and we're eagerly waiting for it ...

@gmaze gmaze added this to the Go from alpha to beta milestone Jul 1, 2020
@gmaze gmaze closed this as completed in #28 Oct 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
internals Internal machinery performance
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants