Skip to content

Commit

Permalink
[Transform] Fix failure when resolving indices from CCS (#91622) (#91707
Browse files Browse the repository at this point in the history
)

properly prefix remote indices in checkpoints, fixes a failure when more than 1 cluster is used and index names clash

relates #80984
fixes #91550
  • Loading branch information
Hendrik Muhs committed Nov 18, 2022
1 parent 5c387f8 commit 2ea0662
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 6 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/91622.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 91622
summary: Fix failure when resolving indices from CCS
area: Transform
type: bug
issues:
- 91550
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ setup:
"cluster": [],
"indices": [
{
"names": ["test_index"],
"names": ["test_index", "same_index_local_and_remote"],
"privileges": ["read", "view_index_metadata"]
},
{
"names": ["simple-remote-transform*", "simple-local-remote-transform"],
"names": ["simple-remote-transform*", "simple-local-remote-transform", "same-index-local-and-remote-transform"],
"privileges": ["create_index", "index", "read"]
},
{
"names": ["my_remote_cluster:remote_test_i*", "my_remote_cluster:aliased_test_index"],
"names": ["my_remote_cluster:remote_test_i*", "my_remote_cluster:aliased_test_index", "my_remote_cluster:same_index_local_and_remote"],
"privileges": ["read", "view_index_metadata"]
}
]
Expand All @@ -54,7 +54,7 @@ setup:
"cluster": [],
"indices": [
{
"names": ["simple-remote-transform*", "simple-local-remote-transform"],
"names": ["simple-remote-transform*", "simple-local-remote-transform", "same-index-local-and-remote-transform"],
"privileges": ["create_index", "index", "read"]
}
]
Expand Down Expand Up @@ -379,3 +379,97 @@ teardown:
"aggs": { "avg_stars": {"avg": {"field": "stars"}}}
}
}
---
"Batch transform local and remote index sharing the same name":
# create index with the same name on local and remote gh#91550
- do:
indices.create:
index: same_index_local_and_remote
body:
settings:
index:
number_of_shards: 3
number_of_replicas: 0
aliases:
test_alias: {}
mappings:
properties:
time:
type: date
user:
type: keyword
stars:
type: integer
coolness:
type: integer

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "z", "stars": 1, "date" : "2018-11-29T12:12:12.123456789Z"}'
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "a", "stars": 2, "date" : "2018-11-29T12:14:12.123456789Z"}'

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.put_transform:
transform_id: "same-index-local-and-remote-transform"
body: >
{
"source": { "index": ["same_index_local_and_remote", "my_remote_cluster:same_index_local_and_remote"] },
"dest": { "index": "same-index-local-and-remote-transform" },
"pivot": {
"group_by": { "user": {"terms": {"field": "user"}}},
"aggs": {
"avg_stars": {"avg": {"field": "stars"}},
"count": {"value_count": {"field": "user"}}
}
}
}
- match: { acknowledged: true }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.start_transform:
transform_id: "same-index-local-and-remote-transform"
- match: { acknowledged: true }

- do:
transform.get_transform_stats:
transform_id: "same-index-local-and-remote-transform"
- match: { count: 1 }
- match: { transforms.0.id: "same-index-local-and-remote-transform" }
- match: { transforms.0.state: "/started|indexing|stopping|stopped/" }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.stop_transform:
transform_id: "same-index-local-and-remote-transform"
wait_for_completion: true
wait_for_checkpoint: true
- match: { acknowledged: true }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
transform.get_transform_stats:
transform_id: "same-index-local-and-remote-transform"
- match: { count: 1 }
- match: { transforms.0.id: "same-index-local-and-remote-transform" }
- match: { transforms.0.state: "stopped" }
- match: { transforms.0.checkpointing.last.checkpoint: 1 }

- do:
headers: { Authorization: "Basic am9lOnRyYW5zZm9ybS1wYXNzd29yZA==" }
search:
rest_total_hits_as_int: true
index: same-index-local-and-remote-transform
sort: user

