Skip to content

Commit

Permalink
This closes #3148
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed May 27, 2020
2 parents d3d7d22 + 9a7620f commit 0c3ced6
Show file tree
Hide file tree
Showing 19 changed files with 386 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;

/**
* An operation failed because an address exists on the server.
*/
public final class ActiveMQDivertDoesNotExistException extends ActiveMQException {

public ActiveMQDivertDoesNotExistException() {
super(ActiveMQExceptionType.DIVERT_DOES_NOT_EXIST);
}

public ActiveMQDivertDoesNotExistException(String msg) {
super(ActiveMQExceptionType.DIVERT_DOES_NOT_EXIST, msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,12 @@ public ActiveMQException createException(String msg) {
public ActiveMQException createException(String msg) {
return new ActiveMQReplicationTimeooutException(msg);
}
},
DIVERT_DOES_NOT_EXIST(221) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQDivertDoesNotExistException(msg);
}
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2662,4 +2662,12 @@ static void browseMessagesFailure(String queueName) {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601726, value = "User {0} failed to browse messages from queue {1}", format = Message.Format.MESSAGE_FORMAT)
void browseMessagesFailure(String user, String queueName);

static void updateDivert(Object source, Object... args) {
LOGGER.updateDivert(getCaller(), source, arrayToString(args));
}

@LogMessage(level = Logger.Level.INFO)
@Message(id = 601727, value = "User {0} is updating a divert on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void updateDivert(String user, Object source, Object... args);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,17 @@ void createDivert(@Parameter(name = "name", desc = "Name of the divert") String
@Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the divert's transformer in JSON form") String transformerPropertiesAsJSON,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;

/**
* update a divert
*/
@Operation(desc = "Update a divert", impact = MBeanOperationInfo.ACTION)
void updateDivert(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
@Parameter(name = "transformerProperties", desc = "Configuration properties of the divert's transformer") Map<String, String> transformerProperties,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;

@Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
Expand Down Expand Up @@ -3415,6 +3416,38 @@ public void createDivert(final String name,
}
}

@Override
public void updateDivert(final String name,
final String forwardingAddress,
final String filterString,
final String transformerClassName,
final Map<String, String> transformerProperties,
final String routingType) throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.updateDivert(this.server, name, forwardingAddress, filterString,
transformerClassName, transformerProperties, routingType);
}
checkStarted();

clearIO();

try {
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null :
new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);

DivertConfiguration config = new DivertConfiguration().setName(name).setForwardingAddress(forwardingAddress).
setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).
setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));

final Divert divert = server.updateDivert(config);
if (divert == null) {
throw ActiveMQMessageBundle.BUNDLE.divertDoesNotExist(config.getName());
}
} finally {
blockOnIO();
}
}

