Skip to content

Commit

Permalink
ARTEMIS-4204 connectors added via management are not durable
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Apr 6, 2023
1 parent 6d3dbc4 commit 6851e7d
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
Expand Down Expand Up @@ -3969,6 +3970,7 @@ public void addConnector(String name, String url) throws Exception {

try {
server.getConfiguration().addConnectorConfiguration(name, url);
storageManager.storeConnector(new PersistedConnector(name, url));
} finally {
blockOnIO();
}
Expand All @@ -3985,6 +3987,7 @@ public void removeConnector(String name) throws Exception {

try {
server.getConfiguration().getConnectorConfigurations().remove(name);
storageManager.deleteConnector(name);
} finally {
blockOnIO();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
Expand Down Expand Up @@ -369,6 +370,12 @@ JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInf

List<PersistedBridgeConfiguration> recoverBridgeConfigurations();

void storeConnector(PersistedConnector persistedConnector) throws Exception;

void deleteConnector(String connectorName) throws Exception;

List<PersistedConnector> recoverConnectors();

void storeUser(PersistedUser persistedUser) throws Exception;

void deleteUser(String username) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.persistence.config;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.utils.BufferHelper;

public class PersistedConnector implements EncodingSupport {

private long storeId;

private String url;

private String name;

public PersistedConnector() {
}

public PersistedConnector(String name, String url) {
this.name = name;
this.url = url;
}

public void setStoreId(long id) {
this.storeId = id;
}

public long getStoreId() {
return storeId;
}

public void setUrl(String url) {
this.url = url;
}

public String getUrl() {
return url;
}

public void setName(String name) {
this.name = name;
}

public String getName() {
return name;
}

@Override
public int getEncodeSize() {
int size = 0;
size += BufferHelper.sizeOfString(name);
size += BufferHelper.sizeOfString(url);
return size;
}

@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeString(name);
buffer.writeString(url);
}

@Override
public void decode(ActiveMQBuffer buffer) {
name = buffer.readString();
url = buffer.readString();
}

@Override
public String toString() {
return "PersistedConnector [storeId=" + storeId +
", name=" +
name +
", url=" +
url +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
Expand Down Expand Up @@ -225,6 +226,8 @@ public Configuration getConfig() {

protected final Map<String, PersistedBridgeConfiguration> mapPersistedBridgeConfigurations = new ConcurrentHashMap<>();

protected final Map<String, PersistedConnector> mapPersistedConnectors = new ConcurrentHashMap<>();

protected final Map<String, PersistedUser> mapPersistedUsers = new ConcurrentHashMap<>();

protected final Map<String, PersistedRole> mapPersistedRoles = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -796,6 +799,32 @@ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return new ArrayList<>(mapPersistedBridgeConfigurations.values());
}

@Override
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
deleteConnector(persistedConnector.getName());
try (ArtemisCloseable lock = closeableReadLock()) {
final long id = idGenerator.generateID();
persistedConnector.setStoreId(id);
bindingsJournal.appendAddRecord(id, JournalRecordIds.CONNECTOR_RECORD, persistedConnector, true);
mapPersistedConnectors.put(persistedConnector.getName(), persistedConnector);
}
}

@Override
public void deleteConnector(String connectorName) throws Exception {
PersistedConnector oldConnector = mapPersistedConnectors.remove(connectorName);
if (oldConnector != null) {
try (ArtemisCloseable lock = closeableReadLock()) {
bindingsJournal.tryAppendDeleteRecord(oldConnector.getStoreId(), this::recordNotFoundCallback, false);
}
}
}

@Override
public List<PersistedConnector> recoverConnectors() {
return new ArrayList<>(mapPersistedConnectors.values());
}

@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
deleteUser(persistedUser.getUsername());
Expand Down Expand Up @@ -1628,6 +1657,9 @@ public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> qu
mapPersistedKeyValuePairs.put(keyValuePair.getMapId(), persistedKeyValuePairs);
}
persistedKeyValuePairs.put(keyValuePair.getKey(), keyValuePair);
} else if (rec == JournalRecordIds.CONNECTOR_RECORD) {
PersistedConnector connector = newConnectorEncoding(id, buffer);
mapPersistedConnectors.put(connector.getName(), connector);
} else {
// unlikely to happen
ActiveMQServerLogger.LOGGER.invalidRecordType(rec, new Exception("invalid record type " + rec));
Expand Down Expand Up @@ -2096,6 +2128,13 @@ static PersistedBridgeConfiguration newBridgeEncoding(long id, ActiveMQBuffer bu
return persistedBridgeConfiguration;
}

static PersistedConnector newConnectorEncoding(long id, ActiveMQBuffer buffer) {
PersistedConnector persistedBridgeConfiguration = new PersistedConnector();
persistedBridgeConfiguration.decode(buffer);
persistedBridgeConfiguration.setStoreId(id);
return persistedBridgeConfiguration;
}

static PersistedUser newUserEncoding(long id, ActiveMQBuffer buffer) {
PersistedUser persistedUser = new PersistedUser();
persistedUser.decode(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,6 @@ public final class JournalRecordIds {
public static final byte ADD_MESSAGE_BODY = 49;

public static final byte KEY_VALUE_PAIR_RECORD = 50;

public static final byte CONNECTOR_RECORD = 51;
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
Expand Down Expand Up @@ -481,6 +482,19 @@ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return null;
}

@Override
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
}

@Override
public void deleteConnector(String connectorName) throws Exception {
}

@Override
public List<PersistedConnector> recoverConnectors() {
return null;
}

@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
Expand Down Expand Up @@ -3480,6 +3481,8 @@ public void run() {

postOffice.startAddressQueueScanner();

recoverStoredConnectors();

recoverStoredBridges();
}

Expand Down Expand Up @@ -4296,6 +4299,14 @@ private void recoverStoredBridges() throws Exception {
}
}

private void recoverStoredConnectors() throws Exception {
if (storageManager.recoverConnectors() != null) {
for (PersistedConnector persistedConnector : storageManager.recoverConnectors()) {
getConfiguration().addConnectorConfiguration(persistedConnector.getName(), persistedConnector.getUrl());
}
}
}

private void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception {
if (config != null) {
GroupingHandler groupingHandler1;
Expand Down Expand Up @@ -4627,6 +4638,7 @@ private void deployReloadableConfigFromConfiguration() throws Exception {
destroyBridge(existingBridge.getConfiguration().getParentName());
}
}
recoverStoredConnectors();
recoverStoredBridges();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
Expand Down Expand Up @@ -637,6 +638,19 @@ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return null;
}

@Override
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
}

@Override
public void deleteConnector(String connectorName) throws Exception {
}

@Override
public List<PersistedConnector> recoverConnectors() {
return null;
}

@Override
public void storeUser(PersistedUser persistedUser) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedConnector;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
Expand Down Expand Up @@ -731,6 +732,19 @@ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
return null;
}

@Override
public void storeConnector(PersistedConnector persistedConnector) throws Exception {
}

@Override
public void deleteConnector(String connectorName) throws Exception {
}

@Override
public List<PersistedConnector> recoverConnectors() {
return null;
}

@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
manager.storeUser(persistedUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public void testChangeQueueFilterOnRestart() throws Exception {
}

@Test
public void bridgeConfigChagesPersist() throws Exception {

public void bridgeConfigChangesPersist() throws Exception {
server = createServer(true);
server.start();

Expand All @@ -163,8 +162,8 @@ public void bridgeConfigChagesPersist() throws Exception {
String queue = "Q1";
String forward = "Q2";

session.createQueue(new QueueConfiguration("Q1").setAddress("Q1").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
session.createQueue(new QueueConfiguration("Q2").setAddress("Q2").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
session.createQueue(new QueueConfiguration(queue).setAddress(queue).setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
session.createQueue(new QueueConfiguration(forward).setAddress(forward).setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
session.close();

BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName(bridgeName)
Expand All @@ -179,16 +178,26 @@ public void bridgeConfigChagesPersist() throws Exception {
server.getActiveMQServerControl().addConnector("connector2", "tcp://localhost:61616");
server.getActiveMQServerControl().createBridge(bridgeConfiguration.toJSON());

Assert.assertEquals(2, server.getConfiguration().getConnectorConfigurations().size());
Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);
server.stop();

// clear the in-memory connector configurations to force a reload from disk
server.getConfiguration().getConnectorConfigurations().clear();

server.start();
Assert.assertEquals(2, server.getConfiguration().getConnectorConfigurations().size());
Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);

server.getActiveMQServerControl().destroyBridge(bridgeName);
server.getActiveMQServerControl().removeConnector("connector1");
server.getActiveMQServerControl().removeConnector("connector2");
Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
Assert.assertEquals(0, server.getConfiguration().getConnectorConfigurations().size());
server.stop();
server.start();
Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
Assert.assertEquals(0, server.getConfiguration().getConnectorConfigurations().size());
server.stop();

}
Expand Down

0 comments on commit 6851e7d

Please sign in to comment.