- match: { hits.total: 3 }
- match: { hits.hits.0._index: same-index-local-and-remote-transform }
- match: { hits.hits.0._source.avg_stars: 3 }
- match: { hits.hits.0._source.count: 2 }
- match: { hits.hits.0._source.user: a }
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ setup:
"cluster": [],
"indices": [
{
"names": ["remote_test_index", "remote_test_index_2"],
"names": ["remote_test_index", "remote_test_index_2", "same_index_local_and_remote"],
"privileges": ["read", "view_index_metadata"]
}
]
Expand Down Expand Up @@ -196,3 +196,34 @@ teardown:
user:
terms:
field: user

# create index with the same name on local and remote gh#91550
- do:
indices.create:
index: same_index_local_and_remote
body:
settings:
index:
number_of_shards: 3
number_of_replicas: 0
aliases:
test_alias: {}
mappings:
properties:
time:
type: date
user:
type: keyword
stars:
type: integer
coolness:
type: integer

- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "x", "stars": 3, "date" : "2018-10-29T12:12:12.123456789Z"}'
- '{"index": {"_index": "same_index_local_and_remote"}}'
- '{"user": "a", "stars": 4, "date" : "2018-10-29T12:14:12.123456789Z"}'
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,38 @@ private static void getCheckpointsFromOneClusterV2(
) {
GetCheckpointAction.Request getCheckpointRequest = new GetCheckpointAction.Request(indices, IndicesOptions.LENIENT_EXPAND_OPEN);

ActionListener<GetCheckpointAction.Response> checkpointListener;
if (RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY.equals(cluster)) {
checkpointListener = ActionListener.wrap(
checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()),
listener::onFailure
);
} else {
checkpointListener = ActionListener.wrap(
checkpointResponse -> listener.onResponse(
checkpointResponse.getCheckpoints()
.entrySet()
.stream()
.collect(
Collectors.toMap(
entry -> cluster + RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR + entry.getKey(),
entry -> entry.getValue()
)
)
),
listener::onFailure
);
}

ClientHelper.executeWithHeadersAsync(
headers,
ClientHelper.TRANSFORM_ORIGIN,
client,
GetCheckpointAction.INSTANCE,
getCheckpointRequest,
ActionListener.wrap(checkpointResponse -> listener.onResponse(checkpointResponse.getCheckpoints()), listener::onFailure)
checkpointListener
);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.MockLogAppender.LoggingExpectation;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.transform.action.GetCheckpointAction;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
Expand All @@ -43,9 +44,13 @@
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -63,6 +68,9 @@ public class DefaultCheckpointProviderTests extends ESTestCase {

private Clock clock;
private Client client;
private Client remoteClient1;
private Client remoteClient2;
private Client remoteClient3;
private IndexBasedTransformConfigManager transformConfigManager;
private MockTransformAuditor transformAuditor;

Expand All @@ -73,6 +81,15 @@ public void setUpMocks() {
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
remoteClient1 = mock(Client.class);
when(remoteClient1.threadPool()).thenReturn(threadPool);
remoteClient2 = mock(Client.class);
when(remoteClient2.threadPool()).thenReturn(threadPool);
remoteClient3 = mock(Client.class);
when(remoteClient3.threadPool()).thenReturn(threadPool);
when(client.getRemoteClusterClient("remote-1")).thenReturn(remoteClient1);
when(client.getRemoteClusterClient("remote-2")).thenReturn(remoteClient2);
when(client.getRemoteClusterClient("remote-3")).thenReturn(remoteClient3);
transformConfigManager = mock(IndexBasedTransformConfigManager.class);
transformAuditor = MockTransformAuditor.createMockAuditor();
}
Expand Down Expand Up @@ -268,6 +285,103 @@ public void testSourceHasChanged() throws InterruptedException {
assertThat(exceptionHolder.get(), is(nullValue()));
}

// regression test for gh#91550, testing a local and a remote the same index name
public void testCreateNextCheckpointWithRemoteClient() throws InterruptedException {
String transformId = getTestName();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);

GetCheckpointAction.Response checkpointResponse = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 1L, 2L, 3L }));
doAnswer(withResponse(checkpointResponse)).when(client).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

