diff --git a/packages/kbn-es/src/cluster.js b/packages/kbn-es/src/cluster.js index 9ea1817e0f49b9..2983ec474b9e01 100644 --- a/packages/kbn-es/src/cluster.js +++ b/packages/kbn-es/src/cluster.js @@ -33,6 +33,19 @@ const { createCliError } = require('./errors'); const { promisify } = require('util'); const treeKillAsync = promisify(require('tree-kill')); +// listen to data on stream until map returns anything but undefined +const first = (stream, map) => + new Promise(resolve => { + const onData = data => { + const result = map(data); + if (result !== undefined) { + resolve(result); + stream.removeListener('data', onData); + } + }; + stream.on('data', onData); + }); + exports.Cluster = class Cluster { constructor(log = defaultLog) { this._log = log; @@ -158,14 +171,15 @@ exports.Cluster = class Cluster { this._exec(installPath, options); await Promise.race([ - // await the "started" log message - new Promise(resolve => { - this._process.stdout.on('data', data => { + // wait for native realm to be setup and es to be started + Promise.all([ + first(this._process.stdout, data => { if (/started/.test(data)) { - resolve(); + return true; } - }); - }), + }), + this._nativeRealmSetup, + ]), // await the outcome of the process in case it exits before starting this._outcome.then(() => { @@ -185,6 +199,12 @@ exports.Cluster = class Cluster { async run(installPath, options = {}) { this._exec(installPath, options); + // log native realm setup errors so they aren't uncaught + this._nativeRealmSetup.catch(error => { + this._log.error(error); + this.stop(); + }); + // await the final outcome of the process await this._outcome; } @@ -241,42 +261,43 @@ exports.Cluster = class Cluster { stdio: ['ignore', 'pipe', 'pipe'], }); + // parse log output to find http port + const httpPort = first(this._process.stdout, data => { + const match = data.toString('utf8').match(/HttpServer.+publish_address {[0-9.]+:([0-9]+)/); + + if (match) { + return match[1]; + } + }); + + // once the http port is available setup the native realm + this._nativeRealmSetup = httpPort.then(async port => { + const nativeRealm = new NativeRealm(options.password, port, this._log); + await nativeRealm.setPasswords(options); + }); + + // parse and forward es stdout to the log this._process.stdout.on('data', data => { const lines = parseEsLog(data.toString()); lines.forEach(line => { this._log.info(line.formattedMessage); - - // once we have the port we can stop checking for it - if (this.httpPort) { - return; - } - - const httpAddressMatch = line.message.match( - /HttpServer.+publish_address {[0-9.]+:([0-9]+)/ - ); - - if (httpAddressMatch) { - this.httpPort = httpAddressMatch[1]; - new NativeRealm(options.password, this.httpPort, this._log).setPasswords(options); - } }); }); + // forward es stderr to the log this._process.stderr.on('data', data => this._log.error(chalk.red(data.toString()))); - this._outcome = new Promise((resolve, reject) => { - this._process.once('exit', code => { - if (this._stopCalled) { - resolve(); - return; - } - // JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors - if (code > 0 && !(code === 143 || code === 130)) { - reject(createCliError(`ES exited with code ${code}`)); - } else { - resolve(); - } - }); + // observe the exit code of the process and reflect in _outcome promies + const exitCode = new Promise(resolve => this._process.once('exit', resolve)); + this._outcome = exitCode.then(code => { + if (this._stopCalled) { + return; + } + + // JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors + if (code > 0 && !(code === 143 || code === 130)) { + throw createCliError(`ES exited with code ${code}`); + } }); } }; diff --git a/packages/kbn-es/src/integration_tests/__fixtures__/es_bin.js b/packages/kbn-es/src/integration_tests/__fixtures__/es_bin.js index d07d5e24a641f1..6c49371fd7d407 100644 --- a/packages/kbn-es/src/integration_tests/__fixtures__/es_bin.js +++ b/packages/kbn-es/src/integration_tests/__fixtures__/es_bin.js @@ -19,10 +19,61 @@ * under the License. */ +const { createServer } = require('http'); +const { format: formatUrl } = require('url'); const { exitCode, start } = JSON.parse(process.argv[2]); -if (start) { - console.log('started'); +process.exitCode = exitCode; + +if (!start) { + return; } -process.exitCode = exitCode; +let serverUrl; +const server = createServer((req, res) => { + const url = new URL(req.url, serverUrl); + const send = (code, body) => { + res.writeHead(code, { 'content-type': 'application/json' }); + res.end(JSON.stringify(body)); + }; + + if (url.pathname === '/_xpack') { + return send(400, { + error: { + reason: 'foo bar', + }, + }); + } + + return send(404, { + error: { + reason: 'not found', + }, + }); +}); + +// setup server auto close after 1 second of silence +let serverCloseTimer; +const delayServerClose = () => { + clearTimeout(serverCloseTimer); + serverCloseTimer = setTimeout(() => server.close(), 1000); +}; +server.on('request', delayServerClose); +server.on('listening', delayServerClose); + +server.listen(0, '127.0.0.1', function() { + const { port, address: hostname } = server.address(); + serverUrl = new URL( + formatUrl({ + protocol: 'http:', + port, + hostname, + }) + ); + + console.log( + `[o.e.h.AbstractHttpServerTransport] [computer] publish_address {127.0.0.1:${port}}, bound_addresses {[::1]:${port}}, {127.0.0.1:${port}}` + ); + + console.log('started'); +}); diff --git a/packages/kbn-es/src/utils/native_realm.js b/packages/kbn-es/src/utils/native_realm.js index fa3bda4752d595..be1e79b1ce4af9 100644 --- a/packages/kbn-es/src/utils/native_realm.js +++ b/packages/kbn-es/src/utils/native_realm.js @@ -1,82 +1,83 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -const { Client } = require('@elastic/elasticsearch'); -const chalk = require('chalk'); - -const { log: defaultLog } = require('./log'); - -exports.NativeRealm = class NativeRealm { - constructor(elasticPassword, port, log = defaultLog) { - this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` }); - this._elasticPassword = elasticPassword; - this._log = log; - } - - async setPassword(username, password = this._elasticPassword) { - this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`); - - try { - await this._client.security.changePassword({ - username, - refresh: 'wait_for', - body: { - password, - }, - }); - } catch (e) { - this._log.error( - chalk.red(`unable to set password for ${chalk.bold(username)}: ${e.message}`) - ); - } - } - - async setPasswords(options) { - if (!(await this.isSecurityEnabled())) { - this._log.info('security is not enabled, unable to set native realm passwords'); - return; - } - - (await this.getReservedUsers()).forEach(user => { - this.setPassword(user, options[`password.${user}`]); - }); - } - - async getReservedUsers() { - const users = await this._client.security.getUser(); - - return Object.keys(users.body).reduce((acc, user) => { - if (users.body[user].metadata._reserved === true) { - acc.push(user); - } - return acc; - }, []); - } - - async isSecurityEnabled() { - try { - const { - body: { features }, - } = await this._client.xpack.info({ categories: 'features' }); - return features.security && features.security.enabled && features.security.available; - } catch (e) { - return false; - } - } -}; +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const { Client } = require('@elastic/elasticsearch'); +const chalk = require('chalk'); + +const { log: defaultLog } = require('./log'); + +exports.NativeRealm = class NativeRealm { + constructor(elasticPassword, port, log = defaultLog) { + this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` }); + this._elasticPassword = elasticPassword; + this._log = log; + } + + async setPassword(username, password = this._elasticPassword) { + this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`); + + await this._client.security.changePassword({ + username, + refresh: 'wait_for', + body: { + password, + }, + }); + } + + async setPasswords(options) { + if (!(await this.isSecurityEnabled())) { + this._log.info('security is not enabled, unable to set native realm passwords'); + return; + } + + const reservedUsers = await this.getReservedUsers(); + await Promise.all( + reservedUsers.map(async user => { + await this.setPassword(user, options[`password.${user}`]); + }) + ); + } + + async getReservedUsers() { + const users = await this._client.security.getUser(); + + return Object.keys(users.body).reduce((acc, user) => { + if (users.body[user].metadata._reserved === true) { + acc.push(user); + } + return acc; + }, []); + } + + async isSecurityEnabled() { + try { + const { + body: { features }, + } = await this._client.xpack.info({ categories: 'features' }); + return features.security && features.security.enabled && features.security.available; + } catch (error) { + if (error.meta && error.meta.statusCode === 400) { + return false; + } + + throw error; + } + } +}; diff --git a/packages/kbn-es/src/utils/native_realm.test.js b/packages/kbn-es/src/utils/native_realm.test.js index 7dccb5eeef5b6f..d5a036df70b449 100644 --- a/packages/kbn-es/src/utils/native_realm.test.js +++ b/packages/kbn-es/src/utils/native_realm.test.js @@ -79,12 +79,31 @@ describe('isSecurityEnabled', () => { expect(await nativeRealm.isSecurityEnabled()).toBe(false); }); - test('logs exception and returns false', async () => { + test('returns false if 400 error returned', async () => { mockClient.xpack.info.mockImplementation(() => { - throw new Error('ResponseError'); + const error = new Error('ResponseError'); + error.meta = { + statusCode: 400, + }; + throw error; }); + expect(await nativeRealm.isSecurityEnabled()).toBe(false); }); + + test('rejects if unexpected error is thrown', async () => { + mockClient.xpack.info.mockImplementation(() => { + const error = new Error('ResponseError'); + error.meta = { + statusCode: 500, + }; + throw error; + }); + + await expect(nativeRealm.isSecurityEnabled()).rejects.toThrowErrorMatchingInlineSnapshot( + `"ResponseError"` + ); + }); }); describe('setPasswords', () => { @@ -204,18 +223,13 @@ describe('setPassword', () => { }); }); - it('logs error', async () => { + it('rejects with errors', async () => { mockClient.security.changePassword.mockImplementation(() => { throw new Error('SomeError'); }); - await nativeRealm.setPassword('kibana', 'foo'); - expect(log.error.mock.calls).toMatchInlineSnapshot(` -Array [ - Array [ - "unable to set password for kibana: SomeError", - ], -] -`); + await expect( + nativeRealm.setPassword('kibana', 'foo') + ).rejects.toThrowErrorMatchingInlineSnapshot(`"SomeError"`); }); });