Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dask for PV time-series #48

Closed
wants to merge 3 commits into from
Closed

Use dask for PV time-series #48

wants to merge 3 commits into from

Conversation

coroa
Copy link
Member

@coroa coroa commented Sep 19, 2019

convert_pv is the last hurdle to a full dask mode in atlite. Refer also to #30 .

This PR replaces all the inplace transformations with clip and where so they work in principle on dask.

Unfortunately dask fails to properly optimize the task graph (into something which should be linear in time), but instead thrashes memory until it dies even for more small cutouts.

I wasn't able today to find any working solution (not even in synchronuous mode it works sensibly).

For future reference, its very helpful to visualize and profile dask:

import dask
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler, visualize
from dask.callbacks import Callback
class PrintKeys(Callback):
    def _pretask(self, key, dask, state):
        """Print the key of every task as it's started"""
        print("Computing: {0}!".format(repr(key)))

panel = atlite.resource.get_solarpanelconfig("CSi")
orientation = atlite.pv.orientation.get_orientation({"slope": 30, "azimuth": 180})
solar = atlite.convert.convert_pv(cutout.data, panel, orientation)

store_task = solar.to_netcdf('test.nc', compute=False)
store_task_opt, = dask.optimize(store_task)
store_task_opt.visualize()
# or: store_task_opt.visualize(filename="graph.dot")
# and then look at it using xdot

with dask.config.set(num_workers=2), \
     Profiler() as profiler, \
     ResourceProfiler(0.5) as mem, \
     CacheProfiler() as cache, \
     PrintKeys():
        dask.compute(store_task)

# shows bokeh plots for the dask computation
visualize([profiler, mem, cache])

You might also want to look at intermediate values, starting from:

from atlite.pv.solar_position import SolarPosition
from atlite.pv.irradiation import TiltedIrradiation
from atlite.pv.solar_panel_model import SolarPanelModel
from atlite.pv.orientation import get_orientation, SurfaceOrientation
ds = cutout.data.astype(np.float32)
solar_position = SolarPosition(ds)
surface_orientation = SurfaceOrientation(ds, solar_position, orientation)

irradiation = TiltedIrradiation(ds, solar_position, surface_orientation, trigon_model="simple",
                                clearsky_model="simple")

solar_panel = SolarPanelModel(ds, irradiation, panel)

@euronion
Copy link
Collaborator

euronion commented Oct 4, 2019

I tested variants of this version with cutout.wind(...), cutout.pv(...) and cutout.heat_demand(...).
All seem to work nicely and produce the same results (within the tested scope) as the old modules.

Limitations / Restrictions

What is still missing is an appropriate place to introduce (automatic) chunking of the cutout, otherwise dask is not feeling responsible. For now I used

cutout.data = cutout.data.chunk({d:'auto' for d in cutout.data.dims.keys()})

after loading the cutout.

Also the index parameter here

atlite/atlite/convert.py

Lines 51 to 55 in 88f2e2b

def convert_and_aggregate(cutout, convert_func, windows=None, matrix=None,
index=None, layout=None, shapes=None,
shapes_proj='latlong', per_unit=False,
return_capacity=False, capacity_factor=False,
show_progress=True, **convert_kwds):

is stricter for using the dask mode:
Without the dask backend index can be provided as a list.
With the dask backend this fails with a rather difficult to track down error caused in the aggregation step. If one follows the documentation and provides a pandas.Index everything is fine though.
Not sure if this requires some changes to e.g. conserve the current (undocumented) behaviour for a list.

Workers exceeding allowed memory pool

I can confirm the problem with the pv conversion requiring significant amounts of memory. I do not get any errors when using the default scheduler and settings (as sufficient memory is available), but the spike is there and it barely fits.
I tracked it down to be linked somehow to the final computation where all the different data is combined. It is also linked to the chunking used: I get the same large memory requirements 6 times, once for each chunk (time is auto-chunked into 6 equally large chunks of 1,464 timestemps).
I'll try to track it down further ... .

Obsolete code pieces?

If we figure this out, am I correct to assume that we can remove the whole requires_windowed decoator? It would make the code significantly simpler.

