Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-30113] Implement state compression for broadcast and regular operator states #21636

Merged

Conversation

echauchot
Copy link
Contributor

@echauchot echauchot commented Jan 10, 2023

What is the purpose of the change

Implement state compression for broadcast and regular operator states. Only the states themselves are compressed not the metadata.

Brief change log

  • Introduce FSData streams wrappers that add compression
  • Use these wrappers while writing state (in the default operator snapshot strategy) and while reading state (in the operator state restore operation)

Verifying this change

This change is already covered by existing tests, such as windowing ITCase tests ... in the flink-runtime module

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes checkpointing
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? javadocs

R: @dawidwys

@echauchot echauchot changed the title [FLINK-30113] Implement operator state compression for broadcast and regular states [FLINK-30113] Implement state compression for broadcast and regular operator states Jan 10, 2023
@flinkbot
Copy link
Collaborator

flinkbot commented Jan 10, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@echauchot echauchot force-pushed the FLINK-30113-compression-state-operator branch 2 times, most recently from d42f351 to e205374 Compare January 10, 2023 10:16
@echauchot echauchot force-pushed the FLINK-30113-compression-state-operator branch from e205374 to 371763a Compare January 10, 2023 13:47
@dawidwys
Copy link
Contributor

Thanks for the PR. I'll review it over the next few days.

@MartijnVisser
Copy link
Contributor

@dawidwys Could you review this PR?

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @echauchot
Thanks for working on the issue.

Correct me if I am wrong, but in the current state you always apply the compression for the operator state.

First of all, imo, this needs to be configurable. I'd say it can be done with the same flag as for the keyed state backend (execution.checkpointing.snapshot-compression)

Moreover, right now it causes previously written checkpoints to be unreadable. (have you tried restoring from an old checkpoint? If not, could you add such a test?) as it always assumes the stream is compressed. The info if the state is compressed or not needs to be persisted somewhere. I don't have a good idea for that now. In the keyed state case it is done via KeyedBackendSerializationProxy#read/write.

Now I realise this might require changes to the checkpoint format, unless we figure out a smart place to put it, so we might need to create a small FLIP for that.

@echauchot
Copy link
Contributor Author

echauchot commented Jan 16, 2023

Hey @echauchot Thanks for working on the issue.

Hey, thanks for the review !

Correct me if I am wrong, but in the current state you always apply the compression for the operator state.

No, see DefaultOperatorStateBackendBuilder I call AbstractStateBackend.getCompressionDecorator that reads execution.checkpointing.snapshot-compression conf parameter.

First of all, imo, this needs to be configurable. I'd say it can be done with the same flag as for the keyed state backend (execution.checkpointing.snapshot-compression)

See above, I've found we could reuse execution.checkpointing.snapshot-compression conf parameter. Tell me if you want it to be a different conf parameter.

Moreover, right now it causes previously written checkpoints to be unreadable. (have you tried restoring from an old checkpoint? If not, could you add such a test?) as it always assumes the stream is compressed.

Well, if the user disables compression to read uncompressed snapshots, then it works.
Though, I have not tested to restore old snapshots. I have just tested the state write/read roundtrip via existing tests. I'll add such a test

The info if the state is compressed or not needs to be persisted somewhere. I don't have a good idea for that now. In the keyed state case it is done via KeyedBackendSerializationProxy#read/write.

Yes I saw that with key state a boolean is written first when the state is written. I can do similar thing for operator state.

Now I realise this might require changes to the checkpoint format, unless we figure out a smart place to put it, so we might need to create a small FLIP for that.

Ok, adding the boolean will require to change the snapshot format indeed. This is why I have not added this boolean to avoid the format change. But, indeed, if a snapshot was compressed and the user then disables compression he could not read the previous compressed snapshot with my code. What about simply adding a compression boolean to operator state meta ? As a backward incompatible change, it requires a FLIP even if simple change, right ?

@dawidwys
Copy link
Contributor

dawidwys commented Jan 16, 2023

No, see DefaultOperatorStateBackendBuilder I call AbstractStateBackend.getCompressionDecorator that reads execution.checkpointing.snapshot-compression conf parameter.

