Skip to content

Commit

Permalink
[Transform] Report version conflict on concurrent updates (#96293) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed May 24, 2023
1 parent 45b80d6 commit 211d282
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/96293.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 96293
summary: Report version conflict on concurrent updates
area: Transform
type: bug
issues:
- 96311
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class TransformMessages {
"Timed out after [{0}] while waiting for transform [{1}] to stop";
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT = "Interrupted while waiting for transform [{0}] to stop";
public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
public static final String REST_UPDATE_TRANSFORM_CONFLICT = "Transform with id [{0}] got updated in the meantime. Please try again";
public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG =
"Detected transforms with no config [{0}]. Use force to stop/delete them.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,33 @@

import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Strings;
import org.elasticsearch.threadpool.TestThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;

public class TransformUpdateIT extends TransformRestTestCase {

Expand All @@ -47,6 +59,8 @@ public class TransformUpdateIT extends TransformRestTestCase {
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static final String DATA_ACCESS_ROLE_2 = "test_data_access_2";

private TestThreadPool threadPool;

// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
Expand Down Expand Up @@ -76,6 +90,15 @@ public void createIndexes() throws IOException {
setupUser(TEST_ADMIN_USER_NAME_2, List.of("transform_admin", DATA_ACCESS_ROLE_2));
setupUser(TEST_ADMIN_USER_NAME_NO_DATA, List.of("transform_admin"));
createReviewsIndex();

threadPool = new TestThreadPool(getTestName());
}

@After
public void shutdownThreadPool() {
if (threadPool != null) {
threadPool.shutdown();
}
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -167,6 +190,50 @@ public void testUpdateThatChangesSettingsButNotHeaders() throws Exception {
assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123))));
}

public void testConcurrentUpdates() throws Exception {
String transformId = "test_concurrent_updates";
String destIndex = transformId + "-dest";

// Create the transform
createPivotReviewsTransform(transformId, destIndex, null, null, null);

// Create a number of concurrent threads competing to update the transform with different settings.
int minMaxPageSearchSize = 10;
int maxMaxPageSearchSize = 20;
List<Callable<Response>> concurrentUpdates = new ArrayList<>(10);
for (int maxPageSearchSize = minMaxPageSearchSize; maxPageSearchSize < maxMaxPageSearchSize; ++maxPageSearchSize) {
Request updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null);
updateTransformRequest.setJsonEntity(Strings.format("""
{ "settings": { "max_page_search_size": %s } }""", maxPageSearchSize));

// Schedule a thread to update the transform's settings
concurrentUpdates.add(() -> client().performRequest(updateTransformRequest));
}

// Gather the results.
List<Future<Response>> futures = threadPool.generic().invokeAll(concurrentUpdates);
for (Future<Response> future : futures) {
try { // The update may succeed...
future.get();
} catch (ExecutionException e) { // ... but if it fails, it's due to conflict
assertThat(e.getCause(), instanceOf(ResponseException.class));
ResponseException re = (ResponseException) e.getCause();
assertThat(re.getResponse().getStatusLine().getStatusCode(), is(equalTo(409)));
assertThat(
re.getMessage(),
containsString("Transform with id [" + transformId + "] got updated in the meantime. Please try again")
);
}
}

// Verify that the settings got updated. Any of the concurrent threads could have won the competition.
Map<String, Object> finalConfig = getTransformConfig(transformId, null);
assertThat(
(int) XContentMapValues.extractValue(finalConfig, "settings", "max_page_search_size"),
is(both(greaterThanOrEqualTo(minMaxPageSearchSize)).and(lessThan(maxMaxPageSearchSize)))
);
}

private void updateTransferRightsTester(boolean useSecondaryAuthHeaders) throws Exception {
String transformId = "transform1";
// Note: Due to a bug the transform does not fail to start after deleting the user and role, therefore invalidating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,16 @@ public void deleteOldIndices(ActionListener<Boolean> listener) {

private void putTransformConfiguration(
TransformConfig transformConfig,
DocWriteRequest.OpType optType,
DocWriteRequest.OpType opType,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<Boolean> listener
) {
assert DocWriteRequest.OpType.CREATE.equals(opType) || DocWriteRequest.OpType.INDEX.equals(opType);

try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));

IndexRequest indexRequest = new IndexRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).opType(optType)
IndexRequest indexRequest = new IndexRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).opType(opType)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(TransformConfig.documentId(transformConfig.getId()))
.source(source);
Expand All @@ -340,12 +342,20 @@ private void putTransformConfiguration(
indexRequest,
ActionListener.wrap(r -> { listener.onResponse(true); }, e -> {
if (e instanceof VersionConflictEngineException) {
// the transform already exists
listener.onFailure(
new ResourceAlreadyExistsException(
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformConfig.getId())
)
);
if (DocWriteRequest.OpType.CREATE.equals(opType)) { // we want to create the transform but it already exists
listener.onFailure(
new ResourceAlreadyExistsException(
TransformMessages.getMessage(TransformMessages.REST_PUT_TRANSFORM_EXISTS, transformConfig.getId())
)
);
} else { // we want to update the transform but it got updated in the meantime, report version conflict
listener.onFailure(
new ElasticsearchStatusException(
TransformMessages.getMessage(TransformMessages.REST_UPDATE_TRANSFORM_CONFLICT, transformConfig.getId()),
RestStatus.CONFLICT
)
);
}
} else {
listener.onFailure(new RuntimeException(TransformMessages.REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION, e));
}
Expand Down

0 comments on commit 211d282

Please sign in to comment.