Skip to content

Commit

Permalink
Decouple replication lag from logic to fail stale replicas (opensearc…
Browse files Browse the repository at this point in the history
…h-project#9507)

* Decouple replication lag from replication timer logic used to fail stale replicas

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Added changelog entry

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments 2

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Retry gradle

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* fix UT

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Retry Gradle

Signed-off-by: Ankit Kala <ankikala@amazon.com>

---------

Signed-off-by: Ankit Kala <ankikala@amazon.com>
(cherry picked from commit d66df10)
  • Loading branch information
ankitkala committed Aug 31, 2023
1 parent b642e69 commit cef2aae
Show file tree
Hide file tree
Showing 12 changed files with 343 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
<<<<<<< HEAD
- Add task cancellation monitoring service ([#7642](https://github.com/opensearch-project/OpenSearch/pull/7642))
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
import static java.util.Arrays.asList;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand All @@ -52,7 +53,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND)
.build();
}
Expand Down Expand Up @@ -223,7 +224,10 @@ public void testBelowReplicaLimit() throws Exception {

public void testFailStaleReplica() throws Exception {

Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.put(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.getKey(), TimeValue.timeValueMillis(1000))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(INDEX_NAME);
Expand Down Expand Up @@ -258,7 +262,13 @@ public void testFailStaleReplica() throws Exception {
}

public void testWithDocumentReplicationEnabledIndex() throws Exception {
Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
assumeTrue(
"Can't create DocRep index with remote store enabled. Skipping.",
Objects.equals(featureFlagSettings().get(FeatureFlags.REMOTE_STORE, "false"), "false")
);
Settings settings = Settings.builder()
.put(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.getKey(), TimeValue.timeValueMillis(500))
.build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,8 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchBackpressureSettings.SETTING_CANCELLATION_BURST, // deprecated
SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED,
SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_TIME_BACKPRESSURE_SETTING,
SegmentReplicationPressureService.MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING,
SegmentReplicationPressureService.MAX_ALLOWED_STALE_SHARDS,

// Settings related to Searchable Snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class SegmentReplicationPressureService implements Closeable {
private volatile boolean isSegmentReplicationBackpressureEnabled;
private volatile int maxCheckpointsBehind;
private volatile double maxAllowedStaleReplicas;
private volatile TimeValue maxReplicationTime;
private volatile TimeValue replicationTimeLimitBackpressure;
private volatile TimeValue replicationTimeLimitFailReplica;

private static final Logger logger = LogManager.getLogger(SegmentReplicationPressureService.class);

Expand All @@ -65,13 +66,23 @@ public class SegmentReplicationPressureService implements Closeable {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_REPLICATION_TIME_SETTING = Setting.positiveTimeSetting(
// Time limit on max allowed replica staleness after which backpressure kicks in on primary.
public static final Setting<TimeValue> MAX_REPLICATION_TIME_BACKPRESSURE_SETTING = Setting.positiveTimeSetting(
"segrep.pressure.time.limit",
TimeValue.timeValueMinutes(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

// Time limit on max allowed replica staleness after which we start failing the replica shard.
// Defaults to 0(disabled)
public static final Setting<TimeValue> MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING = Setting.positiveTimeSetting(
"segrep.replication.time.limit",
TimeValue.timeValueMinutes(0),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Double> MAX_ALLOWED_STALE_SHARDS = Setting.doubleSetting(
"segrep.pressure.replica.stale.limit",
.5,
Expand Down Expand Up @@ -114,8 +125,11 @@ public SegmentReplicationPressureService(
this.maxCheckpointsBehind = MAX_INDEXING_CHECKPOINTS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_INDEXING_CHECKPOINTS, this::setMaxCheckpointsBehind);

this.maxReplicationTime = MAX_REPLICATION_TIME_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_SETTING, this::setMaxReplicationTime);
this.replicationTimeLimitBackpressure = MAX_REPLICATION_TIME_BACKPRESSURE_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_TIME_BACKPRESSURE_SETTING, this::setReplicationTimeLimitBackpressure);

this.replicationTimeLimitFailReplica = MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_REPLICATION_LIMIT_STALE_REPLICA_SETTING, this::setReplicationTimeLimitFailReplica);

this.maxAllowedStaleReplicas = MAX_ALLOWED_STALE_SHARDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(MAX_ALLOWED_STALE_SHARDS, this::setMaxAllowedStaleReplicas);
Expand Down Expand Up @@ -159,7 +173,7 @@ private void validateReplicationGroup(IndexShard shard) {
private Set<SegmentReplicationShardStats> getStaleReplicas(final Set<SegmentReplicationShardStats> replicas) {
return replicas.stream()
.filter(entry -> entry.getCheckpointsBehindCount() > maxCheckpointsBehind)
.filter(entry -> entry.getCurrentReplicationTimeMillis() > maxReplicationTime.millis())
.filter(entry -> entry.getCurrentReplicationTimeMillis() > replicationTimeLimitBackpressure.millis())
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -187,8 +201,12 @@ public void setMaxAllowedStaleReplicas(double maxAllowedStaleReplicas) {
this.maxAllowedStaleReplicas = maxAllowedStaleReplicas;
}

public void setMaxReplicationTime(TimeValue maxReplicationTime) {
this.maxReplicationTime = maxReplicationTime;
public void setReplicationTimeLimitFailReplica(TimeValue replicationTimeLimitFailReplica) {
this.replicationTimeLimitFailReplica = replicationTimeLimitFailReplica;
}

public void setReplicationTimeLimitBackpressure(TimeValue replicationTimeLimitBackpressure) {
this.replicationTimeLimitBackpressure = replicationTimeLimitBackpressure;
}

@Override
Expand Down Expand Up @@ -216,7 +234,8 @@ protected boolean mustReschedule() {

@Override
protected void runInternal() {
if (pressureService.isSegmentReplicationBackpressureEnabled) {
// Do not fail the replicas if time limit is set to 0 (i.e. disabled).
if (TimeValue.ZERO.equals(pressureService.replicationTimeLimitFailReplica) == false) {
final SegmentReplicationStats stats = pressureService.tracker.getStats();

// Find the shardId in node which is having stale replicas with highest current replication time.
Expand All @@ -242,7 +261,7 @@ protected void runInternal() {
}
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
if (staleReplica.getCurrentReplicationTimeMillis() > pressureService.replicationTimeLimitFailReplica.millis()) {
pressureService.shardStateAction.remoteShardFailed(
shardId,
staleReplica.getAllocationId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public class SegmentReplicationShardStats implements Writeable, ToXContentFragme
private final String allocationId;
private final long checkpointsBehindCount;
private final long bytesBehindCount;
// Total Replication lag observed.
private final long currentReplicationLagMillis;
// Total time taken for replicas to catch up. Similar to replication lag except this
// doesn't include time taken by primary to upload data to remote store.
private final long currentReplicationTimeMillis;
private final long lastCompletedReplicationTimeMillis;

Expand All @@ -40,12 +44,14 @@ public SegmentReplicationShardStats(
long checkpointsBehindCount,
long bytesBehindCount,
long currentReplicationTimeMillis,
long currentReplicationLagMillis,
long lastCompletedReplicationTime
) {
this.allocationId = allocationId;
this.checkpointsBehindCount = checkpointsBehindCount;
this.bytesBehindCount = bytesBehindCount;
this.currentReplicationTimeMillis = currentReplicationTimeMillis;
this.currentReplicationLagMillis = currentReplicationLagMillis;
this.lastCompletedReplicationTimeMillis = lastCompletedReplicationTime;
}

Expand All @@ -55,6 +61,7 @@ public SegmentReplicationShardStats(StreamInput in) throws IOException {
this.bytesBehindCount = in.readVLong();
this.currentReplicationTimeMillis = in.readVLong();
this.lastCompletedReplicationTimeMillis = in.readVLong();
this.currentReplicationLagMillis = in.readVLong();
}

public String getAllocationId() {
Expand All @@ -73,6 +80,19 @@ public long getCurrentReplicationTimeMillis() {
return currentReplicationTimeMillis;
}

/**
* Total Replication lag observed.
* @return currentReplicationLagMillis
*/
public long getCurrentReplicationLagMillis() {
return currentReplicationLagMillis;
}

/**
* Total time taken for replicas to catch up. Similar to replication lag except this doesn't include time taken by
* primary to upload data to remote store.
* @return lastCompletedReplicationTimeMillis
*/
public long getLastCompletedReplicationTimeMillis() {
return lastCompletedReplicationTimeMillis;
}
Expand All @@ -93,6 +113,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("checkpoints_behind", checkpointsBehindCount);
builder.field("bytes_behind", new ByteSizeValue(bytesBehindCount).toString());
builder.field("current_replication_time", new TimeValue(currentReplicationTimeMillis));
builder.field("current_replication_lag", new TimeValue(currentReplicationLagMillis));
builder.field("last_completed_replication_time", new TimeValue(lastCompletedReplicationTimeMillis));
if (currentReplicationState != null) {
builder.startObject();
Expand All @@ -110,6 +131,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(bytesBehindCount);
out.writeVLong(currentReplicationTimeMillis);
out.writeVLong(lastCompletedReplicationTimeMillis);
out.writeVLong(currentReplicationLagMillis);
}

@Override
Expand All @@ -121,6 +143,8 @@ public String toString() {
+ checkpointsBehindCount
+ ", bytesBehindCount="
+ bytesBehindCount
+ ", currentReplicationLagMillis="
+ currentReplicationLagMillis
+ ", currentReplicationTimeMillis="
+ currentReplicationTimeMillis
+ ", lastCompletedReplicationTimeMillis="
Expand Down

0 comments on commit cef2aae

Please sign in to comment.