Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/116219.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 116219
summary: "[apm-data] Apply lazy rollover on index template creation"
area: Data streams
type: bug
issues:
- 116230
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private static IngestPipelineConfig loadIngestPipeline(String name, int version,
}

@Override
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,23 @@ setup:
- contains: {index_templates: {name: logs-apm.app@template}}
- contains: {index_templates: {name: logs-apm.error@template}}

---
"Test template reinstallation":
- skip:
reason: contains is a newly added assertion
features: contains
- do:
indices.delete_index_template:
name: traces-apm@template
- do:
cluster.health:
wait_for_events: languid
- do:
indices.get_index_template:
name: traces-apm@template
- length: {index_templates: 1}
- contains: {index_templates: {name: traces-apm@template}}

---
"Test traces-apm-* data stream indexing":
- skip:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
setup:
- do:
indices.put_index_template:
name: traces-low-prio
body:
data_stream: {}
index_patterns: ["traces-*"]
priority: 1

---
"Test data stream rollover on template installation":
- skip:
awaits_fix: "https://github.com/elastic/elasticsearch/issues/102360"

# Disable the apm-data plugin and delete the traces-apm@template index
# template so traces-low-prio takes effect.
- do:
cluster.put_settings:
body:
transient:
xpack.apm_data.registry.enabled: false
- do:
indices.delete_index_template:
name: traces-apm@template
- do:
indices.create_data_stream:
name: traces-apm-testing
- do:
indices.get_data_stream:
name: traces-apm-testing
- match: {data_streams.0.template: traces-low-prio}

# Re-enable the apm-data plugin, after which the traces-apm@template
# index template should be recreated and trigger a lazy rollover on
# the traces-apm-testing data stream.
- do:
cluster.put_settings:
body:
transient:
xpack.apm_data.registry.enabled: true
- do:
cluster.health:
wait_for_events: languid
- do:
indices.get_data_stream:
name: traces-apm-testing
- length: {data_streams: 1}
- match: {data_streams.0.template: traces-apm@template}
- match: {data_streams.0.rollover_on_write: true}

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected Map<String, ComposableIndexTemplate> getComposableTemplateConfigs() {
}

