Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37429: [C++] Add arrow::ipc::StreamDecoder::Reset() #37970

Merged
merged 4 commits into from
Nov 2, 2023

Conversation

kou
Copy link
Member

@kou kou commented Oct 1, 2023

Rationale for this change

We can reuse the same StreamDecoder to read multiple streams with this.

What changes are included in this PR?

Add StreamDecoder::Reset().

Are these changes tested?

Yes.

Are there any user-facing changes?

Yes.

@github-actions
Copy link

github-actions bot commented Oct 1, 2023

⚠️ GitHub issue #37429 has been automatically assigned in GitHub to PR creator.

/// \brief Reset the internal status.
///
/// You can reuse this decoder for new stream after calling
/// this. For example, you can implement endless decoder with this:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I don't think the endless decoder is very useful (why would I want an endless decoder?). Perhaps we can simply remove it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I'll remove this example.

See #37429 for a use case of the endless decoder. It reads multiple streams with one decoder instance.

FYI: We can implement the use case by using next_required_size() instead but we need to recreate decoder instances manually:

while (true) {
  auto buffer = get_data(decoder->next_required_size());
  if (!buffer) {
    break;
  }
  decoder->Consume(buffer);
  if (listener->status() == EOS) {
    decoder = create_decoder(listener);
  }
}

@@ -2032,6 +2036,11 @@ Status StreamDecoder::Consume(const uint8_t* data, int64_t size) {
Status StreamDecoder::Consume(std::shared_ptr<Buffer> buffer) {
return impl_->Consume(std::move(buffer));
}
Status StreamDecoder::Reset() {
impl_ =
std::make_unique<StreamDecoderImpl>(std::move(impl_->listener()), impl_->options());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: std::move on a rvalue is probably a no-op.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. I'll remove it.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Oct 3, 2023
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Oct 3, 2023
@MrLolthe1st
Copy link

MrLolthe1st commented Oct 6, 2023

@kou That's not all: after return from OnEOS() and from OnEOS() at StreamDecoder, we'll back in old MessageReader, but after Reset() call, we've destroyed it. Since old MessageReader hadn't been reseted, MessageReader still in EOS state and will ignore all payload after EOS message.

@MrLolthe1st
Copy link

You can checkout our temporary workaround here:
stream_decoder.cc ydb-platform/ydb@f288403#diff-89894a6bbc78b5f2aacc62338e7cbfbd6a7a841dd14bb44eed25fd5dd151dc3a
stream_decoder.h ydb-platform/ydb@f288403#diff-a442528f4921f6781d036840f10ce3faf49990cdbba0a88bffb7c8832592ec39
There some changes around MessageReader too. Also, dictionary_memo_ is now unique pointer: dictionary_memo_ = std::make_unique<DictionaryMemo>();

@kou
Copy link
Member Author

kou commented Oct 7, 2023

Why do we need to care about MessageReader?
This implementation re-creates StreamDecoderImpl instead of reusing existing StreamDecoderImpl (including MessageReader).

@MrLolthe1st
Copy link

MrLolthe1st commented Oct 8, 2023

Why do we need to care about MessageReader? This implementation re-creates StreamDecoderImpl instead of reusing existing StreamDecoderImpl (including MessageReader).

Because when we got EOS in the middle of payload.
Call stack when we've got EOS looks like that:
Listener::OnEOS(..)
StreamDecoderImpl::OnEOS()
MessageDecoderImpl::OnEOS() <- that is old message decoder, that already is in EOS state. after exiting method above, we will return here.
...
MessageDecoderImpl::Consume(...)
MessageDecoder::Consume(...)
StreamDecoderImpl::Consume(...)

So, if we recreate StreamDecoderImpl in Listener::OnEOS(), after exiting Listener::OnEOS we will

  1. Return to old StreamDecoderImpl, that isn't exists.
  2. Than return to old MessageDocederImpl, that isn't exists. But, if we're somehow save old message decoder, it is in state EOS and will ignore all messages, that left in payload, because of EOS state.

@kou
Copy link
Member Author

kou commented Oct 10, 2023

Ah, I understand.
I've fixed the case by splitting the given data. If StreamDecoderImpl::Consume() is finished for each state, the problem isn't happen.

@MrLolthe1st
Copy link

Looks like truth!

@kou
Copy link
Member Author

kou commented Oct 27, 2023

I'll merge this in a few days if nobody objects it.

@kou
Copy link
Member Author

kou commented Nov 2, 2023

No objection.
I'll merge this.

@kou kou merged commit cead3dd into apache:main Nov 2, 2023
34 checks passed
@kou kou deleted the cpp-stream-decoder-reset branch November 2, 2023 06:35
@kou kou removed the awaiting change review Awaiting change review label Nov 2, 2023
Copy link

After merging your PR, Conbench analyzed the 6 benchmarking runs that have been run so far on merge-commit cead3dd.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details. It also includes information about 32 possible false positives for unstable benchmarks that are known to sometimes produce them.

loicalleyne pushed a commit to loicalleyne/arrow that referenced this pull request Nov 13, 2023
…37970)

### Rationale for this change

We can reuse the same StreamDecoder to read multiple streams with this.

### What changes are included in this PR?

Add StreamDecoder::Reset().

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#37429

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
dgreiss pushed a commit to dgreiss/arrow that referenced this pull request Feb 19, 2024
…37970)

### Rationale for this change

We can reuse the same StreamDecoder to read multiple streams with this.

### What changes are included in this PR?

Add StreamDecoder::Reset().

### Are these changes tested?

Yes.

### Are there any user-facing changes?

Yes.
* Closes: apache#37429

Authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[C++] Make public version of StreamDecoderImpl-like class
3 participants