Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][pip] PIP-302 Introduce refreshAsync API for TableView #21271

Merged
merged 7 commits into from Oct 18, 2023

Conversation

liangyepianzhou
Copy link
Contributor

@liangyepianzhou liangyepianzhou commented Sep 28, 2023

Reopen #21166

Motivation

Prerequisite: Since messages are constantly being written into the Topic and there is no read-write lock guarantee, we cannot assure the retrieval of the most up-to-date value.
Implementation Goal: Record a checkpoint before reading and ensure the retrieval of the latest value of the key up to this checkpoint.
Use Case: When read and write operations for a certain key do not occur simultaneously, we can refresh the TableView before reading the key to obtain the latest value for this key.

Modification

Introduce a new API refreshAsync.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

In the context of utilizing the TableView component, there are instances where we aspire to consistently retrieve the most up-to-date value associated with a given key. To accomplish this, we can employ an API that allows us to wait until all data has been fully retrieved before accessing the value corresponding to the desired key.
### Modification
Introduce a new API method called `readAllExistingMessages()`
@github-actions
Copy link

@liangyepianzhou Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@liangyepianzhou liangyepianzhou self-assigned this Sep 28, 2023
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Sep 28, 2023
@poorbarcode poorbarcode changed the title [improve][client] PIP-302 Introduce refreshAsync API for TableView [improve][pip] PIP-302 Introduce refreshAsync API for TableView Oct 7, 2023
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, users have to run view.refreshAsync().thenApply(__ -> view.get(key)) each time. If so, why not call readAllExistingMessages each time before calling methods like get? Then users only need to run view.get(key) instead.

@BewareMyPower
Copy link
Contributor

I see @merlimat's suggestion here and you respond: #21166 (comment)

Even if we refresh the tableView before each data read, we can't guarantee that the data retrieved is the most up-to-date.

With this proposal, we still cannot guarantee the "latest" value is retrieved. Just like you mentioned, this proposal only guarantees the value is latest at the checkpoint when refreshAsync is called.

This configuration might confuse users.

I disagree. Assume we added two options: STRONG_CONSISTENCY_MODEL and EVENTUAL_CONSISTENCY_MODEL), then we'll have:

  • STRONG_CONSISTENCY_MODEL: any method will be blocked until the latest value is retrieved.
  • EVENTUAL_CONSISTENCY_MODEL: all methods are non-blocking but the value retrieved might not be latest at the time point.

This pattern is similar to the POSIX read API, which is widely used by system development. It's blocking by default, i.e. it will be blocked until some data is available. But if the file descriptor is marked as O_NONBLOCK, it will become non-blocking.

We would need to add many asynchronous interfaces, such as T getAsync(String key); boolean containsKeyAsync(String key); Collection valuesAsync(); etc.

I think the main concern is that get might be blocking while Pulsar uses the future.xxxAsync everywhere and it's dangerous to call a blocking method in the callback of xxxAsync. For the sake of this reason, I'm +1 to add a method to read messages to latest at a certain time point.

However, I think the refreshAsync design is not good. Considering the following case:

view.refreshAsync().thenAccept(__ -> {
    // Now, the TableView is the latest snapshot of the compacted topic
    if (view.containsKey(key)) { // T1
        process(view.get(key)); // T2
    }
});

Assume at T1, the internal data is { key => value1 }. Since the TableView keeps fetching the latest message at the background, if a new message (key, value2) is written after T1 and before T2, at T2, view.get(key) will be value2.

The root cause is that TableView is not immutable. TableViewImpl#readTailMessages fetches the messages at the background. So I suggest returning an immutable map like:

    /**
     * Get the latest snapshot for a set of keys at the current time point.
     *
     * @param keys
     * @return a future of the map that represents the snapshot of the table view. The keys must be a subset of the
     *   `keys` parameter and the value is guaranteed to be the latest value before the future is completed.
     */
    CompletableFuture<Map<String, T>> getLatestSnapshotAsync(Set<String> keys);

    /**
     * Get the latest snapshot at the current time point.
     * @return a future of map that represents the snapshot of the table view.
     */
    CompletableFuture<Map<String, T>> getLatestSnapshotAsync();

@liangyepianzhou
Copy link
Contributor Author

