Skip to content

Commit

Permalink
[ML] Enabling mml estimation in data recognizer module setup (#64900) (
Browse files Browse the repository at this point in the history
…#65141)

* [ML] Enabling mml estimation in data recognizer module setup

* small refactor

* adding functional tests

* increasing uptime test timeout

* tiny refactor

* checking for default setting

* testng flakey uptime test

* catching erros in mml estimation

* lowering timeout

* ensuing data is present for ML tests

* adding await

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
jgowdyelastic and elasticmachine committed May 4, 2020
1 parent 8664ccc commit 9bc4ea0
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 50 deletions.
Expand Up @@ -172,7 +172,6 @@ export const Page: FC<PageProps> = ({ moduleId, existingGroupIds }) => {
startDatafeed: startDatafeedAfterSave,
...(jobOverridesPayload !== null ? { jobOverrides: jobOverridesPayload } : {}),
...resultTimeRange,
estimateModelMemory: false,
});
const { datafeeds: datafeedsResponse, jobs: jobsResponse, kibana: kibanaResponse } = response;

Expand Down
106 changes: 59 additions & 47 deletions x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.ts
Expand Up @@ -110,7 +110,7 @@ export class DataRecognizer {
/**
* List of the module jobs that require model memory estimation
*/
jobsForModelMemoryEstimation: ModuleJob[] = [];
jobsForModelMemoryEstimation: Array<{ job: ModuleJob; query: any }> = [];

constructor(
private callAsCurrentUser: APICaller,
Expand Down Expand Up @@ -374,7 +374,7 @@ export class DataRecognizer {
end?: number,
jobOverrides?: JobOverride | JobOverride[],
datafeedOverrides?: DatafeedOverride | DatafeedOverride[],
estimateModelMemory?: boolean
estimateModelMemory: boolean = true
) {
// load the config from disk
const moduleConfig = await this.getModule(moduleId, jobPrefix);
Expand Down Expand Up @@ -416,7 +416,10 @@ export class DataRecognizer {
savedObjects: [] as KibanaObjectResponse[],
};

this.jobsForModelMemoryEstimation = moduleConfig.jobs;
this.jobsForModelMemoryEstimation = moduleConfig.jobs.map(job => ({
job,
query: moduleConfig.datafeeds.find(d => d.config.job_id === job.id)?.config.query ?? null,
}));

this.applyJobConfigOverrides(moduleConfig, jobOverrides, jobPrefix);
this.applyDatafeedConfigOverrides(moduleConfig, datafeedOverrides, jobPrefix);
Expand Down Expand Up @@ -958,7 +961,7 @@ export class DataRecognizer {
*/
async updateModelMemoryLimits(
moduleConfig: Module,
estimateMML: boolean = false,
estimateMML: boolean,
start?: number,
end?: number
) {
Expand All @@ -967,53 +970,57 @@ export class DataRecognizer {
}

if (estimateMML && this.jobsForModelMemoryEstimation.length > 0) {
const calculateModelMemoryLimit = calculateModelMemoryLimitProvider(this.callAsCurrentUser);
const query = moduleConfig.query ?? null;

// Checks if all jobs in the module have the same time field configured
const isSameTimeFields = this.jobsForModelMemoryEstimation.every(
job =>
job.config.data_description.time_field ===
this.jobsForModelMemoryEstimation[0].config.data_description.time_field
);
try {
const calculateModelMemoryLimit = calculateModelMemoryLimitProvider(this.callAsCurrentUser);

if (isSameTimeFields && (start === undefined || end === undefined)) {
// In case of time range is not provided and the time field is the same
// set the fallback range for all jobs
const { start: fallbackStart, end: fallbackEnd } = await this.getFallbackTimeRange(
this.jobsForModelMemoryEstimation[0].config.data_description.time_field,
query
// Checks if all jobs in the module have the same time field configured
const firstJobTimeField = this.jobsForModelMemoryEstimation[0].job.config.data_description
.time_field;
const isSameTimeFields = this.jobsForModelMemoryEstimation.every(
({ job }) => job.config.data_description.time_field === firstJobTimeField
);
start = fallbackStart;
end = fallbackEnd;
}

for (const job of this.jobsForModelMemoryEstimation) {
let earliestMs = start;
let latestMs = end;
if (earliestMs === undefined || latestMs === undefined) {
const timeFieldRange = await this.getFallbackTimeRange(
if (isSameTimeFields && (start === undefined || end === undefined)) {
// In case of time range is not provided and the time field is the same
// set the fallback range for all jobs
// as there may not be a common query, we use a match_all
const {
start: fallbackStart,
end: fallbackEnd,
} = await this.getFallbackTimeRange(firstJobTimeField, { match_all: {} });
start = fallbackStart;
end = fallbackEnd;
}

for (const { job, query } of this.jobsForModelMemoryEstimation) {
let earliestMs = start;
let latestMs = end;
if (earliestMs === undefined || latestMs === undefined) {
const timeFieldRange = await this.getFallbackTimeRange(
job.config.data_description.time_field,
query
);
earliestMs = timeFieldRange.start;
latestMs = timeFieldRange.end;
}

const { modelMemoryLimit } = await calculateModelMemoryLimit(
job.config.analysis_config,
this.indexPatternName,
query,
job.config.data_description.time_field,
query
earliestMs,
latestMs
);
earliestMs = timeFieldRange.start;
latestMs = timeFieldRange.end;
}

const { modelMemoryLimit } = await calculateModelMemoryLimit(
job.config.analysis_config,
this.indexPatternName,
query,
job.config.data_description.time_field,
earliestMs,
latestMs
);
if (!job.config.analysis_limits) {
job.config.analysis_limits = {} as AnalysisLimits;
}

if (!job.config.analysis_limits) {
job.config.analysis_limits = {} as AnalysisLimits;
job.config.analysis_limits.model_memory_limit = modelMemoryLimit;
}

job.config.analysis_limits.model_memory_limit = modelMemoryLimit;
} catch (error) {
mlLog.warn(`Data recognizer could not estimate model memory limit ${error}`);
}
}

Expand Down Expand Up @@ -1098,10 +1105,15 @@ export class DataRecognizer {
if (generalOverrides.some(override => !!override.analysis_limits?.model_memory_limit)) {
this.jobsForModelMemoryEstimation = [];
} else {
this.jobsForModelMemoryEstimation = moduleConfig.jobs.filter(job => {
const override = jobSpecificOverrides.find(o => `${jobPrefix}${o.job_id}` === job.id);
return override?.analysis_limits?.model_memory_limit === undefined;
});
this.jobsForModelMemoryEstimation = moduleConfig.jobs
.filter(job => {
const override = jobSpecificOverrides.find(o => `${jobPrefix}${o.job_id}` === job.id);
return override?.analysis_limits?.model_memory_limit === undefined;
})
.map(job => ({
job,
query: moduleConfig.datafeeds.find(d => d.config.job_id === job.id)?.config.query || null,
}));
}

function processArrayValues(source: any, update: any) {
Expand Down
74 changes: 73 additions & 1 deletion x-pack/test/api_integration/apis/ml/modules/setup_module.ts
Expand Up @@ -9,6 +9,7 @@ import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../ftr_provider_context';

import { JOB_STATE, DATAFEED_STATE } from '../../../../../plugins/ml/common/constants/states';
import { Job } from '../../../../../plugins/ml/common/types/anomaly_detection_jobs';
import { USER } from '../../../../functional/services/machine_learning/security_common';

const COMMON_HEADERS = {
Expand All @@ -23,7 +24,8 @@ export default ({ getService }: FtrProviderContext) => {

const testDataListPositive = [
{
testTitleSuffix: 'for sample logs dataset with prefix and startDatafeed false',
testTitleSuffix:
'for sample logs dataset with prefix, startDatafeed false and estimateModelMemory false',
sourceDataArchive: 'ml/sample_logs',
indexPattern: { name: 'kibana_sample_data_logs', timeField: '@timestamp' },
module: 'sample_data_weblogs',
Expand All @@ -32,6 +34,7 @@ export default ({ getService }: FtrProviderContext) => {
prefix: 'pf1_',
indexPatternName: 'kibana_sample_data_logs',
startDatafeed: false,
estimateModelMemory: false,
},
expected: {
responseCode: 200,
Expand All @@ -40,16 +43,55 @@ export default ({ getService }: FtrProviderContext) => {
jobId: 'pf1_low_request_rate',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
{
jobId: 'pf1_response_code_rates',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
{
jobId: 'pf1_url_scanning',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
],
},
},
{
testTitleSuffix:
'for sample logs dataset with prefix, startDatafeed false and estimateModelMemory true',
sourceDataArchive: 'ml/sample_logs',
indexPattern: { name: 'kibana_sample_data_logs', timeField: '@timestamp' },
module: 'sample_data_weblogs',
user: USER.ML_POWERUSER,
requestBody: {
prefix: 'pf2_',
indexPatternName: 'kibana_sample_data_logs',
startDatafeed: false,
},
expected: {
responseCode: 200,
jobs: [
{
jobId: 'pf2_low_request_rate',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '11mb',
},
{
jobId: 'pf2_response_code_rates',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '11mb',
},
{
jobId: 'pf2_url_scanning',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '16mb',
},
],
},
Expand Down Expand Up @@ -197,6 +239,36 @@ export default ({ getService }: FtrProviderContext) => {
await ml.api.waitForJobState(job.jobId, job.jobState);
await ml.api.waitForDatafeedState(datafeedId, job.datafeedState);
}

// compare model memory limits for created jobs
const expectedModelMemoryLimits = testData.expected.jobs
.map(j => ({
id: j.jobId,
modelMemoryLimit: j.modelMemoryLimit,
}))
.sort(compareById);

const {
body: { jobs },
}: {
body: {
jobs: Job[];
};
} = await ml.api.getAnomalyDetectionJob(testData.expected.jobs.map(j => j.jobId).join());

const actualModelMemoryLimits = jobs
.map(j => ({
id: j.job_id,
modelMemoryLimit: j.analysis_limits!.model_memory_limit,
}))
.sort(compareById);

expect(actualModelMemoryLimits).to.eql(
expectedModelMemoryLimits,
`Expected job model memory limits '${JSON.stringify(
expectedModelMemoryLimits
)}' (got '${JSON.stringify(actualModelMemoryLimits)}')`
);
});

// TODO in future updates: add creation validations for created saved objects
Expand Down
3 changes: 3 additions & 0 deletions x-pack/test/functional/apps/uptime/ml_anomaly.ts
Expand Up @@ -9,6 +9,8 @@ import { FtrProviderContext } from '../../ftr_provider_context';
export default ({ getService }: FtrProviderContext) => {
const uptime = getService('uptime');
const log = getService('log');
const esArchiver = getService('esArchiver');
const archive = 'uptime/full_heartbeat';

describe('uptime ml anomaly', function() {
this.tags(['skipFirefox']);
Expand All @@ -17,6 +19,7 @@ export default ({ getService }: FtrProviderContext) => {
const monitorId = '0000-intermittent';

before(async () => {
await esArchiver.loadIfNeeded(archive);
if (!(await uptime.navigation.checkIfOnMonitorPage(monitorId))) {
await uptime.navigation.loadDataAndGoToMonitorPage(dateStart, dateEnd, monitorId);
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/test/functional/services/uptime/ml_anomaly.ts
Expand Up @@ -32,7 +32,7 @@ export function UptimeMLAnomalyProvider({ getService }: FtrProviderContext) {

async createMLJob() {
await testSubjects.click('uptimeMLCreateJobBtn');
return retry.tryForTime(10000, async () => {
return retry.tryForTime(30000, async () => {
await testSubjects.existOrFail('uptimeMLJobSuccessfullyCreated');
log.info('Job successfully created');
});
Expand Down

0 comments on commit 9bc4ea0

Please sign in to comment.