Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Wait for autodetect to be ready in the datafeed #37349

Merged
merged 1 commit into from Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -432,7 +432,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor);
System::currentTimeMillis, auditor, autodetectProcessManager);
this.datafeedManager.set(datafeedManager);
MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager,
autodetectProcessManager);
Expand Down Expand Up @@ -473,7 +473,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
return Arrays.asList(
new TransportOpenJobAction.OpenJobPersistentTasksExecutor(settings, clusterService, autodetectProcessManager.get(),
memoryTracker.get(), client),
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor( datafeedManager.get())
new TransportStartDatafeedAction.StartDatafeedPersistentTasksExecutor(datafeedManager.get())
);
}

Expand Down
Expand Up @@ -27,9 +27,11 @@
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.ArrayList;
Expand Down Expand Up @@ -62,16 +64,18 @@ public class DatafeedManager {
private final ConcurrentMap<Long, Holder> runningDatafeedsOnThisNode = new ConcurrentHashMap<>();
private final DatafeedJobBuilder datafeedJobBuilder;
private final TaskRunner taskRunner = new TaskRunner();
private final AutodetectProcessManager autodetectProcessManager;
private volatile boolean isolated;

public DatafeedManager(ThreadPool threadPool, Client client, ClusterService clusterService, DatafeedJobBuilder datafeedJobBuilder,
Supplier<Long> currentTimeSupplier, Auditor auditor) {
Supplier<Long> currentTimeSupplier, Auditor auditor, AutodetectProcessManager autodetectProcessManager) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.threadPool = threadPool;
this.currentTimeSupplier = Objects.requireNonNull(currentTimeSupplier);
this.auditor = Objects.requireNonNull(auditor);
this.datafeedJobBuilder = Objects.requireNonNull(datafeedJobBuilder);
this.autodetectProcessManager = autodetectProcessManager;
clusterService.addListener(taskRunner);
}

Expand Down Expand Up @@ -256,6 +260,21 @@ private JobState getJobState(PersistentTasksCustomMetaData tasks, TransportStart
return MlTasks.getJobStateModifiedForReassignments(getJobId(datafeedTask), tasks);
}

private boolean jobHasOpenAutodetectCommunicator(PersistentTasksCustomMetaData tasks,
TransportStartDatafeedAction.DatafeedTask datafeedTask) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask = MlTasks.getJobTask(getJobId(datafeedTask), tasks);
if (jobTask == null) {
return false;
}

JobTaskState state = (JobTaskState) jobTask.getState();
if (state == null || state.isStatusStale(jobTask)) {
return false;
}

return autodetectProcessManager.hasOpenAutodetectCommunicator(jobTask.getAllocationId());
}

private TimeValue computeNextDelay(long next) {
return new TimeValue(Math.max(1, next - currentTimeSupplier.get()));
}
Expand Down Expand Up @@ -446,7 +465,7 @@ private class TaskRunner implements ClusterStateListener {
private void runWhenJobIsOpened(TransportStartDatafeedAction.DatafeedTask datafeedTask) {
ClusterState clusterState = clusterService.state();
PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (getJobState(tasks, datafeedTask) == JobState.OPENED) {
if (getJobState(tasks, datafeedTask) == JobState.OPENED && jobHasOpenAutodetectCommunicator(tasks, datafeedTask)) {
runTask(datafeedTask);
} else {
logger.info("Datafeed [{}] is waiting for job [{}] to be opened",
Expand Down Expand Up @@ -485,10 +504,10 @@ public void clusterChanged(ClusterChangedEvent event) {
continue;
}
JobState jobState = getJobState(currentTasks, datafeedTask);
if (jobState == JobState.OPENED) {
runTask(datafeedTask);
} else if (jobState == JobState.OPENING) {
if (jobState == JobState.OPENING || jobHasOpenAutodetectCommunicator(currentTasks, datafeedTask) == false) {
remainingTasks.add(datafeedTask);
} else if (jobState == JobState.OPENED) {
runTask(datafeedTask);
} else {
logger.warn("Datafeed [{}] is stopping because job [{}] state is [{}]",
datafeedTask.getDatafeedId(), getJobId(datafeedTask), jobState);
Expand Down
Expand Up @@ -212,6 +212,13 @@ public void killAllProcessesOnThisNode() {
*/
public void persistJob(JobTask jobTask, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot persist because job [%s] does not have a corresponding autodetect process",
jobTask.getJobId());
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
}
communicator.persistJob((aVoid, e) -> handler.accept(e));
}

Expand Down Expand Up @@ -239,7 +246,8 @@ public void processData(JobTask jobTask, AnalysisRegistry analysisRegistry, Inpu
XContentType xContentType, DataLoadParams params, BiConsumer<DataCounts, Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() + "] is not open");
throw ExceptionsHelper.conflictStatusException("Cannot process data because job [" + jobTask.getJobId() +
"] does not have a corresponding autodetect process");
}
communicator.writeToJob(input, analysisRegistry, xContentType, params, handler);
}
Expand All @@ -257,7 +265,8 @@ public void flushJob(JobTask jobTask, FlushJobParams params, ActionListener<Flus
logger.debug("Flushing job {}", jobTask.getJobId());
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] is not open", jobTask.getJobId());
String message = String.format(Locale.ROOT, "Cannot flush because job [%s] does not have a corresponding autodetect process",
jobTask.getJobId());
logger.debug(message);
handler.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
Expand Down Expand Up @@ -307,7 +316,8 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
logger.debug("Forecasting job {}", jobId);
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId);
String message = String.format(Locale.ROOT,
"Cannot forecast because job [%s] does not have a corresponding autodetect process", jobId);
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
Expand All @@ -327,7 +337,8 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message probably made sense once but it doesn't anymore. I'd suggest
Cannot update the job config because job...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create a new PR to change that.

"] does not have a corresponding autodetect process";
logger.debug(message);
handler.accept(ExceptionsHelper.conflictStatusException(message));
return;
Expand Down Expand Up @@ -663,6 +674,14 @@ private AutodetectCommunicator getOpenAutodetectCommunicator(JobTask jobTask) {
return null;
}

public boolean hasOpenAutodetectCommunicator(long jobAllocationId) {
ProcessContext processContext = processByAllocation.get(jobAllocationId);
if (processContext != null && processContext.getState() == ProcessContext.ProcessStateName.RUNNING) {
return processContext.getAutodetectCommunicator() != null;
}
return false;
}

public Optional<Duration> jobOpenTime(JobTask jobTask) {
AutodetectCommunicator communicator = getAutodetectCommunicator(jobTask);
if (communicator == null) {
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedAction.DatafeedTask;
import org.elasticsearch.xpack.ml.action.TransportStartDatafeedActionTests;
import org.elasticsearch.xpack.ml.job.persistence.MockClientBuilder;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
Expand All @@ -48,6 +49,7 @@
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.elasticsearch.xpack.ml.action.TransportOpenJobActionTests.addJobTask;
Expand All @@ -74,13 +76,14 @@ public class DatafeedManagerTests extends ESTestCase {
private long currentTime = 120000;
private Auditor auditor;
private ArgumentCaptor<ClusterStateListener> capturedClusterStateListener = ArgumentCaptor.forClass(ClusterStateListener.class);
private AtomicBoolean hasOpenAutodetectCommunicator;

@Before
@SuppressWarnings("unchecked")
public void setUpTests() {
Job.Builder job = createDatafeedJob().setCreateTime(new Date());

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask(job.getId(), "node_id", JobState.OPENED, tasksBuilder);
PersistentTasksCustomMetaData tasks = tasksBuilder.build();
DiscoveryNodes nodes = DiscoveryNodes.builder()
Expand Down Expand Up @@ -128,7 +131,12 @@ public void setUpTests() {
return null;
}).when(datafeedJobBuilder).build(any(), any());

datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor);
hasOpenAutodetectCommunicator = new AtomicBoolean(true);
AutodetectProcessManager autodetectProcessManager = mock(AutodetectProcessManager.class);
doAnswer(invocation -> hasOpenAutodetectCommunicator.get()).when(autodetectProcessManager).hasOpenAutodetectCommunicator(anyLong());

datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder, () -> currentTime, auditor,
autodetectProcessManager);

verify(clusterService).addListener(capturedClusterStateListener.capture());
}
Expand Down Expand Up @@ -259,7 +267,7 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
// Verify datafeed has not started running yet as job is still opening
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENING, tasksBuilder);
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
Expand All @@ -270,15 +278,52 @@ public void testDatafeedTaskWaitsUntilJobIsOpened() {
// Still no run
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder jobOpenedCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));

capturedClusterStateListener.getValue().clusterChanged(
new ClusterChangedEvent("_source", jobOpenedCs.build(), anotherJobCs.build()));

// Now it should run as the job state chanded to OPENED
// Now it should run as the job state changed to OPENED
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}

public void testDatafeedTaskWaitsUntilAutodetectCommunicatorIsOpen() {

hasOpenAutodetectCommunicator.set(false);

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder cs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));
when(clusterService.state()).thenReturn(cs.build());

Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);

// Verify datafeed has not started running yet as job doesn't have an open autodetect communicator
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", JobState.OPENED, tasksBuilder);
addJobTask("another_job", "node_id", JobState.OPENED, tasksBuilder);
ClusterState.Builder anotherJobCs = ClusterState.builder(clusterService.state())
.metaData(new MetaData.Builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()));

capturedClusterStateListener.getValue().clusterChanged(new ClusterChangedEvent("_source", anotherJobCs.build(), cs.build()));

// Still no run
verify(threadPool, never()).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);

hasOpenAutodetectCommunicator.set(true);

capturedClusterStateListener.getValue().clusterChanged(
new ClusterChangedEvent("_source", cs.build(), anotherJobCs.build()));

// Now it should run as the autodetect communicator is open
verify(threadPool, times(1)).executor(MachineLearning.DATAFEED_THREAD_POOL_NAME);
}

Expand Down