-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using Parquet for large references #293
Comments
Correct, atm., all the references for a variable are loaded on first access, and kept around. A purgeable cache would be very reasonable, as would be splitting each variable into reference chunks to allow even more partial loading. These are the principal benefits of @agoodm 's approach, which could/should be adopted. |
@rsignell-usgs since we are on the subject, you can give my approach a try with your test dataset. The code is currently in this gist at the moment and you can use it as follows (making sure you have the latest development version of fastparquet installed as well): import fsspec
import xarray as xr
# refs is original references JSON dict
make_parquet_store('refs_test', refs)
mapper = ParquetReferenceMapper('refs_test')
fs = fsspec.filesystem('reference', fo=mapper, remote_protocol='file')
ds = xr.open_zarr(fs.get_mapper('')) This uses an LRU cache along with splitting the references into row groups (of 1000 references by default) to help keep the in-memory footprint down. More details are provided here. |
Thanks for the snippet! Also interested to see how well it fares. Also interesting would be to see whether we can convert the individual files' JSON data too, and see an improvement in the memory footprint of the combine step - that would be a big win too. |
@agoodm, I'm trying -- I just need to find a place to run with more memory! |
Progress report: I created a large enough instance on our AWS JupyterHub to run the code, but I can't build the latest fastparquet from github/main because the JupyterHub doesn't have gcc. @martindurant says he can cut a dev release later today so that the build on conda-forge will trigger. |
@agoodm, Martin released the new version of fastparquet on conda-forge and I was able to run your test. It spent about 10 minutes generating the files for each variable, but then died trying to generate the file for the refs['refs']['crs/.zarray']
'{"chunks":[],"compressor":null,"dtype":"|S1","fill_value":"IA==","filters":null,"order":"C","shape":[],"zarr_format":2}' I was going to try just deleting the The full reproducible notebook I tried is here: https://nbviewer.org/gist/2233712db4679628d2d67769b9cc5937 I moved the references JSON file to an Open Storage Network pod, so it's available with no egress fees and no credentials needed! |
@rsignell-usgs Thank you so much for testing this! I have my own potential use-case which has a full reference set of similar size to the one you are using, so these results are of great importance to me too. It looks like my code wasn't taking into account the possibility for variables with zero chunk size, so I have updated the gist: (https://gist.github.com/agoodm/25d41ce0c47cd714271be66d0db0459d). I am wondering if this is even intended behavior when the original references file was generated since the value of Before you attempt to run this again, I have a few more tips / suggestions: First I just remembered that fastparquet doesn't have compression on by default, so when making the parquet files be sure to set the compression info, eg: make_parquet_store('refs_test', refs, compression='zstd') This reduces the size of the references on disk by a factor of 10 compared to the original JSON. Martin and I were discussing how a way we could possibly improve this further is using categorical data like the Next, I highly recommend adding a Finally since your data is on S3 you need to change your fsspec options: mapper = ParquetReferenceMapper('refs_test')
fs = fsspec.filesystem('reference', fo=mapper, remote_protocol='s3', remote_options=dict(anon=True)) With all of these changes I was able to load the dataset and it seems to be working reasonably well: In [6]: ds = xr.open_zarr(fs.get_mapper(''))
In [7]: %time ds.isel(time=0).ACCET.values
CPU times: user 5.8 s, sys: 1.05 s, total: 6.85 s
Wall time: 7.01 s
Out[7]:
array([[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0.01, 0. , 0. ],
...,
[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0. , 0. , 0. ]]) |
@agoodm , yep that worked just fine! |
@agoodm, I copied the zarr/parquet references to the Open Storage Network. Thinking that someone could just use the references from the Cloud without generating them locally. I'm trying to get this to work. Can you help? |
For National Water Model reanalysis 1km grid, Alex's method takes a bit longer to write the references, and the references are a bit bigger, but the access speeds are awesome from refs stored locally. I haven't been able to figure out how to access the refs for Alex's method from a bucket yet, so waiting on that...
|
That seems pretty conclusive :| So let's merge the two efforts:
It leaves two features out:
@agoodm , do you see any way to get those too? |
Regarding your first two points, I have just updated the reference mapper to work with references on any fsspec supported FS (https://gist.github.com/agoodm/25d41ce0c47cd714271be66d0db0459d): mapper = ParquetReferenceMapper(lazy_refs, fs=fs) With these changes I was able to run @rsignell-usgs's example loading the references from S3 though maybe this could be cleaned up a bit more. I'll go ahead and try to incorporate the additional space saving tricks from the Do you have some concrete examples (perhaps of some references files) that have these other two more advanced features? The latter one doesn't sound too difficult to support (if I am understanding it correctly, this would mean multiple lists of paths and byte ranges to the data files and/or b64 strings assigned to one zarr chunk key?). For the former, are you referring to this option in |
Tried reading from remote refs also: https://nbviewer.org/gist/067827019f247ddfa1f614eff17f8d0e |
The number of references to store per row group could be a tuneable knob to trade-off:
I do think that storing the data in typed columns rather than JSON blobs should be optimal, especially as it allows for categoricals on the URLs. I'll work on trying to combine the two approaches next week. The question of what to do with preffs-style multiple refs, I'm not sure. |
That sounds good to me. Would it be alright if I move the code I have implemented so far into a PR or two to track further progress? The code for making the parquet files would go here of course but I take it the mapper code should be in the main fsspec repo? I am also thinking it may be desirable to just keep it more invisible to end users and have the constructor to As for typed and categorical columns I should have some time today or tonight to give this a try myself, definitely curious to see how much of an improvement that will make in the compression ratio when applying that to Rich's reference set. I see #254 has an example of what you were mentioning as a use-case for per-column/chunk decoding. I also took a look at the preffs example and do see now why that might be tricky to adapt with the reference mapper as it is currently implemented. If the references are kept as JSON blobs this wouldn't be a problem since multiple references can just be included as a list but then the categorical trick won't work. Conversely the problem of using separate rows would break the current assumption the mapper makes of having one zarr chunk per row which eliminates the need to explicitly store key names makes it quick and easy to look up the correct row group and row numbers from the chunk key. I'll have to see if there is a more elegant solution I can come up with. |
Indeed, creating parquet should definitely live here, and so far the FS implementation has lived in fsspec. There is an argument that it is too niche and experimental and should also live here or in its own package...
It will make a bigger difference to in-memory size (because zstd should be doing a good job finding repeated strings, but they get expanded into independent objects on load). This is also true for integers, which are 8 bytes in memory as an array, but 28 bytes for a python int.
Yes indeed, once everything is working smoothly.
Yes, agree with all you say. Typed columns and categoricals should be a decent win; maybe we can have a choice of what format to use per row group, depending on whether we have/allow multi-refs or not. |
And also for allowing it to work with intake, right? |
You can bet on it |
@martindurant @rsignell-usgs I have just tested generating the parquet files with separate typed columns instead of raw json blobs, that change alone is cutting the file sizes nearly in half. I tried saving the paths as categoricals too but this instead made the compression much worse. So I think it should be fine to just wrap the paths up as categoricals upon loading the references. I'll incorporate these changes into the PRs later today. I have given the multiple refs question some more thought and am thinking perhaps we could just use columns = [
np.array(['path', 'path', 'offset', 'offset', 'size', 'size', 'raw', 'raw']),
np.array(['0', '1', '0', '1', '0', '1', '0', '1'])
]
df = pd.DataFrame([['a.dat', 'b.dat', 123, 12, 12, 17, None, None],
['b', None, 0, 0, 0, 0, b'data', None]],
columns=columns)
df
Out[68]:
path offset size raw
0 1 0 1 0 1 0 1
0 a.dat b.dat 123 12 12 17 None None
1 b None 0 0 0 0 b'data' None |
If you cut the size in half you will be at the same level as the @martindurant variable-refs-per-parquet approach! Nice! I'm hoping Martin will chime in on the multiple refs issue as I don't really understand it. |
I didn't immediately understand the multiref description there either - I'll come back to it. The use of pandas multiindex is probably a bad idea, though, because it neither maps clearly to parquet nor is at all performant. |
Apologies for any confusion on the description for my multirefs example. Basically the idea was to show the same dataframe from the preffs README and allow multiple references to be mapped to a single key by multiplying the number of columns equal to the maximum number of possible refs per key. I thought of a MultiIndex being one possible way to do that but if that is inefficient or poorly supported by parquet, perhaps just explicitly having separate columns for each possible value (eg, "path.0", "path.1")? The HDF-EOS file format has a convention like that for splitting up core metadata into multiple attributes. Leaving that aside, I have implemented the all the other necessary changes into both PRs including typed columns and wrapping the mapper directly inside the filesystem instantiation. I did a test run with Rich's NWM retrospective reference set and indeed found the file size was 449 MB. It seems to be working well: In [1]: import fsspec
In [2]: fs = fsspec.filesystem('reference', fo='refs_noaa', remote_protocol='s3', remote_options=dict(anon=True))
In [3]: import xarray as xr
In [4]: ds = xr.open_dataset(fs.get_mapper(''), engine='zarr')
In [5]: ds
Out[5]:
<xarray.Dataset>
Dimensions: (time: 116631, y: 3840, x: 4608, vis_nir: 2, soil_layers_stag: 4)
Coordinates:
* time (time) datetime64[ns] 1979-02-01T03:00:00 ... 2020-12-31T21:00:00
* x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06
* y (y) float64 -1.92e+06 -1.919e+06 ... 1.918e+06 1.919e+06
Dimensions without coordinates: vis_nir, soil_layers_stag
Data variables: (12/21)
ACCET (time, y, x) float64 ...
ACSNOM (time, y, x) float64 ...
ALBEDO (time, y, x) float64 ...
ALBSND (time, y, vis_nir, x) float64 ...
ALBSNI (time, y, vis_nir, x) float64 ...
COSZ (time, y, x) float64 ...
... ...
SNOWH (time, y, x) float64 ...
SOIL_M (time, y, soil_layers_stag, x) float64 ...
SOIL_W (time, y, soil_layers_stag, x) float64 ...
TRAD (time, y, x) float64 ...
UGDRNOFF (time, y, x) float64 ...
crs object ...
Attributes:
Conventions: CF-1.6
GDAL_DataType: Generic
TITLE: OUTPUT FROM WRF-Hydro v5.2.0-beta2
code_version: v5.2.0-beta2
model_configuration: retrospective
model_initialization_time: 1979-02-01_00:00:00
model_output_type: land
model_output_valid_time: 1979-02-01_03:00:00
model_total_valid_times: 472
proj4: +proj=lcc +units=m +a=6370000.0 +b=6370000.0 ...
In [6]: ds.isel(time=0).ACCET.compute()
Out[6]:
<xarray.DataArray 'ACCET' (y: 3840, x: 4608)>
array([[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0.01, 0. , 0. ],
...,
[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0. , 0. , 0. ],
[ nan, nan, nan, ..., 0. , 0. , 0. ]])
Coordinates:
time datetime64[ns] 1979-02-01T03:00:00
* x (x) float64 -2.303e+06 -2.302e+06 ... 2.303e+06 2.304e+06
* y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 ... 1.918e+06 1.919e+06
Attributes:
esri_pe_string: PROJCS["Lambert_Conformal_Conic",GEOGCS["GCS_Sphere",DAT...
grid_mapping: crs
long_name: Accumulated total ET
units: mm
valid_range: [-100000, 100000000] Other things to note:
Please go ahead and test the changes out and review when you have the chance. |
@agoodm thanks for continuing to push on this! I know @martindurant is super busy this week, but after this gets merged are you interested in helping write a blog post about this new capability? I started writing one about the original parquet-file-per-variable approach -- maybe we can just adapt it for this new approach? You could be the author... |
Absolutely, I would be happy to. Feel free to send me your current draft when you are ready and we can work from there. |
@agoodm sorry this slipped off my radar, for the draft blod post, please request access to this google doc: https://docs.google.com/document/d/1qSpI24kjXz15bRJ0Deqe0-F6eraJUtuHYrt-UfkjSBM/edit?usp=sharing |
@agoodm, also, is there an updated code (or notebook) for converting the massive JSON into the parquet/zarr reference files? |
I have not kept the original gist up to date, but the code is now implemented as https://github.com/fsspec/kerchunk/blob/d32224b74aa2627921af17b4b9b423bc9f3387f7/kerchunk/df.py Then just call it with
It should run a lot more quickly than before. |
@agoodm, how to proceed on the proposed blog post? Would you like me to take a stab at modifying the text to describe the new and improved best practice? |
Before you go ahead and do so I should note that we have changed the approach a little bit to now instead save multiple small parquet files for each variable rather than having one larger parquet file with multiple row groups in order to store the URLs categorically without blowing up the file size (see discussion here). The updated In the meantime I can go ahead and do a quick lookover of the text tonight or tomorrow and add some changes and comments. Sound good? |
This all sounds great! I asked because people are asking when the blog post will come out -- a bunch of folks will probably jump on this as soon as they understand it! I can hopefully get this done over the next couple of days. |
guys, okay swinging back to this again.... In the last few cells of this notebook I'm trying to convert the big JSON on s3 to parquet on s3 using the new approach. OBVI I don't have the syntax quite right: refs_to_dataframe(fs_json.open(refs), fs_json.open('s3://esip-qhub/noaa/nwm/grid1km/parquet/refs_test', mode='wb')) bombs out with: |
The input (first arg) should be either a string URL or a dict of references (i.e., already decoded). The output should always be a string, and is the directory name, not the name of any file. For the input, I notice that, although it can be remote, there is no way to pass parameters to open that, which we should definitely add. |
Getting closer: # references are on an OSN pod (no credentials needed)
url = 's3://rsignellbucket2/noaa/nwm/grid1km/refs/'
target_opts = {'anon':True, 'skip_instance_cache':True,
'client_kwargs': {'endpoint_url': 'https://renc.osn.xsede.org'}}
# netcdf files are on the AWS public dataset program (no credentials needed)
remote_opts = {'anon':True}
fs = fsspec.filesystem("reference", fo=url,
remote_protocol='s3', remote_options=remote_opts,
target_options=target_opts)
m = fs.get_mapper("")
ds = xr.open_dataset(m, engine='zarr', chunks={}) bombs with
|
Following setup
your code worked ok for me
|
I made those changes into fsspec/filesystem_spec#1224 |
(but something is wrong with CI :|) |
Ah, right, I forgot to use those versions of fsspec and fastparquet. Success!!! : https://nbviewer.org/gist/rsignell-usgs/4a482cbcfebf84387a89d1ef0ca6e3bf |
I think we should wait until these new package versions land on conda-forge before we release the blog post. But I can start working on updating it now... |
Good to hear you were able to get things working again. In the meantime I can make a quick update to |
It does allow a URL, but doesn't take the typical target_kwargs for opening it with fsspec |
No, I think I overlooked this when writing the docstring. It says strings are accepted but the actual code that follows immediately assumes a mapping object: Lines 135 to 142 in 33b00d6
|
OK :| |
Will close this now that the PR in fsspec in merged. |
I've been testing out DFReferenceFileSystem, which loads the references from a collection of Parquet files (one Parquet file for metadata and one for each variable) instead of from a single JSON file.
Two questions:
The text was updated successfully, but these errors were encountered: