diff --git a/agent/bin/agent.js b/agent/bin/agent.js index 3ee961855bf..8cc4d8cb046 100644 --- a/agent/bin/agent.js +++ b/agent/bin/agent.js @@ -68,6 +68,7 @@ function start(env) { ragent.start_listening(env); ragent.listen_health({HEALTH_PORT:8888}); } + address_source.start(); process.on('SIGTERM', function () { log.info('Shutdown started'); diff --git a/agent/bin/launch_node.sh b/agent/bin/launch_node.sh index 23f4f752ea2..ad257fdd2dc 100755 --- a/agent/bin/launch_node.sh +++ b/agent/bin/launch_node.sh @@ -1,5 +1,8 @@ #!/usr/bin/env bash +OLD_SPACE_SIZE_PERCENT=${OLD_SPACE_SIZE_PERCENT:-80} +CGROUP_FILE='/sys/fs/cgroup/memory/memory.limit_in_bytes' + # Ensure that we have a suitable node interpreter. This ought to be declared declaratively in the package.json, within an engines section, # but we currently don't install the package so this wouldn't be enforced. REQUIRED_NODE_MAJOR=6 @@ -13,4 +16,19 @@ if [[ ! -z "${_NODE_OPTIONS}" ]]; then set -- ${_NODE_OPTIONS} "${@}" fi +# Set max_old_space_size w.r.t the container's memory, unless caller has supplied --max_old_space_size +for arg; do + if [[ "${arg}" =~ ^--max_old_space_size.* ]]; then + OLD_SPACE_SIZE_PERCENT=0 + break + fi +done + +if [[ -f "${CGROUP_FILE}" && "${OLD_SPACE_SIZE_PERCENT}" -gt 0 ]]; then + CONTAINTER_BYTES=$(cat ${CGROUP_FILE}) + MAX_OLD_SPACE_SIZE=$(( ${CONTAINTER_BYTES} / 100 * ${OLD_SPACE_SIZE_PERCENT} )) + MAX_OLD_SPACE_SIZE_MI=$(( ${MAX_OLD_SPACE_SIZE} / ( 1024 * 1024 ) )) + set -- "--max_old_space_size=${MAX_OLD_SPACE_SIZE_MI}" "${@}" +fi + exec node "${@}" diff --git a/agent/lib/broker_controller.js b/agent/lib/broker_controller.js index 7a2f607aa63..0e0643005eb 100644 --- a/agent/lib/broker_controller.js +++ b/agent/lib/broker_controller.js @@ -22,6 +22,7 @@ var rhea = require('rhea'); var artemis = require('./artemis.js'); var myevents = require('./events.js'); var myutils = require('./utils.js'); +var plimit = require('p-limit'); function BrokerController(event_sink) { events.EventEmitter.call(this); @@ -393,8 +394,9 @@ BrokerController.prototype.delete_address_and_settings = function (a) { }; BrokerController.prototype.delete_addresses = function (addresses) { - var self = this; - return Promise.all(addresses.map(function (a) { return self.delete_address(a); })); + var limit = plimit(250); + let delete_fn = limit.bind(null, this.delete_address.bind(this)); + return Promise.all(addresses.map(delete_fn)); }; BrokerController.prototype.create_address = function (a) { @@ -442,8 +444,9 @@ BrokerController.prototype.create_address_and_settings = function (a, settings) }; BrokerController.prototype.create_addresses = function (addresses) { - var self = this; - return Promise.all(addresses.map(function (a) { return self.create_address(a); })); + var limit = plimit(250); + let create_fn = limit.bind(null, this.create_address.bind(this)); + return Promise.all(addresses.map(create_fn)); }; BrokerController.prototype.check_broker_addresses = function () { diff --git a/agent/lib/internal_address_source.js b/agent/lib/internal_address_source.js index f7e6f9fde79..c7105ddde9f 100644 --- a/agent/lib/internal_address_source.js +++ b/agent/lib/internal_address_source.js @@ -64,6 +64,11 @@ function ready (addr) { } function same_allocation(a, b) { + if (a === b) { + return true; + } else if (a == null || b == null || a.length !== b.length) { + return false; + } for (var i in a) { var equal = false; for (var j in b) { @@ -79,6 +84,21 @@ function same_allocation(a, b) { return true; } +function same_messages(a, b) { + if (a === b) { + return true; + } else if (a == null || b == null || a.length !== b.length) { + return false; + } + + for (var i in a) { + if (!b.includes(a[i])) { + return false; + } + } + return true; +} + function same_address_definition(a, b) { if (a.address === b.address && a.type === b.type && same_allocation(a.allocated_to, b.allocated_to)) { log.info('allocation changed for %s %s: %s <-> %s', a.type, a.address, JSON.stringify(a.allocated_to), JSON.stringify(b.allocated_to)); @@ -88,7 +108,7 @@ function same_address_definition(a, b) { function same_address_status(a, b) { if (a === undefined) return b === undefined; - return a.isReady === b.isReady && a.phase === b.phase && a.message === b.message; + return a.isReady === b.isReady && a.phase === b.phase && same_messages(a.messages, b.messages); } function same_address_definition_and_status(a, b) { @@ -122,8 +142,12 @@ function AddressSource(config) { if (config.INFRA_UUID) { selector += ",infraUuid=" + config.INFRA_UUID; } - var options = myutils.merge({selector: selector}, this.config); + this.selector = selector; events.EventEmitter.call(this); +} + +AddressSource.prototype.start = function() { + var options = myutils.merge({selector: this.selector}, this.config); this.watcher = kubernetes.watch('configmaps', options); this.watcher.on('updated', this.updated.bind(this)); this.readiness = {}; diff --git a/agent/lib/qdr.js b/agent/lib/qdr.js index d5972c87929..adf9fa1f04c 100644 --- a/agent/lib/qdr.js +++ b/agent/lib/qdr.js @@ -54,6 +54,8 @@ var Router = function (connection, router, agent) { self.health_check(); }, interval); } + + this.connection.setMaxListeners(0); this.connection.on('receiver_open', this.ready.bind(this)); this.connection.on('disconnected', this.disconnected.bind(this)); this.connection.on('sender_error', this.on_sender_error.bind(this)); diff --git a/agent/lib/ragent.js b/agent/lib/ragent.js index a5a63d3f91f..9ca9dd39d7b 100644 --- a/agent/lib/ragent.js +++ b/agent/lib/ragent.js @@ -63,6 +63,7 @@ function Ragent() { this.configure_handlers(); this.status = new events.EventEmitter(); this.disable_connectivity = false; + this.addresses_initialised = false; } Ragent.prototype.subscribe = function (context) { @@ -156,6 +157,7 @@ function is_valid_address_definition(def) { } Ragent.prototype.sync_addresses = function (updated) { + this.addresses_initialised = true; this.addresses = updated.filter(is_valid_address_definition); log.debug('addresses updated: %j', this.addresses); this.addresses_updated(); @@ -342,7 +344,9 @@ Ragent.prototype.configure_handlers = function () { router.retrieve_listeners(); router.retrieve_connectors(); router.on('synchronized', self.on_synchronized.bind(self)); - router.sync_addresses(self.addresses); + if (self.addresses_initialised) { + router.sync_addresses(self.addresses); + } router.on('listeners_updated', self.connected_routers_updated.bind(self));//advertise only once have listeners router.on('connectors_updated', self.check_router_connectors.bind(self)); router.on('provisioned', self.check_router_connectors.bind(self)); @@ -351,7 +355,9 @@ Ragent.prototype.configure_handlers = function () { var broker = broker_controller.create_controller(context.connection, self.event_sink); self.connected_brokers[broker.id] = broker; log.info('broker %s connected', broker.id); - self.sync_broker(broker); + if (self.addresses_initialised) { + self.sync_broker(broker); + } broker.on('synchronized', self.on_synchronized.bind(self)); context.connection.on('disconnected', self.on_broker_disconnect.bind(self)); context.connection.on('connection_close', self.on_broker_disconnect.bind(self)); @@ -398,6 +404,7 @@ Ragent.prototype.listen = function (options) { Ragent.prototype.subscribe_to_addresses = function (env) { var address_source = new AddressSource(env); address_source.on('addresses_ready', this.sync_addresses.bind(this)); + address_source.start(); return address_source.watcher; }; diff --git a/agent/lib/router_config.js b/agent/lib/router_config.js index 2c945a63523..502d067a733 100644 --- a/agent/lib/router_config.js +++ b/agent/lib/router_config.js @@ -19,6 +19,7 @@ var util = require('util'); var qdr = require('./qdr.js'); var myutils = require('./utils.js'); var log = require('./log.js').logger(); +var plimit = require('p-limit'); const ID_QUALIFIER = 'ragent-'; const MAX_RETRIES = 3; @@ -268,14 +269,15 @@ function ensure_elements(entity, desired, router, collected) { let missing = delta.added.concat(delta.modified); if (stale.length || missing.length) { - let delete_fn = delete_config_element.bind(null, router, entity); - let create_fn = create_config_element.bind(null, router, entity); + var limit = plimit(250); + let delete_fn = limit.bind(null, delete_config_element.bind(null, router, entity)); + let create_fn = limit.bind(null, create_config_element.bind(null, router, entity)); return Promise.all(stale.map(delete_fn)).then( function (deletions) { - report(entity, stale, deletions, actual, 'deleted') + report(entity, stale, deletions, actual, 'deleted'); return Promise.all(missing.map(create_fn)).then( function (creations) { - report(entity, missing, creations, actual, 'created') + report(entity, missing, creations, actual, 'created'); return false;//recheck when changed } ).catch(function (error) { @@ -346,7 +348,7 @@ function desired_address_config(high_level_address_definitions) { if (def.type === 'queue') { config.add_address({prefix:def.address, distribution:'balanced', waypoint:true}); if (def.allocated_to) { - log.info("Constructing config for queue " + def.address + " allocated to: " + JSON.stringify(def.allocated_to)); + log.debug("Constructing config for queue %s allocated to: %j", def.address, def.allocated_to); for (var j in def.allocated_to) { var brokerStatus = def.allocated_to[j]; if (brokerStatus.state === 'Active') { @@ -358,7 +360,7 @@ function desired_address_config(high_level_address_definitions) { } } } else { - log.info("Constructing old config for queue " + def.address); + log.debug("Constructing old config for queue %s", def.address); config.add_autolink_pair({addr:def.address, containerId: def.address}); } } else if (def.type === 'topic') { diff --git a/agent/package.json b/agent/package.json index 5c916b534d9..c1f06b5ab59 100644 --- a/agent/package.json +++ b/agent/package.json @@ -11,6 +11,7 @@ "dependencies": { "debug": "^3.1.*", "openid-client": "^2.4.5", + "p-limit": "^2.2.0", "rhea": ">=0.2.11", "simple-oauth2": "^2.2.1", "ws": "*" diff --git a/agent/test/internal_address_source.js b/agent/test/internal_address_source.js index dbcaff5e62c..f698788731f 100644 --- a/agent/test/internal_address_source.js +++ b/agent/test/internal_address_source.js @@ -46,6 +46,7 @@ describe('configmap backed address source', function() { configmaps.add_address_definition({address:'bar', type:'topic'}, undefined, '1234'); configmaps.add_address_definition({address:'baz', type:'queue'}, undefined, "4321"); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'}); + source.start(); source.watcher.close();//prevents watching source.on('addresses_defined', function (addresses) { assert.equal(addresses.length, 2); @@ -62,6 +63,7 @@ describe('configmap backed address source', function() { configmaps.add_address_definition({address:'bar', type:'topic'}, undefined, '1234', {'enmasse.io/broker-id':'broker-2'}); configmaps.add_address_definition({address:'baz', type:'anycast'}, undefined, '1234'); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'}); + source.start(); source.watcher.close();//prevents watching source.on('addresses_defined', function (addresses) { assert.equal(addresses.length, 3); @@ -80,6 +82,7 @@ describe('configmap backed address source', function() { }); it('watches for changes', function(done) { var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'}); + source.start(); source.once('addresses_defined', function () { setTimeout(function () { configmaps.add_address_definition({address:'foo', type:'queue'}, undefined, '1234'); @@ -113,6 +116,7 @@ describe('configmap backed address source', function() { } }; var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'}); + source.start(); source.once('addresses_defined', function () { setTimeout(function () { configmaps.add_address_definition({address:'foo', type:'queue'}, undefined, '1234'); @@ -139,6 +143,7 @@ describe('configmap backed address source', function() { configmaps.add_address_definition({address:'foo', type:'queue'}, undefined, '1234'); configmaps.add_address_definition({address:'bar', type:'topic'}, undefined, '1234'); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', INFRA_UUID: '1234'}); + source.start(); source.watcher.close(); source.on('addresses_defined', function (addresses) { source.check_status({foo:{propagated:100}}).then(function () { @@ -152,6 +157,7 @@ describe('configmap backed address source', function() { configmaps.add_address_definition({address:'foo', type:'queue'}); configmaps.add_address_definition({address:'bar', type:'topic'}); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default'}); + source.start(); source.once('addresses_defined', function (addresses) { source.check_status({foo:{propagated:100}}).then(function () { configmaps.remove_resource_by_name('configmaps', 'foo'); @@ -190,6 +196,7 @@ describe('configmap backed address source', function() { configmaps.add_address_plan({plan_name:'bar', address_type:'topic'}); configmaps.add_address_plan({plan_name:'standard', address_type:'anycast', display_name:'display me', shortDescription:'abcdefg', longDescription:'hijklmn'}); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'}); + source.start(); source.watcher.close(); source.get_address_types().then(function (types) { var queue = remove_by_name(types, 'queue'); @@ -223,6 +230,7 @@ describe('configmap backed address source', function() { configmaps.add_address_plan({plan_name:'large', address_type:'queue', displayOrder:12}); configmaps.add_address_plan({plan_name:'small', address_type:'queue', displayOrder:10}); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'}); + source.start(); source.watcher.close(); source.get_address_types().then(function (types) { assert.equal(types[0].name, 'queue'); @@ -250,6 +258,7 @@ describe('configmap backed address source', function() { configmaps.add_address_plan({plan_name:'large', address_type:'queue', displayOrder:12}); configmaps.add_address_plan({plan_name:'small', address_type:'queue', displayOrder:0}); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'}); + source.start(); source.watcher.close(); source.get_address_types().then(function (types) { assert.equal(types[0].name, 'queue'); @@ -311,6 +320,7 @@ describe('configmap backed address source', function() { configmaps.add_config_map('baz', {type:'address-config'}, {'config.json': '{bad:x[!'}); configmaps.add_address_definition({address:'bar', type:'topic'}); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default'}); + source.start(); source.watcher.close();//prevents watching source.on('addresses_defined', function (addresses) { assert.equal(addresses.length, 2); @@ -331,6 +341,7 @@ describe('configmap backed address source', function() { configmaps.add_address_plan({plan_name:'bar', address_type:'topic'}); configmaps.add_address_plan({plan_name:'standard', address_type:'anycast', display_name:'display me', shortDescription:'abcdefg', longDescription:'hijklmn'}); var source = new AddressSource({port:configmaps.port, host:'localhost', token:'foo', namespace:'default', ADDRESS_SPACE_PLAN: 'space'}); + source.start(); var plans = source.get_address_types(); source.once('addresses_defined', function (addresses) { configmaps.add_address_definition({address:'foo', type:'queue'}, 'address-config-foo'); diff --git a/agent/test/ragent.js b/agent/test/ragent.js index 38171197af6..f36ef13d883 100644 --- a/agent/test/ragent.js +++ b/agent/test/ragent.js @@ -806,6 +806,7 @@ describe('cooperating ragent group', function() { routers = routers.concat(fill(3, new_router)); //inform ragent instances of each other groups.forEach(function (g) { + g.ragent.sync_addresses([]); podserver.add_resource('pods', g.get_pod_definition()); }); groups.forEach(function(g) { @@ -826,6 +827,7 @@ describe('cooperating ragent group', function() { }); //inform ragent instances of each other groups.forEach(function (g) { + g.ragent.sync_addresses([]); podserver.add_resource('pods', g.get_pod_definition()); }); groups.forEach(function(g) { diff --git a/api-server/src/main/java/io/enmasse/api/v1/AddressApiHelper.java b/api-server/src/main/java/io/enmasse/api/v1/AddressApiHelper.java index 5228d9d7439..5939a2d468c 100644 --- a/api-server/src/main/java/io/enmasse/api/v1/AddressApiHelper.java +++ b/api-server/src/main/java/io/enmasse/api/v1/AddressApiHelper.java @@ -68,11 +68,18 @@ public AddressList getAllAddressesWithLabels(final Map labels) t private void validateAddress(AddressSpace addressSpace, Address address) { AddressResolver addressResolver = getAddressResolver(addressSpace); - Set
existingAddresses = addressSpaceApi.withAddressSpace(addressSpace).listAddresses(address.getMetadata().getNamespace()); - addressResolver.validate(address); - for (Address existing : existingAddresses) { - if (address.getSpec().getAddress().equals(existing.getSpec().getAddress()) && !address.getMetadata().getName().equals(existing.getMetadata().getName())) { - throw new BadRequestException("Address '" + address.getSpec().getAddress() + "' already exists with resource name '" + existing.getMetadata().getName() + "'"); + + /* + Brokered address space has no operator that manipulates address phase and readiness, so we need to perform validation at API server. + For the standard address space, the validation is done in AddressController#onUpdate in order to avoid slowing down the request. + */ + if (addressSpace.getSpec().getType().equals("brokered")) { + Set
existingAddresses = addressSpaceApi.withAddressSpace(addressSpace).listAddresses(address.getMetadata().getNamespace()); + addressResolver.validate(address); + for (Address existing : existingAddresses) { + if (address.getSpec().getAddress().equals(existing.getSpec().getAddress()) && !address.getMetadata().getName().equals(existing.getMetadata().getName())) { + throw new BadRequestException("Address '" + address.getSpec().getAddress() + "' already exists with resource name '" + existing.getMetadata().getName() + "'"); + } } } } diff --git a/api-server/src/test/java/io/enmasse/api/server/HTTPServerTest.java b/api-server/src/test/java/io/enmasse/api/server/HTTPServerTest.java index 7d29fada32e..43238e0517d 100644 --- a/api-server/src/test/java/io/enmasse/api/server/HTTPServerTest.java +++ b/api-server/src/test/java/io/enmasse/api/server/HTTPServerTest.java @@ -398,7 +398,7 @@ public void testSchemaApi(String apiVersion, VertxTestContext context) throws Th JsonObject data = buffer.toJsonObject(); System.out.println(data.toString()); assertTrue(data.containsKey("items")); - assertEquals(1, data.getJsonArray("items").size()); + assertEquals(2, data.getJsonArray("items").size()); }); }); diff --git a/api-server/src/test/java/io/enmasse/api/v1/AddressApiHelperTest.java b/api-server/src/test/java/io/enmasse/api/v1/AddressApiHelperTest.java index 7f229d33a5a..d0ad80fe8cb 100644 --- a/api-server/src/test/java/io/enmasse/api/v1/AddressApiHelperTest.java +++ b/api-server/src/test/java/io/enmasse/api/v1/AddressApiHelperTest.java @@ -56,7 +56,7 @@ public void setup() { AddressSpace addressSpace = mock(AddressSpace.class); when(addressSpace.getSpec()).thenReturn(mock(AddressSpaceSpec.class)); when(addressSpace.getMetadata()).thenReturn(mock(ObjectMeta.class)); - when(addressSpace.getSpec().getType()).thenReturn("type1"); + when(addressSpace.getSpec().getType()).thenReturn("brokered"); AddressSpaceApi addressSpaceApi = mock(AddressSpaceApi.class); addressApi = mock(AddressApi.class); securityContext = mock(SecurityContext.class); @@ -68,14 +68,19 @@ public void setup() { } @Test - public void testCreateAddressResourceNameAlreadyExists() throws Exception { - when(addressApi.listAddresses(any())).thenReturn(Collections.singleton(createAddress("q1", "q1"))); - Address invalidAddress = createAddress("someOtherName", "q1"); - Throwable exception = assertThrows(BadRequestException.class, () -> helper.createAddress("test", invalidAddress)); - assertEquals("Address 'q1' already exists with resource name 'q1'", exception.getMessage()); - verify(addressApi, never()).createAddress(any(Address.class)); + public void testReplaceAddressWithInvalidAddress() throws Exception { + Set
addresses = new HashSet<>(); + addresses.add(createAddress("q1", "q1")); + addresses.add(createAddress("q2", "q2")); + when(addressApi.listAddresses(any())).thenReturn(addresses); + when(addressApi.replaceAddress(any())).thenReturn(true); + Address invalidAddress = createAddress("q1", "q2"); + Throwable exception = assertThrows(BadRequestException.class, () -> helper.replaceAddress("test", invalidAddress)); + assertEquals("Address 'q2' already exists with resource name 'q2'", exception.getMessage()); + verify(addressApi, never()).replaceAddress(any(Address.class)); } + @Test public void testCreateAddress() throws Exception { when(addressApi.listAddresses(any())).thenReturn(Collections.emptySet()); @@ -144,18 +149,6 @@ public void testReplaceAddressNotFound() throws Exception { assertEquals("Address q1 not found", exception.getMessage()); } - @Test - public void testReplaceAddressWithInvalidAddress() throws Exception { - Set
addresses = new HashSet<>(); - addresses.add(createAddress("q1", "q1")); - addresses.add(createAddress("q2", "q2")); - when(addressApi.listAddresses(any())).thenReturn(addresses); - when(addressApi.replaceAddress(any())).thenReturn(true); - Address invalidAddress = createAddress("q1", "q2"); - Throwable exception = assertThrows(BadRequestException.class, () -> helper.replaceAddress("test", invalidAddress)); - assertEquals("Address 'q2' already exists with resource name 'q2'", exception.getMessage()); - verify(addressApi, never()).replaceAddress(any(Address.class)); - } @Test public void testDeleteAddress() throws Exception { @@ -180,17 +173,6 @@ public void testDeleteAddressReturningFalse() throws Exception { assertNull(helper.deleteAddress("ns", "test", address.getMetadata().getName())); } - @Test - public void testDuplicateAddresses() throws Exception { - when(addressApi.listAddresses(any())).thenReturn(Sets.newSet(createAddress("q1"), createAddress("q2"))); - - try { - helper.createAddress("test", createAddress("q3", "q1")); - fail("Expected exception for duplicate address"); - } catch (BadRequestException e) { - assertThat(e.getMessage(), is("Address 'q1' already exists with resource name 'q1'")); - } - } @Test public void testParseLabelSelector() throws Exception { diff --git a/k8s-api-testutil/src/main/java/io/enmasse/k8s/api/TestSchemaApi.java b/k8s-api-testutil/src/main/java/io/enmasse/k8s/api/TestSchemaApi.java index 7456971198d..19b0dea09eb 100644 --- a/k8s-api-testutil/src/main/java/io/enmasse/k8s/api/TestSchemaApi.java +++ b/k8s-api-testutil/src/main/java/io/enmasse/k8s/api/TestSchemaApi.java @@ -26,7 +26,28 @@ public class TestSchemaApi implements SchemaApi { public Schema getSchema() { return new SchemaBuilder() - .withAddressSpaceTypes(Collections.singletonList( + .withAddressSpaceTypes(Arrays.asList( + new AddressSpaceTypeBuilder() + .withName("brokered") + .withDescription("Test Type") + .withAddressTypes(Collections.singletonList( + new AddressTypeBuilder() + .withName("queue") + .withPlans(Arrays.asList( + new AddressPlanBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName("plan1") + .build()) + .withAddressType("queue") + .withRequiredResources(Arrays.asList( + new ResourceRequestBuilder() + .withName("broker") + .withCredit(0.1) + .build())) + .build() + )) + .build())) + .build(), new AddressSpaceTypeBuilder() .withName("type1") .withDescription("Test Type") @@ -88,7 +109,19 @@ public Schema getSchema() { .withVersion("1.0") .build()) .build())) - .withPlans(Collections.singletonList( + .withPlans(Arrays.asList( + new AddressSpacePlanBuilder() + .withMetadata(new ObjectMetaBuilder() + .addToAnnotations(AnnotationKeys.DEFINED_BY, "infra") + .withName("myplan") + .build()) + .withAddressSpaceType("brokered") + .withResources(Arrays.asList(new ResourceAllowanceBuilder() + .withName("broker") + .withMax(1.0) + .build())) + .withAddressPlans(Arrays.asList("plan1")) + .build(), new AddressSpacePlanBuilder() .withMetadata(new ObjectMetaBuilder() .addToAnnotations(AnnotationKeys.DEFINED_BY, "infra") diff --git a/standard-controller/src/main/java/io/enmasse/controller/standard/AddressController.java b/standard-controller/src/main/java/io/enmasse/controller/standard/AddressController.java index d533f558a7b..dda594ee130 100644 --- a/standard-controller/src/main/java/io/enmasse/controller/standard/AddressController.java +++ b/standard-controller/src/main/java/io/enmasse/controller/standard/AddressController.java @@ -99,10 +99,9 @@ public void onUpdate(List
addressList) throws Exception { } AddressSpaceResolver addressSpaceResolver = new AddressSpaceResolver(schema); - Set
addressSet = new LinkedHashSet<>(addressList); - final Map previousStatus = addressSet.stream() - .collect(Collectors.toMap(a -> a.getSpec().getAddress(), + final Map previousStatus = addressList.stream() + .collect(Collectors.toMap(a -> a.getMetadata().getName(), a -> new ProvisionState(a.getStatus(), a.getAnnotation(AnnotationKeys.APPLIED_PLAN)))); AddressSpacePlan addressSpacePlan = addressSpaceType.findAddressSpacePlan(options.getAddressSpacePlanName()).orElseThrow(() -> new RuntimeException("Unable to handle updates: address space plan " + options.getAddressSpacePlanName() + " not found!")); @@ -113,14 +112,35 @@ public void onUpdate(List
addressList) throws Exception { AddressProvisioner provisioner = new AddressProvisioner(addressSpaceResolver, addressResolver, addressSpacePlan, clusterGenerator, kubernetes, eventLogger, options.getInfraUuid(), brokerIdGenerator); + Map validAddresses = new HashMap<>(); List readyPhases = Arrays.asList(Configuring, Active); for (Address address : addressList) { address.getStatus().clearMessages(); if (readyPhases.contains(address.getStatus().getPhase())) { address.getStatus().setReady(true); } + + Address existing = validAddresses.get(address.getSpec().getAddress()); + if (existing != null) { + if (!address.getStatus().getPhase().equals(Pending) && existing.getStatus().getPhase().equals(Pending)) { + // If existing address is pending, and we are not pending, we take priority + String errorMessage = String.format("Address '%s' already exists with resource name '%s'", address.getSpec().getAddress(), address.getMetadata().getName()); + existing.getStatus().setPhase(Pending); + existing.getStatus().appendMessage(errorMessage); + validAddresses.put(address.getSpec().getAddress(), address); + } else { + // Existing address has already been accepted, or we are both pending, existing takes priority. + String errorMessage = String.format("Address '%s' already exists with resource name '%s'", address.getSpec().getAddress(), existing.getMetadata().getName()); + address.getStatus().setPhase(Pending); + address.getStatus().appendMessage(errorMessage); + } + } else { + validAddresses.put(address.getSpec().getAddress(), address); + } } + Set
addressSet = new LinkedHashSet<>(validAddresses.values()); + Map countByPhase = countPhases(addressSet); log.info("Total: {}, Active: {}, Configuring: {}, Pending: {}, Terminating: {}, Failed: {}", addressSet.size(), countByPhase.get(Active), countByPhase.get(Configuring), countByPhase.get(Pending), countByPhase.get(Terminating), countByPhase.get(Failed)); @@ -176,8 +196,8 @@ public void onUpdate(List
addressList) throws Exception { long upgradeClusters = System.nanoTime(); int staleCount = 0; - for (Address address : addressSet) { - ProvisionState previous = previousStatus.get(address.getSpec().getAddress()); + for (Address address : addressList) { + ProvisionState previous = previousStatus.get(address.getMetadata().getName()); ProvisionState current = new ProvisionState(address.getStatus(), address.getAnnotation(AnnotationKeys.APPLIED_PLAN)); if (!current.equals(previous)) { try { diff --git a/standard-controller/src/test/java/io/enmasse/controller/standard/AddressControllerTest.java b/standard-controller/src/test/java/io/enmasse/controller/standard/AddressControllerTest.java index 71c610d67d2..bce688d7ef1 100644 --- a/standard-controller/src/test/java/io/enmasse/controller/standard/AddressControllerTest.java +++ b/standard-controller/src/test/java/io/enmasse/controller/standard/AddressControllerTest.java @@ -120,6 +120,150 @@ public void testAddressGarbageCollection() throws Exception { verify(mockApi).deleteAddress(eq(terminating)); } + @Test + public void testDuplicatePendingPendingAddresses() throws Exception { + + Address a1 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a1") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .build(); + + Address a2 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a2") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .build(); + + controller.onUpdate(Arrays.asList(a1, a2)); + + assertEquals(Phase.Configuring, a1.getStatus().getPhase()); + + assertEquals(Phase.Pending, a2.getStatus().getPhase()); + assertEquals("Address 'a' already exists with resource name 'myspace.a1'", a2.getStatus().getMessages().get(0)); + } + + @Test + public void testDuplicatePendingActiveAddresses() throws Exception { + + Address a1 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a1") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .build(); + + Address a2 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a2") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .editOrNewStatus() + .withPhase(Phase.Active) + .endStatus() + .build(); + + controller.onUpdate(Arrays.asList(a1, a2)); + + assertEquals(Phase.Pending, a1.getStatus().getPhase()); + assertEquals("Address 'a' already exists with resource name 'myspace.a2'", a1.getStatus().getMessages().get(0)); + + assertEquals(Phase.Active, a2.getStatus().getPhase()); + } + + @Test + public void testDuplicateActivePendingAddresses() throws Exception { + + Address a1 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a1") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .editOrNewStatus() + .withPhase(Phase.Active) + .endStatus() + .build(); + + Address a2 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a2") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .build(); + + controller.onUpdate(Arrays.asList(a1, a2)); + + assertEquals(Phase.Active, a1.getStatus().getPhase()); + + assertEquals(Phase.Pending, a2.getStatus().getPhase()); + assertEquals("Address 'a' already exists with resource name 'myspace.a1'", a2.getStatus().getMessages().get(0)); + } + + @Test + public void testDuplicateActiveActiveAddresses() throws Exception { + + Address a1 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a1") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .editOrNewStatus() + .withPhase(Phase.Active) + .endStatus() + .build(); + + Address a2 = new AddressBuilder() + .withNewMetadata() + .withName("myspace.a2") + .endMetadata() + .withNewSpec() + .withAddress("a") + .withType("anycast") + .withPlan("small-anycast") + .endSpec() + .editOrNewStatus() + .withPhase(Phase.Active) + .endStatus() + .build(); + + controller.onUpdate(Arrays.asList(a1, a2)); + + assertEquals(Phase.Active, a1.getStatus().getPhase()); + + assertEquals(Phase.Pending, a2.getStatus().getPhase()); + assertEquals("Address 'a' already exists with resource name 'myspace.a1'", a2.getStatus().getMessages().get(0)); + } + @Test public void testDeleteUnusedClusters() throws Exception { Address alive = new AddressBuilder()