Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCB-915:saga alpha event scanner optimization #313

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@

public interface CommandRepository {

void saveCompensationCommands(String globalTxId);
void saveCompensationCommands(String globalTxId, String localTxId);

void markCommandAsDone(String globalTxId, String localTxId);

void markCommandAsPending(String globalTxId, String localTxId);

List<Command> findUncompletedCommands(String globalTxId);

List<Command> findFirstCommandToCompensate();
List<Command> findAllCommandsToCompensate();

List<Command> findPendingCommands();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,44 @@
import static java.util.Collections.emptyMap;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompositeOmegaCallback implements OmegaCallback {
public class CompositeOmegaCallbackRunner implements OmegaCallback, Callable<List<TxEvent>> {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, Map<String, OmegaCallback>> callbacks;
private final List<TxEvent> txEvents;

public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) {
public CompositeOmegaCallbackRunner(Map<String, Map<String, OmegaCallback>> callbacks,
List<TxEvent> txEvents) {
this.callbacks = callbacks;
this.txEvents = txEvents;
}

@Override
public List<TxEvent> call() {
return compensateAllEvents(txEvents);
}

@Override
public void compensate(TxEvent event) {
Map<String, OmegaCallback> serviceCallbacks = callbacks.getOrDefault(event.serviceName(), emptyMap());
Map<String, OmegaCallback> serviceCallbacks = callbacks
.getOrDefault(event.serviceName(), emptyMap());

if (serviceCallbacks.isEmpty()) {
throw new AlphaException("No such omega callback found for service " + event.serviceName());
}

OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
if (omegaCallback == null) {
LOG.info("Cannot find the service with the instanceId {}, call the other instance.", event.instanceId());
LOG.info("Cannot find the service with the instanceId {}, call the other instance.",
event.instanceId());
omegaCallback = serviceCallbacks.values().iterator().next();
}

Expand All @@ -54,4 +68,37 @@ public void compensate(TxEvent event) {
throw e;
}
}

@Override
public List<TxEvent> compensateAllEvents(List<TxEvent> txEvents) {
List<TxEvent> resultTxEvents = new ArrayList<>();
for (TxEvent txEvent : txEvents) {
try {
LOG.info("compensating event with globalTxId: {} localTxId: {}", txEvent.globalTxId(),
txEvent.localTxId());
this.compensate(txEvent);
resultTxEvents.add(txEvent);
} catch (AlphaException ae) {
LOG.error("compensate event with globalTxId: {} localTxId: {} failed,error message is {}",
txEvent.globalTxId(), txEvent.localTxId(), ae);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to continue executing the looping.

} catch (Exception e) {
logError(txEvent, e);
}
}

return resultTxEvents;
}

private void logError(TxEvent event, Exception e) {
LOG.error(
"Failed to {} service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]",
event.retries() == 0 ? "compensate" : "retry",
event.serviceName(),
event.instanceId(),
event.retries() == 0 ? event.compensationMethod() : event.retryMethod(),
event.globalTxId(),
event.localTxId(),
e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.servicecomb.saga.alpha.core.TaskStatus.NEW;
import static org.apache.servicecomb.saga.alpha.core.TaskStatus.PENDING;
import static org.apache.servicecomb.saga.common.EventType.SagaEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxAbortedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxEndedEvent;
import static org.apache.servicecomb.saga.common.EventType.TxStartedEvent;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -36,6 +38,7 @@

@EnableKamon
public class EventScanner implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final byte[] EMPTY_PAYLOAD = new byte[0];
Expand All @@ -52,9 +55,6 @@ public class EventScanner implements Runnable {

private final int eventPollingInterval;

private long nextEndedEventId;

private long nextCompensatedEventId;

public EventScanner(ScheduledExecutorService scheduler,
TxEventRepository eventRepository,
Expand All @@ -79,58 +79,65 @@ private void pollEvents() {
scheduler.scheduleWithFixedDelay(
() -> {
updateTimeoutStatus();
findTimeoutEvents();
findAllTimeoutEvents();
abortTimeoutEvents();
saveUncompensatedEventsToCommands();
compensate();
updateCompensatedCommands();
markSagaEndedForNoTxEnd();
deleteDuplicateSagaEndedEvents();
updateTransactionStatus();
dumpColdData();
},
0,
eventPollingInterval,
MILLISECONDS);
}

@Trace("findTimeoutEvents")
private void findTimeoutEvents() {
private void updateTimeoutStatus() {
timeoutRepository.markTimeoutAsDone();
}

@Trace("findAllTimeoutEvents")
private void findAllTimeoutEvents() {
eventRepository.findTimeoutEvents()
.forEach(event -> {
LOG.info("Found timeout event {}", event);
timeoutRepository.save(txTimeoutOf(event));
});
}

private void updateTimeoutStatus() {
timeoutRepository.markTimeoutAsDone();
@Trace("abortTimeoutEvents")
private void abortTimeoutEvents() {
timeoutRepository.findTimeouts().forEach(timeout -> {
LOG.info("Found timeout event {} to abort", timeout);
eventRepository.save(toTxAbortedEvent(timeout));
});
}

@Trace("saveUncompensatedEventsToCommands")
private void saveUncompensatedEventsToCommands() {
eventRepository.findFirstUncompensatedEventByIdGreaterThan(nextEndedEventId, TxEndedEvent.name())
eventRepository.findNeedToCompensateTxs()
.forEach(event -> {
LOG.info("Found uncompensated event {}", event);
nextEndedEventId = event.id();
commandRepository.saveCompensationCommands(event.globalTxId());
commandRepository.saveCompensationCommands(event.globalTxId(), event.localTxId());
});
}

@Trace("updateCompensationStatus")
private void updateCompensatedCommands() {
eventRepository.findFirstCompensatedEventByIdGreaterThan(nextCompensatedEventId)
.ifPresent(event -> {
LOG.info("Found compensated event {}", event);
nextCompensatedEventId = event.id();
updateCompensationStatus(event);
});
@Trace("compensate")
private void compensate() {
List<TxEvent> compensateTxEvents = new ArrayList<>();
commandRepository.findAllCommandsToCompensate()
.forEach(command ->
compensateTxEvents.add(txStartedEventOf(command))
);
omegaCallback.compensateAllEvents(compensateTxEvents).forEach(
event -> commandRepository.markCommandAsPending(event.globalTxId(), event.localTxId()));
}

@Trace("deleteDuplicateSagaEndedEvents")
private void deleteDuplicateSagaEndedEvents() {
try {
eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
} catch (Exception e) {
LOG.warn("Failed to delete duplicate event", e);
private void markSagaEnded(TxEvent event) {
if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId());
markGlobalTxEndWithEvent(event);
}
}

Expand All @@ -139,43 +146,45 @@ private void updateCompensationStatus(TxEvent event) {
LOG.info("Transaction with globalTxId {} and localTxId {} was compensated",
event.globalTxId(),
event.localTxId());

markSagaEnded(event);
}

@Trace("abortTimeoutEvents")
private void abortTimeoutEvents() {
timeoutRepository.findFirstTimeout().forEach(timeout -> {
LOG.info("Found timeout event {} to abort", timeout);

eventRepository.save(toTxAbortedEvent(timeout));
@Trace("updateCompensatedCommands")
private void updateCompensatedCommands() {
commandRepository.findPendingCommands().forEach(command ->
eventRepository.findCompensatedDoneTxs(command.globalTxId(), command.localTxId())
.forEach(event ->
{
LOG.info("Found compensated event {}", event);
updateCompensationStatus(event);
}));
}

if (timeout.type().equals(TxStartedEvent.name())) {
eventRepository.findTxStartedEvent(timeout.globalTxId(), timeout.localTxId())
.ifPresent(omegaCallback::compensate);
}
});
private void markGlobalTxEndWithEvent(TxEvent event) {
eventRepository.save(toSagaEndedEvent(event));
}

@Trace("updateTransactionStatus")
private void updateTransactionStatus() {
eventRepository.findFirstAbortedGlobalTransaction().ifPresent(this::markGlobalTxEndWithEvents);
private void markSagaEndedForNoTxEnd() {
eventRepository.findAllFinishedTxsForNoTxEnd().forEach(
event -> {
LOG.info("Marked end of no tx end's transaction with globalTxId {}", event.globalTxId());
markGlobalTxEndWithEvent(event);
});
}

private void markSagaEnded(TxEvent event) {
if (commandRepository.findUncompletedCommands(event.globalTxId()).isEmpty()) {
markGlobalTxEndWithEvent(event);
@Trace("deleteDuplicateSagaEndedEvents")
private void deleteDuplicateSagaEndedEvents() {
try {
eventRepository.deleteDuplicateEvents(SagaEndedEvent.name());
} catch (Exception e) {
LOG.warn("Failed to delete duplicate event", e);
}
}

private void markGlobalTxEndWithEvent(TxEvent event) {
eventRepository.save(toSagaEndedEvent(event));
LOG.info("Marked end of transaction with globalTxId {}", event.globalTxId());
private void dumpColdData() {
eventRepository.dumpColdEventData();
}

private void markGlobalTxEndWithEvents(List<TxEvent> events) {
events.forEach(this::markGlobalTxEndWithEvent);
}

private TxEvent toTxAbortedEvent(TxTimeout timeout) {
return new TxEvent(
Expand All @@ -201,17 +210,6 @@ private TxEvent toSagaEndedEvent(TxEvent event) {
EMPTY_PAYLOAD);
}

@Trace("compensate")
private void compensate() {
commandRepository.findFirstCommandToCompensate()
.forEach(command -> {
LOG.info("Compensating transaction with globalTxId {} and localTxId {}",
command.globalTxId(),
command.localTxId());

omegaCallback.compensate(txStartedEventOf(command));
});
}

private TxEvent txStartedEventOf(Command command) {
return new TxEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.servicecomb.saga.alpha.core;

import java.util.List;

public interface OmegaCallback {

void compensate(TxEvent event);

List<TxEvent> compensateAllEvents(List<TxEvent> txEvents);

default void disconnect() {
}
}