Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous API improvements #216

Closed
uellue opened this issue Dec 17, 2018 · 4 comments
Closed

Asynchronous API improvements #216

uellue opened this issue Dec 17, 2018 · 4 comments
Labels
Milestone

Comments

@uellue
Copy link
Member

uellue commented Dec 17, 2018

Currently, using LiberTEM asynchronously requires importing code from deep within LiberTEM. Furthermore the interface can be made easier to use. Here are some notes and ideas from developing the Digital Micrograph asynchronous prototype:

  • We'd need an asynchronous Context that helps us load data sets, create jobs and run them
    • Is it possible to have a Context that can be used both from synchronous and asynchronous code?
  • We'd have to make sure that we can get a local cluster up and running. Right now, we have to start a cluster while still running in the synchronous part, and create the executor in the asynchronous part. We'd have to get to the bottom of that.
  • Usually, the script will want to manage its target buffer. In the asynchronous Digital Micrograph example it was necessary to reshape the data and select the right part to copy to the right buffer. Possible example: Two mask results displayed in two different windows. The data will have to be split up to be copied to the right image buffers for live updating results.
    • Help to allocate the right buffer. Currently: job.get_result_shape() that returns a tuple. Here we could use the Shape class to distinguish between nav and sig simension of a result data set. Example for mask job: nav: mask index; sig: result from one mask.
    • We should probably have an Analysis that already separates results for each mask
    • The display routine might want to do transformations on the results for displaying
    • --> the tile.copy_to_result() method will not be usable in most cases, but this has to be handled by the script.

The run() method could perhaps be used in the following way:

with api.AsyncContext.create() as ctx:
    ds = ctx.load(...)
    analysis = ctx.create_mask_analysis(dataset=ds, factories=[mask0, mask1])

    # TODO how to handle the result shape best for Analysis vs. Job?
    result_shape = analysis.get_result_shape()

    image1 = create_image(np.zeros(tuple(result_shape.sig)))
    image2 = create_image(np.zeros(tuple(result_shape.sig)))

    image1.Show()
    image2.Show()

    async for part_result in ctx.run(job):
         # TODO can the individual result tiles perhaps be pre-assembled so that
         # the for loop is not necessary?
         # Difficulty: The calculation results can arrive
         # completely out of order
         for tile in part_result.mask_0:
             image1.buf[tile.dest_shape] += tile.data
         for tile in part_result.mask_1:
             image2.buf[tile.dest_shape] += tile.data

         image1.Refresh()
         image2.Refresh()
@uellue uellue added enhancement New feature or request future Python API labels Dec 17, 2018
@uellue
Copy link
Member Author

uellue commented Dec 17, 2018

Complication: Adding is not always the correct merging method. In particular if we support MapReduce, this will have to be the reducer function. In that case we have to actually call the thile's method and live with the fact that we might have to manage result buffer and image buffer separately.

with api.AsyncContext.create() as ctx:
    ds = ctx.load(...)
    analysis = ctx.create_mask_analysis(dataset=ds, factories=[mask0, mask1])

    # TODO how to handle the result shape best for Analysis vs. Job?
    result_shape = analysis.get_result_shape()

    image1 = create_image(np.zeros(tuple(result_shape.sig)))
    image2 = create_image(np.zeros(tuple(result_shape.sig)))

    image1.Show()
    image2.Show()

    async for part_result in ctx.run(job):
         # TODO can the individual result tiles perhaps be pre-assembled so that
         # the for loop is not necessary?
         # Difficulty: The calculation results can arrive
         # completely out of order
         for tile in part_result.mask_0:
             tile.reduce_into_result(image1.buf)
         for tile in part_result.mask_1:
             tile.reduce_into_result(image2.buf)

         image1.Refresh()
         image2.Refresh()

@sk1p
Copy link
Member

sk1p commented Feb 8, 2019

Is it possible to have a Context that can be used both from synchronous and asynchronous code?

I think so, yes. My first approach would be to make async the default, and wrap it for the sync case (wrapping could be done with something like async_to_sync from asgiref, see also a related blog post). But, we can't nest async loops, so using the sync API somewhere "under" another async event loop may simply fail. This happens for example in jupyter notebooks: jupyter/notebook#3397. Now, with the current jupyter version, it is actually possible to write async code in the notebook, but we shouldn't force this onto our users.

The approach of dask.distributed appears to be to run async code in a new loop, in a different thread.

Concerning the interface, I thought about making run a generator when we pass a flag like get_partial_results, but this can't be done conditionally; if there is a yield in the function body it needs to be 'started' as a generator in all cases, and just always using a generator would be a backward-compatible change.

So, I propose: run behaves the same way in both sync/async cases (like the current sync case), and we introduce a new function that gives us partial results in form of a generator that is either sync or async.

For handling generators, something like this can be used (need to run in a separate thread though, to allow nesting):

import asyncio


def async_to_sync_generator(gen):
    """
    convert async generator `gen` to sync generator
    """
    loop = asyncio.get_event_loop()
    try:
        while True:
            yield loop.run_until_complete(gen.__anext__())
    except StopAsyncIteration as e:
        raise StopIteration() from e


async def async_generator():
    """
    example async generator
    """
    for i in range(9):
        yield i


# usage in sync code:
g = async_generator()
for i in async_to_sync_generator(g):
    print(i)

Some more remarks:

We should probably have an Analysis that already separates results for each mask

This should already be the case. For each mask, an AnalysisResult instance is returned, which has its own raw_data.

I think the buffer allocation remarks should be solved together with UDF support. I'm actually leaning towards using the UDF interface also for internal functions (and implementing a map_tiles functionality in addition to map_frames), because the buffer handling etc. seems to be quite nice actually.

Complication: Adding is not always the correct merging method. In particular if we support MapReduce, this will have to be the reducer function. [...]

See above: the UDF interface should be quite a good match, we just need to make sure the outside interfaces (async and sync) are well-behaved and don't need to know about reduction functions etc. anymore.

@sk1p sk1p mentioned this issue Mar 7, 2019
10 tasks
@uellue
Copy link
Member Author

uellue commented Apr 9, 2020

Sibling of #675

@uellue uellue added this to the 0.6 milestone Apr 9, 2020
@uellue uellue modified the milestones: 0.6, 0.7 Jun 10, 2020
@uellue
Copy link
Member Author

uellue commented May 27, 2021

Interfaces for asynchronous run_udf() and run_udf_iter() were included in #1011. Closing!

@uellue uellue closed this as completed May 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants