diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 8adb3f753b409..1c1027702a448 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; @@ -22,7 +23,9 @@ import org.junit.After; import java.io.IOException; +import java.util.Collections; import java.util.Locale; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; @@ -111,6 +114,21 @@ public void testGetJobs_GivenMultipleJobs() throws Exception { assertThat(implicitAll, containsString("\"job_id\":\"given-multiple-jobs-job-3\"")); } + // tests the _xpack/usage endpoint + public void testUsage() throws IOException { + createFarequoteJob("job-1"); + createFarequoteJob("job-2"); + Map usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage"))); + assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); + assertEquals(2, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); + Response openResponse = client().performRequest(new Request("POST", MachineLearning.BASE_PATH + "anomaly_detectors/job-1/_open")); + assertEquals(Collections.singletonMap("opened", true), entityAsMap(openResponse)); + usage = entityAsMap(client().performRequest(new Request("GET", "_xpack/usage"))); + assertEquals(2, XContentMapValues.extractValue("ml.jobs._all.count", usage)); + assertEquals(1, XContentMapValues.extractValue("ml.jobs.closed.count", usage)); + assertEquals(1, XContentMapValues.extractValue("ml.jobs.opened.count", usage)); + } + private Response createFarequoteJob(String jobId) throws IOException { Request request = new Request("PUT", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId); request.setJsonEntity( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 41f50024ac344..e52d566aa5fee 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -165,6 +165,7 @@ import org.elasticsearch.xpack.ml.datafeed.DatafeedManager; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.JobManagerHolder; import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier; import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer; import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizerFactory; @@ -372,7 +373,8 @@ public Collection createComponents(Client client, ClusterService cluster NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { if (enabled == false || transportClientMode || tribeNode || tribeNodeClient) { - return emptyList(); + // special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager, empty if ML is disabled + return Collections.singletonList(new JobManagerHolder()); } Auditor auditor = new Auditor(client, clusterService.getNodeName()); @@ -382,6 +384,9 @@ public Collection createComponents(Client client, ClusterService cluster UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(client, clusterService, threadPool); JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier); + // special holder for @link(MachineLearningFeatureSetUsage) which needs access to job manager if ML is enabled + JobManagerHolder jobManagerHolder = new JobManagerHolder(jobManager); + JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client); JobResultsPersister jobResultsPersister = new JobResultsPersister(client); @@ -440,6 +445,7 @@ public Collection createComponents(Client client, ClusterService cluster jobConfigProvider, datafeedConfigProvider, jobManager, + jobManagerHolder, autodetectProcessManager, new MlInitializationService(settings, threadPool, clusterService, client), jobDataCountsPersister, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 9eec0d848f072..e10ffbaf377ef 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -26,12 +26,12 @@ import org.elasticsearch.xpack.core.XPackSettings; import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; +import org.elasticsearch.xpack.ml.job.JobManagerHolder; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.NativeControllerHolder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; @@ -48,6 +48,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; public class MachineLearningFeatureSet implements XPackFeatureSet { @@ -61,15 +62,17 @@ public class MachineLearningFeatureSet implements XPackFeatureSet { private final XPackLicenseState licenseState; private final ClusterService clusterService; private final Client client; + private final JobManagerHolder jobManagerHolder; private final Map nativeCodeInfo; @Inject public MachineLearningFeatureSet(Environment environment, ClusterService clusterService, Client client, - @Nullable XPackLicenseState licenseState) { + @Nullable XPackLicenseState licenseState, JobManagerHolder jobManagerHolder) { this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(environment.settings()); this.clusterService = Objects.requireNonNull(clusterService); this.client = Objects.requireNonNull(client); this.licenseState = licenseState; + this.jobManagerHolder = jobManagerHolder; Map nativeCodeInfo = NativeController.UNKNOWN_NATIVE_CODE_INFO; // Don't try to get the native code version if ML is disabled - it causes too much controversy // if ML has been disabled because of some OS incompatibility. Also don't try to get the native @@ -135,7 +138,7 @@ public Map nativeCodeInfo() { @Override public void usage(ActionListener listener) { ClusterState state = clusterService.state(); - new Retriever(client, MlMetadata.getMlMetadata(state), available(), enabled(), mlNodeCount(state)).execute(listener); + new Retriever(client, jobManagerHolder, available(), enabled(), mlNodeCount(state)).execute(listener); } private int mlNodeCount(final ClusterState clusterState) { @@ -156,16 +159,16 @@ private int mlNodeCount(final ClusterState clusterState) { public static class Retriever { private final Client client; - private final MlMetadata mlMetadata; + private final JobManagerHolder jobManagerHolder; private final boolean available; private final boolean enabled; private Map jobsUsage; private Map datafeedsUsage; private int nodeCount; - public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolean enabled, int nodeCount) { + public Retriever(Client client, JobManagerHolder jobManagerHolder, boolean available, boolean enabled, int nodeCount) { this.client = Objects.requireNonNull(client); - this.mlMetadata = mlMetadata; + this.jobManagerHolder = jobManagerHolder; this.available = available; this.enabled = enabled; this.jobsUsage = new LinkedHashMap<>(); @@ -174,7 +177,8 @@ public Retriever(Client client, MlMetadata mlMetadata, boolean available, boolea } public void execute(ActionListener listener) { - if (enabled == false) { + // empty holder means either ML disabled or transport client mode + if (jobManagerHolder.isEmpty()) { listener.onResponse( new MachineLearningFeatureSetUsage(available, enabled, Collections.emptyMap(), Collections.emptyMap(), 0)); return; @@ -194,20 +198,19 @@ public void execute(ActionListener listener) { GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(MetaData.ALL); ActionListener jobStatsListener = ActionListener.wrap( response -> { - addJobsUsage(response); - GetDatafeedsStatsAction.Request datafeedStatsRequest = - new GetDatafeedsStatsAction.Request(GetDatafeedsStatsAction.ALL); - client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest, - datafeedStatsListener); - }, - listener::onFailure - ); + jobManagerHolder.getJobManager().expandJobs(MetaData.ALL, true, ActionListener.wrap(jobs -> { + addJobsUsage(response, jobs.results()); + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request( + GetDatafeedsStatsAction.ALL); + client.execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest, datafeedStatsListener); + }, listener::onFailure)); + }, listener::onFailure); // Step 0. Kick off the chain of callbacks by requesting jobs stats client.execute(GetJobsStatsAction.INSTANCE, jobStatsRequest, jobStatsListener); } - private void addJobsUsage(GetJobsStatsAction.Response response) { + private void addJobsUsage(GetJobsStatsAction.Response response, List jobs) { StatsAccumulator allJobsDetectorsStats = new StatsAccumulator(); StatsAccumulator allJobsModelSizeStats = new StatsAccumulator(); ForecastStats allJobsForecastStats = new ForecastStats(); @@ -217,11 +220,11 @@ private void addJobsUsage(GetJobsStatsAction.Response response) { Map modelSizeStatsByState = new HashMap<>(); Map forecastStatsByState = new HashMap<>(); - Map jobs = mlMetadata.getJobs(); List jobsStats = response.getResponse().results(); + Map jobMap = jobs.stream().collect(Collectors.toMap(Job::getId, item -> item)); for (GetJobsStatsAction.Response.JobStats jobStats : jobsStats) { ModelSizeStats modelSizeStats = jobStats.getModelSizeStats(); - int detectorsCount = jobs.get(jobStats.getJobId()).getAnalysisConfig() + int detectorsCount = jobMap.get(jobStats.getJobId()).getAnalysisConfig() .getDetectors().size(); double modelSize = modelSizeStats == null ? 0.0 : jobStats.getModelSizeStats().getModelBytes(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManagerHolder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManagerHolder.java new file mode 100644 index 0000000000000..cf54f2852275d --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManagerHolder.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job; + +import org.elasticsearch.ElasticsearchException; + +public class JobManagerHolder { + + private final JobManager instance; + + /** + * Create an empty holder which also means that no job manager gets created. + */ + public JobManagerHolder() { + this.instance = null; + } + + /** + * Create a holder that allows lazy creation of a job manager. + * + */ + public JobManagerHolder(JobManager jobManager) { + this.instance = jobManager; + } + + public boolean isEmpty() { + return instance == null; + } + + /** + * Get the instance of the held JobManager. + * + * @return job manager instance + * @throws ElasticsearchException if holder has been created with the empty constructor + */ + public JobManager getJobManager() { + if (instance == null) { + throw new ElasticsearchException("Tried to get job manager although Machine Learning is disabled"); + } + return instance; + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index 6a2f25f6a3046..2f81a1f8fab71 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.xpack.core.XPackField; import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage; import org.elasticsearch.xpack.core.ml.MachineLearningField; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; @@ -46,6 +45,8 @@ import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; +import org.elasticsearch.xpack.ml.job.JobManager; +import org.elasticsearch.xpack.ml.job.JobManagerHolder; import org.junit.Before; import java.util.Arrays; @@ -62,6 +63,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.Is.is; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -72,6 +74,8 @@ public class MachineLearningFeatureSetTests extends ESTestCase { private Settings commonSettings; private ClusterService clusterService; private Client client; + private JobManager jobManager; + private JobManagerHolder jobManagerHolder; private XPackLicenseState licenseState; @Before @@ -82,7 +86,11 @@ public void init() { .build(); clusterService = mock(ClusterService.class); client = mock(Client.class); + jobManager = mock(JobManager.class); + jobManagerHolder = new JobManagerHolder(jobManager); licenseState = mock(XPackLicenseState.class); + ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE).build(); + when(clusterService.state()).thenReturn(clusterState); givenJobs(Collections.emptyList(), Collections.emptyList()); givenDatafeeds(Collections.emptyList()); } @@ -104,7 +112,7 @@ public void testIsRunningOnMlPlatform() { public void testAvailable() throws Exception { MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(commonSettings), clusterService, - client, licenseState); + client, licenseState, jobManagerHolder); boolean available = randomBoolean(); when(licenseState.isMachineLearningAllowed()).thenReturn(available); assertThat(featureSet.available(), is(available)); @@ -129,7 +137,7 @@ public void testEnabled() throws Exception { } boolean expected = enabled || useDefault; MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()), - clusterService, client, licenseState); + clusterService, client, licenseState, jobManagerHolder); assertThat(featureSet.enabled(), is(expected)); PlainActionFuture future = new PlainActionFuture<>(); featureSet.usage(future); @@ -163,7 +171,7 @@ public void testUsage() throws Exception { )); MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()), - clusterService, client, licenseState); + clusterService, client, licenseState, jobManagerHolder); PlainActionFuture future = new PlainActionFuture<>(); featureSet.usage(future); XPackFeatureSet.Usage mlUsage = future.get(); @@ -232,6 +240,28 @@ public void testUsage() throws Exception { } } + public void testUsageDisabledML() throws Exception { + when(licenseState.isMachineLearningAllowed()).thenReturn(true); + Settings.Builder settings = Settings.builder().put(commonSettings); + settings.put("xpack.ml.enabled", false); + + JobManagerHolder emptyJobManagerHolder = new JobManagerHolder(); + MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()), + clusterService, client, licenseState, emptyJobManagerHolder); + PlainActionFuture future = new PlainActionFuture<>(); + featureSet.usage(future); + XPackFeatureSet.Usage mlUsage = future.get(); + BytesStreamOutput out = new BytesStreamOutput(); + mlUsage.writeTo(out); + XPackFeatureSet.Usage serializedUsage = new MachineLearningFeatureSetUsage(out.bytes().streamInput()); + + for (XPackFeatureSet.Usage usage : Arrays.asList(mlUsage, serializedUsage)) { + assertThat(usage, is(notNullValue())); + assertThat(usage.name(), is(XPackField.MACHINE_LEARNING)); + assertThat(usage.enabled(), is(false)); + } + } + public void testNodeCount() throws Exception { when(licenseState.isMachineLearningAllowed()).thenReturn(true); int nodeCount = randomIntBetween(1, 3); @@ -239,7 +269,7 @@ public void testNodeCount() throws Exception { Settings.Builder settings = Settings.builder().put(commonSettings); settings.put("xpack.ml.enabled", true); MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()), - clusterService, client, licenseState); + clusterService, client, licenseState, jobManagerHolder); PlainActionFuture future = new PlainActionFuture<>(); featureSet.usage(future); @@ -282,7 +312,7 @@ public void testUsageGivenMlMetadataNotInstalled() throws Exception { when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); MachineLearningFeatureSet featureSet = new MachineLearningFeatureSet(TestEnvironment.newEnvironment(settings.build()), - clusterService, client, licenseState); + clusterService, client, licenseState, jobManagerHolder); PlainActionFuture future = new PlainActionFuture<>(); featureSet.usage(future); XPackFeatureSet.Usage usage = future.get(); @@ -319,15 +349,14 @@ public void testUsageGivenMlMetadataNotInstalled() throws Exception { } private void givenJobs(List jobs, List jobsStats) { - MlMetadata.Builder mlMetadataBuilder = new MlMetadata.Builder(); - for (Job job : jobs) { - mlMetadataBuilder.putJob(job, false); - } - ClusterState clusterState = new ClusterState.Builder(ClusterState.EMPTY_STATE) - .metaData(new MetaData.Builder() - .putCustom(MlMetadata.TYPE, mlMetadataBuilder.build())) - .build(); - when(clusterService.state()).thenReturn(clusterState); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener> jobListener = + (ActionListener>) invocationOnMock.getArguments()[2]; + jobListener.onResponse( + new QueryPage<>(jobs, jobs.size(), Job.RESULTS_FIELD)); + return Void.TYPE; + }).when(jobManager).expandJobs(eq(MetaData.ALL), eq(true), any(ActionListener.class)); doAnswer(invocationOnMock -> { ActionListener listener =