Skip to content

Commit

Permalink
TSDB: Implement Downsampling ILM Action for time-series indices (#87269)
Browse files Browse the repository at this point in the history
This PR adds support for an ILM action that downsamples a time-series index
by invoking the _rollup endpoint (#85708)

A policy that includes the rollup action will look like the following

PUT _ilm/policy/my_policy
{
  "policy": {
    "phases": {
      "warm": {
        "actions": {
          "rollup": {
  	    "fixed_interval": "24h"
  	  }
  	}
      }
    }
  }
}

Relates to #74660
Fixes #68609
  • Loading branch information
csoulios committed Jul 26, 2022
1 parent ce9f94b commit 55c0d1a
Show file tree
Hide file tree
Showing 23 changed files with 1,155 additions and 230 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/87269.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 87269
summary: "TSDB: Implement downsampling ILM Action for time-series indices"
area: TSDB
type: feature
issues:
- 68609
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,7 @@ public Index getResizeSourceIndex() {
);

public enum RollupTaskStatus {
UNKNOWN,
STARTED,
SUCCESS;

Expand All @@ -1150,7 +1151,7 @@ public String toString() {
public static final Setting<RollupTaskStatus> INDEX_ROLLUP_STATUS = Setting.enumSetting(
RollupTaskStatus.class,
INDEX_ROLLUP_STATUS_KEY,
RollupTaskStatus.SUCCESS,
RollupTaskStatus.UNKNOWN,
Property.IndexScope,
Property.InternalIndex
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;

import java.util.Objects;
import java.util.function.Function;

/**
* Deletes the target index created by an operation such as shrink or rollup and
* identified the target index name stored in the lifecycle state of the managed
* index (if any was generated)
*/
public class CleanupTargetIndexStep extends AsyncRetryDuringSnapshotActionStep {
public static final String NAME = "cleanup-target-index";
private static final Logger logger = LogManager.getLogger(CleanupTargetIndexStep.class);

private final Function<IndexMetadata, String> sourceIndexNameSupplier;
private final Function<IndexMetadata, String> targetIndexNameSupplier;

public CleanupTargetIndexStep(
StepKey key,
StepKey nextStepKey,
Client client,
Function<IndexMetadata, String> sourceIndexNameSupplier,
Function<IndexMetadata, String> targetIndexNameSupplier
) {
super(key, nextStepKey, client);
this.sourceIndexNameSupplier = sourceIndexNameSupplier;
this.targetIndexNameSupplier = targetIndexNameSupplier;
}

@Override
public boolean isRetryable() {
return true;
}

Function<IndexMetadata, String> getSourceIndexNameSupplier() {
return sourceIndexNameSupplier;
}

Function<IndexMetadata, String> getTargetIndexNameSupplier() {
return targetIndexNameSupplier;
}

@Override
void performDuringNoSnapshot(IndexMetadata indexMetadata, ClusterState currentClusterState, ActionListener<Void> listener) {
final String sourceIndexName = sourceIndexNameSupplier.apply(indexMetadata);
if (Strings.isNullOrEmpty(sourceIndexName) == false) {
// the current managed index is the target index
if (currentClusterState.metadata().index(sourceIndexName) == null) {
// if the source index does not exist, we'll skip deleting the
// (managed) target index as that will cause data loss
String policyName = indexMetadata.getLifecyclePolicyName();
logger.warn(
"managed index [{}] has been created as part of policy [{}] and the source index [{}] does not exist "
+ "anymore. will skip the [{}] step",
indexMetadata.getIndex().getName(),
policyName,
sourceIndexName,
NAME
);
listener.onResponse(null);
return;
}
}

final String targetIndexName = targetIndexNameSupplier.apply(indexMetadata);
// if the target index was not generated there is nothing to delete so we move on
if (Strings.hasText(targetIndexName) == false) {
listener.onResponse(null);
return;
}
getClient().admin()
.indices()
.delete(new DeleteIndexRequest(targetIndexName).masterNodeTimeout(TimeValue.MAX_VALUE), new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
// even if not all nodes acked the delete request yet we can consider this operation as successful as
// we'll generate a new index name and attempt to create an index with the newly generated name
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
// we can move on if the index was deleted in the meantime
listener.onResponse(null);
} else {
listener.onFailure(e);
}
}
});
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CleanupTargetIndexStep that = (CleanupTargetIndexStep) o;
return super.equals(o)
&& Objects.equals(targetIndexNameSupplier, that.targetIndexNameSupplier)
&& Objects.equals(sourceIndexNameSupplier, that.sourceIndexNameSupplier);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), targetIndexNameSupplier, sourceIndexNameSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,41 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;

import java.util.Locale;
import java.util.Objects;
import java.util.function.BiFunction;

/**
* Copy the provided settings from the source to the target index.
* <p>
* The target index is derived from the source index using the provided prefix.
* This is useful for actions like shrink or searchable snapshot that create a new index and migrate the ILM execution from the source
* to the target index.
* The target index is generated by a supplier function.
* This is useful for actions like shrink, rollup or searchable snapshot that create
* a new index and migrate the ILM execution from the source to the target index.
*/
public class CopySettingsStep extends ClusterStateActionStep {
public static final String NAME = "copy-settings";

private static final Logger logger = LogManager.getLogger(CopySettingsStep.class);

private final String[] settingsKeys;
private final String indexPrefix;

public CopySettingsStep(StepKey key, StepKey nextStepKey, String indexPrefix, String... settingsKeys) {
private final BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier;

public CopySettingsStep(
StepKey key,
StepKey nextStepKey,
BiFunction<String, LifecycleExecutionState, String> targetIndexNameSupplier,
String... settingsKeys
) {
super(key, nextStepKey);
Objects.requireNonNull(indexPrefix);
Objects.requireNonNull(settingsKeys);
this.indexPrefix = indexPrefix;
this.settingsKeys = settingsKeys;
this.targetIndexNameSupplier = targetIndexNameSupplier;
}

@Override
Expand All @@ -49,17 +56,14 @@ public String[] getSettingsKeys() {
return settingsKeys;
}

public String getIndexPrefix() {
return indexPrefix;
}
BiFunction<String, LifecycleExecutionState, String> getTargetIndexNameSupplier() {
return targetIndexNameSupplier;
};

@Override
public ClusterState performAction(Index index, ClusterState clusterState) {
String sourceIndexName = index.getName();
IndexMetadata sourceIndexMetadata = clusterState.metadata().index(sourceIndexName);
String targetIndexName = indexPrefix + sourceIndexName;
IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName);

if (sourceIndexMetadata == null) {
// Index must have been since deleted, ignore it
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().getAction(), sourceIndexName);
Expand All @@ -70,6 +74,8 @@ public ClusterState performAction(Index index, ClusterState clusterState) {
return clusterState;
}

String targetIndexName = targetIndexNameSupplier.apply(sourceIndexName, sourceIndexMetadata.getLifecycleExecutionState());
IndexMetadata targetIndexMetadata = clusterState.metadata().index(targetIndexName);
if (targetIndexMetadata == null) {
String errorMessage = String.format(
Locale.ROOT,
Expand Down Expand Up @@ -107,11 +113,13 @@ public boolean equals(Object o) {
return false;
}
CopySettingsStep that = (CopySettingsStep) o;
return Objects.equals(settingsKeys, that.settingsKeys) && Objects.equals(indexPrefix, that.indexPrefix);
return super.equals(o)
&& Objects.equals(targetIndexNameSupplier, that.targetIndexNameSupplier)
&& Objects.equals(settingsKeys, that.settingsKeys);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), settingsKeys, indexPrefix);
return Objects.hash(super.hashCode(), targetIndexNameSupplier, settingsKeys);
}
}

0 comments on commit 55c0d1a

Please sign in to comment.