Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Release 0.26 agent fixes (#3108)
Browse files Browse the repository at this point in the history
* Backport address allocation equality fix

Backport of commit 228fa4e

* Turn down noisy per reconcilation per address logging (#3085)

* Restrict the number of active promises when creating/deleting entities on the routers (#3095)

Fix #3090

* Fix #3086: Adjust node max_old_space_size to a percentage of available container memory (#3099)

* Correct address status equality check (#3094)

* Fix #3089: Correct address status equality check

* address review comments
* made same_allocation same implementation symmetric

* Fix #3092: Turn off EventEmitter#maxListeners check for router management connection (#3093)

* Restrict the number of active promises when creating/deleting entities on the brokers

Fix #3090

* Ensure that addresses are not synced until addresses have been defined

As proposed in #3101

* Fix unit tests

* Start the watcher after the listeners are in place

Signed-off-by: Vanessa <vbusch@redhat.com>

* Fix test

Signed-off-by: Vanessa <vbusch@redhat.com>
  • Loading branch information
Ulf Lilleengen committed Aug 21, 2019
1 parent 1cba083 commit 3f2e925
Show file tree
Hide file tree
Showing 11 changed files with 166 additions and 20 deletions.
6 changes: 6 additions & 0 deletions agent/bin/agent.js
Expand Up @@ -15,6 +15,7 @@
*/
'use strict';

var v8 = require('v8');
var log = require("../lib/log.js").logger();
var AddressSource = require('../lib/internal_address_source.js');
var BrokerAddressSettings = require('../lib/broker_address_settings.js');
Expand Down Expand Up @@ -59,6 +60,11 @@ function start(env) {
ragent.start_listening(env);
ragent.listen_health({HEALTH_PORT:8888});
}
address_source.start();

setInterval(() => {
log.info("Heap statistics : %j", v8.getHeapStatistics());
}, 60000);

process.on('SIGTERM', function () {
log.info('Shutdown started');
Expand Down
18 changes: 18 additions & 0 deletions agent/bin/launch_node.sh
@@ -1,5 +1,8 @@
#!/usr/bin/env bash

OLD_SPACE_SIZE_PERCENT=${OLD_SPACE_SIZE_PERCENT:-80}
CGROUP_FILE='/sys/fs/cgroup/memory/memory.limit_in_bytes'

# Ensure that we have a suitable node interpreter. This ought to be declared declaratively in the package.json, within an engines section,
# but we currently don't install the package so this wouldn't be enforced.
REQUIRED_NODE_MAJOR=6
Expand All @@ -13,4 +16,19 @@ if [[ ! -z "${_NODE_OPTIONS}" ]]; then
set -- ${_NODE_OPTIONS} "${@}"
fi

# Set max_old_space_size w.r.t the container's memory, unless caller has supplied --max_old_space_size
for arg; do
if [[ "${arg}" =~ ^--max_old_space_size.* ]]; then
OLD_SPACE_SIZE_PERCENT=0
break
fi
done

if [[ -f "${CGROUP_FILE}" && "${OLD_SPACE_SIZE_PERCENT}" -gt 0 ]]; then
CONTAINTER_BYTES=$(cat ${CGROUP_FILE})
MAX_OLD_SPACE_SIZE=$(( ${CONTAINTER_BYTES} / 100 * ${OLD_SPACE_SIZE_PERCENT} ))
MAX_OLD_SPACE_SIZE_MI=$(( ${MAX_OLD_SPACE_SIZE} / ( 1024 * 1024 ) ))
set -- "--max_old_space_size=${MAX_OLD_SPACE_SIZE_MI}" "${@}"
fi

exec node "${@}"
11 changes: 7 additions & 4 deletions agent/lib/broker_controller.js
Expand Up @@ -22,6 +22,7 @@ var rhea = require('rhea');
var artemis = require('./artemis.js');
var myevents = require('./events.js');
var myutils = require('./utils.js');
var plimit = require('p-limit');

function BrokerController(event_sink) {
events.EventEmitter.call(this);
Expand Down Expand Up @@ -391,8 +392,9 @@ BrokerController.prototype.delete_address_and_settings = function (a) {
};

BrokerController.prototype.delete_addresses = function (addresses) {
var self = this;
return Promise.all(addresses.map(function (a) { return self.delete_address(a); }));
var limit = plimit(250);
let delete_fn = limit.bind(null, this.delete_address.bind(this));
return Promise.all(addresses.map(delete_fn));
};

BrokerController.prototype.create_address = function (a) {
Expand Down Expand Up @@ -440,8 +442,9 @@ BrokerController.prototype.create_address_and_settings = function (a, settings)
};

BrokerController.prototype.create_addresses = function (addresses) {
var self = this;
return Promise.all(addresses.map(function (a) { return self.create_address(a); }));
var limit = plimit(250);
let create_fn = limit.bind(null, this.create_address.bind(this));
return Promise.all(addresses.map(create_fn));
};

BrokerController.prototype.check_broker_addresses = function () {
Expand Down
36 changes: 30 additions & 6 deletions agent/lib/internal_address_source.js
Expand Up @@ -64,11 +64,16 @@ function ready (addr) {
}

function same_allocation(a, b) {
if (a === b) {
return true;
} else if (a == null || b == null || a.length !== b.length) {
return false;
}
for (var i in a) {
var equal = false;
for (var j in b) {
if (a[i].containerId == b[j].containerId && a[i].clusterId == b[j].clusterId && a[i].state == b[j].state) {
equal = false;
if (a[i].containerId === b[j].containerId && a[i].clusterId === b[j].clusterId && a[i].state === b[j].state) {
equal = true;
break;
}
}
Expand All @@ -79,16 +84,31 @@ function same_allocation(a, b) {
return true;
}

function same_messages(a, b) {
if (a === b) {
return true;
} else if (a == null || b == null || a.length !== b.length) {
return false;
}

for (var i in a) {
if (!b.includes(a[i])) {
return false;
}
}
return true;
}

function same_address_definition(a, b) {
if (a.address === b.address && a.type === b.type && same_allocation(a.allocated_to, b.allocated_to)) {
if (a.address === b.address && a.type === b.type && !same_allocation(a.allocated_to, b.allocated_to)) {
log.info('allocation changed for %s %s: %s <-> %s', a.type, a.address, JSON.stringify(a.allocated_to), JSON.stringify(b.allocated_to));
}
return a.address === b.address && a.type === b.type && a.allocated_to === b.allocated_to;
return a.address === b.address && a.type === b.type && same_allocation(a.allocated_to, b.allocated_to);
}

function same_address_status(a, b) {
if (a === undefined) return b === undefined;
return a.isReady === b.isReady && a.phase === b.phase && a.message === b.message;
return a.isReady === b.isReady && a.phase === b.phase && same_messages(a.messages, b.messages);
}

function same_address_definition_and_status(a, b) {
Expand Down Expand Up @@ -122,8 +142,12 @@ function AddressSource(config) {
if (config.INFRA_UUID) {
selector += ",infraUuid=" + config.INFRA_UUID;
}
var options = myutils.merge({selector: selector}, this.config);
this.selector = selector;
events.EventEmitter.call(this);
}

AddressSource.prototype.start = function() {
var options = myutils.merge({selector: this.selector}, this.config);
this.watcher = kubernetes.watch('configmaps', options);
this.watcher.on('updated', this.updated.bind(this));
this.readiness = {};
Expand Down
2 changes: 2 additions & 0 deletions agent/lib/qdr.js
Expand Up @@ -54,6 +54,8 @@ var Router = function (connection, router, agent) {
self.health_check();
}, interval);
}

this.connection.setMaxListeners(0);
this.connection.on('receiver_open', this.ready.bind(this));
this.connection.on('disconnected', this.disconnected.bind(this));
this.connection.on('sender_error', this.on_sender_error.bind(this));
Expand Down
11 changes: 9 additions & 2 deletions agent/lib/ragent.js
Expand Up @@ -63,6 +63,7 @@ function Ragent() {
this.configure_handlers();
this.status = new events.EventEmitter();
this.disable_connectivity = false;
this.addresses_initialised = false;
}

Ragent.prototype.subscribe = function (context) {
Expand Down Expand Up @@ -156,6 +157,7 @@ function is_valid_address_definition(def) {
}

Ragent.prototype.sync_addresses = function (updated) {
this.addresses_initialised = true;
this.addresses = updated.filter(is_valid_address_definition);
log.debug('addresses updated: %j', this.addresses);
this.addresses_updated();
Expand Down Expand Up @@ -342,7 +344,9 @@ Ragent.prototype.configure_handlers = function () {
router.retrieve_listeners();
router.retrieve_connectors();
router.on('synchronized', self.on_synchronized.bind(self));
router.sync_addresses(self.addresses);
if (self.addresses_initialised) {
router.sync_addresses(self.addresses);
}
router.on('listeners_updated', self.connected_routers_updated.bind(self));//advertise only once have listeners
router.on('connectors_updated', self.check_router_connectors.bind(self));
router.on('provisioned', self.check_router_connectors.bind(self));
Expand All @@ -351,7 +355,9 @@ Ragent.prototype.configure_handlers = function () {
var broker = broker_controller.create_controller(context.connection, self.event_sink);
self.connected_brokers[broker.id] = broker;
log.info('broker %s connected', broker.id);
self.sync_broker(broker);
if (self.addresses_initialised) {
self.sync_broker(broker);
}
broker.on('synchronized', self.on_synchronized.bind(self));
context.connection.on('disconnected', self.on_broker_disconnect.bind(self));
context.connection.on('connection_close', self.on_broker_disconnect.bind(self));
Expand Down Expand Up @@ -398,6 +404,7 @@ Ragent.prototype.listen = function (options) {
Ragent.prototype.subscribe_to_addresses = function (env) {
var address_source = new AddressSource(env);
address_source.on('addresses_ready', this.sync_addresses.bind(this));
address_source.start();
return address_source.watcher;
};

Expand Down
14 changes: 8 additions & 6 deletions agent/lib/router_config.js
Expand Up @@ -19,6 +19,7 @@ var util = require('util');
var qdr = require('./qdr.js');
var myutils = require('./utils.js');
var log = require('./log.js').logger();
var plimit = require('p-limit');

const ID_QUALIFIER = 'ragent-';
const MAX_RETRIES = 3;
Expand Down Expand Up @@ -268,14 +269,15 @@ function ensure_elements(entity, desired, router, collected) {
let missing = delta.added.concat(delta.modified);

if (stale.length || missing.length) {
let delete_fn = delete_config_element.bind(null, router, entity);
let create_fn = create_config_element.bind(null, router, entity);
var limit = plimit(250);
let delete_fn = limit.bind(null, delete_config_element.bind(null, router, entity));
let create_fn = limit.bind(null, create_config_element.bind(null, router, entity));
return Promise.all(stale.map(delete_fn)).then(
function (deletions) {
report(entity, stale, deletions, actual, 'deleted')
report(entity, stale, deletions, actual, 'deleted');
return Promise.all(missing.map(create_fn)).then(
function (creations) {
report(entity, missing, creations, actual, 'created')
report(entity, missing, creations, actual, 'created');
return false;//recheck when changed
}
).catch(function (error) {
Expand Down Expand Up @@ -346,7 +348,7 @@ function desired_address_config(high_level_address_definitions) {
if (def.type === 'queue') {
config.add_address({prefix:def.address, distribution:'balanced', waypoint:true});
if (def.allocated_to) {
log.info("Constructing config for queue " + def.address + " allocated to: " + JSON.stringify(def.allocated_to));
log.debug("Constructing config for queue %s allocated to: %j", def.address, def.allocated_to);
for (var j in def.allocated_to) {
var brokerStatus = def.allocated_to[j];
if (brokerStatus.state === 'Active') {
Expand All @@ -358,7 +360,7 @@ function desired_address_config(high_level_address_definitions) {
}
}
} else {
log.info("Constructing old config for queue " + def.address);
log.debug("Constructing old config for queue %s", def.address);
config.add_autolink_pair({addr:def.address, containerId: def.address});
}
} else if (def.type === 'topic') {
Expand Down
70 changes: 69 additions & 1 deletion agent/npm-shrinkwrap.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions agent/package.json
Expand Up @@ -10,6 +10,7 @@
},
"dependencies": {
"debug": "^3.1.*",
"p-limit": "^2.2.0",
"rhea": ">=0.2.11",
"keycloak-connect": "3.4.3",
"ws": "*"
Expand Down

0 comments on commit 3f2e925

Please sign in to comment.