Skip to content

Conversation

@gonzojive
Copy link
Contributor

This change defines a new exec.ReStreamFactory type for constructing a
re-iterable stream from a byte stream and coder.Coder. The user may override the
default behavior by calling exec.SetReStreamFactory.

This customizable factory function allows overriding the default behavior of
exec.ReStream construction. The default implementation buffers all elements bein
iterated over by a GBK or CoGBK function into an in-memory buffer, which may
cause OOM errors when a single iterable is large and the runner does not supply
a state-backed iterable. See #21817. The
user may specify an alternative function that spills elements to disk if a
memory threshold is exceeded, for instance.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes. I did not update this because this change doesn't seem significant enough, but I can update it if requested.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@asf-ci
Copy link

asf-ci commented Jun 25, 2022

Can one of the admins verify this patch?

4 similar comments
@asf-ci
Copy link

asf-ci commented Jun 25, 2022

Can one of the admins verify this patch?

@asf-ci
Copy link

asf-ci commented Jun 25, 2022

Can one of the admins verify this patch?

@asf-ci
Copy link

asf-ci commented Jun 25, 2022

Can one of the admins verify this patch?

@asf-ci
Copy link

asf-ci commented Jun 25, 2022

Can one of the admins verify this patch?

@github-actions github-actions bot added the go label Jun 25, 2022
…and CoGBK.

This change defines a new exec.ReStreamFactory type for constructing a
re-iterable stream from a byte stream and coder.Coder. The user may override the
default behavior by calling exec.SetReStreamFactory.

This customizable factory function allows overriding the default behavior of
exec.ReStream construction. The default implementation buffers all elements
being iterated over by a GBK or CoGBK function into an in-memory buffer, which
may cause OOM errors when a single iterable is large and the runner does not
supply a state-backed iterable. See
apache#21817. The user may specify an alternative
function that spills elements to disk if a memory threshold is exceeded, for
instance.
@codecov
Copy link

codecov bot commented Jun 25, 2022

Codecov Report

Merging #22057 (8445f3a) into master (7ad4864) will decrease coverage by 0.00%.
The diff coverage is 67.88%.

@@            Coverage Diff             @@
##           master   #22057      +/-   ##
==========================================
- Coverage   74.00%   74.00%   -0.01%     
==========================================
  Files         703      703              
  Lines       92936    92987      +51     
==========================================
+ Hits        68776    68811      +35     
- Misses      22894    22906      +12     
- Partials     1266     1270       +4     
Flag Coverage Δ
go 51.00% <67.88%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/go/pkg/beam/core/runtime/exec/datasource.go 65.51% <65.62%> (-0.05%) ⬇️
sdks/go/pkg/beam/core/runtime/exec/pardo.go 60.31% <84.61%> (+0.87%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7ad4864...8445f3a. Read the comment docs.

@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @lostluck for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to take another look at this when I'm fresher.

The main concern I have is this adds a user facing actuation point inside the internal exec package, and we do not want to add those if we can help it.

The package isn't intended for direct end user configuration and use, and we don't treat it as such. If left as is, the experimental nature of the API would need to be made very explicit in the documentation.

Now, if there were a harness configuration that was triggered by options that would switch between one or more pre-written, and tested implementations, that would probably be OK, as it reduces likelyhood of user misuse.

@lostluck
Copy link
Contributor

lostluck commented Jun 30, 2022

Really, what I'd want to tell you instead is to contribute to the Go Direct runner to resolve this for local single machine runs, but I'm working on a portability based replacement (in Go).

Unfortunately it's not yet ready for migrating to the beam repo, and is very test focused.

Read: currently everything is in memory, no parallelism, and doesn't do DoFn Graph fusion optimizations, each DoFn is executed one at a time. My intent is to allow it to be much more configurable though, for robustness, and be able to actuate all the various Beam FnAPI features. In this case, have it be able to spill out to disk.

https://github.com/lostluck/experimental/tree/master/local/internal

@github-actions
Copy link
Contributor

github-actions bot commented Jul 8, 2022

Reminder, please take a look at this pr: @lostluck

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@lostluck
Copy link
Contributor

TIL the bot takes the "review" state pretty seriously, instead of assuming something from the comment. (That's probably the correct approach).

@gonzojive
Copy link
Contributor Author

gonzojive commented Jul 12, 2022 via email

@lostluck
Copy link
Contributor

That feels reasonable, in combination with the marking of the method as experimental, and probably directing it to the linked issue in question and similar.

In practice, it's unlikely that we'll simply remove the entry point code, even if we mark it as experimental. This will allow us to better support this case in the future through alternate means.

I'll try to take another look this afternoon. (I'm a bit busy preparing for next week's Beam Summit, and it's consuming much of my time.)

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few changes, then this should be alright to merge.

Any unit tests you could add for the newly separated functions that don't have great coverage would be strongly appreciated.

}()
// createChunkReStreams appends to chunkStreams and
// chunkStreamsCloser.children
createChunkReStreams := func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indirecting the error through an anon closure doesn't help with code readability here. The closure is too long for a meaningful readability benefit.