@Override
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
}
} else if (Objects.isNull(currentTemplate)) {
logger.debug("adding composable template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck, false);
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck);
} else if (Objects.isNull(currentTemplate.version()) || newTemplate.getValue().version() > currentTemplate.version()) {
// IndexTemplateConfig now enforces templates contain a `version` property, so if the template doesn't have one we can
// safely assume it's an old version of the template.
Expand All @@ -411,7 +411,7 @@ private void addComposableTemplatesIfMissing(ClusterState state) {
currentTemplate.version(),
newTemplate.getValue().version()
);
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck, true);
putComposableTemplate(state, templateName, newTemplate.getValue(), creationCheck);
} else {
creationCheck.set(false);
logger.trace(
Expand All @@ -433,11 +433,11 @@ private void addComposableTemplatesIfMissing(ClusterState state) {

/**
* Returns true if the cluster state contains all of the component templates needed by the composable template. If this registry
* requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Upgrade()}), this method also
* requires automatic rollover after index template upgrades (see {@link #applyRolloverAfterTemplateV2Update()}), this method also
* verifies that the installed components templates are of the right version.
*/
private boolean componentTemplatesInstalled(ClusterState state, ComposableIndexTemplate indexTemplate) {
if (applyRolloverAfterTemplateV2Upgrade() == false) {
if (applyRolloverAfterTemplateV2Update() == false) {
// component templates and index templates can be updated independently, we only need to know that the required component
// templates are available
return state.metadata().componentTemplates().keySet().containsAll(indexTemplate.getRequiredComponentTemplates());
Expand Down Expand Up @@ -533,8 +533,7 @@ private void putComposableTemplate(
ClusterState state,
final String templateName,
final ComposableIndexTemplate indexTemplate,
final AtomicBoolean creationCheck,
final boolean isUpgrade
final AtomicBoolean creationCheck
) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
Expand All @@ -549,8 +548,8 @@ private void putComposableTemplate(
@Override
public void onResponse(AcknowledgedResponse response) {
if (response.isAcknowledged()) {
if (isUpgrade && applyRolloverAfterTemplateV2Upgrade()) {
invokeRollover(state, templateName, indexTemplate, creationCheck);
if (applyRolloverAfterTemplateV2Update()) {
invokeRollover(state, templateName, indexTemplate, () -> creationCheck.set((false)));
} else {
creationCheck.set(false);
}
Expand Down Expand Up @@ -763,12 +762,13 @@ public void onFailure(Exception e) {

/**
* Allows registries to opt-in for automatic rollover of "relevant" data streams immediately after a composable index template gets
* upgraded. If set to {@code true}, then every time a composable index template is being upgraded, all data streams of which name
* matches this template's index patterns AND of all matching templates the upgraded one has the highest priority, will be rolled over.
* updated, including its initial installation. If set to {@code true}, then every time a composable index template is being updated,
* all data streams of which name matches this template's index patterns AND of all matching templates the upgraded one has the highest
* priority, will be rolled over.
*
* @return {@code true} if this registry wants to apply automatic rollovers after template V2 upgrades
*/
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return false;
}

Expand All @@ -782,50 +782,56 @@ protected void onPutPipelineFailure(String pipelineId, Exception e) {
logger.error(() -> format("error adding ingest pipeline template [%s] for [%s]", pipelineId, getOrigin()), e);
}

/**
* invokeRollover rolls over any data streams matching the index template,
* and then invokes runAfter.
*/
private void invokeRollover(
final ClusterState state,
final String templateName,
final ComposableIndexTemplate indexTemplate,
final AtomicBoolean creationCheck
final Runnable runAfter
) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
List<String> rolloverTargets = findRolloverTargetDataStreams(state, templateName, indexTemplate);
if (rolloverTargets.isEmpty() == false) {
GroupedActionListener<RolloverResponse> groupedActionListener = new GroupedActionListener<>(
rolloverTargets.size(),
new ActionListener<>() {
@Override
public void onResponse(Collection<RolloverResponse> rolloverResponses) {
creationCheck.set(false);
onRolloversBulkResponse(rolloverResponses);
}
if (rolloverTargets.isEmpty()) {
runAfter.run();
return;
}
GroupedActionListener<RolloverResponse> groupedActionListener = new GroupedActionListener<>(
rolloverTargets.size(),
new ActionListener<>() {
@Override
public void onResponse(Collection<RolloverResponse> rolloverResponses) {
runAfter.run();
onRolloversBulkResponse(rolloverResponses);
}

@Override
public void onFailure(Exception e) {
creationCheck.set(false);
onRolloverFailure(e);
}
@Override
public void onFailure(Exception e) {
runAfter.run();
onRolloverFailure(e);
}
);
for (String rolloverTarget : rolloverTargets) {
logger.info(
"rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]",
rolloverTarget,
getOrigin(),
templateName
);
RolloverRequest request = new RolloverRequest(rolloverTarget, null);
request.lazy(true);
request.masterNodeTimeout(TimeValue.MAX_VALUE);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
getOrigin(),
request,
groupedActionListener,
(req, listener) -> client.execute(RolloverAction.INSTANCE, req, listener)
);
}
);
for (String rolloverTarget : rolloverTargets) {
logger.info(
"rolling over data stream [{}] lazily as a followup to the upgrade of the [{}] index template [{}]",
rolloverTarget,
getOrigin(),
templateName
);
RolloverRequest request = new RolloverRequest(rolloverTarget, null);
request.lazy(true);
request.masterNodeTimeout(TimeValue.MAX_VALUE);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
getOrigin(),
request,
groupedActionListener,
(req, listener) -> client.execute(RolloverAction.INSTANCE, req, listener)
);
}
});
}
Expand Down Expand Up @@ -865,7 +871,21 @@ static List<String> findRolloverTargetDataStreams(ClusterState state, String tem
.stream()
// Limit to checking data streams that match any of the index template's index patterns
.filter(ds -> indexTemplate.indexPatterns().stream().anyMatch(pattern -> Regex.simpleMatch(pattern, ds.getName())))
.filter(ds -> templateName.equals(MetadataIndexTemplateService.findV2Template(metadata, ds.getName(), ds.isHidden())))
.filter(ds -> {
final String dsTemplateName = MetadataIndexTemplateService.findV2Template(metadata, ds.getName(), ds.isHidden());
if (templateName.equals(dsTemplateName)) {
return true;
}
// findV2Template did not match templateName, which implies one of two things:
// - indexTemplate has a lower priority than the index template matching for ds, OR
// - indexTemplate does not yet exist in cluster state (i.e. because it's in the process of being
// installed or updated)
//
// Because of the second case, we must check if indexTemplate's priority is greater than the matching
// index template, in case it would take precedence after installation/update.
final ComposableIndexTemplate dsTemplate = metadata.templatesV2().get(dsTemplateName);
return dsTemplate == null || indexTemplate.priorityOrZero() > dsTemplate.priorityOrZero();
})
.map(DataStream::getName)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ public void testAutomaticRollover() throws Exception {
assertThat(suppressed[0].getMessage(), startsWith("Failed to rollover logs-my_app-"));
}

public void testNoRolloverForFreshInstalledIndexTemplate() throws Exception {
public void testRolloverForFreshInstalledIndexTemplate() throws Exception {
DiscoveryNode node = DiscoveryNodeUtils.create("node");
DiscoveryNodes nodes = DiscoveryNodes.builder().localNodeId("node").masterNodeId("node").add(node).build();

Expand Down Expand Up @@ -473,9 +473,10 @@ public void testNoRolloverForFreshInstalledIndexTemplate() throws Exception {
registry.setApplyRollover(true);
registry.clusterChanged(event);
assertBusy(() -> assertThat(putIndexTemplateCounter.get(), equalTo(1)));
// the index component is first installed, not upgraded, therefore rollover should not be triggered
// rollover should be triggered even for the first installation, since the template
// may now take precedence over a data stream's existing index template
Thread.sleep(100L);
assertThat(rolloverCounter.get(), equalTo(0));
assertThat(rolloverCounter.get(), equalTo(2));
}

public void testThatTemplatesAreNotUpgradedWhenNotNeeded() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void setPolicyUpgradeRequired(boolean policyUpgradeRequired) {
}

@Override
protected boolean applyRolloverAfterTemplateV2Upgrade() {
protected boolean applyRolloverAfterTemplateV2Update() {
return applyRollover.get();
}

Expand Down