Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Enabling mml estimation in data recognizer module setup #64900

Merged
Expand Up @@ -172,7 +172,6 @@ export const Page: FC<PageProps> = ({ moduleId, existingGroupIds }) => {
startDatafeed: startDatafeedAfterSave,
...(jobOverridesPayload !== null ? { jobOverrides: jobOverridesPayload } : {}),
...resultTimeRange,
estimateModelMemory: false,
});
const { datafeeds: datafeedsResponse, jobs: jobsResponse, kibana: kibanaResponse } = response;

Expand Down
106 changes: 59 additions & 47 deletions x-pack/plugins/ml/server/models/data_recognizer/data_recognizer.ts
Expand Up @@ -110,7 +110,7 @@ export class DataRecognizer {
/**
* List of the module jobs that require model memory estimation
*/
jobsForModelMemoryEstimation: ModuleJob[] = [];
jobsForModelMemoryEstimation: Array<{ job: ModuleJob; query: any }> = [];

constructor(
private callAsCurrentUser: APICaller,
Expand Down Expand Up @@ -374,7 +374,7 @@ export class DataRecognizer {
end?: number,
jobOverrides?: JobOverride | JobOverride[],
datafeedOverrides?: DatafeedOverride | DatafeedOverride[],
estimateModelMemory?: boolean
estimateModelMemory: boolean = true
) {
// load the config from disk
const moduleConfig = await this.getModule(moduleId, jobPrefix);
Expand Down Expand Up @@ -416,7 +416,10 @@ export class DataRecognizer {
savedObjects: [] as KibanaObjectResponse[],
};

this.jobsForModelMemoryEstimation = moduleConfig.jobs;
this.jobsForModelMemoryEstimation = moduleConfig.jobs.map(job => ({
job,
query: moduleConfig.datafeeds.find(d => d.config.job_id === job.id)?.config.query ?? null,
}));

this.applyJobConfigOverrides(moduleConfig, jobOverrides, jobPrefix);
this.applyDatafeedConfigOverrides(moduleConfig, datafeedOverrides, jobPrefix);
Expand Down Expand Up @@ -958,7 +961,7 @@ export class DataRecognizer {
*/
async updateModelMemoryLimits(
moduleConfig: Module,
estimateMML: boolean = false,
estimateMML: boolean,
start?: number,
end?: number
) {
Expand All @@ -967,53 +970,57 @@ export class DataRecognizer {
}

if (estimateMML && this.jobsForModelMemoryEstimation.length > 0) {
const calculateModelMemoryLimit = calculateModelMemoryLimitProvider(this.callAsCurrentUser);
const query = moduleConfig.query ?? null;

// Checks if all jobs in the module have the same time field configured
const isSameTimeFields = this.jobsForModelMemoryEstimation.every(
job =>
job.config.data_description.time_field ===
this.jobsForModelMemoryEstimation[0].config.data_description.time_field
);
try {
const calculateModelMemoryLimit = calculateModelMemoryLimitProvider(this.callAsCurrentUser);

if (isSameTimeFields && (start === undefined || end === undefined)) {
// In case of time range is not provided and the time field is the same
// set the fallback range for all jobs
const { start: fallbackStart, end: fallbackEnd } = await this.getFallbackTimeRange(
this.jobsForModelMemoryEstimation[0].config.data_description.time_field,
query
// Checks if all jobs in the module have the same time field configured
const firstJobTimeField = this.jobsForModelMemoryEstimation[0].job.config.data_description
.time_field;
const isSameTimeFields = this.jobsForModelMemoryEstimation.every(
({ job }) => job.config.data_description.time_field === firstJobTimeField
);
start = fallbackStart;
end = fallbackEnd;
}

for (const job of this.jobsForModelMemoryEstimation) {
let earliestMs = start;
let latestMs = end;
if (earliestMs === undefined || latestMs === undefined) {
const timeFieldRange = await this.getFallbackTimeRange(
if (isSameTimeFields && (start === undefined || end === undefined)) {
// In case of time range is not provided and the time field is the same
// set the fallback range for all jobs
// as there may not be a common query, we use a match_all
const {
start: fallbackStart,
end: fallbackEnd,
} = await this.getFallbackTimeRange(firstJobTimeField, { match_all: {} });
start = fallbackStart;
end = fallbackEnd;
}

for (const { job, query } of this.jobsForModelMemoryEstimation) {
let earliestMs = start;
let latestMs = end;
if (earliestMs === undefined || latestMs === undefined) {
const timeFieldRange = await this.getFallbackTimeRange(
job.config.data_description.time_field,
query
);
earliestMs = timeFieldRange.start;
latestMs = timeFieldRange.end;
}

const { modelMemoryLimit } = await calculateModelMemoryLimit(
job.config.analysis_config,
this.indexPatternName,
query,
job.config.data_description.time_field,
query
earliestMs,
latestMs
);
earliestMs = timeFieldRange.start;
latestMs = timeFieldRange.end;
}

const { modelMemoryLimit } = await calculateModelMemoryLimit(
job.config.analysis_config,
this.indexPatternName,
query,
job.config.data_description.time_field,
earliestMs,
latestMs
);
if (!job.config.analysis_limits) {
job.config.analysis_limits = {} as AnalysisLimits;
}

if (!job.config.analysis_limits) {
job.config.analysis_limits = {} as AnalysisLimits;
job.config.analysis_limits.model_memory_limit = modelMemoryLimit;
}

job.config.analysis_limits.model_memory_limit = modelMemoryLimit;
} catch (error) {
mlLog.warn(`Data recognizer could not estimate model memory limit ${error}`);
}
}

Expand Down Expand Up @@ -1098,10 +1105,15 @@ export class DataRecognizer {
if (generalOverrides.some(override => !!override.analysis_limits?.model_memory_limit)) {
this.jobsForModelMemoryEstimation = [];
} else {
this.jobsForModelMemoryEstimation = moduleConfig.jobs.filter(job => {
const override = jobSpecificOverrides.find(o => `${jobPrefix}${o.job_id}` === job.id);
return override?.analysis_limits?.model_memory_limit === undefined;
});
this.jobsForModelMemoryEstimation = moduleConfig.jobs
.filter(job => {
const override = jobSpecificOverrides.find(o => `${jobPrefix}${o.job_id}` === job.id);
return override?.analysis_limits?.model_memory_limit === undefined;
})
.map(job => ({
job,
query: moduleConfig.datafeeds.find(d => d.config.job_id === job.id)?.config.query || null,
}));
}

function processArrayValues(source: any, update: any) {
Expand Down
74 changes: 73 additions & 1 deletion x-pack/test/api_integration/apis/ml/modules/setup_module.ts
Expand Up @@ -9,6 +9,7 @@ import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../ftr_provider_context';

import { JOB_STATE, DATAFEED_STATE } from '../../../../../plugins/ml/common/constants/states';
import { Job } from '../../../../../plugins/ml/common/types/anomaly_detection_jobs';
import { USER } from '../../../../functional/services/machine_learning/security_common';

const COMMON_HEADERS = {
Expand All @@ -23,7 +24,8 @@ export default ({ getService }: FtrProviderContext) => {

const testDataListPositive = [
{
testTitleSuffix: 'for sample logs dataset with prefix and startDatafeed false',
testTitleSuffix:
'for sample logs dataset with prefix, startDatafeed false and estimateModelMemory false',
sourceDataArchive: 'ml/sample_logs',
indexPattern: { name: 'kibana_sample_data_logs', timeField: '@timestamp' },
module: 'sample_data_weblogs',
Expand All @@ -32,6 +34,7 @@ export default ({ getService }: FtrProviderContext) => {
prefix: 'pf1_',
indexPatternName: 'kibana_sample_data_logs',
startDatafeed: false,
estimateModelMemory: false,
},
expected: {
responseCode: 200,
Expand All @@ -40,16 +43,55 @@ export default ({ getService }: FtrProviderContext) => {
jobId: 'pf1_low_request_rate',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
{
jobId: 'pf1_response_code_rates',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
{
jobId: 'pf1_url_scanning',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
],
},
},
{
testTitleSuffix:
'for sample logs dataset with prefix, startDatafeed false and estimateModelMemory true',
sourceDataArchive: 'ml/sample_logs',
indexPattern: { name: 'kibana_sample_data_logs', timeField: '@timestamp' },
module: 'sample_data_weblogs',
user: USER.ML_POWERUSER,
requestBody: {
prefix: 'pf2_',
indexPatternName: 'kibana_sample_data_logs',
startDatafeed: false,
},
expected: {
responseCode: 200,
jobs: [
{
jobId: 'pf2_low_request_rate',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '11mb',
},
{
jobId: 'pf2_response_code_rates',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '11mb',
},
{
jobId: 'pf2_url_scanning',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '16mb',
},
],
},
Expand Down Expand Up @@ -197,6 +239,36 @@ export default ({ getService }: FtrProviderContext) => {
await ml.api.waitForJobState(job.jobId, job.jobState);
await ml.api.waitForDatafeedState(datafeedId, job.datafeedState);
}

// compare model memory limits for created jobs
const expectedModelMemoryLimits = testData.expected.jobs
.map(j => ({
id: j.jobId,
modelMemoryLimit: j.modelMemoryLimit,
}))
.sort(compareById);

const {
body: { jobs },
}: {
body: {
jobs: Job[];
};
Comment on lines +253 to +256
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to provide a return type for getAnomalyDetectionJob method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but would touch other files which use it, so should be done in a refactor follow up.

} = await ml.api.getAnomalyDetectionJob(testData.expected.jobs.map(j => j.jobId).join());

const actualModelMemoryLimits = jobs
.map(j => ({
id: j.job_id,
modelMemoryLimit: j.analysis_limits!.model_memory_limit,
}))
.sort(compareById);

expect(actualModelMemoryLimits).to.eql(
expectedModelMemoryLimits,
`Expected job model memory limits '${JSON.stringify(
expectedModelMemoryLimits
)}' (got '${JSON.stringify(actualModelMemoryLimits)}')`
);
});

// TODO in future updates: add creation validations for created saved objects
Expand Down
3 changes: 3 additions & 0 deletions x-pack/test/functional/apps/uptime/ml_anomaly.ts
Expand Up @@ -9,6 +9,8 @@ import { FtrProviderContext } from '../../ftr_provider_context';
export default ({ getService }: FtrProviderContext) => {
const uptime = getService('uptime');
const log = getService('log');
const esArchiver = getService('esArchiver');
const archive = 'uptime/full_heartbeat';

describe('uptime ml anomaly', function() {
this.tags(['skipFirefox']);
Expand All @@ -17,6 +19,7 @@ export default ({ getService }: FtrProviderContext) => {
const monitorId = '0000-intermittent';

before(async () => {
await esArchiver.loadIfNeeded(archive);
if (!(await uptime.navigation.checkIfOnMonitorPage(monitorId))) {
await uptime.navigation.loadDataAndGoToMonitorPage(dateStart, dateEnd, monitorId);
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/test/functional/services/uptime/ml_anomaly.ts
Expand Up @@ -32,7 +32,7 @@ export function UptimeMLAnomalyProvider({ getService }: FtrProviderContext) {

async createMLJob() {
await testSubjects.click('uptimeMLCreateJobBtn');
return retry.tryForTime(10000, async () => {
return retry.tryForTime(30000, async () => {
await testSubjects.existOrFail('uptimeMLJobSuccessfullyCreated');
log.info('Job successfully created');
});
Expand Down