Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
import java.io.IOException;

/** StateMachine for PartitionRegion */
public class PartitionRegionStateMachine implements IStateMachine, IStateMachine.EventApi {
public class PartitionRegionStateMachine
implements IStateMachine, IStateMachine.EventApi, IStateMachine.RetryPolicy {

private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
private final ConfigPlanExecutor executor;
Expand Down Expand Up @@ -169,4 +170,22 @@ public void stop() {
public boolean isReadOnly() {
return CommonDescriptor.getInstance().getConfig().isReadOnly();
}

@Override
public boolean shouldRetry(TSStatus writeResult) {
// TODO implement this
return RetryPolicy.super.shouldRetry(writeResult);
}

@Override
public TSStatus updateResult(TSStatus previousResult, TSStatus retryResult) {
// TODO implement this
return RetryPolicy.super.updateResult(previousResult, retryResult);
}

@Override
public long getSleepTime() {
// TODO implement this
return RetryPolicy.super.getSleepTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ default List<Path> getSnapshotFiles(File latestSnapshotRootDir) {
return Utils.listAllRegularFilesRecursively(latestSnapshotRootDir);
}

/**
* To guarantee the statemachine replication property, when {@link #write(IConsensusRequest)}
* failed in this statemachine, Upper consensus implementation like RatisConsensus may choose to
* retry the operation until it succeed.
*/
interface RetryPolicy {

/** Given the last write result, should we retry? */
default boolean shouldRetry(TSStatus writeResult) {
return false;
}

/**
* Use the latest write result to update final write result
*
* @param previousResult previous write result
* @param retryResult latest write result
* @return the aggregated result upon current retry
*/
default TSStatus updateResult(TSStatus previousResult, TSStatus retryResult) {
return retryResult;
}

/**
* sleep time before the next retry
*
* @return time in millis
*/
default long getSleepTime() {
return 100;
};
}

/** An optional API for event notifications. */
interface EventApi {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
public abstract class BaseStateMachine
implements IStateMachine, IStateMachine.EventApi, IStateMachine.RetryPolicy {

private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,4 +379,22 @@ public DataSet read(IConsensusRequest request) {
return QUERY_INSTANCE_MANAGER.execDataQueryFragmentInstance(fragmentInstance, region);
}
}

@Override
public boolean shouldRetry(TSStatus writeResult) {
// TODO implement this
return super.shouldRetry(writeResult);
}

@Override
public TSStatus updateResult(TSStatus previousResult, TSStatus retryResult) {
// TODO implement this
return super.updateResult(previousResult, retryResult);
}

@Override
public long getSleepTime() {
// TODO implement this
return super.getSleepTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,22 @@ public DataSet read(IConsensusRequest request) {
fragmentInstance.getId());
return QUERY_INSTANCE_MANAGER.execSchemaQueryFragmentInstance(fragmentInstance, schemaRegion);
}

@Override
public boolean shouldRetry(TSStatus writeResult) {
// TODO implement this
return super.shouldRetry(writeResult);
}

@Override
public TSStatus updateResult(TSStatus previousResult, TSStatus retryResult) {
// TODO implement this
return super.updateResult(previousResult, retryResult);
}

@Override
public long getSleepTime() {
// TODO implement this
return super.getSleepTime();
}
}