-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cache GCP client responses to reduce repeated operations and network calls #19
Comments
My inclination is to cache processed dataframes. GCP is pretty resilient,
but there's a major bottleneck when it comes to processing the NetCDF
files. I'd like to see if we can optimize that aspect first, then explore
caching options.
…On Fri, Jan 22, 2021, 11:23 AustinRaney-NOAA ***@***.***> wrote:
Each time the gcp client is used, it hits gcp to get the requested data.
Given the size of the data and that repeated process, it only makes sense
to implement some kind of cache. I propose that we use a file db (i.e.
sqlite) to accomplish this for simplicity and broad support in python.
High level logic
1. check if db cache has desired data (stored as df)
1. yes - return data
2. no - continue
2. get data from gcp
3. create a df from the gcp data
4. cache this df in an sqlite db using the URL path as the key
5. return the df
Requirements
- sqlite lib must support multiprocessing and batch commits
The sqlitedict <https://github.com/RaRe-Technologies/sqlitedict> library
is mature, maintained, and seems to fit this bill for this feature. The lib
lets you create/connect with a db and use it like you would a python
dictionary. Most importantly, it supports multiprocessing.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#19>, or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ADBZNXCOO4SNGR45CVALZKTS3GYATANCNFSM4WOZLTNA>
.
|
Likewise.
Okay, I will look into optimizing that before moving on. Can you speculate on the bottleneck you mentioned? |
I think there may be an issue similar to what I had when pulling data from
AWS. That it's not using standard xarray to_dataframe methods.
…On Fri, Jan 22, 2021, 11:34 AustinRaney-NOAA ***@***.***> wrote:
My inclination is to cache processed dataframes.
Likewise.
GCP is pretty resilient,
but there's a major bottleneck when it comes to processing the NetCDF
files. I'd like to see if we can optimize that aspect first, then explore
caching options.
Okay, I will look into optimizing that before moving on. Can you speculate
on the bottleneck you mentioned?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#19 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ADBZNXHUCGVYWBTQNHQQELDS3GZKNANCNFSM4WOZLTNA>
.
|
Alright, that's helpful. I will fool around with it and see if I see any improvement in performance. |
I took a look at the performance issues on Friday and found two places where we can make substantial improvements. For an initial baseline, I timed retrieving an analysis and assimilation cycle without filtering the results (all 2.7 million reaches included). That took ~48 seconds. Not horrible, but also not great. To get a sense of what the bottle neck might be, I did the same test, but filtered to just USGS gauges and that took substantially less time ~24 seconds. Before I elaborate on the findings, I was able to get the time down to ~20 seconds for pull and processing all 2.7 million reaches for a single AnA cycle. To me, that is a success especially if we are to cache those results. Enough of the good news and on to the findings... I found three places where we can substantially improve performance:
def NWM_bytes_to_DataFrame(
path,
) -> pd.DataFrame:
vars_to_drop = [
"crs",
"nudge",
"velocity",
"qSfcLatRunoff",
"qBucket",
"qBtmVertRunoff",
]
# Load data as xarray DataSet
with xr.load_dataset(
path,
engine="h5netcdf",
mask_and_scale=False,
drop_variables=vars_to_drop, #1
decode_cf=False, #2
) as ds:
# Extract streamflow data to pandas DataFrame
df = pd.DataFrame(
{
"nwm_feature_id": ds["streamflow"].feature_id.values,
"value": ds["streamflow"].values,
}
)
# Scale data
scale_factor = ds["streamflow"].scale_factor[0]
value_date = pd.to_datetime(ds.time.values[0], unit="m")
start_date = pd.to_datetime(ds.reference_time.values[0], unit="m")
df.loc[:, "value"] = df["value"].mul(scale_factor)
# 3. This had the biggest impact on wall time. With casting, for a full AnA cycle, goes to ~50 seconds
# Convert feature IDs to strings
# df["nwm_feature_id"] = df["nwm_feature_id"].astype(str)
# Extract valid datetime
df["value_date"] = value_date
# Extract reference datetime
df["start_date"] = start_date
return df Now with these findings being presented, my only question is, given the cast from |
At the model level, it makes sense to store Having said that, I believe the transformation from Additionally, I wonder if there is any performance to be gained from using the |
I agree that for the end user, storing them as strings makes more sense and it is just the standard that we have in place. I think it makes sense to cache results as
Potentially, but I am not so sure of that. Given that a copy of the data will be made in memory when cast from
Ive tested that hypothesis , I would not say robustly, and found that is actually not the case. My thinking is that passing the underlying numpy array to the |
Having looked into the source I don't think that for this problem we will see any performance improvements using # from xarray.core.dataset.Dataset
# https://github.com/pydata/xarray/blob/a0c71c1508f34345ad7eef244cdbbe224e031c1b/xarray/core/dataset.py#L4970
def _to_dataframe(self, ordered_dims: Mapping[Hashable, int]):
columns = [k for k in self.variables if k not in self.dims]
data = [
self._variables[k].set_dims(ordered_dims).values.reshape(-1)
for k in columns
]
index = self.coords.to_index([*ordered_dims])
return pd.DataFrame(dict(zip(columns, data)), index=index) |
As an experiment I ran a test on my personal laptop that compared creating and concatenating 10,000 dataframes each with 1000 rows and two columns ( I understand the efficiency arguments for caching |
I also had notes on memory-usage for the resulting |
Right, I agree with you. My plan was never to include the |
I was able to get the time down to ~34 seconds doing the following: max_feature_id = df["nwm_feature_id"].max()
max_digit_length = int(np.log10(max_feature_id)) + 1
df["nwm_feature_id"] = df["nwm_feature_id"].values.astype(f"|S{max_digit_length}") |
Yuck! But also, clever! 😆 Nice! |
Only cause for concern is now |
Might try writing this a vectorized copy and see if it helps performance # 3. This had the biggest impact on wall time. With casting, for a full AnA cycle, goes to ~50 seconds
# Convert feature IDs to strings
# df["nwm_feature_id"] = df["nwm_feature_id"].astype(str) To something like
I'm curious if the type assignment from one column type to another is causing some significant slow down on that large of data. Essentially it has to create tmp buffer, copy the new transformed data to it, and then reassign that index. Maybe something like the above can speed that up a bit, but YMMV. |
I tried something similar to that, but instead of using casting with the built-in Edit: Remember English |
Ive spent some more time working on this and I think I may have come to a fair compromise solution. Just to recap a few of the things ive tried since I last commented:
but also returned encoded bytes rather than unicode strings.
|
The categorical dtype flexes its strength more on large dataframes. I found categories beats both From a previous comment:
|
Those memory saving are hard to argue against. Do you know if you can specify the category type ( |
It appears that you can be explicit about the Another alternative might be to use |
Right, that was my thinking as well . I am giving your second suggestion a run at the moment. |
So I tried the following: df = pd.DataFrame(
{
# "nwm_feature_id": pd.Categorical(
# ds["streamflow"].feature_id.values
# ),
"nwm_feature_id": ds["streamflow"].feature_id.astype(str),
"value": ds["streamflow"].values,
}
)
df["nwm_feature_id"] = df["nwm_feature_id"].astype("category") I took ~183 seconds. I did not report it earlier, but I found early on that casting the values while they are still in an |
What about merging your above solution with the underlying numpy method? Something like:
|
Ill give that a go and see if that improves anything. I just tested this: df = pd.DataFrame(
{
"nwm_feature_id": ds["streamflow"].feature_id.values,
"value": ds["streamflow"].values,
}
)
df["nwm_feature_id"] = df["nwm_feature_id"].astype("category")
df["nwm_feature_id"].cat.rename_categories(
df["nwm_feature_id"].cat.codes.apply(str), inplace=True
) It was the slowest so far of the bunch ~212 seconds lol. |
Unfortunately,
took ~148 seconds. |
Basic caching implemented for this subpack with #66 Closing this issue here. We can discuss the future of dataframe caching on another ticket. |
Each time the gcp client is used, it hits gcp to get the requested data. Given the size of the data and that repeated process, it only makes sense to implement some kind of cache. I propose that we use a file db (i.e. sqlite) to accomplish this for simplicity and broad support in python.
High level logic
sqlite
db using the URL path as the keyRequirements
sqlite
lib must support multiprocessing and batch commitsThe
sqlitedict
library is mature, maintained, and seems to fit this bill for this feature. The lib lets you create/connect with a db and use it like you would a python dictionary. Most importantly, it supports multiprocessing.The text was updated successfully, but these errors were encountered: