Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase stability when initializing the Elasticsearch index for the event log #57465

Merged
merged 13 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { IClusterClientAdapter } from './cluster_client_adapter';

const createClusterClientMock = () => {
const mock: jest.Mocked<IClusterClientAdapter> = {
indexDocument: jest.fn(),
doesIlmPolicyExist: jest.fn(),
createIlmPolicy: jest.fn(),
doesIndexTemplateExist: jest.fn(),
createIndexTemplate: jest.fn(),
doesAliasExist: jest.fn(),
createIndex: jest.fn(),
};
return mock;
};

export const clusterClientAdapterMock = {
create: createClusterClientMock,
};
196 changes: 196 additions & 0 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { ClusterClient, Logger } from '../../../../../src/core/server';
import { elasticsearchServiceMock, loggingServiceMock } from '../../../../../src/core/server/mocks';
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter';

type EsClusterClient = Pick<jest.Mocked<ClusterClient>, 'callAsInternalUser' | 'asScoped'>;

let logger: Logger;
let clusterClient: EsClusterClient;
let clusterClientAdapter: IClusterClientAdapter;

beforeEach(() => {
logger = loggingServiceMock.createLogger();
clusterClient = elasticsearchServiceMock.createClusterClient();
clusterClientAdapter = new ClusterClientAdapter({
logger,
clusterClient,
});
});

describe('indexDocument', () => {
test('should call cluster client with given doc', async () => {
await clusterClientAdapter.indexDocument({ args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('index', {
args: true,
});
});

test('should throw error when cluster client throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.indexDocument({ args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"Fail"`);
});
});

describe('doesIlmPolicyExist', () => {
const notFoundError = new Error('Not found') as any;
notFoundError.statusCode = 404;

test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesIlmPolicyExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', {
method: 'GET',
path: '_ilm/policy/foo',
});
});

test('should return false when 404 error is returned by Elasticsearch', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(notFoundError);
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(false);
});

test('should throw error when error is not 404', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesIlmPolicyExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(`"error checking existance of ilm policy: Fail"`);
});

test('should return true when no error is thrown', async () => {
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(true);
});
});

describe('createIlmPolicy', () => {
test('should call cluster client with given policy', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({ success: true });
await clusterClientAdapter.createIlmPolicy('foo', { args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', {
method: 'PUT',
path: '_ilm/policy/foo',
body: { args: true },
});
});

test('should throw error when call cluster client throws', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.createIlmPolicy('foo', { args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating ilm policy: Fail"`);
});
});

describe('doesIndexTemplateExist', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesIndexTemplateExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsTemplate', {
name: 'foo',
});
});

test('should return true when call cluster returns true', async () => {
clusterClient.callAsInternalUser.mockResolvedValue(true);
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(true);
});

test('should return false when call cluster returns false', async () => {
clusterClient.callAsInternalUser.mockResolvedValue(false);
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(false);
});

test('should throw error when call cluster throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesIndexTemplateExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(
`"error checking existance of index template: Fail"`
);
});
});

describe('createIndexTemplate', () => {
test('should call cluster with given template', async () => {
await clusterClientAdapter.createIndexTemplate('foo', { args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.putTemplate', {
name: 'foo',
create: true,
body: { args: true },
});
});

test(`should throw error if index template still doesn't exist after error is thrown`, async () => {
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
clusterClient.callAsInternalUser.mockResolvedValueOnce(false);
await expect(
clusterClientAdapter.createIndexTemplate('foo', { args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating index template: Fail"`);
});

test('should not throw error if index template exists after error is thrown', async () => {
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
clusterClient.callAsInternalUser.mockResolvedValueOnce(true);
await clusterClientAdapter.createIndexTemplate('foo', { args: true });
});
});

describe('doesAliasExist', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesAliasExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsAlias', {
name: 'foo',
});
});

test('should return true when call cluster returns true', async () => {
clusterClient.callAsInternalUser.mockResolvedValueOnce(true);
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(true);
});

test('should return false when call cluster returns false', async () => {
clusterClient.callAsInternalUser.mockResolvedValueOnce(false);
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(false);
});

test('should throw error when call cluster throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesAliasExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(
`"error checking existance of initial index: Fail"`
);
});
});

describe('createIndex', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.createIndex('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.create', {
index: 'foo',
});
});

test('should throw error when not getting an error of type resource_already_exists_exception', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.createIndex('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating initial index: Fail"`);
});

test(`shouldn't throw when an error of type resource_already_exists_exception is thrown`, async () => {
const err = new Error('Already exists') as any;
err.body = {
error: {
type: 'resource_already_exists_exception',
},
};
clusterClient.callAsInternalUser.mockRejectedValue(err);
await clusterClientAdapter.createIndex('foo');
});
});
126 changes: 126 additions & 0 deletions x-pack/plugins/event_log/server/es/cluster_client_adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Logger, ClusterClient } from '../../../../../src/core/server';

export type EsClusterClient = Pick<ClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>;

export interface ConstructorOpts {
logger: Logger;
clusterClient: EsClusterClient;
}

export class ClusterClientAdapter {
private readonly logger: Logger;
private readonly clusterClient: EsClusterClient;

constructor(opts: ConstructorOpts) {
this.logger = opts.logger;
this.clusterClient = opts.clusterClient;
}

public async indexDocument(doc: any): Promise<void> {
await this.callEs('index', doc);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting that we don't need a result here!

}

public async doesIlmPolicyExist(policyName: string): Promise<boolean> {
const request = {
method: 'GET',
path: `_ilm/policy/${policyName}`,
};
try {
await this.callEs('transport.request', request);
} catch (err) {
if (err.statusCode === 404) return false;
throw new Error(`error checking existance of ilm policy: ${err.message}`);
}
return true;
}

public async createIlmPolicy(policyName: string, policy: any): Promise<void> {
const request = {
method: 'PUT',
path: `_ilm/policy/${policyName}`,
body: policy,
};
try {
await this.callEs('transport.request', request);
} catch (err) {
throw new Error(`error creating ilm policy: ${err.message}`);
}
}

public async doesIndexTemplateExist(name: string): Promise<boolean> {
let result;
try {
result = await this.callEs('indices.existsTemplate', { name });
} catch (err) {
throw new Error(`error checking existance of index template: ${err.message}`);
}
return result as boolean;
}

public async createIndexTemplate(name: string, template: any): Promise<void> {
const addTemplateParams = {
name,
create: true,
body: template,
};
try {
await this.callEs('indices.putTemplate', addTemplateParams);
} catch (err) {
// The error message doesn't have a type attribute we can look to guarantee it's due
// to the template already existing (only long message) so we'll check ourselves to see
// if the template now exists. This scenario would happen if you startup multiple Kibana
// instances at the same time.
const existsNow = await this.doesIndexTemplateExist(name);
if (!existsNow) {
throw new Error(`error creating index template: ${err.message}`);
}
}
}

public async doesAliasExist(name: string): Promise<boolean> {
let result;
try {
result = await this.callEs('indices.existsAlias', { name });
} catch (err) {
throw new Error(`error checking existance of initial index: ${err.message}`);
}
return result as boolean;
}

public async createIndex(name: string): Promise<void> {
try {
await this.callEs('indices.create', { index: name });
} catch (err) {
if (err.body?.error?.type !== 'resource_already_exists_exception') {
throw new Error(`error creating initial index: ${err.message}`);
}
}
}

private async callEs(operation: string, body?: any): Promise<any> {
try {
this.debug(`callEs(${operation}) calls:`, body);
const result = await this.clusterClient.callAsInternalUser(operation, body);
this.debug(`callEs(${operation}) result:`, result);
return result;
} catch (err) {
this.debug(`callEs(${operation}) error:`, {
message: err.message,
statusCode: err.statusCode,
});
throw err;
}
}

private debug(message: string, object?: any) {
const objectString = object == null ? '' : JSON.stringify(object);
this.logger.debug(`esContext: ${message} ${objectString}`);
}
}
Loading