I'd strongly prefer that we move this out to a named method and pass parameters in and out, instead of making hard to follow code harder to follow.

And looking at this again, is this just to avoid adding a nopCloser{} return parameter on error cases? That would be clearer than the indirection.

// for DoFns that iterate over values (GBK and CoGBK).
//
// The default implementation of this function is DefaultReadStreamToBuffer.
func SetReStreamFactory(fn ReStreamFactory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, please label this as Experimental, and link to the issue we're attempting to solve with this.


// DefaultReadStreamToReStream reads numElements from the byteStream using the
// element decoder dec and returns an in-memory ReStream.
func DefaultReadStreamToReStream(_ context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion, please unexport the default implementation.

if len(rest) == 0 {
return &concatReStream{first: first, next: nil}
}
return &concatReStream{first: first, next: newConcatReStream(rest...)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of the recursive linked list construction here, but I think in practice this will end up being minimally used, if ever. But this is probably less fiddly than alternative approaches.

@lostluck
Copy link
Contributor

waiting on author

@gonzojive
Copy link
Contributor Author

gonzojive commented Jul 13, 2022 via email

@lostluck
Copy link
Contributor

That was for the bot. Sorry for the confusion.

@github-actions
Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jan 30, 2023
@gonzojive
Copy link
Contributor Author

I forgot about this but can make the requested changes when I get a chance.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 8, 2023

Reminder, please take a look at this pr: @jrmccluskey

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @damccorm for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@lostluck
Copy link
Contributor

Good news @gonzojive !

The Go SDK has a local portable runner, implemented in Go, that would very much welcome support for offloading intermediate datasets to files, compared to the current direct runner, or flink implementations. The v0 is coming with the next release (2.46, being cut tomorrow).

See the following for the current vision.
https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism

That said, there's other work to be done that may block it from your use case, since Prism is currently in V0, and is presently intentionally Test skewed. But it is moving in the right direction, and is intended to ultimately be configurable for local production execution.

The current missing features that may affect your usage are the following, since you're trying to do real work with it.

  1. Shuffles are currently in memory.
  2. Bundles are currently executed one at a time, and not yet in parallel.
  3. State/pending status of Elements, is currently in memory.
  4. State backed iterable support isn't yet implemented.
  5. Splitting/sharding isn't yet implemented, which is blocked by progress tracking and determining load. It will however do initial SDF splits.
  6. Fusion isn't yet implemented.

Other than the offloading to the file system, most of these will come along in due time.
If you're aware of some kind of light database/memory store that could change how elements are stored/referred to, and "shuffles" are handled, that could be handy though.

As mentioned before, the existing SDK side implementation for what you've got isn't as desirable as a runner based solution. But, as long as any particular feature is configurable between in-memory and more test focused (closer to 1 at a time), I'm very amenable to unblocking things.

@github-actions
Copy link
Contributor

github-actions bot commented Jun 8, 2023

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jun 8, 2023
@github-actions
Copy link
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jun 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants