Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35162][state] Implement WriteBatchOperation and general MultiGetOperation for ForSt #24681

Closed
wants to merge 4 commits into from

Conversation

ljz2051
Copy link
Contributor

@ljz2051 ljz2051 commented Apr 18, 2024

What is the purpose of the change

This pull request introduces the ForStValueState and support WriteBatchOperation and general MultiGetOperation for ForSt.

Brief change log

  • Introduces the ForStValueState
  • Support WriteBatchOperation and general MultiGetOperation for ForSt

Verifying this change

This change added tests and can be verified by ForStWriteBatchOperationTest and ForStGeneralMultiGetOperationTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@ljz2051
Copy link
Contributor Author

ljz2051 commented Apr 18, 2024

@Zakelly @fredia @masteryhx Could you please take a look?

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 18, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@ljz2051 ljz2051 force-pushed the FLINK-35162 branch 2 times, most recently from 2d65519 to c529291 Compare April 19, 2024 02:26
Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this PR! I left some comments PTAL.

* @return Processing result.
* @throws IOException Thrown if ForStDB access encountered an I/O related error.
*/
OUT process() throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this designed to be a synchronous method? Would it be better if it is an asynchronous method returning a CompletableFuture? I think this is more flexible since we don't need a specific thread waiting the result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I agree that the future-style interface is more flexible. I have refined it.

}

@Override
public byte[] serializeKey(ContextKey<K> contextKey) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should ContextKey be thread-safe? It seems the 'read cache or serialize' logic better be called only once for each context key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I have marked the ContextKey with @threadsafe annotation, and refactor the 'read cache or serialize' logic. Please review the ContextKey#getOrCreateSerializedKey method.

*/
private @Nullable byte[] serializedKey = null;

public ContextKey(K rawKey, int keyGroup) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have provided the of static method, the constructor could be private? Well I'd prefer not providing of if you have multiple constructors....

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have remove the "ContextKey#of" static method

*
* @param <K> The type of the raw key.
*/
public class ContextKey<K> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And IIUC the ContextKey will be shared across state requests? Is that means ContextKey should be attached to the RecordContext in future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the ContextKey can be shared across state requests, and ContextKey will be attached to the RecordContext in another PR. So I move the ContextKey class to flink-runtime module from forst-module.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I didn't mean that it should move to the flink-runtime. I was thinking providing a payload in RecordContext for state backend to assign customized data defined by itself. WDYT?

And by the way, the key and keygroup should be provided in RecordContext so maybe we could save some memory overhead here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zakelly As we discussed offline, I have attached the keyGroup and payload (serialized key) into RecordContext , and the ContextKey is placed in forst-module. Please help to review again.

@ljz2051
Copy link
Contributor Author

ljz2051 commented Apr 19, 2024

@Zakelly Thanks for your review! I have addressed all comments your mentioned.

@ljz2051 ljz2051 requested a review from Zakelly April 19, 2024 10:25
try {
ForStInnerTable<K, V> table = request.table;
byte[] key = table.serializeKey(request.key);
byte[] value = db.get(table.getColumnFamilyHandle(), key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ljz2051, it creates many rpc request here, as FLIP-426 mentioned the rpc round-trip overhead is the bottleneck. It might be better by using multiGetAsList

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jectpro7 Thanks four your advice.
The MultiGet api based on remote filesystem is not currently supported by ForStDB, and more importantly, not all file systems will support the MultiGet api, so the purpose of this PR is to introduce a general MultiGet implementation that works on all file systems.

In addition, I will introduce the ForStDB native MultiGet api in another jira (FLINK-35163) to optimize remote state access.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, got it, good to learn. Thanks for sharing the background.

}
db.write(writeOptions, writeBatch);
result.complete(null);
} catch (RocksDBException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any case that the exception is not a RocksDBException? If yes, the future might not be completed. So complete it in finally might be a better choice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to catch all types of exceptions here, not just RocksDBException , and complete the result future exceptionally. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds great

Copy link
Contributor

@masteryhx masteryhx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!
I just left some comments. PTAL.

StateRequestHandler stateRequestHandler,
ColumnFamilyHandle columnFamily,
ValueStateDescriptor<V> valueStateDescriptor,
ThreadLocal<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Threadlocal parameters are a bit confusing for callers.
And also make life cycles of these variables expose to callers.
Could we avoid it and just maintain them internally ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just pass the Supplier to ForStValueState, and keep the ThreadLocal semantics inside ForStState.

}

/** The Put access request for ForStDB. */
static class Request<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it have structural relationship with other one in ForStGeneralMultiGetOperation ?
Or Could we use WriteRequest and GetRequest to distinguish them ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't have structural relationship. It's a good idea that using GetRequest and PutRequest to distinguish them.

this.table = table;
}

static <K, V> Request<K, V> of(K key, V value, ForStInnerTable<K, V> table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A minor suggestion:
Adding some descriptions here ? Since value is nullable and we will delete it for null value which should be explicit for callers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refined it.

synchronized (recordContext) {
if (recordContext.getPayload() == null) {
byte[] serializedKey = serializeKeyFunc.apply(this);
recordContext.setPayload(serializedKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the serializedKey and value are both store in payload?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, only the serialized key will be cached in payload. The value cannot be reused between different state execution stage, so there is no need to cache value.

@ljz2051 ljz2051 force-pushed the FLINK-35162 branch 2 times, most recently from 322a2ad to 3097401 Compare April 23, 2024 13:10
}

@Override
public CompletableFuture<Void> process() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this process() still act as synchronous method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refined it !

Copy link
Contributor

@Zakelly Zakelly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update , LGTM +1

Copy link
Contributor

@masteryhx masteryhx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update.
LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants