Skip to content

Commit

Permalink
Add integration test for restarting kibana with different task claime…
Browse files Browse the repository at this point in the history
…r config
  • Loading branch information
ymao1 committed Jun 18, 2024
1 parent 63ceab2 commit 52e2222
Show file tree
Hide file tree
Showing 2 changed files with 255 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,8 @@
import deepmerge from 'deepmerge';
import { createTestServers, createRootWithCorePlugins } from '@kbn/core-test-helpers-kbn-server';

export async function setupTestServers(settings = {}) {
const { startES } = createTestServers({
adjustTimeout: (t) => jest.setTimeout(t),
settings: {
es: {
license: 'trial',
},
},
});

const esServer = await startES();

const root = createRootWithCorePlugins(
function createRoot(settings = {}) {
return createRootWithCorePlugins(
deepmerge(
{
logging: {
Expand All @@ -32,13 +21,35 @@ export async function setupTestServers(settings = {}) {
name: 'plugins.taskManager',
level: 'all',
},
{
name: 'plugins.taskManager.metrics-debugger',
level: 'warn',
},
{
name: 'plugins.taskManager.metrics-subscribe-debugger',
level: 'warn',
},
],
},
},
settings
),
{ oss: false }
);
}
export async function setupTestServers(settings = {}) {
const { startES } = createTestServers({
adjustTimeout: (t) => jest.setTimeout(t),
settings: {
es: {
license: 'trial',
},
},
});

const esServer = await startES();

const root = createRoot(settings);

await root.preboot();
const coreSetup = await root.setup();
Expand All @@ -54,3 +65,20 @@ export async function setupTestServers(settings = {}) {
},
};
}

export async function setupKibanaServer(settings = {}) {
const root = createRoot(settings);

await root.preboot();
const coreSetup = await root.setup();
const coreStart = await root.start();

return {
kibanaServer: {
root,
coreSetup,
coreStart,
stop: async () => await root.shutdown(),
},
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { v4 as uuidV4 } from 'uuid';
import { schema } from '@kbn/config-schema';
import { TaskStatus } from '../task';
import type { TaskClaimingOpts } from '../queries/task_claiming';
import { injectTask, setupTestServers, retry } from './lib';
import { setupKibanaServer } from './lib/setup_test_servers';

const mockTaskTypeRunFn = jest.fn();
const mockCreateTaskRunner = jest.fn();
const mockTaskType = {
title: '',
description: '',
stateSchemaByVersion: {
1: {
up: (state: Record<string, unknown>) => ({ ...state, baz: state.baz || '' }),
schema: schema.object({
foo: schema.string(),
bar: schema.string(),
baz: schema.string(),
}),
},
},
createTaskRunner: mockCreateTaskRunner.mockImplementation(() => ({
run: mockTaskTypeRunFn,
})),
};
const { TaskClaiming: TaskClaimingMock } = jest.requireMock('../queries/task_claiming');
jest.mock('../queries/task_claiming', () => {
const actual = jest.requireActual('../queries/task_claiming');
return {
...actual,
TaskClaiming: jest.fn().mockImplementation((opts: TaskClaimingOpts) => {
// We need to register here because once the class is instantiated, adding
// definitions won't get claimed because of "partitionIntoClaimingBatches".
opts.definitions.registerTaskDefinitions({
fooType: mockTaskType,
});
return new actual.TaskClaiming(opts);
}),
};
});

describe('switch task claiming strategies', () => {
beforeEach(() => {
jest.clearAllMocks();
});

it('should switch from default to mget and still claim tasks', async () => {
const setupResultDefault = await setupTestServers();
const esServer = setupResultDefault.esServer;
let kibanaServer = setupResultDefault.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];

expect(taskClaimingOpts.strategy).toBe('default');

mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
});

// inject a task to run and ensure it is claimed and run
const id1 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id1,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});

await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(1);
});

if (kibanaServer) {
await kibanaServer.stop();
}

const setupResultMget = await setupKibanaServer({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
},
},
});
kibanaServer = setupResultMget.kibanaServer;

taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('unsafe_mget');

// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id2,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});

await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(2);
});

if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});

it('should switch from mget to default and still claim tasks', async () => {
const setupResultMget = await setupTestServers({
xpack: {
task_manager: {
claim_strategy: 'unsafe_mget',
},
},
});
const esServer = setupResultMget.esServer;
let kibanaServer = setupResultMget.kibanaServer;
let taskClaimingOpts: TaskClaimingOpts = TaskClaimingMock.mock.calls[0][0];

expect(taskClaimingOpts.strategy).toBe('unsafe_mget');

mockTaskTypeRunFn.mockImplementation(() => {
return { state: {} };
});

// inject a task to run and ensure it is claimed and run
const id1 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id1,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});

await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(1);
});

if (kibanaServer) {
await kibanaServer.stop();
}

const setupResultDefault = await setupKibanaServer();
kibanaServer = setupResultDefault.kibanaServer;

taskClaimingOpts = TaskClaimingMock.mock.calls[1][0];
expect(taskClaimingOpts.strategy).toBe('default');

// inject a task to run and ensure it is claimed and run
const id2 = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: id2,
taskType: 'fooType',
params: { foo: true },
state: { foo: 'test', bar: 'test', baz: 'test' },
stateVersion: 4,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});

await retry(async () => {
expect(mockTaskTypeRunFn).toHaveBeenCalledTimes(2);
});

if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});
});

0 comments on commit 52e2222

Please sign in to comment.