Also we can probably make the aggregation here

def aggregate_matrix(da, matrix, index):

simpler, as we do not need any distinction between backends?

For further reference

Plotting with bokeh in jupyter requires initialisation in the beginning

from bokeh.io import output_notebook, show
output_notebook()

Copy link
Member Author

@coroa coroa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixup

atlite/pv/irradiation.py Outdated Show resolved Hide resolved
@coroa
Copy link
Member Author

coroa commented Oct 5, 2019

Did you test cutout.pv(..., windows=False)? Otherwise there is not much dask involved, for now.

In principle the decorator could go then. The code simplification is relatively local, though. Only parts of data.py would not be needed anymore.

But I would want to be sure, we are not introducing major performance degradations by relying on dask only.

The aggregate_matrix branch introduced for dask, should actually also work for non-dask arrays. Again I am not sure, how it would behave performance wise, since the matrix product is applied to a transposed array (probably minor changes only, I suspect, but haven't tried).

An index = pd.Index(index) line in aggregate_matrix would not hurt to bring back the old behaviour when isinstance(index, list).

@euronion
Copy link
Collaborator

euronion commented Oct 7, 2019

Yes, I checked with windows=False, dask is definitley taking care of the calculations:
Issuing the cutout.pv(...) command returns nearly immediately with a dask.array wrapped by axarray.DataArray :

> pv = cutout.pv("CSi",{"slope": 30, "azimuth": 180}, windows=False)
> pv
<xarray.DataArray (y: 133, x: 157)>
dask.array<shape=(133, 157), dtype=float64, chunksize=(133, 157)>
Coordinates:
  * y        (y) float64 44.0 43.75 43.5 43.25 43.0 ... 11.75 11.5 11.25 11.0
    lat      (y) float64 dask.array<shape=(133,), chunksize=(133,)>
  * x        (x) float64 25.0 25.25 25.5 25.75 26.0 ... 63.25 63.5 63.75 64.0
    lon      (x) float64 dask.array<shape=(157,), chunksize=(157,)>

as it should be.

Performance wise I feel like there is a bit overhead (not much, I'll report numbers later). The simplifications gained by understanding the code (when reading because e.g. of bugtracking) would I think be more valuable than minor performance loses.

A actually prefer the pd.Index compared to a list, as it gives more sense to the return values. The error message for a list however is not really helpful. I'd prefer a deprecation warning or a TypeError.

@euronion
Copy link
Collaborator

euronion commented Oct 7, 2019

Addendum - timings

Timing using %%time on the following code

pv = cutout.pv("CSi",{"slope": 30, "azimuth": 180})
pv.compute()
print(pv)

cutout dimensions (ERA5):
'x': 157, 'y': 133, 'time': 8784

For the codebase, GitHub refers to the daskpv branch, the Dask-specific code has the first code block enabled by if True:, the Non-Dask code has the second code block enabled by if False: in aggregate.py:

Codebase Chunked windows dask num_workers Timing
GitHub Yes True active auto 12.3s
GitHub Yes True active 1 31.4s
GitHub Yes None (default) off - 41.9s
GitHub No None (default) off - 41.5s
Dask-specific code Yes None (default) off - 41.8s
Non-Dask code Yes True off - 41.3s
Non-Dask code Yes False active auto 12.5s

I do not see any significant changes.
The only question still open is, why there is a different approach for dask arrays / other backends it in the first place?

@euronion
Copy link
Collaborator

side note:

albedo.values[albedo.values > 1.0] = 1.0

should be

albedo = albedo.where(albedo <= 1., 1.)

@euronion
Copy link
Collaborator

Update on memory usage

I think this is related to dask poorly handling intermediate values and keeping too much in memory. One solution to the problem is described in the dask documentation and as point 2 in the xarray documentation. Also presumable also this long open issue.

I was able to trim it down (~50% for my case from ca. 60GB to 32GB) using 5-6 dask.persist() statements in intermediary steps (placement rule: following my gut feeling).
These trigger the computation of intermediary results and seems to also allow dask to better optimise the task graph.

Task graphs

(External links because of large svg files - sorry)

Caveat:
It did degrade performance a bit in comparison to the case without .persist() statements. The speedup on my rig was still ~2x-3x compared to the non-dask situation.

Do you think that's a reasonable solution? I'd like to finish this up, already literally a week on this without much success. It's a pain in the seat muscles to pinpoint and solve.

@euronion
Copy link
Collaborator

Update - Alternative

I didn't really like the idea of adding persist() into the code, so I followed the recommendations and added some temporary flushing to disk. More precisley I changed

atlite/atlite/convert.py

Lines 414 to 421 in b227ab2

def convert_pv(ds, panel, orientation, trigon_model='simple', clearsky_model='simple'):
solar_position = SolarPosition(ds)
surface_orientation = SurfaceOrientation(ds, solar_position, orientation)
irradiation = TiltedIrradiation(ds, solar_position, surface_orientation,
trigon_model=trigon_model,
clearsky_model=clearsky_model)
solar_panel = SolarPanelModel(ds, irradiation, panel)
return solar_panel

to

414 imoprt os, tempfile
415 def convert_pv(ds, panel, orientation, trigon_model='simple', clearsky_model='simple'):
416
417     fh, fn = tempfile.mkstemp()
418     os.close(fh)
419
420     solar_position = SolarPosition(ds)
421     surface_orientation = SurfaceOrientation(ds, solar_position, orientation)
422
423     d_surface_orientation = surface_orientation.to_netcdf(fn, compute=False)
424     surface_orientation = xr.open_dataset(fn)
425
426     irradiation = TiltedIrradiation(ds, solar_position, surface_orientation,
427                                     trigon_model=trigon_model,
428                                     clearsky_model=clearsky_model)
429     solar_panel = SolarPanelModel(ds, irradiation, panel)
430
431     surface_orientation.close()
432     os.remove(fn)
433
434     return solar_panel

This yields similar gains in memory reduction (down to 20-25 GB from ~60 GB) in my case, but is noticeably slower than the persist() approach.

@euronion
Copy link
Collaborator

euronion commented Oct 11, 2019

Update - Better alternative

I was just trying out another alternative, which is now my favourite one:

def convert_pv(ds, panel, orientation, trigon_model='simple', clearsky_model='simple'):


    # .persist() statements in the following counter high memory usage
    # see https://github.com/PyPSA/atlite/pull/48 for more information.
    solar_position = SolarPosition(ds)
    solar_position = solar_position.persist()
    surface_orientation = SurfaceOrientation(ds, solar_position, orientation)
    surface_orientation = surface_orientation.persist()
    irradiation = TiltedIrradiation(ds, solar_position, surface_orientation,
                                    trigon_model=trigon_model,
                                    clearsky_model=clearsky_model)
    irradiation = irradiation.persist()
    solar_panel = SolarPanelModel(ds, irradiation, panel)
    return solar_panel.persist()

See this branch in my repo.

Results:

  • The first cutout.pv(...) initially takes similarly long as without the whole dask mode [because loading from disk is done repeadetley]
  • The second cutout.pv(...) call is significantly faster now
  • pv = cutout.pv(...) calls still return a delayed action, as the convert_pv(..) action is computed but not the aggregation_...(...) method.
  • Gains from using dask are still present (albeit a bit dampened).

I am now down to 11 GB of memory usage (reduction by a factor of 5 to 6).

Could you confirm this to work for you?

@fneum
Copy link
Member

fneum commented Mar 3, 2020

@euronion this relevant or necessary for v0.2?

@fneum fneum added this to the Release v0.2 milestone Mar 3, 2020
@euronion
Copy link
Collaborator

euronion commented Mar 3, 2020

I think so, yes. Need to check it during review.

@coroa
Copy link
Member Author

coroa commented Mar 3, 2020

Nope, out of scope

This was referenced Jun 3, 2020
@FabianHofmann
Copy link
Contributor

Closing this in favour of code in #20

@FabianHofmann FabianHofmann deleted the daskpv branch January 11, 2021 22:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants