Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Release 0.29 backports (#3120)
Browse files Browse the repository at this point in the history
* Turn down noisy per reconcilation per address logging (#3085)

* Restrict the number of active promises when creating/deleting entities on the brokers

Fix #3090

* Restrict the number of active promises when creating/deleting entities on the routers (#3095)

Fix #3090

* Fix #3086: Adjust node max_old_space_size to a percentage of available container memory (#3099)

* Correct address status equality check (#3094)

* Fix #3089: Correct address status equality check

* address review comments
* made same_allocation same implementation symmetric

* Fix #3092: Turn off EventEmitter#maxListeners check for router management connection (#3093)

* Ensure that addresses are not synced until addresses have been defined

As proposed in #3101

* Fix unit tests

* Start the watcher after the listeners are in place

Signed-off-by: Vanessa <vbusch@redhat.com>

* Improve api server address create performance (#3114)

This change improves api server address create performance by almost an order of magnitude when 1000 addresses are defined.
The changes moves the validation of spec.address to standard-controller for the standard address space. For the brokered address space, the validation remains in the api-server, as it would require adding write-back capability to the agent. Future refactoring/consolidation of agent/standard-controller should incorporate this validation.

Fixes #3111
  • Loading branch information
Ulf Lilleengen committed Aug 23, 2019
1 parent 857c622 commit a7c4455
Show file tree
Hide file tree
Showing 16 changed files with 314 additions and 57 deletions.
1 change: 1 addition & 0 deletions agent/bin/agent.js
Expand Up @@ -68,6 +68,7 @@ function start(env) {
ragent.start_listening(env);
ragent.listen_health({HEALTH_PORT:8888});
}
address_source.start();

process.on('SIGTERM', function () {
log.info('Shutdown started');
Expand Down
18 changes: 18 additions & 0 deletions agent/bin/launch_node.sh
@@ -1,5 +1,8 @@
#!/usr/bin/env bash

OLD_SPACE_SIZE_PERCENT=${OLD_SPACE_SIZE_PERCENT:-80}
CGROUP_FILE='/sys/fs/cgroup/memory/memory.limit_in_bytes'

# Ensure that we have a suitable node interpreter. This ought to be declared declaratively in the package.json, within an engines section,
# but we currently don't install the package so this wouldn't be enforced.
REQUIRED_NODE_MAJOR=6
Expand All @@ -13,4 +16,19 @@ if [[ ! -z "${_NODE_OPTIONS}" ]]; then
set -- ${_NODE_OPTIONS} "${@}"
fi

# Set max_old_space_size w.r.t the container's memory, unless caller has supplied --max_old_space_size
for arg; do
if [[ "${arg}" =~ ^--max_old_space_size.* ]]; then
OLD_SPACE_SIZE_PERCENT=0
break
fi
done

if [[ -f "${CGROUP_FILE}" && "${OLD_SPACE_SIZE_PERCENT}" -gt 0 ]]; then
CONTAINTER_BYTES=$(cat ${CGROUP_FILE})
MAX_OLD_SPACE_SIZE=$(( ${CONTAINTER_BYTES} / 100 * ${OLD_SPACE_SIZE_PERCENT} ))
MAX_OLD_SPACE_SIZE_MI=$(( ${MAX_OLD_SPACE_SIZE} / ( 1024 * 1024 ) ))
set -- "--max_old_space_size=${MAX_OLD_SPACE_SIZE_MI}" "${@}"
fi

exec node "${@}"
11 changes: 7 additions & 4 deletions agent/lib/broker_controller.js
Expand Up @@ -22,6 +22,7 @@ var rhea = require('rhea');
var artemis = require('./artemis.js');
var myevents = require('./events.js');
var myutils = require('./utils.js');
var plimit = require('p-limit');

function BrokerController(event_sink) {
events.EventEmitter.call(this);
Expand Down Expand Up @@ -393,8 +394,9 @@ BrokerController.prototype.delete_address_and_settings = function (a) {
};

BrokerController.prototype.delete_addresses = function (addresses) {
var self = this;
return Promise.all(addresses.map(function (a) { return self.delete_address(a); }));
var limit = plimit(250);
let delete_fn = limit.bind(null, this.delete_address.bind(this));
return Promise.all(addresses.map(delete_fn));
};

BrokerController.prototype.create_address = function (a) {
Expand Down Expand Up @@ -442,8 +444,9 @@ BrokerController.prototype.create_address_and_settings = function (a, settings)
};

BrokerController.prototype.create_addresses = function (addresses) {
var self = this;
return Promise.all(addresses.map(function (a) { return self.create_address(a); }));
var limit = plimit(250);
let create_fn = limit.bind(null, this.create_address.bind(this));
return Promise.all(addresses.map(create_fn));
};

BrokerController.prototype.check_broker_addresses = function () {
Expand Down
28 changes: 26 additions & 2 deletions agent/lib/internal_address_source.js
Expand Up @@ -64,6 +64,11 @@ function ready (addr) {
}

function same_allocation(a, b) {
if (a === b) {
return true;
} else if (a == null || b == null || a.length !== b.length) {
return false;
}
for (var i in a) {
var equal = false;
for (var j in b) {
Expand All @@ -79,6 +84,21 @@ function same_allocation(a, b) {
return true;
}

function same_messages(a, b) {
if (a === b) {
return true;
} else if (a == null || b == null || a.length !== b.length) {
return false;
}

for (var i in a) {
if (!b.includes(a[i])) {
return false;
}
}
return true;
}

function same_address_definition(a, b) {
if (a.address === b.address && a.type === b.type && same_allocation(a.allocated_to, b.allocated_to)) {
log.info('allocation changed for %s %s: %s <-> %s', a.type, a.address, JSON.stringify(a.allocated_to), JSON.stringify(b.allocated_to));
Expand All @@ -88,7 +108,7 @@ function same_address_definition(a, b) {

function same_address_status(a, b) {
if (a === undefined) return b === undefined;
return a.isReady === b.isReady && a.phase === b.phase && a.message === b.message;
return a.isReady === b.isReady && a.phase === b.phase && same_messages(a.messages, b.messages);
}

function same_address_definition_and_status(a, b) {
Expand Down Expand Up @@ -122,8 +142,12 @@ function AddressSource(config) {
if (config.INFRA_UUID) {
selector += ",infraUuid=" + config.INFRA_UUID;
}
var options = myutils.merge({selector: selector}, this.config);
this.selector = selector;
events.EventEmitter.call(this);
}

AddressSource.prototype.start = function() {
var options = myutils.merge({selector: this.selector}, this.config);
this.watcher = kubernetes.watch('configmaps', options);
this.watcher.on('updated', this.updated.bind(this));
this.readiness = {};
Expand Down
2 changes: 2 additions & 0 deletions agent/lib/qdr.js
Expand Up @@ -54,6 +54,8 @@ var Router = function (connection, router, agent) {
self.health_check();
}, interval);
}

this.connection.setMaxListeners(0);
this.connection.on('receiver_open', this.ready.bind(this));
this.connection.on('disconnected', this.disconnected.bind(this));
this.connection.on('sender_error', this.on_sender_error.bind(this));
Expand Down
11 changes: 9 additions & 2 deletions agent/lib/ragent.js
Expand Up @@ -63,6 +63,7 @@ function Ragent() {
this.configure_handlers();
this.status = new events.EventEmitter();
this.disable_connectivity = false;
this.addresses_initialised = false;
}

Ragent.prototype.subscribe = function (context) {
Expand Down Expand Up @@ -156,6 +157,7 @@ function is_valid_address_definition(def) {
}

Ragent.prototype.sync_addresses = function (updated) {
this.addresses_initialised = true;
this.addresses = updated.filter(is_valid_address_definition);
log.debug('addresses updated: %j', this.addresses);
this.addresses_updated();
Expand Down Expand Up @@ -342,7 +344,9 @@ Ragent.prototype.configure_handlers = function () {
router.retrieve_listeners();
router.retrieve_connectors();
router.on('synchronized', self.on_synchronized.bind(self));
router.sync_addresses(self.addresses);
if (self.addresses_initialised) {
router.sync_addresses(self.addresses);
}
router.on('listeners_updated', self.connected_routers_updated.bind(self));//advertise only once have listeners
router.on('connectors_updated', self.check_router_connectors.bind(self));
router.on('provisioned', self.check_router_connectors.bind(self));
Expand All @@ -351,7 +355,9 @@ Ragent.prototype.configure_handlers = function () {
var broker = broker_controller.create_controller(context.connection, self.event_sink);
self.connected_brokers[broker.id] = broker;
log.info('broker %s connected', broker.id);
self.sync_broker(broker);
if (self.addresses_initialised) {
self.sync_broker(broker);
}
broker.on('synchronized', self.on_synchronized.bind(self));
context.connection.on('disconnected', self.on_broker_disconnect.bind(self));
context.connection.on('connection_close', self.on_broker_disconnect.bind(self));
Expand Down Expand Up @@ -398,6 +404,7 @@ Ragent.prototype.listen = function (options) {
Ragent.prototype.subscribe_to_addresses = function (env) {
var address_source = new AddressSource(env);
address_source.on('addresses_ready', this.sync_addresses.bind(this));
address_source.start();
return address_source.watcher;
};

Expand Down
14 changes: 8 additions & 6 deletions agent/lib/router_config.js
Expand Up @@ -19,6 +19,7 @@ var util = require('util');
var qdr = require('./qdr.js');
var myutils = require('./utils.js');
var log = require('./log.js').logger();
var plimit = require('p-limit');

const ID_QUALIFIER = 'ragent-';
const MAX_RETRIES = 3;
Expand Down Expand Up @@ -268,14 +269,15 @@ function ensure_elements(entity, desired, router, collected) {
let missing = delta.added.concat(delta.modified);

if (stale.length || missing.length) {
let delete_fn = delete_config_element.bind(null, router, entity);
let create_fn = create_config_element.bind(null, router, entity);
var limit = plimit(250);
let delete_fn = limit.bind(null, delete_config_element.bind(null, router, entity));
let create_fn = limit.bind(null, create_config_element.bind(null, router, entity));
return Promise.all(stale.map(delete_fn)).then(
function (deletions) {
report(entity, stale, deletions, actual, 'deleted')
report(entity, stale, deletions, actual, 'deleted');
return Promise.all(missing.map(create_fn)).then(
function (creations) {
report(entity, missing, creations, actual, 'created')
report(entity, missing, creations, actual, 'created');
return false;//recheck when changed
}
).catch(function (error) {
Expand Down Expand Up @@ -346,7 +348,7 @@ function desired_address_config(high_level_address_definitions) {
if (def.type === 'queue') {
config.add_address({prefix:def.address, distribution:'balanced', waypoint:true});
if (def.allocated_to) {
log.info("Constructing config for queue " + def.address + " allocated to: " + JSON.stringify(def.allocated_to));
log.debug("Constructing config for queue %s allocated to: %j", def.address, def.allocated_to);
for (var j in def.allocated_to) {
var brokerStatus = def.allocated_to[j];
if (brokerStatus.state === 'Active') {
Expand All @@ -358,7 +360,7 @@ function desired_address_config(high_level_address_definitions) {
}
}
} else {
log.info("Constructing old config for queue " + def.address);
log.debug("Constructing old config for queue %s", def.address);
config.add_autolink_pair({addr:def.address, containerId: def.address});
}
} else if (def.type === 'topic') {
Expand Down
1 change: 1 addition & 0 deletions agent/package.json
Expand Up @@ -11,6 +11,7 @@
"dependencies": {
"debug": "^3.1.*",
"openid-client": "^2.4.5",
"p-limit": "^2.2.0",
"rhea": ">=0.2.11",
"simple-oauth2": "^2.2.1",
"ws": "*"
Expand Down
11 changes: 11 additions & 0 deletions agent/test/internal_address_source.js
Expand Up @@ -46,6 +46,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_definition({address:'bar', type:'topic'}, undefined, '1234');
configmaps.add_address_definition({address:'baz', type:'queue'}, undefined, "4321");
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'});
source.start();
source.watcher.close();//prevents watching
source.on('addresses_defined', function (addresses) {
assert.equal(addresses.length, 2);
Expand All @@ -62,6 +63,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_definition({address:'bar', type:'topic'}, undefined, '1234', {'enmasse.io/broker-id':'broker-2'});
configmaps.add_address_definition({address:'baz', type:'anycast'}, undefined, '1234');
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'});
source.start();
source.watcher.close();//prevents watching
source.on('addresses_defined', function (addresses) {
assert.equal(addresses.length, 3);
Expand All @@ -80,6 +82,7 @@ describe('configmap backed address source', function() {
});
it('watches for changes', function(done) {
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'});
source.start();
source.once('addresses_defined', function () {
setTimeout(function () {
configmaps.add_address_definition({address:'foo', type:'queue'}, undefined, '1234');
Expand Down Expand Up @@ -113,6 +116,7 @@ describe('configmap backed address source', function() {
}
};
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'});
source.start();
source.once('addresses_defined', function () {
setTimeout(function () {
configmaps.add_address_definition({address:'foo', type:'queue'}, undefined, '1234');
Expand All @@ -139,6 +143,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_definition({address:'foo', type:'queue'}, undefined, '1234');
configmaps.add_address_definition({address:'bar', type:'topic'}, undefined, '1234');
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'});
source.start();
source.watcher.close();
source.on('addresses_defined', function (addresses) {
source.check_status({foo:{propagated:100}}).then(function () {
Expand All @@ -152,6 +157,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_definition({address:'foo', type:'queue'});
configmaps.add_address_definition({address:'bar', type:'topic'});
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default'});
source.start();
source.once('addresses_defined', function (addresses) {
source.check_status({foo:{propagated:100}}).then(function () {
configmaps.remove_resource_by_name('configmaps', 'foo');
Expand Down Expand Up @@ -190,6 +196,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_plan({plan_name:'bar', address_type:'topic'});
configmaps.add_address_plan({plan_name:'standard', address_type:'anycast', display_name:'display me', shortDescription:'abcdefg', longDescription:'hijklmn'});
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'});
source.start();
source.watcher.close();
source.get_address_types().then(function (types) {
var queue = remove_by_name(types, 'queue');
Expand Down Expand Up @@ -223,6 +230,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_plan({plan_name:'large', address_type:'queue', displayOrder:12});
configmaps.add_address_plan({plan_name:'small', address_type:'queue', displayOrder:10});
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'});
source.start();
source.watcher.close();
source.get_address_types().then(function (types) {
assert.equal(types[0].name, 'queue');
Expand Down Expand Up @@ -250,6 +258,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_plan({plan_name:'large', address_type:'queue', displayOrder:12});
configmaps.add_address_plan({plan_name:'small', address_type:'queue', displayOrder:0});
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'});
source.start();
source.watcher.close();
source.get_address_types().then(function (types) {
assert.equal(types[0].name, 'queue');
Expand Down Expand Up @@ -311,6 +320,7 @@ describe('configmap backed address source', function() {
configmaps.add_config_map('baz', {type:'address-config'}, {'config.json': '{bad:x[!'});
configmaps.add_address_definition({address:'bar', type:'topic'});
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default'});
source.start();
source.watcher.close();//prevents watching
source.on('addresses_defined', function (addresses) {
assert.equal(addresses.length, 2);
Expand All @@ -331,6 +341,7 @@ describe('configmap backed address source', function() {
configmaps.add_address_plan({plan_name:'bar', address_type:'topic'});
configmaps.add_address_plan({plan_name:'standard', address_type:'anycast', display_name:'display me', shortDescription:'abcdefg', longDescription:'hijklmn'});
var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'});
source.start();
var plans = source.get_address_types();
source.once('addresses_defined', function (addresses) {
configmaps.add_address_definition({address:'foo', type:'queue'}, 'address-config-foo');
Expand Down
2 changes: 2 additions & 0 deletions agent/test/ragent.js
Expand Up @@ -806,6 +806,7 @@ describe('cooperating ragent group', function() {
routers = routers.concat(fill(3, new_router));
//inform ragent instances of each other
groups.forEach(function (g) {
g.ragent.sync_addresses([]);
podserver.add_resource('pods', g.get_pod_definition());
});
groups.forEach(function(g) {
Expand All @@ -826,6 +827,7 @@ describe('cooperating ragent group', function() {
});
//inform ragent instances of each other
groups.forEach(function (g) {
g.ragent.sync_addresses([]);
podserver.add_resource('pods', g.get_pod_definition());
});
groups.forEach(function(g) {
Expand Down
17 changes: 12 additions & 5 deletions api-server/src/main/java/io/enmasse/api/v1/AddressApiHelper.java
Expand Up @@ -68,11 +68,18 @@ public AddressList getAllAddressesWithLabels(final Map<String, String> labels) t

private void validateAddress(AddressSpace addressSpace, Address address) {
AddressResolver addressResolver = getAddressResolver(addressSpace);
Set<Address> existingAddresses = addressSpaceApi.withAddressSpace(addressSpace).listAddresses(address.getMetadata().getNamespace());
addressResolver.validate(address);
for (Address existing : existingAddresses) {
if (address.getSpec().getAddress().equals(existing.getSpec().getAddress()) && !address.getMetadata().getName().equals(existing.getMetadata().getName())) {
throw new BadRequestException("Address '" + address.getSpec().getAddress() + "' already exists with resource name '" + existing.getMetadata().getName() + "'");

/*
Brokered address space has no operator that manipulates address phase and readiness, so we need to perform validation at API server.
For the standard address space, the validation is done in AddressController#onUpdate in order to avoid slowing down the request.
*/
if (addressSpace.getSpec().getType().equals("brokered")) {
Set<Address> existingAddresses = addressSpaceApi.withAddressSpace(addressSpace).listAddresses(address.getMetadata().getNamespace());
addressResolver.validate(address);
for (Address existing : existingAddresses) {
if (address.getSpec().getAddress().equals(existing.getSpec().getAddress()) && !address.getMetadata().getName().equals(existing.getMetadata().getName())) {
throw new BadRequestException("Address '" + address.getSpec().getAddress() + "' already exists with resource name '" + existing.getMetadata().getName() + "'");
}
}
}
}
Expand Down
Expand Up @@ -398,7 +398,7 @@ public void testSchemaApi(String apiVersion, VertxTestContext context) throws Th
JsonObject data = buffer.toJsonObject();
System.out.println(data.toString());
assertTrue(data.containsKey("items"));
assertEquals(1, data.getJsonArray("items").size());
assertEquals(2, data.getJsonArray("items").size());
});
});

Expand Down

0 comments on commit a7c4455

Please sign in to comment.