This configuration might confuse users.

I disagree. Assume we added two options: STRONG_CONSISTENCY_MODEL and EVENTUAL_CONSISTENCY_MODEL), then we'll have:

  • STRONG_CONSISTENCY_MODEL: any method will be blocked until the latest value is retrieved.
  • EVENTUAL_CONSISTENCY_MODEL: all methods are non-blocking but the value retrieved might not be latest at the time point.

Here's a point to note: because there might be continuous message writing, we can't guarantee getting the latest value. If we provide users with the STRONG_CONSISTENCY_MODEL option, they might assume that they get the latest value every time, but this is incorrect.

@liangyepianzhou
Copy link
Contributor Author

I think the main concern is that get might be blocking while Pulsar uses the future.xxxAsync everywhere and it's dangerous to call a blocking method in the callback of xxxAsync. For the sake of this reason, I'm +1 to add a method to read messages to latest at a certain time point.

However, I think the refreshAsync design is not good. Considering the following case:

view.refreshAsync().thenAccept(__ -> {
    // Now, the TableView is the latest snapshot of the compacted topic
    if (view.containsKey(key)) { // T1
        process(view.get(key)); // T2
    }
});

Assume at T1, the internal data is { key => value1 }. Since the TableView keeps fetching the latest message at the background, if a new message (key, value2) is written after T1 and before T2, at T2, view.get(key) will be value2.

The root cause is that TableView is not immutable. TableViewImpl#readTailMessages fetches the messages at the background. So I suggest returning an immutable map like:

    /**
     * Get the latest snapshot for a set of keys at the current time point.
     *
     * @param keys
     * @return a future of the map that represents the snapshot of the table view. The keys must be a subset of the
     *   `keys` parameter and the value is guaranteed to be the latest value before the future is completed.
     */
    CompletableFuture<Map<String, T>> getLatestSnapshotAsync(Set<String> keys);

    /**
     * Get the latest snapshot at the current time point.
     * @return a future of map that represents the snapshot of the table view.
     */
    CompletableFuture<Map<String, T>> getLatestSnapshotAsync();

The current use case is that when reads and writes for a key do not occur at the same time, we can refresh the TableView next time to get the latest value of this key.

As for the getLatestSnapshotAsync method, I don't quite understand its use case. Could you give an example to illustrate under what circumstances we would use this guarantee?

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Oct 8, 2023

As for the getLatestSnapshotAsync method, I don't quite understand its use case.

It provides a consistent view. For example, let's assume there are two keys that represents x and y that satisfy y = x * 2 (just an example). The change might be:

  • x: 0, 1, 3, 7, ...
  • y: 0, 2, 6, 14, ...

With refreshAsync, you might call:

view.refreshAsync().thenAccept(__ -> { // now, x = 3 and y = 6
    int x = view.get("x"); // x = 3
    int y = view.get("y"); // y = 14, actually, at the moment, view.get("x") is 7
    // If users want to process x and y here, they might find the difference is unexpected large

With getLatestSnapshotAsync() method, you can write:

view.getLatestSnapshotAsync().thenAccept(snapshot -> {
    int x = snapshot.get("x"); // x = 3, view.get("x") = 3, view.get("y") = 6
    int y = snapshot.get("y"); // y = 6, even if now view.get("y") is 14
});

@liangyepianzhou
Copy link
Contributor Author

If you can control the write side, why not just use a Reader?

Because there is a system topic shared by all topics in this namespace.

If we use Reader, then all topics may need to read the system topic from beginning to end when it loads.
If we use TableView, the system topic only needs to be read one time from beginning to end for each broker.

pip/pip-302.md Outdated
```java
@Override
public CompletableFuture<Void> refreshAsync() {
return reader.thenCompose(this::readAllExistingMessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a readTailMessages loop at the background.readAllExistingMessages could also call readNextAsync. Is there any possible race?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a matter of specific implementation. We can optimize this during the implementation process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we only need to call hasMessageAvailableAsync until the future returns false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not touch the very concrete implementation in the proposal. But we should illustrate "how" to implement it. You should remove the readAllExistingMessages call here because nobody knows the method's semantics unless reading the source code. Instead, you should write the implementation like "waiting until there is more messages via hasMessageAvailable" or "the returned future will be completed when there is no more message".

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Oct 8, 2023

If we use Reader, then all topics may need to read the system topic from beginning to end when it loads.

Yeah I just also thought of this point so I deleted my comment before.

The current use case is that when reads and writes for a key do not occur at the same time,

If refreshAsync method is just to handle this case, you must note the limitation of refreshAsync that it cannot guarantee latest value is read unless there is no messages sent to the topic before the future is complete. i.e. it's meaningful only when the caller takes control of all producers to the topic.

pip/pip-302.md Outdated Show resolved Hide resolved
@liangyepianzhou
Copy link
Contributor Author

If refreshAsync method is just to handle this case, you must note the limitation of refreshAsync that it cannot guarantee latest value is read unless there is no messages sent to the topic before the future is complete. i.e. it's meaningful only when the caller takes control of all producers to the topic.

Yes, It will be clarified in the API comments.

It should be noted that this is actually the greatest assurance we can provide right now. Because there might be continuous messages being sent, and the rate of sending messages might be faster than the rate of receiving messages, we actually can't guarantee to get the latest message. All we can do is help you refresh once and get the latest value at the current point in time.

pip/pip-302.md Outdated
Comment on lines 54 to 64
/**
* Triggers the reading of all existing messages from the topics, updates the TableView and waits for the read operation to complete.
* This method fetches the last message of the topics at the point of invocation and updates the TableView with all messages up to and including this last message.
* After the update is complete, users can use the TableView to obtain the latest value for any key.
* Note that the 'latest' value refers to the value at the point of calling refresh, not necessarily the current latest if more messages have been produced in the meantime.
*
* @return a CompletableFuture that completes when all existing messages up to the point of invocation have been read, and the TableView has been updated.
*
* Example usage:
* table.refreshAsync().thenApply(__ -> table.get(key));
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Triggers the reading of all existing messages from the topics, updates the TableView and waits for the read operation to complete.
* This method fetches the last message of the topics at the point of invocation and updates the TableView with all messages up to and including this last message.
* After the update is complete, users can use the TableView to obtain the latest value for any key.
* Note that the 'latest' value refers to the value at the point of calling refresh, not necessarily the current latest if more messages have been produced in the meantime.
*
* @return a CompletableFuture that completes when all existing messages up to the point of invocation have been read, and the TableView has been updated.
*
* Example usage:
* table.refreshAsync().thenApply(__ -> table.get(key));
*/
/**
*
* Refresh the table view with the latest data in the topic, ensuring that all subsequent reads are based on the refreshed data.
*
* Example usage:
*
* table.refreshAsync().thenApply(__ -> table.get(key));
*
* This function retrieves the last written message in the topic and refreshes the table view accordingly.
* Once the refresh is complete, all subsequent reads will be performed on the refreshed data or a combination of the refreshed
* data and newly published data. The table view remains synchronized with any newly published data after the refresh.
*
* |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2|
*
* If a read occurs after the refresh (at the last published message |y:2|), it ensures that outdated data like x=1 is not obtained.
* However, it does not guarantee that the values will always be x=2, y=2, z=1, as the table view may receive updates with newly
* published data.
*
* |x:0|->|y:0|->|z:0|->|x:1|->|z:1|->|x:2|->|y:1|->|y:2| ---> |y:3|
*
* Both y=2 or y=3 are possible. Therefore, different readers may receive different values, but all values will be equal to or newer
* than the data refreshed from the last call to the refresh method.
*/

I would like to provide another description of the method to make sure we can explain what we are guaranteed and what is not.

@liangyepianzhou liangyepianzhou merged commit 2ff1b8c into apache:master Oct 18, 2023
19 checks passed
@liangyepianzhou liangyepianzhou deleted the pip-302 branch October 18, 2023 07:41
@Technoboy- Technoboy- added this to the 3.2.0 milestone Oct 27, 2023
liangyepianzhou added a commit that referenced this pull request Mar 15, 2024
Master #21271
### Motivation
The proposal will introduce a new API to refresh the table view with the latest written data on the topic, ensuring that all subsequent reads are based on the refreshed data.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs type/PIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants