Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Improve DF analytics audits and logging #53179

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -65,9 +65,14 @@ public final class Messages {
public static final String DATA_FRAME_ANALYTICS_AUDIT_ESTIMATED_MEMORY_USAGE = "Estimated memory usage for this analytics to be [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_CREATING_DEST_INDEX = "Creating destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_REUSING_DEST_INDEX = "Using existing destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING = "Finished reindexing to destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING = "Started reindexing to destination index [{0}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING =
"Finished reindexing to destination index [{0}], took [{1}]";
public static final String DATA_FRAME_ANALYTICS_AUDIT_FINISHED_ANALYSIS = "Finished analysis";
public static final String DATA_FRAME_ANALYTICS_AUDIT_RESTORING_STATE = "Restoring from previous model state";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA = "Started loading data";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING = "Started analyzing";
public static final String DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS = "Started writing results";

public static final String FILTER_CANNOT_DELETE = "Cannot delete filter [{0}] currently used by jobs {1}";
public static final String FILTER_CONTAINS_TOO_MANY_ITEMS = "Filter [{0}] contains too many items; up to [{1}] items are allowed";
Expand Down
Expand Up @@ -119,7 +119,11 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws
"Starting analytics on node",
"Started analytics",
expectedDestIndexAuditMessage(),
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}
Expand Down Expand Up @@ -160,7 +164,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti
"Starting analytics on node",
"Started analytics",
expectedDestIndexAuditMessage(),
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}
Expand Down Expand Up @@ -223,7 +231,11 @@ public <T> void testWithOnlyTrainingRowsAndTrainingPercentIsFifty(String jobId,
"Starting analytics on node",
"Started analytics",
expectedDestIndexAuditMessage(),
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
assertEvaluation(dependentVariable, dependentVariableValues, "ml." + predictedClassField);
}
Expand Down
Expand Up @@ -233,6 +233,8 @@ protected static void assertThatAuditMessagesMatch(String configId, String... ex
// Since calls to write the AbstractAuditor are sent and forgot (async) we could have returned from the start,
// finished the job (as this is a very short analytics job), all without the audit being fully written.
assertBusy(() -> assertTrue(indexExists(NotificationsIndex.NOTIFICATIONS_INDEX)));
client().admin().indices().prepareRefresh(".ml-notifications*").get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed taking into account that fetchAllAuditMessages also issues refresh request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point. That comment got me confused. I'll remove.


@SuppressWarnings("unchecked")
Matcher<String>[] itemMatchers = Arrays.stream(expectedAuditMessagePrefixes).map(Matchers::startsWith).toArray(Matcher[]::new);
assertBusy(() -> {
Expand All @@ -252,6 +254,7 @@ private static List<String> fetchAllAuditMessages(String dataFrameAnalyticsId) {
.setIndices(NotificationsIndex.NOTIFICATIONS_INDEX)
.addSort("timestamp", SortOrder.ASC)
.setQuery(QueryBuilders.termQuery("job_id", dataFrameAnalyticsId))
.setSize(100)
.request();
SearchResponse searchResponse = client().execute(SearchAction.INSTANCE, searchRequest).actionGet();

Expand Down
Expand Up @@ -103,7 +103,11 @@ public void testSingleNumericFeatureAndMixedTrainingAndNonTrainingRows() throws
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down Expand Up @@ -142,7 +146,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsHundred() throws Excepti
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down Expand Up @@ -196,7 +204,11 @@ public void testWithOnlyTrainingRowsAndTrainingPercentIsFifty() throws Exception
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down
Expand Up @@ -126,7 +126,11 @@ public void testOutlierDetectionWithFewDocuments() throws Exception {
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-few-docs-results]",
"Started reindexing to destination index [test-outlier-detection-with-few-docs-results]",
"Finished reindexing to destination index [test-outlier-detection-with-few-docs-results]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down Expand Up @@ -181,7 +185,11 @@ public void testOutlierDetectionWithEnoughDocumentsToScroll() throws Exception {
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
"Started reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
"Finished reindexing to destination index [test-outlier-detection-with-enough-docs-to-scroll-results]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down Expand Up @@ -262,7 +270,11 @@ public void testOutlierDetectionWithMoreFieldsThanDocValueFieldLimit() throws Ex
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
"Started reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
"Finished reindexing to destination index [test-outlier-detection-with-more-fields-than-docvalue-limit-results]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down Expand Up @@ -387,7 +399,11 @@ public void testOutlierDetectionWithMultipleSourceIndices() throws Exception {
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-multiple-source-indices-results]",
"Started reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
"Finished reindexing to destination index [test-outlier-detection-with-multiple-source-indices-results]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down Expand Up @@ -445,7 +461,11 @@ public void testOutlierDetectionWithPreExistingDestIndex() throws Exception {
"Starting analytics on node",
"Started analytics",
"Using existing destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
"Started reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
"Finished reindexing to destination index [test-outlier-detection-with-pre-existing-dest-index-results]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}

Expand Down Expand Up @@ -699,7 +719,11 @@ public void testOutlierDetectionWithCustomParams() throws Exception {
"Starting analytics on node",
"Started analytics",
"Creating destination index [test-outlier-detection-with-custom-params-results]",
"Started reindexing to destination index [test-outlier-detection-with-custom-params-results]",
"Finished reindexing to destination index [test-outlier-detection-with-custom-params-results]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}
}
Expand Up @@ -213,7 +213,8 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
task.setReindexingFinished();
auditor.info(
config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex()));
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_FINISHED_REINDEXING, config.getDest().getIndex(),
reindexResponse.getTook()));
startAnalytics(task, config);
},
error -> task.setFailed(ExceptionsHelper.unwrapCause(error).getMessage())
Expand All @@ -233,9 +234,12 @@ private void reindexDataframeAndStartAnalysis(DataFrameAnalyticsTask task, DataF
final ThreadContext threadContext = client.threadPool().getThreadContext();
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
try (ThreadContext.StoredContext ignore = threadContext.stashWithOrigin(ML_ORIGIN)) {
LOGGER.info("[{}] Started reindexing", config.getId());
Task reindexTask = client.executeLocally(ReindexAction.INSTANCE, reindexRequest,
new ContextPreservingActionListener<>(supplier, reindexCompletedListener));
task.setReindexingTaskId(reindexTask.getId());
auditor.info(config.getId(),
Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_REINDEXING, config.getDest().getIndex()));
}
},
reindexCompletedListener::onFailure
Expand Down
Expand Up @@ -147,6 +147,9 @@ private BytesReference getModelState(DataFrameAnalyticsConfig config) {
}

private void processData(DataFrameAnalyticsTask task, ProcessContext processContext, BytesReference state) {
LOGGER.info("[{}] Started loading data", processContext.config.getId());
auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_LOADING_DATA));

DataFrameAnalyticsConfig config = processContext.config;
DataFrameDataExtractor dataExtractor = processContext.dataExtractor.get();
AnalyticsProcess<AnalyticsResult> process = processContext.process.get();
Expand All @@ -159,6 +162,9 @@ private void processData(DataFrameAnalyticsTask task, ProcessContext processCont

restoreState(task, config, state, process);

LOGGER.info("[{}] Started analyzing", processContext.config.getId());
auditor.info(processContext.config.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_ANALYZING));

LOGGER.info("[{}] Waiting for result processor to complete", config.getId());
resultProcessor.awaitForCompletion();
processContext.setFailureReason(resultProcessor.getFailure());
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelDefinition;
import org.elasticsearch.xpack.core.ml.inference.TrainedModelInput;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.security.user.XPackUser;
Expand Down Expand Up @@ -120,6 +121,10 @@ public void process(AnalyticsProcess<AnalyticsResult> process) {
AnalyticsResult result = iterator.next();
processResult(result, resultsJoiner);
if (result.getRowResults() != null) {
if (processedRows == 0) {
LOGGER.info("[{}] Started writing results", analytics.getId());
auditor.info(analytics.getId(), Messages.getMessage(Messages.DATA_FRAME_ANALYTICS_AUDIT_STARTED_WRITING_RESULTS));
}
processedRows++;
updateResultsProgress(processedRows >= totalRows ? 100 : (int) (processedRows * 100.0 / totalRows));
}
Expand Down