Skip to content

Commit

Permalink
Repo analysis: allow configuration of register ops (#102051)
Browse files Browse the repository at this point in the history
Adds the `?register_operation_count` parameter that allows to control
the number of register operations separately from the number of regular
blob operations.
  • Loading branch information
DaveCTurner committed Nov 13, 2023
1 parent e6c62a8 commit 8572d6e
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 20 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102051.yaml
@@ -0,0 +1,5 @@
pr: 102051
summary: "Repo analysis: allow configuration of register ops"
area: Snapshot/Restore
type: enhancement
issues: []
24 changes: 16 additions & 8 deletions docs/reference/snapshot-restore/apis/repo-analysis-api.asciidoc
Expand Up @@ -59,12 +59,13 @@ the impact of running an analysis inadvertently and to provide a sensible
starting point for your investigations. Run your first analysis with the default
parameter values to check for simple problems. If successful, run a sequence of
increasingly large analyses until you encounter a failure or you reach a
`blob_count` of at least `2000`, a `max_blob_size` of at least `2gb`, and a
`max_total_data_size` of at least `1tb`. Always specify a generous timeout,
possibly `1h` or longer, to allow time for each analysis to run to completion.
Perform the analyses using a multi-node cluster of a similar size to your
production cluster so that it can detect any problems that only arise when the
repository is accessed by many nodes at once.
`blob_count` of at least `2000`, a `max_blob_size` of at least `2gb`, a
`max_total_data_size` of at least `1tb`, and a `register_operation_count` of at
least `100`. Always specify a generous timeout, possibly `1h` or longer, to
allow time for each analysis to run to completion. Perform the analyses using a
multi-node cluster of a similar size to your production cluster so that it can
detect any problems that only arise when the repository is accessed by many
nodes at once.

If the analysis fails then {es} detected that your repository behaved
unexpectedly. This usually means you are using a third-party storage system
Expand Down Expand Up @@ -141,8 +142,10 @@ between versions. The request parameters and response format depend on details
of the implementation so may also be different in newer versions.

The analysis comprises a number of blob-level tasks, as set by the `blob_count`
parameter. The blob-level tasks are distributed over the data and
master-eligible nodes in the cluster for execution.
parameter, and a number of compare-and-exchange operations on linearizable
registers, as set by the `register_operation_count` parameter. These tasks are
distributed over the data and master-eligible nodes in the cluster for
execution.

For most blob-level tasks, the executing node first writes a blob to the
repository, and then instructs some of the other nodes in the cluster to
Expand Down Expand Up @@ -200,6 +203,11 @@ this to at least `2gb`.
the blobs written during the test. Defaults to `1gb`. For realistic experiments
you should set this to at least `1tb`.

`register_operation_count`::
(Optional, integer) The minimum number of linearizable register operations to
perform in total. Defaults to `10`. For realistic experiments you should set
this to at least `100`.

`timeout`::
(Optional, <<time-units, time units>>) Specifies the period of time to wait for
the test to complete. If no response is received before the timeout expires,
Expand Down
Expand Up @@ -162,7 +162,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_TASK_SETTINGS_OPTIONAL_ADDED = def(8_531_00_0);
public static final TransportVersion DEPRECATED_COMPONENT_TEMPLATES_ADDED = def(8_532_00_0);
public static final TransportVersion UPDATE_NON_DYNAMIC_SETTINGS_ADDED = def(8_533_00_0);

public static final TransportVersion REPO_ANALYSIS_REGISTER_OP_COUNT_ADDED = def(8_534_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Expand Up @@ -57,9 +57,13 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.elasticsearch.repositories.blobstore.testkit.ContendedRegisterAnalyzeAction.longFromBytes;
import static org.elasticsearch.repositories.blobstore.testkit.RepositoryAnalysisFailureIT.isContendedRegisterKey;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
Expand Down Expand Up @@ -110,6 +114,11 @@ public void testRepositoryAnalysis() {
blobStore.setMaxBlobCount(request.getBlobCount());
}

if (randomBoolean()) {
request.registerOperationCount(between(internalCluster().size(), request.getRegisterOperationCount() * 2));
blobStore.setExpectedRegisterOperationCount(request.getRegisterOperationCount());
}

if (request.getBlobCount() > 3 || randomBoolean()) {
// only use the default blob size of 10MB if writing a small number of blobs, since this is all in-memory
request.maxBlobSize(ByteSizeValue.ofBytes(between(1, 2048)));
Expand Down Expand Up @@ -209,6 +218,7 @@ static class AssertingBlobStore implements BlobStore {
private int maxBlobCount = new RepositoryAnalyzeAction.Request("dummy").getBlobCount();
private long maxBlobSize = new RepositoryAnalyzeAction.Request("dummy").getMaxBlobSize().getBytes();
private long maxTotalBlobSize = new RepositoryAnalyzeAction.Request("dummy").getMaxTotalDataSize().getBytes();
private int expectedRegisterOperationCount = new RepositoryAnalyzeAction.Request("dummy").getRegisterOperationCount();

AssertingBlobStore(@Nullable String basePath) {
this.pathPrefix = basePath == null ? "" : basePath + "/";
Expand All @@ -227,7 +237,8 @@ public BlobContainer blobContainer(BlobPath path) {
writeSemaphore,
maxBlobCount,
maxBlobSize,
maxTotalBlobSize
maxTotalBlobSize,
expectedRegisterOperationCount
);
}
assertThat(path.buildAsString(), equalTo(currentPath));
Expand Down Expand Up @@ -266,6 +277,10 @@ public void setMaxBlobSize(long maxBlobSize) {
public void setMaxTotalBlobSize(long maxTotalBlobSize) {
this.maxTotalBlobSize = maxTotalBlobSize;
}

public void setExpectedRegisterOperationCount(int expectedRegisterOperationCount) {
this.expectedRegisterOperationCount = expectedRegisterOperationCount;
}
}

static class AssertingBlobContainer implements BlobContainer {
Expand All @@ -278,25 +293,32 @@ static class AssertingBlobContainer implements BlobContainer {
private final int maxBlobCount;
private final long maxBlobSize;
private final long maxTotalBlobSize;
private final long expectedRegisterOperationCount;
private final Map<String, byte[]> blobs = ConcurrentCollections.newConcurrentMap();
private final AtomicLong totalBytesWritten = new AtomicLong();
private final Map<String, BytesRegister> registers = ConcurrentCollections.newConcurrentMap();
private final AtomicBoolean firstRegisterRead = new AtomicBoolean(true);

private final Object registerMutex = new Object();
private long contendedRegisterValue = 0L;
private long uncontendedRegisterValue = 0L;

AssertingBlobContainer(
BlobPath path,
Consumer<AssertingBlobContainer> deleteContainer,
Semaphore writeSemaphore,
int maxBlobCount,
long maxBlobSize,
long maxTotalBlobSize
long maxTotalBlobSize,
long expectedRegisterOperationCount
) {
this.path = path;
this.deleteContainer = deleteContainer;
this.writeSemaphore = writeSemaphore;
this.maxBlobCount = maxBlobCount;
this.maxBlobSize = maxBlobSize;
this.maxTotalBlobSize = maxTotalBlobSize;
this.expectedRegisterOperationCount = expectedRegisterOperationCount;
}

@Override
Expand Down Expand Up @@ -406,6 +428,10 @@ private void writeBlobAtomic(String blobName, InputStream inputStream, long blob
@Override
public DeleteResult delete(OperationPurpose purpose) {
assertPurpose(purpose);
synchronized (registerMutex) {
assertThat(contendedRegisterValue, equalTo(expectedRegisterOperationCount));
assertThat(uncontendedRegisterValue, greaterThanOrEqualTo(expectedRegisterOperationCount));
}
deleteContainer.accept(this);
final DeleteResult deleteResult = new DeleteResult(blobs.size(), blobs.values().stream().mapToLong(b -> b.length).sum());
blobs.clear();
Expand Down Expand Up @@ -477,11 +503,38 @@ public void compareAndExchangeRegister(
}
}

listener.onResponse(
OptionalBytesReference.of(
registers.computeIfAbsent(key, ignored -> new BytesRegister()).compareAndExchange(expected, updated)
)
);
final BytesReference witness;
synchronized (registerMutex) {
// synchronized to avoid concurrent updates from interfering with the assertions which follow this update, but NB we aren't
// testing the atomicity of this particular compareAndExchange() operation (itself implemented with a lock), we're testing
// the sequence of how these operations are executed, so the mutex here is fine.

witness = registers.computeIfAbsent(key, ignored -> new BytesRegister()).compareAndExchange(expected, updated);

if (isContendedRegisterKey(key)) {
if (expected.equals(witness) // CAS succeeded
&& expected.equals(updated) == false // CAS was a genuine update
&& updated.length() != 1 // CAS was not the final verification step, which sometimes writes {0xff}
) {
final var updatedValue = longFromBytes(updated);
assertThat(
updatedValue,
allOf(greaterThan(0L), lessThanOrEqualTo(expectedRegisterOperationCount), equalTo(contendedRegisterValue + 1))
);
contendedRegisterValue = updatedValue;
}
} else {
assertEquals(expected, witness); // uncontended writes always succeed
assertNotEquals(expected, updated); // uncontended register sees only updates
if (updated.length() != 0) {
final var updatedValue = longFromBytes(updated);
assertThat(updatedValue, allOf(greaterThan(0L), equalTo(uncontendedRegisterValue + 1)));
uncontendedRegisterValue = updatedValue;
} // else this was the final step which writes an empty register
}
}

listener.onResponse(OptionalBytesReference.of(witness));
}
}

Expand Down
Expand Up @@ -488,7 +488,7 @@ public void run() {
)
)
) {
final int registerOperations = Math.max(nodes.size(), request.getConcurrency());
final int registerOperations = Math.max(nodes.size(), request.getRegisterOperationCount());
for (int i = 0; i < registerOperations; i++) {
final ContendedRegisterAnalyzeAction.Request registerAnalyzeRequest = new ContendedRegisterAnalyzeAction.Request(
request.getRepositoryName(),
Expand Down Expand Up @@ -730,8 +730,8 @@ public void run() {
return;
}

if (currentValue <= request.getConcurrency() || otherAnalysisComplete.get() == false) {
// complete at least request.getConcurrency() steps, but we may as well keep running for longer too
if (currentValue <= request.getRegisterOperationCount() || otherAnalysisComplete.get() == false) {
// complete at least request.getRegisterOperationCount() steps, but we may as well keep running for longer too
transportService.sendChildRequest(
nodes.get(currentValue < nodes.size() ? currentValue : random.nextInt(nodes.size())),
UncontendedRegisterAnalyzeAction.NAME,
Expand Down Expand Up @@ -887,6 +887,7 @@ public static class Request extends ActionRequest {

private int blobCount = 100;
private int concurrency = 10;
private int registerOperationCount = 10;
private int readNodeCount = 10;
private int earlyReadNodeCount = 2;
private long seed = 0L;
Expand All @@ -909,6 +910,11 @@ public Request(StreamInput in) throws IOException {
rareActionProbability = in.readDouble();
blobCount = in.readVInt();
concurrency = in.readVInt();
if (in.getTransportVersion().onOrAfter(TransportVersions.REPO_ANALYSIS_REGISTER_OP_COUNT_ADDED)) {
registerOperationCount = in.readVInt();
} else {
registerOperationCount = concurrency;
}
readNodeCount = in.readVInt();
earlyReadNodeCount = in.readVInt();
timeout = in.readTimeValue();
Expand Down Expand Up @@ -936,6 +942,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeDouble(rareActionProbability);
out.writeVInt(blobCount);
out.writeVInt(concurrency);
if (out.getTransportVersion().onOrAfter(TransportVersions.REPO_ANALYSIS_REGISTER_OP_COUNT_ADDED)) {
out.writeVInt(registerOperationCount);
} else if (registerOperationCount != concurrency) {
throw new IllegalArgumentException(
"cannot send request with registerOperationCount != concurrency on transport version ["
+ out.getTransportVersion()
+ "]"
);
}
out.writeVInt(readNodeCount);
out.writeVInt(earlyReadNodeCount);
out.writeTimeValue(timeout);
Expand All @@ -946,7 +961,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_7_14_0)) {
out.writeBoolean(abortWritePermitted);
} else if (abortWritePermitted) {
throw new IllegalStateException(
throw new IllegalArgumentException(
"cannot send abortWritePermitted request on transport version [" + out.getTransportVersion() + "]"
);
}
Expand Down Expand Up @@ -975,6 +990,13 @@ public void concurrency(int concurrency) {
this.concurrency = concurrency;
}

public void registerOperationCount(int registerOperationCount) {
if (registerOperationCount <= 0) {
throw new IllegalArgumentException("registerOperationCount must be >0, but was [" + registerOperationCount + "]");
}
this.registerOperationCount = registerOperationCount;
}

public void seed(long seed) {
this.seed = seed;
}
Expand Down Expand Up @@ -1009,6 +1031,10 @@ public int getConcurrency() {
return concurrency;
}

public int getRegisterOperationCount() {
return registerOperationCount;
}

public String getRepositoryName() {
return repositoryName;
}
Expand Down
Expand Up @@ -40,6 +40,9 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

analyzeRepositoryRequest.blobCount(request.paramAsInt("blob_count", analyzeRepositoryRequest.getBlobCount()));
analyzeRepositoryRequest.concurrency(request.paramAsInt("concurrency", analyzeRepositoryRequest.getConcurrency()));
analyzeRepositoryRequest.registerOperationCount(
request.paramAsInt("register_operation_count", analyzeRepositoryRequest.getRegisterOperationCount())
);
analyzeRepositoryRequest.readNodeCount(request.paramAsInt("read_node_count", analyzeRepositoryRequest.getReadNodeCount()));
analyzeRepositoryRequest.earlyReadNodeCount(
request.paramAsInt("early_read_node_count", analyzeRepositoryRequest.getEarlyReadNodeCount())
Expand Down

0 comments on commit 8572d6e

Please sign in to comment.