Skip to content

Commit

Permalink
Disallow new rollup jobs in clusters with no rollup usage. (#108624)
Browse files Browse the repository at this point in the history
This change will add logic to the put rollup api that fails if no rollup job is active and no rollup index exists in the cluster.

The logic first check whether there is an active rollup persistent task if there are no active rollup persistent tasks, then it checks whether any rollup index exists. The latter check is an expensive check, but assuming that it only runs as part of the put rollup job api and only when there are no rollup jobs, this should be ok.

All tests that invoke the put rollup job api will need to be adjusted to create a dummy index that has rollup mapping metadata. Otherwise, tests can't create a rollup job.

Closes #108381
  • Loading branch information
martijnvg committed May 21, 2024
1 parent deb3ef9 commit 9585504
Show file tree
Hide file tree
Showing 20 changed files with 470 additions and 10 deletions.
46 changes: 46 additions & 0 deletions docs/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,18 @@ buildRestTests.setups['library'] = '''
'''
buildRestTests.setups['sensor_rollup_job'] = '''
- do:
indices.create:
index: dummy-rollup-index
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
_meta:
_rollup:
my-id: {}
- do:
indices.create:
index: sensor-1
Expand Down Expand Up @@ -893,6 +905,18 @@ buildRestTests.setups['sensor_rollup_job'] = '''
}
'''
buildRestTests.setups['sensor_started_rollup_job'] = '''
- do:
indices.create:
index: dummy-rollup-index
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
_meta:
_rollup:
my-id: {}
- do:
indices.create:
index: sensor-1
Expand Down Expand Up @@ -967,6 +991,28 @@ buildRestTests.setups['sensor_started_rollup_job'] = '''
'''

buildRestTests.setups['sensor_index'] = '''
- do:
indices.create:
index: dummy-rollup-index
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
_meta:
_rollup:
my-id:
id: my-id
index_pattern: "dummy-index-*"
rollup_index: "dummy-rollup-index"
cron: "*/30 * * * * ?"
page_size: 1000
groups:
date_histogram:
field: timestamp
fixed_interval: 1h
delay: 7d
- do:
indices.create:
index: sensor-1
Expand Down
12 changes: 12 additions & 0 deletions docs/changelog/108624.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
pr: 108624
summary: Disallow new rollup jobs in clusters with no rollup usage
area: Rollup
type: breaking
issues:
- 108381
breaking:
title: Disallow new rollup jobs in clusters with no rollup usage
area: Rollup
details: The put rollup API will fail with an error when a rollup job is created in a cluster with no rollup usage
impact: Clusters with no rollup usage (either no rollup job or index) can not create new rollup jobs
notable: true
4 changes: 4 additions & 0 deletions docs/reference/rollup/apis/put-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

deprecated::[8.11.0,"Rollups will be removed in a future version. Use <<downsampling,downsampling>> instead."]

WARNING: From 8.15.0 invoking this API in a cluster with no rollup usage will fail with a message about Rollup's
deprecation and planned removal. A cluster either needs to contain a rollup job or a rollup index in order for this API
to be allowed to execute.

Creates a {rollup-job}.

[[rollup-put-job-api-request]]
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/rollup/rollup-getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

deprecated::[8.11.0,"Rollups will be removed in a future version. Please <<rollup-migrating-to-downsampling,migrate>> to <<downsampling,downsampling>> instead."]

WARNING: From 8.15.0 invoking the put job API in a cluster with no rollup usage will fail with a message about Rollup's
deprecation and planned removal. A cluster either needs to contain a rollup job or a rollup index in order for the
put job API to be allowed to execute.

To use the Rollup feature, you need to create one or more "Rollup Jobs". These jobs run continuously in the background
and rollup the index or indices that you specify, placing the rolled documents in a secondary index (also of your choosing).

Expand Down
7 changes: 1 addition & 6 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,6 @@ tasks.named("yamlRestTestV7CompatTransform").configure { task ->
"ml/categorization_agg/Test categorization aggregation with poor settings",
"categorize_text was changed in 8.3, but experimental prior to the change"
)
task.skipTest("rollup/delete_job/Test basic delete_job", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/delete_job/Test delete job twice", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/delete_job/Test delete running job", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/get_jobs/Test basic get_jobs", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/put_job/Test basic put_job", "rollup was an experimental feature, also see #41227")
task.skipTest("rollup/start_job/Test start job twice", "rollup was an experimental feature, also see #41227")
task.skipTest("indices.freeze/30_usage/Usage stats on frozen indices", "#70192 -- the freeze index API is removed from 8.0")
task.skipTest("indices.freeze/20_stats/Translog stats on frozen indices", "#70192 -- the freeze index API is removed from 8.0")
task.skipTest("indices.freeze/10_basic/Basic", "#70192 -- the freeze index API is removed from 8.0")
Expand All @@ -152,6 +146,7 @@ tasks.named("yamlRestTestV7CompatTransform").configure { task ->
task.skipTest("spatial/70_script_doc_values/diagonal length", "precision changed in 8.4.0")
task.skipTest("spatial/70_script_doc_values/geoshape value", "error message changed in 8.9.0")
task.skipTest("security/authz/14_cat_indices/Test empty request while single authorized index", "not supported for compatibility")
task.skipTestsByFilePattern("**/rollup/**", "The rollup yaml tests in the 7.x branch don't know how to fake a cluster with rollup usage")

task.replaceValueInMatch("_type", "_doc")
task.addAllowedWarningRegex("\\[types removal\\].*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ public void testInsufficientSearchPrivilegesOnPutWithJob() {
}

public void testCreationOnPutWithRollup() throws Exception {
createDummyRollupIndex();
setupDataAccessRole("airline-data-aggs-rollup");
String jobId = "privs-put-job-rollup";
String datafeedId = "datafeed-" + jobId;
Expand Down Expand Up @@ -1248,6 +1249,7 @@ public void testLookbackWithPipelineBucketAgg() throws Exception {
}

public void testLookbackOnlyGivenAggregationsWithHistogramAndRollupIndex() throws Exception {
createDummyRollupIndex();
String jobId = "aggs-histogram-rollup-job";
Request createJobRequest = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId);
createJobRequest.setJsonEntity("""
Expand Down Expand Up @@ -1351,6 +1353,7 @@ public void testLookbackOnlyGivenAggregationsWithHistogramAndRollupIndex() throw
}

public void testLookbackWithoutPermissionsAndRollup() throws Exception {
createDummyRollupIndex();
setupFullAccessRole("airline-data-aggs-rollup");
String jobId = "rollup-permission-test-network-job";
String datafeedId = "datafeed-" + jobId;
Expand Down Expand Up @@ -1878,4 +1881,21 @@ private Response createJobAndDataFeed(String jobId, String datafeedId) throws IO
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS)
.build();
}

private static void createDummyRollupIndex() throws IOException {
// create dummy rollup index to circumvent the check that prohibits rollup usage in empty clusters:
Request req = new Request("PUT", "dummy-rollup-index");
req.setJsonEntity("""
{
"mappings":{
"_meta": {
"_rollup":{
"my-id": {}
}
}
}
}
""");
client().performRequest(req);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,17 @@ protected void masterOperation(
ClusterState state,
ActionListener<XPackUsageFeatureResponse> listener
) {
int numberOfRollupJobs = findNumberOfRollupJobs(state);
RollupFeatureSetUsage usage = new RollupFeatureSetUsage(numberOfRollupJobs);
listener.onResponse(new XPackUsageFeatureResponse(usage));
}

static int findNumberOfRollupJobs(ClusterState state) {
int numberOfRollupJobs = 0;
PersistentTasksCustomMetadata persistentTasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
if (persistentTasks != null) {
numberOfRollupJobs = persistentTasks.findTasks(RollupJob.NAME, Predicates.always()).size();
}
RollupFeatureSetUsage usage = new RollupFeatureSetUsage(numberOfRollupJobs);
listener.onResponse(new XPackUsageFeatureResponse(usage));
return numberOfRollupJobs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateUtils;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -45,6 +47,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand All @@ -56,12 +60,18 @@
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

import static org.elasticsearch.xpack.core.ClientHelper.assertNoAuthorizationHeader;

public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNodeAction<PutRollupJobAction.Request> {

private static final Logger LOGGER = LogManager.getLogger(TransportPutRollupJobAction.class);
private static final XContentParserConfiguration PARSER_CONFIGURATION = XContentParserConfiguration.EMPTY.withFiltering(
Set.of("_doc._meta._rollup"),
null,
false
);

private final PersistentTasksService persistentTasksService;
private final Client client;
Expand Down Expand Up @@ -102,6 +112,24 @@ protected void masterOperation(
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
checkForDeprecatedTZ(request);

int numberOfCurrentRollupJobs = RollupUsageTransportAction.findNumberOfRollupJobs(clusterState);
if (numberOfCurrentRollupJobs == 0) {
try {
boolean hasRollupIndices = hasRollupIndices(clusterState.getMetadata());
if (hasRollupIndices == false) {
listener.onFailure(
new IllegalArgumentException(
"new rollup jobs are not allowed in clusters that don't have any rollup usage, since rollup has been deprecated"
)
);
return;
}
} catch (IOException e) {
listener.onFailure(e);
return;
}
}

FieldCapabilitiesRequest fieldCapsRequest = new FieldCapabilitiesRequest().indices(request.indices())
.fields(request.getConfig().getAllFields().toArray(new String[0]));
fieldCapsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
Expand Down Expand Up @@ -180,7 +208,7 @@ static void createIndex(
);
}

private static XContentBuilder createMappings(RollupJobConfig config) throws IOException {
static XContentBuilder createMappings(RollupJobConfig config) throws IOException {
return XContentBuilder.builder(XContentType.JSON.xContent())
.startObject()
.startObject("mappings")
Expand Down Expand Up @@ -339,6 +367,32 @@ public void onTimeout(TimeValue timeout) {
);
}

static boolean hasRollupIndices(Metadata metadata) throws IOException {
// Sniffing logic instead of invoking sourceAsMap(), which would materialize the entire mapping as map of maps.
for (var imd : metadata) {
if (imd.mapping() == null) {
continue;
}

try (var parser = XContentHelper.createParser(PARSER_CONFIGURATION, imd.mapping().source().compressedReference())) {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
if ("_doc".equals(parser.nextFieldName())) {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
if ("_meta".equals(parser.nextFieldName())) {
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
if ("_rollup".equals(parser.nextFieldName())) {
return true;
}
}
}
}
}
}
}
}
return false;
}

@Override
protected ClusterBlockException checkBlock(PutRollupJobAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
Expand Down

0 comments on commit 9585504

Please sign in to comment.