GetCheckpointAction.Response remoteCheckpointResponse = new GetCheckpointAction.Response(
Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L })
);
doAnswer(withResponse(remoteCheckpointResponse)).when(remoteClient1).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);

// local and remote share the same index name
when(remoteClusterResolver.resolve(any())).thenReturn(
new RemoteClusterResolver.ResolvedIndices(Map.of("remote-1", List.of("index-1")), List.of("index-1"))
);

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
clock,
client,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

SetOnce<TransformCheckpoint> checkpointHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.createNextCheckpoint(
new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L),
new LatchedActionListener<>(ActionListener.wrap(checkpointHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
assertThat(exceptionHolder.get(), is(nullValue()));
assertNotNull(checkpointHolder.get());
assertThat(checkpointHolder.get().getCheckpoint(), is(equalTo(8L)));
assertThat(checkpointHolder.get().getIndicesCheckpoints().keySet(), containsInAnyOrder("index-1", "remote-1:index-1"));
}

// regression test for gh#91550, testing 3 remotes with same index name
public void testCreateNextCheckpointWithRemoteClients() throws InterruptedException {
String transformId = getTestName();
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);

GetCheckpointAction.Response remoteCheckpointResponse1 = new GetCheckpointAction.Response(
Map.of("index-1", new long[] { 1L, 2L, 3L })
);
doAnswer(withResponse(remoteCheckpointResponse1)).when(remoteClient1).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

GetCheckpointAction.Response remoteCheckpointResponse2 = new GetCheckpointAction.Response(
Map.of("index-1", new long[] { 4L, 5L, 6L, 7L, 8L })
);
doAnswer(withResponse(remoteCheckpointResponse2)).when(remoteClient2).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

GetCheckpointAction.Response remoteCheckpointResponse3 = new GetCheckpointAction.Response(Map.of("index-1", new long[] { 9L }));
doAnswer(withResponse(remoteCheckpointResponse3)).when(remoteClient3).execute(eq(GetCheckpointAction.INSTANCE), any(), any());

RemoteClusterResolver remoteClusterResolver = mock(RemoteClusterResolver.class);

// local and remote share the same index name
when(remoteClusterResolver.resolve(any())).thenReturn(
new RemoteClusterResolver.ResolvedIndices(
Map.of("remote-1", List.of("index-1"), "remote-2", List.of("index-1"), "remote-3", List.of("index-1")),
Collections.emptyList()
)
);

DefaultCheckpointProvider provider = new DefaultCheckpointProvider(
clock,
client,
remoteClusterResolver,
transformConfigManager,
transformAuditor,
transformConfig
);

SetOnce<TransformCheckpoint> checkpointHolder = new SetOnce<>();
SetOnce<Exception> exceptionHolder = new SetOnce<>();
CountDownLatch latch = new CountDownLatch(1);
provider.createNextCheckpoint(
new TransformCheckpoint(transformId, 100000000L, 7, emptyMap(), 120000000L),
new LatchedActionListener<>(ActionListener.wrap(checkpointHolder::set, exceptionHolder::set), latch)
);
assertThat(latch.await(100, TimeUnit.MILLISECONDS), is(true));
assertThat(exceptionHolder.get(), is(nullValue()));
assertNotNull(checkpointHolder.get());
assertThat(checkpointHolder.get().getCheckpoint(), is(equalTo(8L)));
assertThat(
checkpointHolder.get().getIndicesCheckpoints().keySet(),
containsInAnyOrder("remote-1:index-1", "remote-2:index-1", "remote-3:index-1")
);
}

private DefaultCheckpointProvider newCheckpointProvider(TransformConfig transformConfig) {
return new DefaultCheckpointProvider(
clock,
Expand Down

0 comments on commit 2ea0662

Please sign in to comment.