Ah, sorry missed that :( That part looks good then.

Ok, adding the boolean will require to change the snapshot format indeed. This is why I have not added this boolean to avoid the format change. But, indeed, if a snapshot was compressed and the user then disables compression he could not read the previous compressed snapshot with my code. What about simply adding a compression boolean to operator state meta ? As a backward incompatible change, it requires a FLIP even if simple change, right ?

Yes, I do think it requires a FLIP as it is a substantial change, but I believe this change is unavoidable if we want to add operator state compression.

@echauchot
Copy link
Contributor Author

echauchot commented Jan 16, 2023

No, see DefaultOperatorStateBackendBuilder I call AbstractStateBackend.getCompressionDecorator that reads execution.checkpointing.snapshot-compression conf parameter.

Ah, sorry missed that :( That part looks good then.

No problem.

Ok, adding the boolean will require to change the snapshot format indeed. This is why I have not added this boolean to avoid the format change. But, indeed, if a snapshot was compressed and the user then disables compression he could not read the previous compressed snapshot with my code. What about simply adding a compression boolean to operator state meta ? As a backward incompatible change, it requires a FLIP even if simple change, right ?

Yes, I do think it requires a FLIP as it is a substantial change, but I believe this change is unavoidable if we want to add operator state compression.

Ok, I'll write the FLIP and ping you on it for review, then when we agree on the architecture, I'll write the code and commit on this PR.

@echauchot
Copy link
Contributor Author

Ok, I'll write the FLIP and ping you on it for review, then when we agree on the architecture, I'll write the code and commit on this PR.

@dawidwys I don't have the rights to add a FLIP to Flink confluence space. Can you give me the rights ?

@MartijnVisser
Copy link
Contributor

Can you give me the rights ?

@echauchot You should have the rights now

@echauchot
Copy link
Contributor Author

Can you give me the rights ?

@echauchot You should have the rights now

Thanks @MartijnVisser !

@echauchot
Copy link
Contributor Author

echauchot commented Jan 17, 2023

Ok, I'll write the FLIP and ping you on it for review, then when we agree on the architecture, I'll write the code and commit on this PR.

@dawidwys I don't have the rights to add a FLIP to Flink confluence space. Can you give me the rights ?

@dawidwys here is the flip, can you please review it ?

@echauchot
Copy link
Contributor Author

The vote on the FLIP has passed, but I'm going on vacation tonight for a week. I'll update this PR to reflect the FLIP when I get back

@dawidwys
Copy link
Contributor

Absolutely @echauchot ! Thanks for the work so far!

@echauchot
Copy link
Contributor Author

@dawidwys sorry for the delay, I had to finish this other PR after my vacations. Resuming the work on this PR ...

@echauchot
Copy link
Contributor Author

@flinkbot run azure

1 similar comment
@echauchot
Copy link
Contributor Author

@flinkbot run azure

@echauchot
Copy link
Contributor Author

Hi @dawidwys, my code is failing with versioning. I incremented the proxy version (as the metadata serde code has changed) and the snapshot version (as I added a boolean for compression). Can you explain how the proxy/snapshot versioning works because I see nothing in the docs.

@dawidwys dawidwys self-assigned this Mar 6, 2023
@dawidwys
Copy link
Contributor

dawidwys commented Mar 6, 2023

What you did for versioning looks sane. I think the problem is with line

which has not been implemented in a future proof way. The intent there was to throw an exception for readVersion < 6, not less than CURRENT_STATE_META_INFO_SNAPSHOT_VERSION. If we change the condition to readVersion < 6 it should work.

@echauchot echauchot force-pushed the FLINK-30113-compression-state-operator branch from a4a11a3 to be169cf Compare March 6, 2023 14:10
@echauchot
Copy link
Contributor Author

@flinkbot run azure

1 similar comment
@echauchot
Copy link
Contributor Author

@flinkbot run azure

@echauchot
Copy link
Contributor Author

What you did for versioning looks sane. I think the problem is with line

which has not been implemented in a future proof way. The intent there was to throw an exception for readVersion < 6, not less than CURRENT_STATE_META_INFO_SNAPSHOT_VERSION. If we change the condition to readVersion < 6 it should work.

Hi @dawidwys, thanks for the prompt answer. I suspected something not future proof, thanks for the confirmation. It is now fixed. Tests pass but the CI fails in preparation of the E2E tests (since some days) with a http 404 error.

PTAL.

Copy link
Contributor

@dawidwys dawidwys left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, let's wait until e2e tests are fixed though before we merge it.

@echauchot
Copy link
Contributor Author

Thanks for the review Dawid !

@MartijnVisser
Copy link
Contributor

LGTM, let's wait until e2e tests are fixed though before we merge it.

These E2E tests have been fixed for some time already, you probably need to rebase :) See https://issues.apache.org/jira/browse/FLINK-30972

Add a new snapshot format version 6 compatible with the previous versions that adds a boolean for snapshot compression
At snapshot time: depending on user configuration, write Snappy compressed regular and broadcast operator states (but not the snapshot meta info) and write the compression boolean accordingly
At restoration time: depending on the read compression boolean, read the snapshots as compressed or not.
@echauchot echauchot force-pushed the FLINK-30113-compression-state-operator branch from be169cf to 83f0a2a Compare March 7, 2023 13:24
@echauchot
Copy link
Contributor Author

LGTM, let's wait until e2e tests are fixed though before we merge it.

These E2E tests have been fixed for some time already, you probably need to rebase :) See https://issues.apache.org/jira/browse/FLINK-30972

Thanks @MartijnVisser, I missed this ticket. I just rebased. Let's wait for the green lights.

@echauchot
Copy link
Contributor Author

@flinkbot run azure

@dawidwys dawidwys merged commit d04d727 into apache:master Mar 8, 2023
@echauchot
Copy link
Contributor Author

@dawidwys thanks for reviewing/merging.

@echauchot echauchot deleted the FLINK-30113-compression-state-operator branch March 17, 2023 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants