Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCB-547 try to fix the EventScanner multiple AbortedGlobalTx issue #186

Merged
merged 1 commit into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;

import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

import org.slf4j.Logger;
Expand Down Expand Up @@ -141,20 +142,26 @@ private void abortTimeoutEvents() {
}

private void updateTransactionStatus() {
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEnd);
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
}

private void markSagaEnded(TxEvent event) {
if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
markGlobalTxEnd(event);
markGlobalTxEntWithEvent(event);
}
}

private void markGlobalTxEnd(TxEvent event) {
private void markGlobalTxEntWithEvent(TxEvent event) {
eventRepository.save(toSagaEndedEvent(event));
LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId());
}

private void markGlobalTxEndWithEvents(List<TxEvent> events) {
events.forEach(event -> {
markGlobalTxEntWithEvent(event);
});
}

private TxEvent toTxAbortedEvent(TxTimeout timeout) {
return new TxEvent(
timeout.serviceName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface TxEventRepository {
* </ol>
* @return
*/
Optional<TxEvent> findFirstAbortedGlobalTransaction();
Optional<List<TxEvent>> findFirstAbortedGlobalTransaction();

/**
* Find timeout {@link TxEvent}s. A timeout TxEvent satisfies below requirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void save(TxEvent event) {
}

@Override
public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
public Optional<List<TxEvent>> findFirstAbortedGlobalTransaction() {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void save(TxEvent event) {
}

@Override
public Optional<TxEvent> findFirstAbortedGlobalTransaction() {
public Optional<List<TxEvent>> findFirstAbortedGlobalTransaction() {
return eventRepo.findFirstAbortedGlobalTxByType();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ interface TxEventEnvelopeRepository extends CrudRepository<TxEvent, Long> {
+ " AND t2.localTxId = t.localTxId "
+ " AND t2.type = 'TxStartedEvent') = 0 "
+ "OR t.globalTxId = t.localTxId)")
Optional<TxEvent> findFirstAbortedGlobalTxByType();
Optional<List<TxEvent>> findFirstAbortedGlobalTxByType();

@Query("SELECT t FROM TxEvent t "
+ "WHERE t.type IN ('TxStartedEvent', 'SagaStartedEvent') "
Expand Down