Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kbn-es] await native realm setup, error if there are failures #36290

Merged
merged 14 commits into from May 10, 2019
87 changes: 54 additions & 33 deletions packages/kbn-es/src/cluster.js
Expand Up @@ -33,6 +33,19 @@ const { createCliError } = require('./errors');
const { promisify } = require('util');
const treeKillAsync = promisify(require('tree-kill'));

// listen to data on stream until map returns anything but undefined
const first = (stream, map) =>
new Promise(resolve => {
const onData = data => {
const result = map(data);
if (result !== undefined) {
resolve(result);
stream.removeListener('data', onData);
}
};
stream.on('data', onData);
});

exports.Cluster = class Cluster {
constructor(log = defaultLog) {
this._log = log;
Expand Down Expand Up @@ -158,14 +171,15 @@ exports.Cluster = class Cluster {
this._exec(installPath, options);

await Promise.race([
// await the "started" log message
new Promise(resolve => {
this._process.stdout.on('data', data => {
// wait for native realm to be setup and es to be started
Promise.all([
first(this._process.stdout, data => {
if (/started/.test(data)) {
resolve();
return true;
}
});
}),
}),
this._nativeRealmSetup,
]),

// await the outcome of the process in case it exits before starting
this._outcome.then(() => {
Expand All @@ -185,6 +199,12 @@ exports.Cluster = class Cluster {
async run(installPath, options = {}) {
this._exec(installPath, options);

// log native realm setup errors so they aren't uncaught
this._nativeRealmSetup.catch(error => {
this._log.error(error);
this.stop();
});

// await the final outcome of the process
await this._outcome;
}
Expand Down Expand Up @@ -241,42 +261,43 @@ exports.Cluster = class Cluster {
stdio: ['ignore', 'pipe', 'pipe'],
});

// parse log output to find http port
const httpPort = first(this._process.stdout, data => {
const match = data.toString('utf8').match(/HttpServer.+publish_address {[0-9.]+:([0-9]+)/);

if (match) {
return match[1];
}
});

// once the http port is available setup the native realm
this._nativeRealmSetup = httpPort.then(async port => {
const nativeRealm = new NativeRealm(options.password, port, this._log);
await nativeRealm.setPasswords(options);
});

// parse and forward es stdout to the log
this._process.stdout.on('data', data => {
const lines = parseEsLog(data.toString());
lines.forEach(line => {
this._log.info(line.formattedMessage);

// once we have the port we can stop checking for it
if (this.httpPort) {
return;
}

const httpAddressMatch = line.message.match(
/HttpServer.+publish_address {[0-9.]+:([0-9]+)/
);

if (httpAddressMatch) {
this.httpPort = httpAddressMatch[1];
new NativeRealm(options.password, this.httpPort, this._log).setPasswords(options);
}
});
});

// forward es stderr to the log
this._process.stderr.on('data', data => this._log.error(chalk.red(data.toString())));

this._outcome = new Promise((resolve, reject) => {
this._process.once('exit', code => {
if (this._stopCalled) {
resolve();
return;
}
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
if (code > 0 && !(code === 143 || code === 130)) {
reject(createCliError(`ES exited with code ${code}`));
} else {
resolve();
}
});
// observe the exit code of the process and reflect in _outcome promies
const exitCode = new Promise(resolve => this._process.once('exit', resolve));
this._outcome = exitCode.then(code => {
if (this._stopCalled) {
return;
}

// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat them as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`ES exited with code ${code}`);
}
});
}
};
46 changes: 43 additions & 3 deletions packages/kbn-es/src/integration_tests/__fixtures__/es_bin.js
Expand Up @@ -19,10 +19,50 @@
* under the License.
*/

const { createServer } = require('http');
const { exitCode, start } = JSON.parse(process.argv[2]);

if (start) {
console.log('started');
process.exitCode = exitCode;

if (!start) {
return;
}

process.exitCode = exitCode;
let serverUrl;
const server = createServer((req, res) => {
const url = new URL(req.url, serverUrl);
const send = (code, body) => {
res.writeHead(code, { 'content-type': 'application/json' });
res.end(JSON.stringify(body));
};

if (url.pathname === '/_xpack') {
return send(400, {
error: {
reason: 'foo bar',
},
});
}

return send(404, {
error: {
reason: 'not found',
},
});
});

// setup server auto close after 1 second of silence
let serverCloseTimer;
const delayServerClose = () => {
clearTimeout(serverCloseTimer);
serverCloseTimer = setTimeout(() => server.close(), 1000);
};
server.on('request', delayServerClose);
server.on('listening', delayServerClose);

server.listen(0, '127.0.0.1', function() {
serverUrl = new URL('http://127.0.0.1');
serverUrl.port = server.address().port;
console.log(`HttpServer publish_address {${serverUrl.hostname}:${serverUrl.port}}`);
spalger marked this conversation as resolved.
Show resolved Hide resolved
console.log('started');
});
165 changes: 83 additions & 82 deletions packages/kbn-es/src/utils/native_realm.js
@@ -1,82 +1,83 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const { Client } = require('@elastic/elasticsearch');
const chalk = require('chalk');

const { log: defaultLog } = require('./log');

exports.NativeRealm = class NativeRealm {
constructor(elasticPassword, port, log = defaultLog) {
this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` });
this._elasticPassword = elasticPassword;
this._log = log;
}

async setPassword(username, password = this._elasticPassword) {
this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`);

try {
await this._client.security.changePassword({
username,
refresh: 'wait_for',
body: {
password,
},
});
} catch (e) {
this._log.error(
chalk.red(`unable to set password for ${chalk.bold(username)}: ${e.message}`)
);
}
}

async setPasswords(options) {
if (!(await this.isSecurityEnabled())) {
this._log.info('security is not enabled, unable to set native realm passwords');
return;
}

(await this.getReservedUsers()).forEach(user => {
this.setPassword(user, options[`password.${user}`]);
});
}

async getReservedUsers() {
const users = await this._client.security.getUser();

return Object.keys(users.body).reduce((acc, user) => {
if (users.body[user].metadata._reserved === true) {
acc.push(user);
}
return acc;
}, []);
}

async isSecurityEnabled() {
try {
const {
body: { features },
} = await this._client.xpack.info({ categories: 'features' });
return features.security && features.security.enabled && features.security.available;
} catch (e) {
return false;
}
}
};
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const { Client } = require('@elastic/elasticsearch');
const chalk = require('chalk');

const { log: defaultLog } = require('./log');

exports.NativeRealm = class NativeRealm {
constructor(elasticPassword, port, log = defaultLog) {
this._client = new Client({ node: `http://elastic:${elasticPassword}@localhost:${port}` });
this._elasticPassword = elasticPassword;
this._log = log;
}

async setPassword(username, password = this._elasticPassword) {
this._log.info(`setting ${chalk.bold(username)} password to ${chalk.bold(password)}`);

await this._client.security.changePassword({
username,
refresh: 'wait_for',
body: {
password,
},
});
}

async setPasswords(options) {
if (!(await this.isSecurityEnabled())) {
this._log.info('security is not enabled, unable to set native realm passwords');
return;
}

const reservedUsers = await this.getReservedUsers();
await Promise.all(
reservedUsers.map(async user => {
await this.setPassword(user, options[`password.${user}`]);
})
);
}

async getReservedUsers() {
const users = await this._client.security.getUser();

return Object.keys(users.body).reduce((acc, user) => {
if (users.body[user].metadata._reserved === true) {
acc.push(user);
}
return acc;
}, []);
}

async isSecurityEnabled() {
try {
const {
body: { features },
} = await this._client.xpack.info({ categories: 'features' });
return features.security && features.security.enabled && features.security.available;
} catch (error) {
if (error.meta && error.meta.statusCode === 400) {
return false;
}

throw error;
}
}
};