Skip to content

Commit

Permalink
Increase stability when initializing the Elasticsearch index for the …
Browse files Browse the repository at this point in the history
…event log (#57465)

* Fix ILM policy creation

* Handle errors thrown in scenario multiple Kibana instances are started at the same time

* Fix tests and cleanup

* Start adding tests

* Refactor tests, add index template failure test

* Create cluster client adapter to facilitate testing and isolation

* Fix places calling callEs still

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
mikecote and elasticmachine committed Feb 14, 2020
1 parent 34ae99b commit 918c0de
Show file tree
Hide file tree
Showing 13 changed files with 495 additions and 185 deletions.
24 changes: 24 additions & 0 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { IClusterClientAdapter } from './cluster_client_adapter';

const createClusterClientMock = () => {
const mock: jest.Mocked<IClusterClientAdapter> = {
indexDocument: jest.fn(),
doesIlmPolicyExist: jest.fn(),
createIlmPolicy: jest.fn(),
doesIndexTemplateExist: jest.fn(),
createIndexTemplate: jest.fn(),
doesAliasExist: jest.fn(),
createIndex: jest.fn(),
};
return mock;
};

export const clusterClientAdapterMock = {
create: createClusterClientMock,
};
196 changes: 196 additions & 0 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { ClusterClient, Logger } from '../../../../../src/core/server';
import { elasticsearchServiceMock, loggingServiceMock } from '../../../../../src/core/server/mocks';
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter';

type EsClusterClient = Pick<jest.Mocked<ClusterClient>, 'callAsInternalUser' | 'asScoped'>;

let logger: Logger;
let clusterClient: EsClusterClient;
let clusterClientAdapter: IClusterClientAdapter;

beforeEach(() => {
logger = loggingServiceMock.createLogger();
clusterClient = elasticsearchServiceMock.createClusterClient();
clusterClientAdapter = new ClusterClientAdapter({
logger,
clusterClient,
});
});

describe('indexDocument', () => {
test('should call cluster client with given doc', async () => {
await clusterClientAdapter.indexDocument({ args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', {
args: true,
});
});

test('should throw error when cluster client throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.indexDocument({ args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`);
});
});

describe('doesIlmPolicyExist', () => {
const notFoundError = new Error('Not found') as any;
notFoundError.statusCode = 404;

test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesIlmPolicyExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', {
method: 'GET',
path: '_ilm/policy/foo',
});
});

test('should return false when 404 error is returned by Elasticsearch', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(notFoundError);
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(false);
});

test('should throw error when error is not 404', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesIlmPolicyExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(`"error checking existance of ilm policy: Fail"`);
});

test('should return true when no error is thrown', async () => {
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(true);
});
});

describe('createIlmPolicy', () => {
test('should call cluster client with given policy', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({ success: true });
await clusterClientAdapter.createIlmPolicy('foo', { args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', {
method: 'PUT',
path: '_ilm/policy/foo',
body: { args: true },
});
});

test('should throw error when call cluster client throws', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.createIlmPolicy('foo', { args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating ilm policy: Fail"`);
});
});

describe('doesIndexTemplateExist', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesIndexTemplateExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsTemplate', {
name: 'foo',
});
});

test('should return true when call cluster returns true', async () => {
clusterClient.callAsInternalUser.mockResolvedValue(true);
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(true);
});

test('should return false when call cluster returns false', async () => {
clusterClient.callAsInternalUser.mockResolvedValue(false);
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(false);
});

test('should throw error when call cluster throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesIndexTemplateExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(
`"error checking existance of index template: Fail"`
);
});
});

describe('createIndexTemplate', () => {
test('should call cluster with given template', async () => {
await clusterClientAdapter.createIndexTemplate('foo', { args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.putTemplate', {
name: 'foo',
create: true,
body: { args: true },
});
});

test(`should throw error if index template still doesn't exist after error is thrown`, async () => {
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
clusterClient.callAsInternalUser.mockResolvedValueOnce(false);
await expect(
clusterClientAdapter.createIndexTemplate('foo', { args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating index template: Fail"`);
});

test('should not throw error if index template exists after error is thrown', async () => {
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
clusterClient.callAsInternalUser.mockResolvedValueOnce(true);
await clusterClientAdapter.createIndexTemplate('foo', { args: true });
});
});

describe('doesAliasExist', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesAliasExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsAlias', {
name: 'foo',
});
});

test('should return true when call cluster returns true', async () => {
clusterClient.callAsInternalUser.mockResolvedValueOnce(true);
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(true);
});

test('should return false when call cluster returns false', async () => {
clusterClient.callAsInternalUser.mockResolvedValueOnce(false);
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(false);
});

test('should throw error when call cluster throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesAliasExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(
`"error checking existance of initial index: Fail"`
);
});
});

describe('createIndex', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.createIndex('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.create', {
index: 'foo',
});
});

test('should throw error when not getting an error of type resource_already_exists_exception', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.createIndex('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating initial index: Fail"`);
});

test(`shouldn't throw when an error of type resource_already_exists_exception is thrown`, async () => {
const err = new Error('Already exists') as any;
err.body = {
error: {
type: 'resource_already_exists_exception',
},
};
clusterClient.callAsInternalUser.mockRejectedValue(err);
await clusterClientAdapter.createIndex('foo');
});
});
126 changes: 126 additions & 0 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Logger, ClusterClient } from '../../../../../src/core/server';

export type EsClusterClient = Pick<ClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>;

export interface ConstructorOpts {
logger: Logger;
clusterClient: EsClusterClient;
}

export class ClusterClientAdapter {
private readonly logger: Logger;
private readonly clusterClient: EsClusterClient;

constructor(opts: ConstructorOpts) {
this.logger = opts.logger;
this.clusterClient = opts.clusterClient;
}

public async indexDocument(doc: any): Promise<void> {
await this.callEs('index', doc);
}

public async doesIlmPolicyExist(policyName: string): Promise<boolean> {
const request = {
method: 'GET',
path: `_ilm/policy/${policyName}`,
};
try {
await this.callEs('transport.request', request);
} catch (err) {
if (err.statusCode === 404) return false;
throw new Error(`error checking existance of ilm policy: ${err.message}`);
}
return true;
}

public async createIlmPolicy(policyName: string, policy: any): Promise<void> {
const request = {
method: 'PUT',
path: `_ilm/policy/${policyName}`,
body: policy,
};
try {
await this.callEs('transport.request', request);
} catch (err) {
throw new Error(`error creating ilm policy: ${err.message}`);
}
}

public async doesIndexTemplateExist(name: string): Promise<boolean> {
let result;
try {
result = await this.callEs('indices.existsTemplate', { name });
} catch (err) {
throw new Error(`error checking existance of index template: ${err.message}`);
}
return result as boolean;
}

public async createIndexTemplate(name: string, template: any): Promise<void> {
const addTemplateParams = {
name,
create: true,
body: template,
};
try {
await this.callEs('indices.putTemplate', addTemplateParams);
} catch (err) {
// The error message doesn't have a type attribute we can look to guarantee it's due
// to the template already existing (only long message) so we'll check ourselves to see
// if the template now exists. This scenario would happen if you startup multiple Kibana
// instances at the same time.
const existsNow = await this.doesIndexTemplateExist(name);
if (!existsNow) {
throw new Error(`error creating index template: ${err.message}`);
}
}
}

public async doesAliasExist(name: string): Promise<boolean> {
let result;
try {
result = await this.callEs('indices.existsAlias', { name });
} catch (err) {
throw new Error(`error checking existance of initial index: ${err.message}`);
}
return result as boolean;
}

public async createIndex(name: string): Promise<void> {
try {
await this.callEs('indices.create', { index: name });
} catch (err) {
if (err.body?.error?.type !== 'resource_already_exists_exception') {
throw new Error(`error creating initial index: ${err.message}`);
}
}
}

private async callEs(operation: string, body?: any): Promise<any> {
try {
this.debug(`callEs(${operation}) calls:`, body);
const result = await this.clusterClient.callAsInternalUser(operation, body);
this.debug(`callEs(${operation}) result:`, result);
return result;
} catch (err) {
this.debug(`callEs(${operation}) error:`, {
message: err.message,
statusCode: err.statusCode,
});
throw err;
}
}

private debug(message: string, object?: any) {
const objectString = object == null ? '' : JSON.stringify(object);
this.logger.debug(`esContext: ${message} ${objectString}`);
}
}
Loading

0 comments on commit 918c0de

Please sign in to comment.