-
Notifications
You must be signed in to change notification settings - Fork 437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SCB-227 stop sub tx from running when global tx failed #114
Conversation
Changes Unknown when pulling dfb53f2 on eric-lee-ltk:SCB-227 into ** on apache:master**. |
Changes Unknown when pulling b95ae53 on eric-lee-ltk:SCB-227 into ** on apache:master**. |
@@ -100,4 +106,8 @@ private boolean isGlobalTxAborted(TxEvent event) { | |||
private boolean isTxEndedEvent(TxEvent event) { | |||
return TxEndedEvent.name().equals(event.type()); | |||
} | |||
|
|||
private boolean isInvalidTxStarted(TxEvent event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably inlining this method is more readable
if (!ok) { | ||
LOG.info("Skipped transaction {} due to abort.", context.globalTxId()); | ||
context.setLocalTxId(localTxId); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a good idea to return null, since user doesn't expect it
@@ -33,6 +33,7 @@ message GrpcServiceConfig { | |||
} | |||
|
|||
message GrpcAck { | |||
bool valid = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
valid doesn't seem to be a proper name, because the transaction is indeed valid. probably just aborted
?
blockingEventService.onTxEvent(convertEvent(event)); | ||
public boolean send(TxEvent event) { | ||
GrpcAck grpcAck = blockingEventService.onTxEvent(convertEvent(event)); | ||
return grpcAck.getValid(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to return a proper AlphaResponse class instead of boolean
if (response.aborted()) { | ||
String abortedLocalTxId = context.localTxId(); | ||
context.setLocalTxId(localTxId); | ||
throw new IllegalStateException("Abort local sub transaction " + abortedLocalTxId + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this throw transaction exception?
@@ -123,14 +124,15 @@ public void close() { | |||
} | |||
|
|||
@Override | |||
public void send(TxEvent event) { | |||
public AlphaResponse send(TxEvent event) { | |||
AlphaResponse response = new AlphaResponse(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what behavior is expected from omega, if this response returns?
0bc5598
to
0550e93
Compare
if (response.aborted()) { | ||
String abortedLocalTxId = context.localTxId(); | ||
context.setLocalTxId(localTxId); | ||
throw new OmegaTxAbortedException("Abort local sub transaction " + abortedLocalTxId + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to throw TransactionException defined in spring, because user is not going to catch OmegaTxAbortedException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the spring's TransactionException will introduce spring dependency in our omega transaction module, is it all right to do so?
@@ -124,14 +125,15 @@ public void close() { | |||
} | |||
|
|||
@Override | |||
public void send(TxEvent event) { | |||
public AlphaResponse send(TxEvent event) { | |||
AlphaResponse response = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have to think about when this response will be returned, and the consequence of returning this response under such a scenario. apparently it's only possible when this thread is interrupted and throwing an exception is a better idea in such case.
} catch (OmegaTxAbortedException e) { | ||
assertThat(e.getMessage().contains("Abort local sub transaction"), is(true)); | ||
} catch (Throwable throwable) { | ||
expectFailing(OmegaTxAbortedException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not necessary, since test will fail on uncaught exception
} catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
} | ||
return new AlphaResponse(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
@@ -85,7 +87,12 @@ public void onTxEvent(GrpcTxEvent message, StreamObserver<GrpcAck> responseObser | |||
message.getPayloads().toByteArray() | |||
)); | |||
|
|||
responseObserver.onNext(ACK); | |||
if (ok) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be simplified into one line
Signed-off-by: Eric Lee <dagang.li@huawei.com>
Signed-off-by: Eric Lee <dagang.li@huawei.com>
Signed-off-by: Eric Lee <dagang.li@huawei.com>
Signed-off-by: Eric Lee <dagang.li@huawei.com>
Signed-off-by: Eric Lee <dagang.li@huawei.com>
} | ||
}); | ||
thread.start(); | ||
thread.interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to join here to wait for thread exit
if (response.aborted()) { | ||
String abortedLocalTxId = context.localTxId(); | ||
context.setLocalTxId(localTxId); | ||
throw new InvalidTransactionException("Abort local sub transaction " + abortedLocalTxId + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
local transaction is sub transaction, "Aborted sub transaction... because global transaction ..."
Thread.currentThread().interrupt(); | ||
} | ||
})); | ||
throw new OmegaException("Failed to send event " + event + " due to interruption"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
preferably to move this line to catch clause and pass interrupted exception to its constructor
Signed-off-by: Eric Lee <dagang.li@huawei.com>
Follow this checklist to help us incorporate your contribution quickly and easily:
[SCB-XXX] Fixes bug in ApproximateQuantiles
, where you replaceSCB-XXX
with the appropriate JIRA issue.mvn clean install
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.