@Override
public void destroyDivert(final String name) throws Exception {
if (AuditLogger.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;

import java.util.Collections;
import java.util.Map;

import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.transformer.RegisteredTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.logs.AuditLogger;

public class DivertControlImpl extends AbstractControl implements DivertControl {
Expand All @@ -37,8 +40,6 @@ public class DivertControlImpl extends AbstractControl implements DivertControl

private final Divert divert;

private final DivertConfiguration configuration;

private final String internalNamingPrefix;

// Static --------------------------------------------------------
Expand All @@ -49,11 +50,9 @@ public class DivertControlImpl extends AbstractControl implements DivertControl

public DivertControlImpl(final Divert divert,
final StorageManager storageManager,
final DivertConfiguration configuration,
final String internalNamingPrefix) throws Exception {
super(DivertControl.class, storageManager);
this.divert = divert;
this.configuration = configuration;
this.internalNamingPrefix = internalNamingPrefix;
}

Expand All @@ -64,7 +63,7 @@ public String getAddress() {
}
clearIO();
try {
return configuration.getAddress();
return divert.getAddress().toString();
} finally {
blockOnIO();
}
Expand All @@ -77,7 +76,8 @@ public String getFilter() {
}
clearIO();
try {
return configuration.getFilterString();
Filter filter = divert.getFilter();
return filter != null ? filter.getFilterString().toString() : null;
} finally {
blockOnIO();
}
Expand All @@ -90,7 +90,7 @@ public String getForwardingAddress() {
}
clearIO();
try {
return configuration.getForwardingAddress();
return divert.getForwardAddress().toString();
} finally {
blockOnIO();
}
Expand All @@ -116,7 +116,9 @@ public String getTransformerClassName() {
}
clearIO();
try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getClassName();
Transformer transformer = divert.getTransformer();
return transformer != null ? (transformer instanceof RegisteredTransformer ?
((RegisteredTransformer)transformer).getTransformer() : transformer).getClass().getName() : null;
} finally {
blockOnIO();
}
Expand All @@ -137,7 +139,9 @@ public Map<String, String> getTransformerProperties() {
}
clearIO();
try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getProperties();
Transformer transformer = divert.getTransformer();
return transformer != null && transformer instanceof RegisteredTransformer ?
((RegisteredTransformer)transformer).getProperties() : Collections.emptyMap();
} finally {
blockOnIO();
}
Expand All @@ -150,7 +154,7 @@ public String getRoutingType() {
}
clearIO();
try {
return configuration.getRoutingType().toString();
return divert.getRoutingType().toString();
} finally {
blockOnIO();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ public class DivertBinding implements Binding {

private final Divert divert;

private final Filter filter;

private final SimpleString uniqueName;

private final SimpleString routingName;
Expand All @@ -48,8 +46,6 @@ public DivertBinding(final long id, final SimpleString address, final Divert div

this.divert = divert;

filter = divert.getFilter();

uniqueName = divert.getUniqueName();

routingName = divert.getRoutingName();
Expand All @@ -64,7 +60,7 @@ public long getID() {

@Override
public Filter getFilter() {
return filter;
return divert.getFilter();
}

@Override
Expand Down Expand Up @@ -129,7 +125,7 @@ public String toString() {
", divert=" +
divert +
", filter=" +
filter +
divert.getFilter() +
", uniqueName=" +
uniqueName +
", routingName=" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQDivertDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
Expand Down Expand Up @@ -488,4 +489,7 @@ IllegalStateException invalidRoutingTypeUpdate(String queueName,

@Message(id = 229230, value = "Failed to bind acceptor {0} to {1}", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException failedToBind(String acceptor, String hostPort, @Cause Exception e);

@Message(id = 229231, value = "Divert Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQDivertDoesNotExistException divertDoesNotExist(String divert);
}
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,9 @@ void destroyQueue(SimpleString queueName,

FederationManager getFederationManager();

void deployDivert(DivertConfiguration config) throws Exception;
Divert deployDivert(DivertConfiguration config) throws Exception;

Divert updateDivert(DivertConfiguration config) throws Exception;

void destroyDivert(SimpleString name) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public interface Divert extends Bindable {

SimpleString getAddress();

Filter getFilter();

boolean isExclusive();
Expand All @@ -33,4 +35,14 @@ public interface Divert extends Bindable {
Transformer getTransformer();

SimpleString getForwardAddress();

ComponentConfigurationRoutingType getRoutingType();

void setFilter(Filter filter);

void setTransformer(Transformer transformer);

void setForwardAddress(SimpleString forwardAddress);

void setRoutingType(ComponentConfigurationRoutingType routingType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2544,29 +2544,29 @@ public FederationManager getFederationManager() {


@Override
public void deployDivert(DivertConfiguration config) throws Exception {
public Divert deployDivert(DivertConfiguration config) throws Exception {
if (config.getName() == null) {
throw ActiveMQMessageBundle.BUNDLE.divertWithNoName();
}

if (config.getAddress() == null) {
ActiveMQServerLogger.LOGGER.divertWithNoAddress();

return;
return null;
}

if (config.getForwardingAddress() == null) {
ActiveMQServerLogger.LOGGER.divertWithNoForwardingAddress();

return;
return null;
}

SimpleString sName = new SimpleString(config.getName());

if (postOffice.getBinding(sName) != null) {
ActiveMQServerLogger.LOGGER.divertBindingAlreadyExists(sName);

return;
return null;
}

SimpleString sAddress = new SimpleString(config.getAddress());
Expand All @@ -2575,13 +2575,53 @@ public void deployDivert(DivertConfiguration config) throws Exception {

Filter filter = FilterImpl.createFilter(config.getFilterString());

Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager, config.getRoutingType());
Divert divert = new DivertImpl(sName, sAddress, new SimpleString(config.getForwardingAddress()),
new SimpleString(config.getRoutingName()), config.isExclusive(),
filter, transformer, postOffice, storageManager, config.getRoutingType());

Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);

postOffice.addBinding(binding);

managementService.registerDivert(divert, config);
managementService.registerDivert(divert);

return divert;
}

@Override
public Divert updateDivert(DivertConfiguration config) throws Exception {
final DivertBinding divertBinding = (DivertBinding) postOffice.getBinding(SimpleString.toSimpleString(config.getName()));
if (divertBinding == null) {
return null;
}

final Divert divert = divertBinding.getDivert();

Filter filter = FilterImpl.createFilter(config.getFilterString());
if (filter != null && !filter.equals(divert.getFilter())) {
divert.setFilter(filter);
}

if (config.getTransformerConfiguration() != null) {
getServiceRegistry().removeDivertTransformer(divert.getUniqueName().toString());
Transformer transformer = getServiceRegistry().getDivertTransformer(
config.getName(), config.getTransformerConfiguration());
divert.setTransformer(transformer);
}

if (config.getForwardingAddress() != null) {
SimpleString forwardAddress = SimpleString.toSimpleString(config.getForwardingAddress());

if (!forwardAddress.equals(config)) {
divert.setForwardAddress(forwardAddress);
}
}

if (config.getRoutingType() != null && divert.getRoutingType() != config.getRoutingType()) {
divert.setRoutingType(config.getRoutingType());
}

return divert;
}

@Override
Expand Down

0 comments on commit 0c3ced6

Please sign in to comment.