New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IGNITE-15434: Reactive scan for table partitions. #348
Conversation
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanCloseCommand.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/ignite/internal/table/distributed/command/scan/ScanInitCommand.java
Outdated
Show resolved
Hide resolved
…istributed/command/scan/ScanCloseCommand.java Co-authored-by: ygerzhedovich <41903880+ygerzhedovich@users.noreply.github.com>
…istributed/command/scan/ScanInitCommand.java Co-authored-by: ygerzhedovich <41903880+ygerzhedovich@users.noreply.github.com>
…nternalTable.java Co-authored-by: ygerzhedovich <41903880+ygerzhedovich@users.noreply.github.com>
...table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
Show resolved
Hide resolved
...table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
Outdated
Show resolved
Hide resolved
cursorDesc.cursor().close(); | ||
} | ||
catch (Exception e) { | ||
throw new IgniteInternalException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we complete the command closure with this exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably not, there's nothing to be done by user with such exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception doesn't matter actually. But what does matter is fact that something went wrong and there is no need for user code to wait for response, because error response is the response. Otherwise, they will wait for an answer that will never come.
Probably, it would be better to handle all such errors in one place -- within onWrite
method. WDYT?
BTW I don't sure about cursorDesc.cursor().close()
, but cursorDesc.cursor().hasNext()
throws sometimes an AssertionError
, hence I propose to catch Throwable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception doesn't matter actually. But what does matter is fact that something went wrong and there is no need for user code to wait for response, because error response is the response. Otherwise, they will wait for an answer that will never come.
In case of throwing an exception from state machine raft node should be stopped and corresponding RaftException should be propagated to the client, so client won't wait for the answer any longer thаn in case of clo.result(exception),If it's not happening it's definitely a bug, however it's not in the scope of given ticket.
BTW I don't sure about cursorDesc.cursor().close(), but cursorDesc.cursor().hasNext() throws sometimes an AssertionError, hence I propose to catch Throwable.
Well, I don't agree here. Only business logic exceptions should be processed with clo.result, all other including AssertionError should stop raft node and be propagated to client as RaftException, same as mentioned above actually.
...table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
Outdated
Show resolved
Hide resolved
...table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
Outdated
Show resolved
Hide resolved
modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
Outdated
Show resolved
Hide resolved
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Outdated
Show resolved
Hide resolved
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Outdated
Show resolved
Hide resolved
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Outdated
Show resolved
Hide resolved
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Outdated
Show resolved
Hide resolved
…istributed/raft/PartitionListener.java Co-authored-by: korlov42 <korlov@gridgain.com>
…nternalTable.java Co-authored-by: korlov42 <korlov@gridgain.com>
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Show resolved
Hide resolved
…istributed/storage/InternalTableImpl.java Co-authored-by: korlov42 <korlov@gridgain.com>
…ite-3 into ignite-15434
…-15434 # Conflicts: # modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
cursorDesc.cursor().close(); | ||
} | ||
catch (Exception e) { | ||
throw new IgniteInternalException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception doesn't matter actually. But what does matter is fact that something went wrong and there is no need for user code to wait for response, because error response is the response. Otherwise, they will wait for an answer that will never come.
Probably, it would be better to handle all such errors in one place -- within onWrite
method. WDYT?
BTW I don't sure about cursorDesc.cursor().close()
, but cursorDesc.cursor().hasNext()
throws sometimes an AssertionError
, hence I propose to catch Throwable
.
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Outdated
Show resolved
Hide resolved
if (isCanceled.get()) | ||
return; | ||
|
||
scanInitOp.thenCompose((none) -> raftGrpSvc.<MultiRowsResponse>run(new ScanRetrieveBatchCommand(n, scanId))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A race is possible here. The scan initialization could take some time, and caller could cancel the scan, thus lead as to a follow sequence of events: SCAN_INIT
, SCAN_CLOSE
, SCAN_RETRIEVE
. We need to check isCanceled
flag right before invocation of raftGrpSvc .run(ScanRetrieveBatchCommand)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to check isCanceled flag right before invocation of raftGrpSvc .run(ScanRetrieveBatchCommand)
Without extra synchronization that won't help.
However, I believe it's not a big deal either. According to Subscription#cancel
, cancel is a sort of eventually stuff
Causes the Subscriber to (eventually) stop receiving messages. Implementation is best-effort -- additional messages may be received after invoking this method. A cancelled subscription need not ever receive an onComplete or onError signal.
So in order to be complaint with Flow contract, onError() should be prevented in case SCAN_INIT
, SCAN_CLOSE
, SCAN_RETRIEVE
. I mean that if SCAN_CLOSE
removes the cursor and SCAN_RETRIEVE
returns NoSuchElementException
we shouldn't fire onError, following code was added to exceptionally
.exceptionally(
t -> {
if (t instanceof NoSuchElementException ||
t instanceof CompletionException && t.getCause() instanceof NoSuchElementException)
return null;
That's however won't work without https://issues.apache.org/jira/browse/IGNITE-15581
…istributed/storage/InternalTableImpl.java Co-authored-by: korlov42 <korlov@gridgain.com>
…-15434 # Conflicts: # modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
…ite-3 into ignite-15434
private static final IgniteLogger LOG = IgniteLogger.forClass(InternalTableImpl.class); | ||
|
||
/** IgniteUuid generator. */ | ||
private final IgniteUuidGenerator UUID_GENERATOR = new IgniteUuidGenerator(UUID.randomUUID(), 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name of non-static field should be in lower case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let it be static.
|
||
/** {@inheritDoc} */ | ||
@Override public void subscribe(Subscriber<? super BinaryRow> subscriber) { | ||
if (subscriber == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-public API. Use assert
instead of NullPointerException
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, it's the part Publisher contract of https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html
Throws:
NullPointerException - if subscriber is null
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Outdated
Show resolved
Hide resolved
...le/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
Outdated
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Checks that {@link IllegalArgumentException} is thrown in case of invalid parition. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partition
, not parition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
In order to unblock sql activities in required to have partition scanning logic. Simple approach would be similar to meta storage range command: cursors on the client side with corresponding raft commands that will manage server side storage cursors. However given approach has some disadvantages, for example, cursor's hasNext(), next(), close() etc are blocking by design so it might lead (and as it's seen with Meta storage range will lead) to a deadlock if response retrieval logic is blocked by synchronous get on hasNext or similar. In order to overcome given issue we might use unblocking cursors based on reactive flow model. It actually means that client side range/scan should provide Publisher instance.
Implementation details:
Within InternalTable class new method was introduced:
Core publisher logic is implemented inside:
InternalTableImpl.PartitionScanPublisher.PartitionScanSubscription#PartitionScanSubscription
Please pay attention that inner batching logic is not implemented. PartitionScanSubscription will request same amount of entries from Storage as it was specified within subscription.request(n). Seems that it doesn't worth to implement it right now cause event reactive model wasn't approved yet.
Three new partition raft commands and corresponding handlers were added:
All given commands are write ones, so it'll lead to full scan replication over raft nodes. On the one hand such approach leads to transparent remapping logic that eases sql quey processing logic in case of raft leadership changes, on the other hand it requires more time for replication and more space within raft log for such commands. In future such strong restrictions might be be weakened.