From 26b04723fe9664e78c9a87e8f553686cecbbfc41 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 1 Jun 2018 16:09:40 +0100 Subject: [PATCH 1/9] Check ML is licensed on remote clusters --- .../core/ml/action/CheckLicenseCssAction.java | 57 ++++++ .../core/ml/datafeed/DatafeedConfig.java | 10 + .../core/ml/datafeed/DatafeedConfigTests.java | 38 ++-- .../xpack/ml/MachineLearning.java | 5 +- .../xpack/ml/MlDailyMaintenanceService.java | 19 +- .../ml/action/TransportCheckLicenseCcs.java | 122 ++++++++++++ .../action/TransportStartDatafeedAction.java | 47 ++++- .../xpack/ml/datafeed/CcsLicenseChecker.java | 182 +++++++++++++++++ .../ml/MlDailyManagementServiceTests.java | 13 ++ .../ml/datafeed/CcsLicenseCheckerTests.java | 188 ++++++++++++++++++ 10 files changed, 660 insertions(+), 21 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckLicenseCssAction.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckLicenseCcs.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseChecker.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseCheckerTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckLicenseCssAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckLicenseCssAction.java new file mode 100644 index 0000000000000..225e39cc85749 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckLicenseCssAction.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; +import org.elasticsearch.client.ElasticsearchClient; + +public class CheckLicenseCssAction extends Action { + + public static final CheckLicenseCssAction INSTANCE = new CheckLicenseCssAction(); + public static final String NAME = "cluster:admin/xpack/ml/datafeed/checklic"; + + private CheckLicenseCssAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends AcknowledgedRequest { + + @Override + public ActionRequestValidationException validate() { + return null; + } + } + + public static class Response extends AcknowledgedResponse { + + public Response() { + super(); + } + } + + public static class RequestBuilder extends MasterNodeOperationRequestBuilder { + + public RequestBuilder(ElasticsearchClient client, CheckLicenseCssAction action) { + super(client, action, new Request()); + } + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index d06c911e13cfe..737069e1b20e1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -44,6 +44,7 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Datafeed configuration options. Describes where to proactively pull input @@ -221,6 +222,15 @@ public List getIndices() { return indices; } + /** + * Get any indices used for cross cluster search + * that are on a remote cluster. + * @return List of remote cluster indices + */ + public List getRemoteIndices() { + return indices.stream().filter(index -> index.indexOf(':') != -1).collect(Collectors.toList()); + } + public List getTypes() { return types; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 6aa987fc0e932..ee35901124306 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -45,7 +45,9 @@ import java.util.List; import java.util.TimeZone; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; @@ -193,11 +195,11 @@ public void testDefaults() { public void testDefaultQueryDelay() { DatafeedConfig.Builder feedBuilder1 = new DatafeedConfig.Builder("datafeed1", "job1"); - feedBuilder1.setIndices(Arrays.asList("foo")); + feedBuilder1.setIndices(Collections.singletonList("foo")); DatafeedConfig.Builder feedBuilder2 = new DatafeedConfig.Builder("datafeed2", "job1"); - feedBuilder2.setIndices(Arrays.asList("foo")); + feedBuilder2.setIndices(Collections.singletonList("foo")); DatafeedConfig.Builder feedBuilder3 = new DatafeedConfig.Builder("datafeed3", "job2"); - feedBuilder3.setIndices(Arrays.asList("foo")); + feedBuilder3.setIndices(Collections.singletonList("foo")); DatafeedConfig feed1 = feedBuilder1.build(); DatafeedConfig feed2 = feedBuilder2.build(); DatafeedConfig feed3 = feedBuilder3.build(); @@ -208,19 +210,19 @@ public void testDefaultQueryDelay() { assertThat(feed1.getQueryDelay(), not(equalTo(feed3.getQueryDelay()))); } - public void testCheckValid_GivenNullIndices() throws IOException { + public void testCheckValid_GivenNullIndices() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); expectThrows(IllegalArgumentException.class, () -> conf.setIndices(null)); } - public void testCheckValid_GivenEmptyIndices() throws IOException { + public void testCheckValid_GivenEmptyIndices() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); conf.setIndices(Collections.emptyList()); ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, conf::build); assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[]"), e.getMessage()); } - public void testCheckValid_GivenIndicesContainsOnlyNulls() throws IOException { + public void testCheckValid_GivenIndicesContainsOnlyNulls() { List indices = new ArrayList<>(); indices.add(null); indices.add(null); @@ -230,7 +232,7 @@ public void testCheckValid_GivenIndicesContainsOnlyNulls() throws IOException { assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[null, null]"), e.getMessage()); } - public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() throws IOException { + public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() { List indices = new ArrayList<>(); indices.add(""); indices.add(""); @@ -240,27 +242,27 @@ public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() throws IOExcep assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[, ]"), e.getMessage()); } - public void testCheckValid_GivenNegativeQueryDelay() throws IOException { + public void testCheckValid_GivenNegativeQueryDelay() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setQueryDelay(TimeValue.timeValueMillis(-10))); assertEquals("query_delay cannot be less than 0. Value = -10", e.getMessage()); } - public void testCheckValid_GivenZeroFrequency() throws IOException { + public void testCheckValid_GivenZeroFrequency() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.ZERO)); assertEquals("frequency cannot be less or equal than 0. Value = 0s", e.getMessage()); } - public void testCheckValid_GivenNegativeFrequency() throws IOException { + public void testCheckValid_GivenNegativeFrequency() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.timeValueMinutes(-1))); assertEquals("frequency cannot be less or equal than 0. Value = -1", e.getMessage()); } - public void testCheckValid_GivenNegativeScrollSize() throws IOException { + public void testCheckValid_GivenNegativeScrollSize() { DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1"); ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> conf.setScrollSize(-1000)); assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage()); @@ -414,7 +416,7 @@ public void testDefaultFrequency_GivenNegative() { public void testDefaultFrequency_GivenNoAggregations() { DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", "job"); - datafeedBuilder.setIndices(Arrays.asList("my_index")); + datafeedBuilder.setIndices(Collections.singletonList("my_index")); DatafeedConfig datafeed = datafeedBuilder.build(); assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1))); @@ -491,6 +493,18 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); } + public void testGetRemoteIndices() { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); + builder.setIndices(Collections.singletonList("local-index")); + + DatafeedConfig config = builder.build(); + assertThat(config.getRemoteIndices(), is(empty())); + + builder.setIndices(Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1")); + config = builder.build(); + assertThat(config.getRemoteIndices(), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); + } + public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index a1714a8e3f5db..0811d9ec0b742 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -56,6 +56,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; +import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction; @@ -107,6 +108,7 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.core.template.TemplateUtils; +import org.elasticsearch.xpack.ml.action.TransportCheckLicenseCcs; import org.elasticsearch.xpack.ml.action.TransportCloseJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarEventAction; @@ -547,7 +549,8 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, TransportUpdateCalendarJobAction.class), new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class), new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class), - new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class) + new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class), + new ActionHandler<>(CheckLicenseCssAction.INSTANCE, TransportCheckLicenseCcs.class) ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 4d0911f6691e1..2e5ccadc1cadb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; @@ -110,10 +111,24 @@ private void scheduleNext() { private void triggerTasks() { LOGGER.info("triggering scheduled [ML] maintenance tasks"); + + ActionListener checkLicenseListener = ActionListener.wrap( + response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), + e -> LOGGER.error("An error occurred during maintenance license checks", e) + ); + executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap( - response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), - e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); + response -> checkLicensesForCrossClusterSearch(checkLicenseListener), + e -> { + LOGGER.error("An error occurred during maintenance tasks execution deleting expired data", e); + checkLicensesForCrossClusterSearch(checkLicenseListener); + })); + scheduleNext(); } + + private void checkLicensesForCrossClusterSearch(ActionListener listener) { + executeAsyncWithOrigin(client, ML_ORIGIN, CheckLicenseCssAction.INSTANCE, new CheckLicenseCssAction.Request(), listener); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckLicenseCcs.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckLicenseCcs.java new file mode 100644 index 0000000000000..b3f0a0b6dc5b3 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckLicenseCcs.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.MlMetadata; +import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; +import org.elasticsearch.xpack.core.ml.action.CloseJobAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.CcsLicenseChecker; +import org.elasticsearch.xpack.ml.notifications.Auditor; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; + +public class TransportCheckLicenseCcs extends TransportMasterNodeAction { + + private final Auditor auditor; + private final Client client; + + @Inject + public TransportCheckLicenseCcs(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, Client client, + IndexNameExpressionResolver indexNameExpressionResolver, Auditor auditor) { + super(settings, CheckLicenseCssAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, CheckLicenseCssAction.Request::new); + this.auditor = auditor; + this.client = client; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected CheckLicenseCssAction.Response newResponse() { + return new CheckLicenseCssAction.Response(); + } + + @Override + protected void masterOperation(CheckLicenseCssAction.Request request, ClusterState state, ActionListener listener) throws Exception { + checkoutCssLic(activeDatafeeds(state)); + } + + @Override + protected ClusterBlockException checkBlock(CheckLicenseCssAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + private List activeDatafeeds(ClusterState state) { + MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); + PersistentTasksCustomMetaData persistentTasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); + + return mlMetadata.getDatafeeds().values().stream() + .filter(config -> MlMetadata.getDatafeedTask(config.getId(), persistentTasks) != null) + .collect(Collectors.toList()); + } + + private void checkoutCssLic(List activeDatafeeds) { + for (DatafeedConfig datafeed: activeDatafeeds) { + List remoteClusterNames = CcsLicenseChecker.remoteClusterNames(datafeed.getIndices()); + if (remoteClusterNames.isEmpty() == false) { + CcsLicenseChecker remoteLicenseChecker = new CcsLicenseChecker(client); + remoteLicenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + ActionListener.wrap(response -> checkLicense(response, datafeed.getJobId(), datafeed.getId()), + e -> logger.error("Error checking remote datafeed cluster licenses", e))); + } + } + } + + private void checkLicense(CcsLicenseChecker.LicenseViolation licenseCheck, String jobId, String datafeedId) { + if (licenseCheck.isViolated()) { + String message = "[" + jobId + "] Stoppping datafeed and closing job because Machine Learning is not licenced " + + "on the remote cluster [" + licenseCheck.get().getClusterName() + "] used in cross cluster search. " + + CcsLicenseChecker.buildErrorMessage(licenseCheck.get()); + logger.info(message); + auditor.warning(jobId, message); + + closeJob(jobId, datafeedId); + } + } + + private void closeJob(String jobId, String datafeedId) { + executeAsyncWithOrigin(client, ML_ORIGIN, StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId), + ActionListener.wrap( + response -> { + executeAsyncWithOrigin(client, ML_ORIGIN, CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId), + ActionListener.wrap( + closeJobResponse -> {}, + e -> logger.error( + new ParameterizedMessage("[{}] An error occurred closing job", jobId), e))); + }, + e -> logger.error( + new ParameterizedMessage("[{}] An error occurred stopping datafeed [{}]", jobId, datafeedId), e))); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index bed83ed82c1c9..3e7bd61a16792 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -43,10 +43,12 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.datafeed.CcsLicenseChecker; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import java.util.ArrayList; import java.util.Map; import java.util.function.Predicate; @@ -111,7 +113,8 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState ActionListener listener) { StartDatafeedAction.DatafeedParams params = request.getParams(); if (licenseState.isMachineLearningAllowed()) { - ActionListener> finalListener = + + ActionListener> waitForTaskListener = new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { @@ -135,16 +138,48 @@ public void onFailure(Exception e) { validate(params.getDatafeedId(), mlMetadata, tasks); DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId()); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); - DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap( - dataExtractorFactory -> - persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()), - StartDatafeedAction.TASK_NAME, params, finalListener) - , listener::onFailure)); + + if (CcsLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { + CcsLicenseChecker remoteLicenseChecker = new CcsLicenseChecker(client); + remoteLicenseChecker.checkRemoteClusterLicenses(CcsLicenseChecker.remoteClusterNames(datafeed.getIndices()), + ActionListener.wrap( + response -> { + if (response.isViolated()) { + String message = "Cannot start datafeed [" + datafeed.getId() + "] as it is configured to use " + + "indices on a remote cluster [" + response.get().getClusterName() + + "] that is not licensed for Machine Learning. " + + CcsLicenseChecker.buildErrorMessage(response.get()); + + listener.onFailure(new ElasticsearchException(message)); + } else { + createDataExtractor(job, datafeed, params, waitForTaskListener); + } + }, + e -> { + String message = "Cannot start datafeed [" + datafeed.getId() + "] as it is configured to use " + + "indices on a remote cluster" + datafeed.getRemoteIndices() + + " and the license type could not be verified"; + ElasticsearchException wrappedException = new ElasticsearchException(message, e); + listener.onFailure(wrappedException); + })); + } else { + createDataExtractor(job, datafeed, params, waitForTaskListener); + } } else { listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING)); } } + private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params, + ActionListener> + listener) { + DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap( + dataExtractorFactory -> + persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()), + StartDatafeedAction.TASK_NAME, params, listener) + , listener::onFailure)); + } + @Override protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, ClusterState state) { // We only delegate here to PersistentTasksService, but if there is a metadata writeblock, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseChecker.java new file mode 100644 index 0000000000000..6d61d3501f52d --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseChecker.java @@ -0,0 +1,182 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.xpack.core.action.XPackInfoAction; +import org.elasticsearch.xpack.core.action.XPackInfoRequest; + +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * ML datafeeds can use cross cluster search to access data in a remote cluster. + * The remote cluster should be licenced for ML this class performs that check + * using the _xpack endpoint. + */ +public class CcsLicenseChecker { + + private final Client client; + + public static class RemoteClusterLicenseInfo { + private String clusterName; + private XPackInfoResponse.LicenseInfo licenseInfo; + + RemoteClusterLicenseInfo(String clusterName, XPackInfoResponse.LicenseInfo licenseInfo) { + this.clusterName = clusterName; + this.licenseInfo = licenseInfo; + } + + public String getClusterName() { + return clusterName; + } + + public XPackInfoResponse.LicenseInfo getLicenseInfo() { + return licenseInfo; + } + } + + public class LicenseViolation { + private final RemoteClusterLicenseInfo licenseInfo; + + private LicenseViolation(@Nullable RemoteClusterLicenseInfo licenseInfo) { + this.licenseInfo = licenseInfo; + } + + public boolean isViolated() { + return licenseInfo != null; + } + + public RemoteClusterLicenseInfo get() { + return licenseInfo; + } + } + + public CcsLicenseChecker(Client client) { + this.client = client; + } + + /** + * Check each cluster is licensed for ML. + * This function terminates early when the first cluster that is not licensed + * is found or an error occurs. + * + * @param clusterNames List of remote cluster names + * @param listener Response listener + */ + public void checkRemoteClusterLicenses(List clusterNames, ActionListener listener) { + final Iterator itr = clusterNames.iterator(); + if (itr.hasNext() == false) { + listener.onResponse(new LicenseViolation(null)); + return; + } + + final AtomicReference clusterName = new AtomicReference<>(itr.next()); + + ActionListener infoListener = new ActionListener() { + @Override + public void onResponse(XPackInfoResponse xPackInfoResponse) { + if (licenseSupportsML(xPackInfoResponse.getLicenseInfo()) == false) { + listener.onResponse(new LicenseViolation( + new RemoteClusterLicenseInfo(clusterName.get(), xPackInfoResponse.getLicenseInfo()))); + return; + } + + if (itr.hasNext()) { + clusterName.set(itr.next()); + remoteClusterLicense(clusterName.get(), this); + } else { + listener.onResponse(new LicenseViolation(null)); + } + } + + @Override + public void onFailure(Exception e) { + String message = "Could not determine the X-Pack licence type for cluster [" + clusterName.get() + "]"; + if (e instanceof InvalidIndexNameException) { + // This is likely to be because x-pack is not installed in the target cluster + message += ". Is X-Pack installed on the target cluster?"; + } + listener.onFailure(new ElasticsearchException(message, e)); + } + }; + + remoteClusterLicense(clusterName.get(), infoListener); + } + + private void remoteClusterLicense(String clusterName, ActionListener listener) { + Client remoteClusterClient = client.getRemoteClusterClient(clusterName); + ThreadContext threadContext = remoteClusterClient.threadPool().getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we stash any context here since this is an internal execution and should not leak any + // existing context information. + threadContext.markAsSystemContext(); + + XPackInfoRequest request = new XPackInfoRequest(); + request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE)); + remoteClusterClient.execute(XPackInfoAction.INSTANCE, request, listener); + } + } + + static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) { + License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode()); + return licenseInfo.getStatus() == License.Status.ACTIVE && + (mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL); + } + + private static boolean isRemoteIndex(String index) { + return index.indexOf(':') != -1; + } + + public static boolean containsRemoteIndex(List indices) { + return indices.stream().anyMatch(CcsLicenseChecker::isRemoteIndex); + } + + /** + * Extract the list of remote cluster names from the list of indices. + * @param indices List of indices. Remote cluster indices are prefixed + * with {@code cluster-name:} + * @return Every cluster name found in {@code indices} + */ + public static List remoteClusterNames(List indices) { + return indices.stream() + .filter(CcsLicenseChecker::isRemoteIndex) + .map(index -> index.substring(0, index.indexOf(':'))) + .distinct() + .collect(Collectors.toList()); + } + + public static String buildErrorMessage(RemoteClusterLicenseInfo clusterLicenseInfo) { + StringBuilder error = new StringBuilder(); + if (clusterLicenseInfo.licenseInfo.getStatus() != License.Status.ACTIVE) { + error.append("The license on cluster [").append(clusterLicenseInfo.clusterName) + .append("] is not active. "); + } else { + License.OperationMode mode = License.OperationMode.resolve(clusterLicenseInfo.licenseInfo.getMode()); + if (mode != License.OperationMode.PLATINUM && mode != License.OperationMode.TRIAL) { + error.append("The license mode on cluster [").append(clusterLicenseInfo.clusterName) + .append("] with mode [").append(mode). + append("] does not enable Machine Learning. "); + } + } + + error.append(Strings.toString(clusterLicenseInfo.licenseInfo)); + return error.toString(); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java index 4d2b77561296e..2b80f07a25e72 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java @@ -5,11 +5,14 @@ */ package org.elasticsearch.xpack.ml; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.XPackInfoAction; +import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.junit.After; import org.junit.Before; @@ -21,6 +24,7 @@ import static org.elasticsearch.mock.orig.Mockito.verify; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -42,6 +46,14 @@ public void stop() throws InterruptedException { } public void testScheduledTriggering() throws InterruptedException { + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(null); + return null; + }).when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + + int triggerCount = randomIntBetween(2, 4); CountDownLatch latch = new CountDownLatch(triggerCount); try (MlDailyMaintenanceService service = createService(latch, client)) { @@ -50,6 +62,7 @@ public void testScheduledTriggering() throws InterruptedException { } verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); + verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(CheckLicenseCssAction.INSTANCE), any(), any()); } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseCheckerTests.java new file mode 100644 index 0000000000000..397d37751836e --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseCheckerTests.java @@ -0,0 +1,188 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.datafeed; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.license.License; +import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.action.XPackInfoAction; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CcsLicenseCheckerTests extends ESTestCase { + + public void testIsRemoteIndex() { + List indices = Arrays.asList("local-index1", "local-index2"); + assertFalse(CcsLicenseChecker.containsRemoteIndex(indices)); + indices = Arrays.asList("local-index1", "remote-cluster:remote-index2"); + assertTrue(CcsLicenseChecker.containsRemoteIndex(indices)); + } + + public void testRemoteClusterNames() { + List indices = Arrays.asList("local-index1", "local-index2"); + assertThat(CcsLicenseChecker.remoteClusterNames(indices), empty()); + indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); + assertThat(CcsLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); + indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); + assertThat(CcsLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); + assertThat(CcsLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + } + + public void testLicenseSupportsML() { + XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", + License.Status.ACTIVE, randomNonNegativeLong()); + assertTrue(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", License.Status.EXPIRED, randomNonNegativeLong()); + assertFalse(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", License.Status.ACTIVE, randomNonNegativeLong()); + assertFalse(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + + licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong()); + assertTrue(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + } + + public void testCheckRemoteClusterLicenses_givenValidLicenses() { + final AtomicInteger index = new AtomicInteger(0); + final List responses = new ArrayList<>(); + + Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + + List remoteClusterNames = Arrays.asList("valid1", "valid2", "valid3"); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + CcsLicenseChecker licenseChecker = new CcsLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + new ActionListener() { + @Override + public void onResponse(CcsLicenseChecker.LicenseViolation response) { + licCheckResponse.set(response); + } + + @Override + public void onFailure(Exception e) { + fail(e.getMessage()); + } + }); + + verify(client, times(3)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licCheckResponse.get()); + assertFalse(licCheckResponse.get().isViolated()); + assertNull(licCheckResponse.get().get()); + } + + public void testCheckRemoteClusterLicenses_givenInvalidLicense() { + final AtomicInteger index = new AtomicInteger(0); + List remoteClusterNames = Arrays.asList("good", "cluster-with-basic-license", "good2"); + final List responses = new ArrayList<>(); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createBasicLicenseResponse(), null)); + responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); + + Client client = createMockClient(); + doAnswer(invocationMock -> { + @SuppressWarnings("raw_types") + ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; + listener.onResponse(responses.get(index.getAndIncrement())); + return null; + }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); + + CcsLicenseChecker licenseChecker = new CcsLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); + + licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, + new ActionListener() { + @Override + public void onResponse(CcsLicenseChecker.LicenseViolation response) { + licCheckResponse.set(response); + } + + @Override + public void onFailure(Exception e) { + fail(e.getMessage()); + } + }); + + verify(client, times(2)).execute(same(XPackInfoAction.INSTANCE), any(), any()); + assertNotNull(licCheckResponse.get()); + assertTrue(licCheckResponse.get().isViolated()); + assertEquals("cluster-with-basic-license", licCheckResponse.get().get().getClusterName()); + assertEquals("BASIC", licCheckResponse.get().get().getLicenseInfo().getType()); + } + + public void testBuildErrorMessage() { + XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); + CcsLicenseChecker.RemoteClusterLicenseInfo info = new CcsLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); + assertEquals(Strings.toString(platinumLicence), CcsLicenseChecker.buildErrorMessage(info)); + + XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); + info = new CcsLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); + String expected = "The license mode on cluster [basic-cluster] with mode [BASIC] does not enable Machine Learning. " + + Strings.toString(basicLicense); + assertEquals(expected, CcsLicenseChecker.buildErrorMessage(info)); + + XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse(); + info = new CcsLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); + expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense); + assertEquals(expected, CcsLicenseChecker.buildErrorMessage(info)); + } + + private Client createMockClient() { + Client client = mock(Client.class); + ThreadPool threadPool = mock(ThreadPool.class); + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + return client; + } + + private XPackInfoResponse.LicenseInfo createPlatinumLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createBasicLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "BASIC", "BASIC", License.Status.ACTIVE, randomNonNegativeLong()); + } + + private XPackInfoResponse.LicenseInfo createExpiredLicenseResponse() { + return new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.EXPIRED, randomNonNegativeLong()); + } +} From c7b54f66348f4d33da42b9e65d6bde8a862e9711 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 1 Jun 2018 17:32:31 +0100 Subject: [PATCH 2/9] Rename classes --- ...ion.java => CheckRemoteLicenseAction.java} | 8 +-- .../xpack/ml/MachineLearning.java | 6 +-- .../xpack/ml/MlDailyMaintenanceService.java | 8 +-- ...=> TransportCheckRemoteLicenseAction.java} | 35 ++++++------- .../action/TransportStartDatafeedAction.java | 11 ++-- ...ecker.java => MlRemoteLicenseChecker.java} | 9 ++-- .../ml/MlDailyManagementServiceTests.java | 5 +- ....java => MlRemoteLicenseCheckerTests.java} | 50 +++++++++---------- 8 files changed, 63 insertions(+), 69 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/{CheckLicenseCssAction.java => CheckRemoteLicenseAction.java} (79%) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/{TransportCheckLicenseCcs.java => TransportCheckRemoteLicenseAction.java} (72%) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/{CcsLicenseChecker.java => MlRemoteLicenseChecker.java} (96%) rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/{CcsLicenseCheckerTests.java => MlRemoteLicenseCheckerTests.java} (76%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckLicenseCssAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckRemoteLicenseAction.java similarity index 79% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckLicenseCssAction.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckRemoteLicenseAction.java index 225e39cc85749..92a2ee7c5e3c4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckLicenseCssAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckRemoteLicenseAction.java @@ -13,12 +13,12 @@ import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; -public class CheckLicenseCssAction extends Action { +public class CheckRemoteLicenseAction extends Action { - public static final CheckLicenseCssAction INSTANCE = new CheckLicenseCssAction(); + public static final CheckRemoteLicenseAction INSTANCE = new CheckRemoteLicenseAction(); public static final String NAME = "cluster:admin/xpack/ml/datafeed/checklic"; - private CheckLicenseCssAction() { + private CheckRemoteLicenseAction() { super(NAME); } @@ -49,7 +49,7 @@ public Response() { public static class RequestBuilder extends MasterNodeOperationRequestBuilder { - public RequestBuilder(ElasticsearchClient client, CheckLicenseCssAction action) { + public RequestBuilder(ElasticsearchClient client, CheckRemoteLicenseAction action) { super(client, action, new Request()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 0811d9ec0b742..a5a68bded10a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -56,7 +56,7 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; -import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; +import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction; @@ -108,7 +108,7 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.core.template.TemplateUtils; -import org.elasticsearch.xpack.ml.action.TransportCheckLicenseCcs; +import org.elasticsearch.xpack.ml.action.TransportCheckRemoteLicenseAction; import org.elasticsearch.xpack.ml.action.TransportCloseJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarEventAction; @@ -550,7 +550,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class), new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class), new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class), - new ActionHandler<>(CheckLicenseCssAction.INSTANCE, TransportCheckLicenseCcs.class) + new ActionHandler<>(CheckRemoteLicenseAction.INSTANCE, TransportCheckRemoteLicenseAction.class) ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 2e5ccadc1cadb..dc51bf3c67055 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -15,7 +15,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; +import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; @@ -112,7 +112,7 @@ private void scheduleNext() { private void triggerTasks() { LOGGER.info("triggering scheduled [ML] maintenance tasks"); - ActionListener checkLicenseListener = ActionListener.wrap( + ActionListener checkLicenseListener = ActionListener.wrap( response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), e -> LOGGER.error("An error occurred during maintenance license checks", e) ); @@ -128,7 +128,7 @@ private void triggerTasks() { scheduleNext(); } - private void checkLicensesForCrossClusterSearch(ActionListener listener) { - executeAsyncWithOrigin(client, ML_ORIGIN, CheckLicenseCssAction.INSTANCE, new CheckLicenseCssAction.Request(), listener); + private void checkLicensesForCrossClusterSearch(ActionListener listener) { + executeAsyncWithOrigin(client, ML_ORIGIN, CheckRemoteLicenseAction.INSTANCE, new CheckRemoteLicenseAction.Request(), listener); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckLicenseCcs.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckRemoteLicenseAction.java similarity index 72% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckLicenseCcs.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckRemoteLicenseAction.java index b3f0a0b6dc5b3..e9d920325dcd8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckLicenseCcs.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckRemoteLicenseAction.java @@ -22,33 +22,30 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; +import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; -import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.CcsLicenseChecker; +import org.elasticsearch.xpack.ml.datafeed.MlRemoteLicenseChecker; import org.elasticsearch.xpack.ml.notifications.Auditor; -import java.util.ArrayList; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; -public class TransportCheckLicenseCcs extends TransportMasterNodeAction { +public class TransportCheckRemoteLicenseAction extends TransportMasterNodeAction { private final Auditor auditor; private final Client client; @Inject - public TransportCheckLicenseCcs(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, Client client, - IndexNameExpressionResolver indexNameExpressionResolver, Auditor auditor) { - super(settings, CheckLicenseCssAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, CheckLicenseCssAction.Request::new); + public TransportCheckRemoteLicenseAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, Client client, + IndexNameExpressionResolver indexNameExpressionResolver, Auditor auditor) { + super(settings, CheckRemoteLicenseAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, CheckRemoteLicenseAction.Request::new); this.auditor = auditor; this.client = client; } @@ -59,17 +56,17 @@ protected String executor() { } @Override - protected CheckLicenseCssAction.Response newResponse() { - return new CheckLicenseCssAction.Response(); + protected CheckRemoteLicenseAction.Response newResponse() { + return new CheckRemoteLicenseAction.Response(); } @Override - protected void masterOperation(CheckLicenseCssAction.Request request, ClusterState state, ActionListener listener) throws Exception { + protected void masterOperation(CheckRemoteLicenseAction.Request request, ClusterState state, ActionListener listener) throws Exception { checkoutCssLic(activeDatafeeds(state)); } @Override - protected ClusterBlockException checkBlock(CheckLicenseCssAction.Request request, ClusterState state) { + protected ClusterBlockException checkBlock(CheckRemoteLicenseAction.Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); } @@ -84,9 +81,9 @@ private List activeDatafeeds(ClusterState state) { private void checkoutCssLic(List activeDatafeeds) { for (DatafeedConfig datafeed: activeDatafeeds) { - List remoteClusterNames = CcsLicenseChecker.remoteClusterNames(datafeed.getIndices()); + List remoteClusterNames = MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()); if (remoteClusterNames.isEmpty() == false) { - CcsLicenseChecker remoteLicenseChecker = new CcsLicenseChecker(client); + MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client); remoteLicenseChecker.checkRemoteClusterLicenses(remoteClusterNames, ActionListener.wrap(response -> checkLicense(response, datafeed.getJobId(), datafeed.getId()), e -> logger.error("Error checking remote datafeed cluster licenses", e))); @@ -94,11 +91,11 @@ private void checkoutCssLic(List activeDatafeeds) { } } - private void checkLicense(CcsLicenseChecker.LicenseViolation licenseCheck, String jobId, String datafeedId) { + private void checkLicense(MlRemoteLicenseChecker.LicenseViolation licenseCheck, String jobId, String datafeedId) { if (licenseCheck.isViolated()) { String message = "[" + jobId + "] Stoppping datafeed and closing job because Machine Learning is not licenced " + "on the remote cluster [" + licenseCheck.get().getClusterName() + "] used in cross cluster search. " - + CcsLicenseChecker.buildErrorMessage(licenseCheck.get()); + + MlRemoteLicenseChecker.buildErrorMessage(licenseCheck.get()); logger.info(message); auditor.warning(jobId, message); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3e7bd61a16792..8b02a17895c74 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -43,12 +43,11 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.datafeed.CcsLicenseChecker; +import org.elasticsearch.xpack.ml.datafeed.MlRemoteLicenseChecker; import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; -import java.util.ArrayList; import java.util.Map; import java.util.function.Predicate; @@ -139,16 +138,16 @@ public void onFailure(Exception e) { DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId()); Job job = mlMetadata.getJobs().get(datafeed.getJobId()); - if (CcsLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { - CcsLicenseChecker remoteLicenseChecker = new CcsLicenseChecker(client); - remoteLicenseChecker.checkRemoteClusterLicenses(CcsLicenseChecker.remoteClusterNames(datafeed.getIndices()), + if (MlRemoteLicenseChecker.containsRemoteIndex(datafeed.getIndices())) { + MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client); + remoteLicenseChecker.checkRemoteClusterLicenses(MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()), ActionListener.wrap( response -> { if (response.isViolated()) { String message = "Cannot start datafeed [" + datafeed.getId() + "] as it is configured to use " + "indices on a remote cluster [" + response.get().getClusterName() + "] that is not licensed for Machine Learning. " - + CcsLicenseChecker.buildErrorMessage(response.get()); + + MlRemoteLicenseChecker.buildErrorMessage(response.get()); listener.onFailure(new ElasticsearchException(message)); } else { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java similarity index 96% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseChecker.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java index 6d61d3501f52d..58e5472d0368a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java @@ -21,7 +21,6 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -30,7 +29,7 @@ * The remote cluster should be licenced for ML this class performs that check * using the _xpack endpoint. */ -public class CcsLicenseChecker { +public class MlRemoteLicenseChecker { private final Client client; @@ -68,7 +67,7 @@ public RemoteClusterLicenseInfo get() { } } - public CcsLicenseChecker(Client client) { + public MlRemoteLicenseChecker(Client client) { this.client = client; } @@ -145,7 +144,7 @@ private static boolean isRemoteIndex(String index) { } public static boolean containsRemoteIndex(List indices) { - return indices.stream().anyMatch(CcsLicenseChecker::isRemoteIndex); + return indices.stream().anyMatch(MlRemoteLicenseChecker::isRemoteIndex); } /** @@ -156,7 +155,7 @@ public static boolean containsRemoteIndex(List indices) { */ public static List remoteClusterNames(List indices) { return indices.stream() - .filter(CcsLicenseChecker::isRemoteIndex) + .filter(MlRemoteLicenseChecker::isRemoteIndex) .map(index -> index.substring(0, index.indexOf(':'))) .distinct() .collect(Collectors.toList()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java index 2b80f07a25e72..de9d7ddb09ec7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java @@ -11,8 +11,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.action.XPackInfoAction; -import org.elasticsearch.xpack.core.ml.action.CheckLicenseCssAction; +import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.junit.After; import org.junit.Before; @@ -62,7 +61,7 @@ public void testScheduledTriggering() throws InterruptedException { } verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); - verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(CheckLicenseCssAction.INSTANCE), any(), any()); + verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(CheckRemoteLicenseAction.INSTANCE), any(), any()); } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java similarity index 76% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseCheckerTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java index 397d37751836e..f6785a8494a84 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/CcsLicenseCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java @@ -34,39 +34,39 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class CcsLicenseCheckerTests extends ESTestCase { +public class MlRemoteLicenseCheckerTests extends ESTestCase { public void testIsRemoteIndex() { List indices = Arrays.asList("local-index1", "local-index2"); - assertFalse(CcsLicenseChecker.containsRemoteIndex(indices)); + assertFalse(MlRemoteLicenseChecker.containsRemoteIndex(indices)); indices = Arrays.asList("local-index1", "remote-cluster:remote-index2"); - assertTrue(CcsLicenseChecker.containsRemoteIndex(indices)); + assertTrue(MlRemoteLicenseChecker.containsRemoteIndex(indices)); } public void testRemoteClusterNames() { List indices = Arrays.asList("local-index1", "local-index2"); - assertThat(CcsLicenseChecker.remoteClusterNames(indices), empty()); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), empty()); indices = Arrays.asList("local-index1", "remote-cluster1:remote-index2"); - assertThat(CcsLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1")); indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1"); - assertThat(CcsLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); indices = Arrays.asList("remote-cluster1:index2", "index1", "remote-cluster2:index1", "remote-cluster2:index2"); - assertThat(CcsLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); + assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), contains("remote-cluster1", "remote-cluster2")); } public void testLicenseSupportsML() { XPackInfoResponse.LicenseInfo licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", License.Status.ACTIVE, randomNonNegativeLong()); - assertTrue(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "trial", "trial", License.Status.EXPIRED, randomNonNegativeLong()); - assertFalse(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "GOLD", "GOLD", License.Status.ACTIVE, randomNonNegativeLong()); - assertFalse(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + assertFalse(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); licenseInfo = new XPackInfoResponse.LicenseInfo("uid", "PLATINUM", "PLATINUM", License.Status.ACTIVE, randomNonNegativeLong()); - assertTrue(CcsLicenseChecker.licenseSupportsML(licenseInfo)); + assertTrue(MlRemoteLicenseChecker.licenseSupportsML(licenseInfo)); } public void testCheckRemoteClusterLicenses_givenValidLicenses() { @@ -87,13 +87,13 @@ public void testCheckRemoteClusterLicenses_givenValidLicenses() { responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); responses.add(new XPackInfoResponse(null, createPlatinumLicenseResponse(), null)); - CcsLicenseChecker licenseChecker = new CcsLicenseChecker(client); - AtomicReference licCheckResponse = new AtomicReference<>(); + MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, - new ActionListener() { + new ActionListener() { @Override - public void onResponse(CcsLicenseChecker.LicenseViolation response) { + public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { licCheckResponse.set(response); } @@ -125,13 +125,13 @@ public void testCheckRemoteClusterLicenses_givenInvalidLicense() { return null; }).when(client).execute(same(XPackInfoAction.INSTANCE), any(), any()); - CcsLicenseChecker licenseChecker = new CcsLicenseChecker(client); - AtomicReference licCheckResponse = new AtomicReference<>(); + MlRemoteLicenseChecker licenseChecker = new MlRemoteLicenseChecker(client); + AtomicReference licCheckResponse = new AtomicReference<>(); licenseChecker.checkRemoteClusterLicenses(remoteClusterNames, - new ActionListener() { + new ActionListener() { @Override - public void onResponse(CcsLicenseChecker.LicenseViolation response) { + public void onResponse(MlRemoteLicenseChecker.LicenseViolation response) { licCheckResponse.set(response); } @@ -150,19 +150,19 @@ public void onFailure(Exception e) { public void testBuildErrorMessage() { XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); - CcsLicenseChecker.RemoteClusterLicenseInfo info = new CcsLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); - assertEquals(Strings.toString(platinumLicence), CcsLicenseChecker.buildErrorMessage(info)); + MlRemoteLicenseChecker.RemoteClusterLicenseInfo info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); + assertEquals(Strings.toString(platinumLicence), MlRemoteLicenseChecker.buildErrorMessage(info)); XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); - info = new CcsLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); + info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); String expected = "The license mode on cluster [basic-cluster] with mode [BASIC] does not enable Machine Learning. " + Strings.toString(basicLicense); - assertEquals(expected, CcsLicenseChecker.buildErrorMessage(info)); + assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); XPackInfoResponse.LicenseInfo expiredLicense = createExpiredLicenseResponse(); - info = new CcsLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); + info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("expired-cluster", expiredLicense); expected = "The license on cluster [expired-cluster] is not active. " + Strings.toString(expiredLicense); - assertEquals(expected, CcsLicenseChecker.buildErrorMessage(info)); + assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); } private Client createMockClient() { From b6cc9dfeeee4de59ef5ac0b784a776efade7d378 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 4 Jun 2018 16:32:21 +0100 Subject: [PATCH 3/9] Rewrite mangled English is error messages --- .../action/TransportStartDatafeedAction.java | 104 ++++++++++-------- .../ml/datafeed/MlRemoteLicenseChecker.java | 7 +- .../process/autodetect/AutodetectProcess.java | 2 +- 3 files changed, 62 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 8b02a17895c74..dbd2e0a885d97 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import java.util.List; import java.util.Map; import java.util.function.Predicate; @@ -115,21 +116,21 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState ActionListener> waitForTaskListener = new ActionListener>() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - waitForDatafeedStarted(persistentTask.getId(), params, listener); - } + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + waitForDatafeedStarted(persistentTask.getId(), params, listener); + } - @Override - public void onFailure(Exception e) { - if (e instanceof ResourceAlreadyExistsException) { - logger.debug("datafeed already started", e); - e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + - "] because it has already been started", RestStatus.CONFLICT); - } - listener.onFailure(e); - } - }; + @Override + public void onFailure(Exception e) { + if (e instanceof ResourceAlreadyExistsException) { + logger.debug("datafeed already started", e); + e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() + + "] because it has already been started", RestStatus.CONFLICT); + } + listener.onFailure(e); + } + }; // Verify data extractor factory can be created, then start persistent task MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); @@ -144,23 +145,13 @@ public void onFailure(Exception e) { ActionListener.wrap( response -> { if (response.isViolated()) { - String message = "Cannot start datafeed [" + datafeed.getId() + "] as it is configured to use " - + "indices on a remote cluster [" + response.get().getClusterName() - + "] that is not licensed for Machine Learning. " - + MlRemoteLicenseChecker.buildErrorMessage(response.get()); - - listener.onFailure(new ElasticsearchException(message)); + listener.onFailure(createUnlicensedError(datafeed.getId(), response)); } else { createDataExtractor(job, datafeed, params, waitForTaskListener); } }, - e -> { - String message = "Cannot start datafeed [" + datafeed.getId() + "] as it is configured to use " - + "indices on a remote cluster" + datafeed.getRemoteIndices() - + " and the license type could not be verified"; - ElasticsearchException wrappedException = new ElasticsearchException(message, e); - listener.onFailure(wrappedException); - })); + e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(), datafeed.getRemoteIndices(), e)) + )); } else { createDataExtractor(job, datafeed, params, waitForTaskListener); } @@ -192,28 +183,28 @@ private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedP DatafeedPredicate predicate = new DatafeedPredicate(); persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(), new PersistentTasksService.WaitForPersistentTaskListener() { - @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { - if (predicate.exception != null) { - // We want to return to the caller without leaving an unassigned persistent task, to match - // what would have happened if the error had been detected in the "fast fail" validation - cancelDatafeedStart(persistentTask, predicate.exception, listener); - } else { - listener.onResponse(new StartDatafeedAction.Response(true)); - } - } + @Override + public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + if (predicate.exception != null) { + // We want to return to the caller without leaving an unassigned persistent task, to match + // what would have happened if the error had been detected in the "fast fail" validation + cancelDatafeedStart(persistentTask, predicate.exception, listener); + } else { + listener.onResponse(new StartDatafeedAction.Response(true)); + } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new ElasticsearchException("Starting datafeed [" - + params.getDatafeedId() + "] timed out after [" + timeout + "]")); - } - }); + @Override + public void onTimeout(TimeValue timeout) { + listener.onFailure(new ElasticsearchException("Starting datafeed [" + + params.getDatafeedId() + "] timed out after [" + timeout + "]")); + } + }); } private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, @@ -237,6 +228,25 @@ public void onFailure(Exception e) { ); } + private ElasticsearchStatusException createUnlicensedError(String datafeedId, + MlRemoteLicenseChecker.LicenseViolation licenseViolation) { + String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use " + + "indices on a remote cluster [" + licenseViolation.get().getClusterName() + + "] that is not licensed for Machine Learning. " + + MlRemoteLicenseChecker.buildErrorMessage(licenseViolation.get()); + + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST); + } + + private ElasticsearchStatusException createUnknownLicenseError(String datafeedId, List remoteIndices, + Exception cause) { + String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use" + + " indices on a remote cluster " + remoteIndices + + " but the license type could not be verified"; + + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause); + } + public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { private final DatafeedManager datafeedManager; private final IndexNameExpressionResolver resolver; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java index 58e5472d0368a..3a0c36a62592a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java @@ -169,9 +169,10 @@ public static String buildErrorMessage(RemoteClusterLicenseInfo clusterLicenseIn } else { License.OperationMode mode = License.OperationMode.resolve(clusterLicenseInfo.licenseInfo.getMode()); if (mode != License.OperationMode.PLATINUM && mode != License.OperationMode.TRIAL) { - error.append("The license mode on cluster [").append(clusterLicenseInfo.clusterName) - .append("] with mode [").append(mode). - append("] does not enable Machine Learning. "); + error.append("The license mode [").append(mode) + .append("] on cluster [").append(mode) + .append(clusterLicenseInfo.clusterName) + .append("] does not enable Machine Learning. "); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java index 049880b1ac224..21be815d561a8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcess.java @@ -117,7 +117,7 @@ void writeUpdateDetectorRulesMessage(int detectorIndex, List rule /** * Ask the job to start persisting model state in the background - * @throws IOException + * @throws IOException If writing the request fails */ void persistJob() throws IOException; From eca969aa4bcf9623ca44158f665dd2a94bbaf5c0 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 8 Jun 2018 14:30:26 +0100 Subject: [PATCH 4/9] Remove daily licence check --- .../ml/action/CheckRemoteLicenseAction.java | 57 --------- .../xpack/ml/MachineLearning.java | 5 +- .../xpack/ml/MlDailyMaintenanceService.java | 19 +-- .../TransportCheckRemoteLicenseAction.java | 119 ------------------ .../ml/MlDailyManagementServiceTests.java | 12 -- 5 files changed, 3 insertions(+), 209 deletions(-) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckRemoteLicenseAction.java delete mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckRemoteLicenseAction.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckRemoteLicenseAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckRemoteLicenseAction.java deleted file mode 100644 index 92a2ee7c5e3c4..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/CheckRemoteLicenseAction.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.ml.action; - -import org.elasticsearch.action.Action; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder; -import org.elasticsearch.client.ElasticsearchClient; - -public class CheckRemoteLicenseAction extends Action { - - public static final CheckRemoteLicenseAction INSTANCE = new CheckRemoteLicenseAction(); - public static final String NAME = "cluster:admin/xpack/ml/datafeed/checklic"; - - private CheckRemoteLicenseAction() { - super(NAME); - } - - @Override - public RequestBuilder newRequestBuilder(ElasticsearchClient client) { - return new RequestBuilder(client, this); - } - - @Override - public Response newResponse() { - return new Response(); - } - - public static class Request extends AcknowledgedRequest { - - @Override - public ActionRequestValidationException validate() { - return null; - } - } - - public static class Response extends AcknowledgedResponse { - - public Response() { - super(); - } - } - - public static class RequestBuilder extends MasterNodeOperationRequestBuilder { - - public RequestBuilder(ElasticsearchClient client, CheckRemoteLicenseAction action) { - super(client, action, new Request()); - } - } - -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index a5a68bded10a4..a1714a8e3f5db 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -56,7 +56,6 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetaIndex; -import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction; import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction; @@ -108,7 +107,6 @@ import org.elasticsearch.xpack.core.ml.notifications.AuditMessage; import org.elasticsearch.xpack.core.ml.notifications.AuditorField; import org.elasticsearch.xpack.core.template.TemplateUtils; -import org.elasticsearch.xpack.ml.action.TransportCheckRemoteLicenseAction; import org.elasticsearch.xpack.ml.action.TransportCloseJobAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarAction; import org.elasticsearch.xpack.ml.action.TransportDeleteCalendarEventAction; @@ -549,8 +547,7 @@ public List getRestHandlers(Settings settings, RestController restC new ActionHandler<>(UpdateCalendarJobAction.INSTANCE, TransportUpdateCalendarJobAction.class), new ActionHandler<>(GetCalendarEventsAction.INSTANCE, TransportGetCalendarEventsAction.class), new ActionHandler<>(PostCalendarEventsAction.INSTANCE, TransportPostCalendarEventsAction.class), - new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class), - new ActionHandler<>(CheckRemoteLicenseAction.INSTANCE, TransportCheckRemoteLicenseAction.class) + new ActionHandler<>(PersistJobAction.INSTANCE, TransportPersistJobAction.class) ); } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index dc51bf3c67055..4d0911f6691e1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -15,7 +15,6 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; @@ -111,24 +110,10 @@ private void scheduleNext() { private void triggerTasks() { LOGGER.info("triggering scheduled [ML] maintenance tasks"); - - ActionListener checkLicenseListener = ActionListener.wrap( - response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), - e -> LOGGER.error("An error occurred during maintenance license checks", e) - ); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), ActionListener.wrap( - response -> checkLicensesForCrossClusterSearch(checkLicenseListener), - e -> { - LOGGER.error("An error occurred during maintenance tasks execution deleting expired data", e); - checkLicensesForCrossClusterSearch(checkLicenseListener); - })); - + response -> LOGGER.info("Successfully completed [ML] maintenance tasks"), + e -> LOGGER.error("An error occurred during maintenance tasks execution", e))); scheduleNext(); } - - private void checkLicensesForCrossClusterSearch(ActionListener listener) { - executeAsyncWithOrigin(client, ML_ORIGIN, CheckRemoteLicenseAction.INSTANCE, new CheckRemoteLicenseAction.Request(), listener); - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckRemoteLicenseAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckRemoteLicenseAction.java deleted file mode 100644 index e9d920325dcd8..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCheckRemoteLicenseAction.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.ml.action; - -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; -import org.elasticsearch.xpack.core.ml.action.CloseJobAction; -import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.ml.datafeed.MlRemoteLicenseChecker; -import org.elasticsearch.xpack.ml.notifications.Auditor; - -import java.util.List; -import java.util.stream.Collectors; - -import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; -import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; - -public class TransportCheckRemoteLicenseAction extends TransportMasterNodeAction { - - private final Auditor auditor; - private final Client client; - - @Inject - public TransportCheckRemoteLicenseAction(Settings settings, TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, Client client, - IndexNameExpressionResolver indexNameExpressionResolver, Auditor auditor) { - super(settings, CheckRemoteLicenseAction.NAME, transportService, clusterService, threadPool, actionFilters, - indexNameExpressionResolver, CheckRemoteLicenseAction.Request::new); - this.auditor = auditor; - this.client = client; - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected CheckRemoteLicenseAction.Response newResponse() { - return new CheckRemoteLicenseAction.Response(); - } - - @Override - protected void masterOperation(CheckRemoteLicenseAction.Request request, ClusterState state, ActionListener listener) throws Exception { - checkoutCssLic(activeDatafeeds(state)); - } - - @Override - protected ClusterBlockException checkBlock(CheckRemoteLicenseAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); - } - - private List activeDatafeeds(ClusterState state) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(state); - PersistentTasksCustomMetaData persistentTasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - - return mlMetadata.getDatafeeds().values().stream() - .filter(config -> MlMetadata.getDatafeedTask(config.getId(), persistentTasks) != null) - .collect(Collectors.toList()); - } - - private void checkoutCssLic(List activeDatafeeds) { - for (DatafeedConfig datafeed: activeDatafeeds) { - List remoteClusterNames = MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()); - if (remoteClusterNames.isEmpty() == false) { - MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client); - remoteLicenseChecker.checkRemoteClusterLicenses(remoteClusterNames, - ActionListener.wrap(response -> checkLicense(response, datafeed.getJobId(), datafeed.getId()), - e -> logger.error("Error checking remote datafeed cluster licenses", e))); - } - } - } - - private void checkLicense(MlRemoteLicenseChecker.LicenseViolation licenseCheck, String jobId, String datafeedId) { - if (licenseCheck.isViolated()) { - String message = "[" + jobId + "] Stoppping datafeed and closing job because Machine Learning is not licenced " - + "on the remote cluster [" + licenseCheck.get().getClusterName() + "] used in cross cluster search. " - + MlRemoteLicenseChecker.buildErrorMessage(licenseCheck.get()); - logger.info(message); - auditor.warning(jobId, message); - - closeJob(jobId, datafeedId); - } - } - - private void closeJob(String jobId, String datafeedId) { - executeAsyncWithOrigin(client, ML_ORIGIN, StopDatafeedAction.INSTANCE, new StopDatafeedAction.Request(datafeedId), - ActionListener.wrap( - response -> { - executeAsyncWithOrigin(client, ML_ORIGIN, CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId), - ActionListener.wrap( - closeJobResponse -> {}, - e -> logger.error( - new ParameterizedMessage("[{}] An error occurred closing job", jobId), e))); - }, - e -> logger.error( - new ParameterizedMessage("[{}] An error occurred stopping datafeed [{}]", jobId, datafeedId), e))); - } -} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java index de9d7ddb09ec7..4d2b77561296e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyManagementServiceTests.java @@ -5,13 +5,11 @@ */ package org.elasticsearch.xpack.ml; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.action.CheckRemoteLicenseAction; import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction; import org.junit.After; import org.junit.Before; @@ -23,7 +21,6 @@ import static org.elasticsearch.mock.orig.Mockito.verify; import static org.mockito.Matchers.any; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,14 +42,6 @@ public void stop() throws InterruptedException { } public void testScheduledTriggering() throws InterruptedException { - doAnswer(invocationMock -> { - @SuppressWarnings("raw_types") - ActionListener listener = (ActionListener) invocationMock.getArguments()[2]; - listener.onResponse(null); - return null; - }).when(client).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); - - int triggerCount = randomIntBetween(2, 4); CountDownLatch latch = new CountDownLatch(triggerCount); try (MlDailyMaintenanceService service = createService(latch, client)) { @@ -61,7 +50,6 @@ public void testScheduledTriggering() throws InterruptedException { } verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(DeleteExpiredDataAction.INSTANCE), any(), any()); - verify(client, Mockito.atLeast(triggerCount - 1)).execute(same(CheckRemoteLicenseAction.INSTANCE), any(), any()); } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { From 2f8edfc502a3ac26e7a85ad51395086ad36b6a32 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Fri, 8 Jun 2018 15:05:08 +0100 Subject: [PATCH 5/9] Fix tests --- .../elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java | 2 +- .../xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java index 3a0c36a62592a..4e6355ee5d27c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java @@ -170,7 +170,7 @@ public static String buildErrorMessage(RemoteClusterLicenseInfo clusterLicenseIn License.OperationMode mode = License.OperationMode.resolve(clusterLicenseInfo.licenseInfo.getMode()); if (mode != License.OperationMode.PLATINUM && mode != License.OperationMode.TRIAL) { error.append("The license mode [").append(mode) - .append("] on cluster [").append(mode) + .append("] on cluster [") .append(clusterLicenseInfo.clusterName) .append("] does not enable Machine Learning. "); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java index f6785a8494a84..153d5654a269c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java @@ -155,7 +155,7 @@ public void testBuildErrorMessage() { XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("basic-cluster", basicLicense); - String expected = "The license mode on cluster [basic-cluster] with mode [BASIC] does not enable Machine Learning. " + String expected = "The license mode [BASIC] on cluster [basic-cluster] does not enable Machine Learning. " + Strings.toString(basicLicense); assertEquals(expected, MlRemoteLicenseChecker.buildErrorMessage(info)); From 64fe7668bb2fe0b2c750e3f5895ce414712351bc Mon Sep 17 00:00:00 2001 From: David Kyle Date: Mon, 11 Jun 2018 14:31:34 +0100 Subject: [PATCH 6/9] Catch correct exception from x-pack-less cluster --- .../core/ml/datafeed/DatafeedConfig.java | 10 -------- .../core/ml/datafeed/DatafeedConfigTests.java | 12 --------- .../action/TransportStartDatafeedAction.java | 5 ++-- .../ml/datafeed/DatafeedNodeSelector.java | 6 +---- .../ml/datafeed/MlRemoteLicenseChecker.java | 25 +++++++++++++------ .../datafeed/MlRemoteLicenseCheckerTests.java | 11 ++++++++ 6 files changed, 32 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index 737069e1b20e1..d06c911e13cfe 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -44,7 +44,6 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Datafeed configuration options. Describes where to proactively pull input @@ -222,15 +221,6 @@ public List getIndices() { return indices; } - /** - * Get any indices used for cross cluster search - * that are on a remote cluster. - * @return List of remote cluster indices - */ - public List getRemoteIndices() { - return indices.stream().filter(index -> index.indexOf(':') != -1).collect(Collectors.toList()); - } - public List getTypes() { return types; } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index ee35901124306..7e3dbc56d5bc8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -493,18 +493,6 @@ public void testDefaultFrequency_GivenAggregationsWithHistogramInterval_1_Hour() assertEquals(TimeValue.timeValueHours(1), datafeed.defaultFrequency(TimeValue.timeValueHours(12))); } - public void testGetRemoteIndices() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("datafeed1", "job1"); - builder.setIndices(Collections.singletonList("local-index")); - - DatafeedConfig config = builder.build(); - assertThat(config.getRemoteIndices(), is(empty())); - - builder.setIndices(Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1")); - config = builder.build(); - assertThat(config.getRemoteIndices(), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); - } - public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index dbd2e0a885d97..113a1b59ccaf6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -150,7 +150,8 @@ public void onFailure(Exception e) { createDataExtractor(job, datafeed, params, waitForTaskListener); } }, - e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(), datafeed.getRemoteIndices(), e)) + e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(), + MlRemoteLicenseChecker.remoteIndices(datafeed.getIndices()), e)) )); } else { createDataExtractor(job, datafeed, params, waitForTaskListener); @@ -244,7 +245,7 @@ private ElasticsearchStatusException createUnknownLicenseError(String datafeedId + " indices on a remote cluster " + remoteIndices + " but the license type could not be verified"; - return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause); + return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage())); } public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 37f9715d09464..0eb57ab79be5d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -91,7 +91,7 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { List indices = datafeed.getIndices(); for (String index : indices) { - if (isRemoteIndex(index)) { + if (MlRemoteLicenseChecker.isRemoteIndex(index)) { // We cannot verify remote indices continue; } @@ -122,10 +122,6 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) { return null; } - private boolean isRemoteIndex(String index) { - return index.indexOf(':') != -1; - } - private static class AssignmentFailure { private final String reason; private final boolean isCriticalForTaskCreation; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java index 4e6355ee5d27c..0d93df993d8a5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java @@ -12,9 +12,9 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.license.License; import org.elasticsearch.license.XPackInfoResponse; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackInfoRequest; @@ -27,15 +27,15 @@ /** * ML datafeeds can use cross cluster search to access data in a remote cluster. * The remote cluster should be licenced for ML this class performs that check - * using the _xpack endpoint. + * using the _xpack (info) endpoint. */ public class MlRemoteLicenseChecker { private final Client client; public static class RemoteClusterLicenseInfo { - private String clusterName; - private XPackInfoResponse.LicenseInfo licenseInfo; + private final String clusterName; + private final XPackInfoResponse.LicenseInfo licenseInfo; RemoteClusterLicenseInfo(String clusterName, XPackInfoResponse.LicenseInfo licenseInfo) { this.clusterName = clusterName; @@ -73,8 +73,8 @@ public MlRemoteLicenseChecker(Client client) { /** * Check each cluster is licensed for ML. - * This function terminates early when the first cluster that is not licensed - * is found or an error occurs. + * This function evaluates lazily and will terminate when the first cluster + * that is not licensed is found or an error occurs. * * @param clusterNames List of remote cluster names * @param listener Response listener @@ -108,7 +108,7 @@ public void onResponse(XPackInfoResponse xPackInfoResponse) { @Override public void onFailure(Exception e) { String message = "Could not determine the X-Pack licence type for cluster [" + clusterName.get() + "]"; - if (e instanceof InvalidIndexNameException) { + if (e instanceof ActionNotFoundTransportException) { // This is likely to be because x-pack is not installed in the target cluster message += ". Is X-Pack installed on the target cluster?"; } @@ -139,7 +139,7 @@ static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) { (mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL); } - private static boolean isRemoteIndex(String index) { + public static boolean isRemoteIndex(String index) { return index.indexOf(':') != -1; } @@ -147,6 +147,15 @@ public static boolean containsRemoteIndex(List indices) { return indices.stream().anyMatch(MlRemoteLicenseChecker::isRemoteIndex); } + /** + * Get any remote indices used in cross cluster search. + * Remote indices are of the form {@code cluster_name:index_name} + * @return List of remote cluster indices + */ + public static List remoteIndices(List indices) { + return indices.stream().filter(index -> index.indexOf(':') != -1).collect(Collectors.toList()); + } + /** * Extract the list of remote cluster names from the list of indices. * @param indices List of indices. Remote cluster indices are prefixed diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java index 153d5654a269c..263e10a603f36 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java @@ -16,15 +16,19 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.action.XPackInfoAction; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.same; @@ -43,6 +47,13 @@ public void testIsRemoteIndex() { assertTrue(MlRemoteLicenseChecker.containsRemoteIndex(indices)); } + public void testRemoteIndices() { + List indices = Collections.singletonList("local-index"); + assertThat(MlRemoteLicenseChecker.remoteIndices(indices), is(empty())); + indices = Arrays.asList("local-index", "remote-cluster:index1", "local-index2", "remote-cluster2:index1"); + assertThat(MlRemoteLicenseChecker.remoteIndices(indices), containsInAnyOrder("remote-cluster:index1", "remote-cluster2:index1")); + } + public void testRemoteClusterNames() { List indices = Arrays.asList("local-index1", "local-index2"); assertThat(MlRemoteLicenseChecker.remoteClusterNames(indices), empty()); From 7666d3e5899222018a5f3df968cbee431e7ee342 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 12 Jun 2018 12:52:46 +0100 Subject: [PATCH 7/9] Fix checkstyle --- .../xpack/ml/action/TransportStartDatafeedAction.java | 6 ++++-- .../xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 113a1b59ccaf6..3d261864ab409 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -117,7 +117,8 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState ActionListener> waitForTaskListener = new ActionListener>() { @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { waitForDatafeedStarted(persistentTask.getId(), params, listener); } @@ -185,7 +186,8 @@ private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedP persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(), new PersistentTasksService.WaitForPersistentTaskListener() { @Override - public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { + public void onResponse(PersistentTasksCustomMetaData.PersistentTask + persistentTask) { if (predicate.exception != null) { // We want to return to the caller without leaving an unassigned persistent task, to match // what would have happened if the error had been detected in the "fast fail" validation diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java index 263e10a603f36..47d4d30a7c6e4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseCheckerTests.java @@ -161,7 +161,8 @@ public void onFailure(Exception e) { public void testBuildErrorMessage() { XPackInfoResponse.LicenseInfo platinumLicence = createPlatinumLicenseResponse(); - MlRemoteLicenseChecker.RemoteClusterLicenseInfo info = new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); + MlRemoteLicenseChecker.RemoteClusterLicenseInfo info = + new MlRemoteLicenseChecker.RemoteClusterLicenseInfo("platinum-cluster", platinumLicence); assertEquals(Strings.toString(platinumLicence), MlRemoteLicenseChecker.buildErrorMessage(info)); XPackInfoResponse.LicenseInfo basicLicense = createBasicLicenseResponse(); From 8aeff95270682250385dce7c9f1549a912458082 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 12 Jun 2018 13:45:01 +0100 Subject: [PATCH 8/9] Address review comments --- .../xpack/ml/datafeed/MlRemoteLicenseChecker.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java index 0d93df993d8a5..b55713f6d0ab7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/MlRemoteLicenseChecker.java @@ -15,6 +15,7 @@ import org.elasticsearch.license.License; import org.elasticsearch.license.XPackInfoResponse; import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.xpack.core.action.XPackInfoAction; import org.elasticsearch.xpack.core.action.XPackInfoRequest; @@ -140,7 +141,7 @@ static boolean licenseSupportsML(XPackInfoResponse.LicenseInfo licenseInfo) { } public static boolean isRemoteIndex(String index) { - return index.indexOf(':') != -1; + return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1; } public static boolean containsRemoteIndex(List indices) { @@ -153,7 +154,7 @@ public static boolean containsRemoteIndex(List indices) { * @return List of remote cluster indices */ public static List remoteIndices(List indices) { - return indices.stream().filter(index -> index.indexOf(':') != -1).collect(Collectors.toList()); + return indices.stream().filter(MlRemoteLicenseChecker::isRemoteIndex).collect(Collectors.toList()); } /** @@ -165,7 +166,7 @@ public static List remoteIndices(List indices) { public static List remoteClusterNames(List indices) { return indices.stream() .filter(MlRemoteLicenseChecker::isRemoteIndex) - .map(index -> index.substring(0, index.indexOf(':'))) + .map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) .distinct() .collect(Collectors.toList()); } From d1a3f1f46cbe54c7b6a9da67a7c788590460faa4 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 13 Jun 2018 09:52:29 +0100 Subject: [PATCH 9/9] Remove unused imports --- .../xpack/core/ml/datafeed/DatafeedConfigTests.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 7e3dbc56d5bc8..d59ef16dfdf2c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -40,14 +40,11 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.TimeZone; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is;