Skip to content

Commit

Permalink
Double decode connection usernames and passwords depending on config
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Mar 24, 2022
1 parent 4a13900 commit 23e5477
Show file tree
Hide file tree
Showing 22 changed files with 258 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkArgument;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -826,8 +824,8 @@ private ConnectionUri(final String theUriString) {
final String userInfo = uri.getUserInfo();
if (userInfo != null && userInfo.contains(USERNAME_PASSWORD_SEPARATOR)) {
final int separatorIndex = userInfo.indexOf(USERNAME_PASSWORD_SEPARATOR);
userName = tryDecodeUriComponent(userInfo.substring(0, separatorIndex));
password = tryDecodeUriComponent(userInfo.substring(separatorIndex + 1));
userName = userInfo.substring(0, separatorIndex);
password = userInfo.substring(separatorIndex + 1);
} else {
userName = null;
password = null;
Expand All @@ -837,15 +835,6 @@ private ConnectionUri(final String theUriString) {
uriStringWithMaskedPassword = createUriStringWithMaskedPassword();
}

private static String tryDecodeUriComponent(final String string) {
try {
final String withoutPlus = string.replace("+", "%2B");
return URLDecoder.decode(withoutPlus, "UTF-8");
} catch (final IllegalArgumentException | UnsupportedEncodingException e) {
return string;
}
}

private String createUriStringWithMaskedPassword() {
return MessageFormat.format(MASKED_URI_PATTERN, protocol, getUserCredentialsOrEmptyString(), hostname, port,
getPathOrEmptyString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.junit.Ignore;
import org.junit.Test;

import nl.jqno.equalsverifier.EqualsVerifier;
Expand Down Expand Up @@ -471,7 +470,7 @@ public void parsePasswordWithPlusSignEncoded() {
public void parsePasswordWithPlusSignDoubleEncoded() {
final ImmutableConnection.ConnectionUri underTest =
ImmutableConnection.ConnectionUri.of("amqps://foo:bar%252Bbaz@hono.eclipse.org:5671/vhost");
assertThat(underTest.getPassword()).contains("bar+baz");
assertThat(underTest.getPassword()).contains("bar%2Bbaz");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
import org.eclipse.ditto.connectivity.service.config.ClientConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientData;
import org.eclipse.ditto.connectivity.service.messaging.amqp.status.ConnectionFailureStatusReport;
Expand Down Expand Up @@ -121,17 +122,17 @@ public final class AmqpClientActor extends BaseClientActor implements ExceptionL
*/
@SuppressWarnings("unused")
private AmqpClientActor(final Connection connection,
ActorRef proxyActor,
final ActorRef proxyActor,
final ActorRef connectionActor,
final Config connectivityConfigOverwrites,
final DittoHeaders dittoHeaders) {

super(connection, proxyActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
final ConnectionConfig connectionConfig = connectivityConfig().getConnectionConfig();
final Amqp10Config amqp10Config = connectionConfig.getAmqp10Config();
this.jmsConnectionFactory =
jmsConnectionFactory =
ConnectionBasedJmsConnectionFactory.getInstance(AmqpSpecificConfig.toDefaultConfig(amqp10Config),
this::getSshTunnelState, getContext().getSystem());
this::getSshTunnelState, getContext().getSystem(), connectionConfig.doubleDecodingEnabled());
connectionListener = new StatusReportingListener(getSelf(), logger, connectionLogger);
consumerByNamePrefix = new HashMap<>();
recoverSessionOnSessionClosed = isRecoverSessionOnSessionClosedEnabled(connection);
Expand Down Expand Up @@ -181,8 +182,8 @@ public static Props props(final Connection connection, final ActorRef proxyActor
final ActorRef connectionActor, final Config configOverwrites, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem), proxyActor,
connectionActor, configOverwrites, dittoHeaders);
return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem, configOverwrites),
proxyActor, connectionActor, configOverwrites, dittoHeaders);
}

/**
Expand All @@ -199,18 +200,23 @@ static Props propsForTest(final Connection connection, @Nullable final ActorRef
final ActorRef connectionActor, final JmsConnectionFactory jmsConnectionFactory,
final ActorSystem actorSystem) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem),
return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem, ConfigFactory.empty()),
jmsConnectionFactory, proxyActor, connectionActor, DittoHeaders.empty());
}

private static Connection validateConnection(final Connection connection, final ActorSystem actorSystem) {
private static Connection validateConnection(final Connection connection, final ActorSystem actorSystem,
final Config configOverwrites) {
try {
final Config withOverwrites = configOverwrites.withFallback(actorSystem.settings().config());
final ConnectivityConfig connectivityConfig = ConnectivityConfig.of(withOverwrites);

final String connectionUri = ConnectionBasedJmsConnectionFactory.buildAmqpConnectionUri(connection,
connection.getId().toString(),
// fake established tunnel state for uri validation
() -> SshTunnelState.from(connection).established(22222),
Map.of(),
SaslPlainCredentialsSupplier.of(actorSystem));
SaslPlainCredentialsSupplier.of(actorSystem),
connectivityConfig.getConnectionConfig().doubleDecodingEnabled());
ProviderFactory.create(URI.create(connectionUri));
// it is safe to pass an empty map as default config as only default values are loaded via that config
// of which we can be certain that they are always valid
Expand Down Expand Up @@ -250,7 +256,7 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedSt
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState() {
return super.inAnyState()
.event(ConnectionRestoredStatusReport.class,
(report, currentData) -> this.handleConnectionRestored(currentData))
(report, currentData) -> handleConnectionRestored(currentData))
.event(ConnectionFailureStatusReport.class, this::handleConnectionFailure)
.event(ConsumerClosedStatusReport.class, this::handleConsumerClosed)
.event(ProducerClosedStatusReport.class, this::handleProducerClosed)
Expand All @@ -263,7 +269,7 @@ protected CompletionStage<Status.Status> doTestConnection(final TestConnection t
final Connection connectionToBeTested = testConnectionCommand.getConnection();
final ClientConfig clientConfig = connectivityConfig().getClientConfig();
return Patterns.ask(getTestConnectionHandler(connectionToBeTested),
jmsConnect(getSender(), connectionToBeTested), clientConfig.getTestingTimeout())
jmsConnect(getSender(), connectionToBeTested), clientConfig.getTestingTimeout())
// compose the disconnect because otherwise the actor hierarchy might be stopped too fast
.thenCompose(response -> {
logger.withCorrelationId(testConnectionCommand)
Expand All @@ -274,7 +280,7 @@ protected CompletionStage<Status.Status> doTestConnection(final TestConnection t
final JmsDisconnect jmsDisconnect = new JmsDisconnect(ActorRef.noSender(),
connectedJmsConnection, true);
return Patterns.ask(getDisconnectConnectionHandler(connectionToBeTested), jmsDisconnect,
clientConfig.getTestingTimeout())
clientConfig.getTestingTimeout())
// replace jmsDisconnected message with original response
.thenApply(jmsDisconnected -> response);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static org.apache.qpid.jms.provider.failover.FailoverProviderFactory.FAILOVER_OPTION_PREFIX;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -50,9 +52,11 @@ public final class AmqpSpecificConfig {
private final boolean failoverEnabled;
private final PlainCredentialsSupplier plainCredentialsSupplier;

private AmqpSpecificConfig(final Map<String, String> amqpParameters, final Map<String, String> jmsParameters,
private AmqpSpecificConfig(final Map<String, String> amqpParameters,
final Map<String, String> jmsParameters,
final boolean failoverEnabled,
final PlainCredentialsSupplier plainCredentialsSupplier) {

this.amqpParameters = Collections.unmodifiableMap(new LinkedHashMap<>(amqpParameters));
this.jmsParameters = Collections.unmodifiableMap(new LinkedHashMap<>(jmsParameters));
this.failoverEnabled = failoverEnabled;
Expand All @@ -66,11 +70,15 @@ private AmqpSpecificConfig(final Map<String, String> amqpParameters, final Map<S
* @param connection the connection.
* @param defaultConfig the default config values.
* @param plainCredentialsSupplier supplier of username-password credentials.
* @param doubleDecodingEnabled whether the username and password should be double decoded.
* @return the AMQP specific config.
*/
public static AmqpSpecificConfig withDefault(final String clientId, final Connection connection,
public static AmqpSpecificConfig withDefault(final String clientId,
final Connection connection,
final Map<String, String> defaultConfig,
final PlainCredentialsSupplier plainCredentialsSupplier) {
final PlainCredentialsSupplier plainCredentialsSupplier,
final boolean doubleDecodingEnabled) {

final var amqpParameters = new LinkedHashMap<>(filterForAmqpParameters(defaultConfig));
final Optional<UserPasswordCredentials> credentialsOptional = plainCredentialsSupplier.get(connection);
addSaslMechanisms(amqpParameters, credentialsOptional.isPresent());
Expand All @@ -79,7 +87,7 @@ public static AmqpSpecificConfig withDefault(final String clientId, final Connec

final var jmsParameters = new LinkedHashMap<>(filterForJmsParameters(defaultConfig));
addParameter(jmsParameters, CLIENT_ID, clientId);
credentialsOptional.ifPresent(credentials -> addCredentials(jmsParameters, credentials));
credentialsOptional.ifPresent(credentials -> addCredentials(jmsParameters, credentials, doubleDecodingEnabled));
addFailoverParameters(jmsParameters, connection);
addSpecificConfigParameters(jmsParameters, connection, AmqpSpecificConfig::isPermittedJmsConfig);

Expand Down Expand Up @@ -153,11 +161,26 @@ private static void addSaslMechanisms(final LinkedHashMap<String, String> parame
}

private static void addCredentials(final LinkedHashMap<String, String> parameters,
final UserPasswordCredentials credentials) {
addParameter(parameters, USERNAME, credentials.getUsername());
addParameter(parameters, PASSWORD, credentials.getPassword());
final UserPasswordCredentials credentials, final boolean doubleDecodingEnabled) {

final String username =
doubleDecodingEnabled ? tryDecodeUriComponent(credentials.getUsername()) : credentials.getUsername();
final String password =
doubleDecodingEnabled ? tryDecodeUriComponent(credentials.getPassword()) : credentials.getPassword();
addParameter(parameters, USERNAME, username);
addParameter(parameters, PASSWORD, password);
}

private static String tryDecodeUriComponent(final String string) {
try {
final String withoutPlus = string.replace("+", "%2B");
return URLDecoder.decode(withoutPlus, "UTF-8");
} catch (final IllegalArgumentException | UnsupportedEncodingException e) {
return string;
}
}


private static void addTransportParameters(final LinkedHashMap<String, String> parameters,
final Connection connection) {
if (isSecuredConnection(connection) && !connection.isValidateCertificates()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ public final class ConnectionBasedJmsConnectionFactory implements JmsConnectionF
private final Map<String, String> defaultConfig;
private final Supplier<SshTunnelState> sshTunnelConfigSupplier;
private final PlainCredentialsSupplier credentialsSupplier;
private final boolean doubleEncodingEnabled;

private ConnectionBasedJmsConnectionFactory(final Map<String, String> defaultConfig,
final Supplier<SshTunnelState> sshTunnelConfigSupplier,
final PlainCredentialsSupplier credentialsSupplier) {
final PlainCredentialsSupplier credentialsSupplier,
final boolean doubleEncodingEnabled) {

this.defaultConfig = checkNotNull(defaultConfig, "defaultConfig");
this.sshTunnelConfigSupplier = checkNotNull(sshTunnelConfigSupplier, "sshTunnelConfigSupplier");
this.credentialsSupplier = credentialsSupplier;
this.doubleEncodingEnabled = doubleEncodingEnabled;
}

/**
Expand All @@ -65,9 +69,13 @@ private ConnectionBasedJmsConnectionFactory(final Map<String, String> defaultCon
* @return the instance.
*/
public static ConnectionBasedJmsConnectionFactory getInstance(final Map<String, String> defaultConfig,
final Supplier<SshTunnelState> sshTunnelConfigSupplier, final ActorSystem actorSystem) {
final Supplier<SshTunnelState> sshTunnelConfigSupplier,
final ActorSystem actorSystem,
final boolean doubleEncodingEnabled) {

final PlainCredentialsSupplier credentialsSupplier = SaslPlainCredentialsSupplier.of(actorSystem);
return new ConnectionBasedJmsConnectionFactory(defaultConfig, sshTunnelConfigSupplier, credentialsSupplier);
return new ConnectionBasedJmsConnectionFactory(defaultConfig, sshTunnelConfigSupplier, credentialsSupplier,
doubleEncodingEnabled);
}

@Override
Expand All @@ -93,15 +101,19 @@ public JmsConnection createConnection(final Connection connection, final Excepti

private String buildAmqpConnectionUri(final Connection connection, final String clientId) {
return buildAmqpConnectionUri(connection, clientId, sshTunnelConfigSupplier, defaultConfig,
credentialsSupplier);
credentialsSupplier, doubleEncodingEnabled);
}

public static String buildAmqpConnectionUri(final Connection connection, final String clientId,
final Supplier<SshTunnelState> sshTunnelConfigSupplier, final Map<String, String> defaultConfig,
final PlainCredentialsSupplier plainCredentialsSupplier) {
public static String buildAmqpConnectionUri(final Connection connection,
final String clientId,
final Supplier<SshTunnelState> sshTunnelConfigSupplier,
final Map<String, String> defaultConfig,
final PlainCredentialsSupplier plainCredentialsSupplier,
final boolean doubleDecodingEnabled) {

final URI uri = sshTunnelConfigSupplier.get().getURI(connection);
final var amqpSpecificConfig = AmqpSpecificConfig.withDefault(clientId, connection, defaultConfig,
plainCredentialsSupplier);
plainCredentialsSupplier, doubleDecodingEnabled);
final var connectionUri = amqpSpecificConfig.render(uri.toString());
LOGGER.debug("[{}] URI: {}", clientId, connectionUri);
return connectionUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -39,8 +41,10 @@ final class KafkaAuthenticationSpecificConfig implements KafkaSpecificConfig {
private static final Map<String, String> SASL_MECHANISMS_WITH_LOGIN_MODULE = new HashMap<>();

@Nullable private static KafkaAuthenticationSpecificConfig instance;
private final boolean doubleDecodingEnabled;

private KafkaAuthenticationSpecificConfig() {
private KafkaAuthenticationSpecificConfig(final boolean doubleDecodingEnabled) {
this.doubleDecodingEnabled = doubleDecodingEnabled;
SASL_MECHANISMS_WITH_LOGIN_MODULE.put(PLAIN_SASL_MECHANISM,
"org.apache.kafka.common.security.plain.PlainLoginModule");
SASL_MECHANISMS_WITH_LOGIN_MODULE.put("SCRAM-SHA-256",
Expand All @@ -49,10 +53,10 @@ private KafkaAuthenticationSpecificConfig() {
"org.apache.kafka.common.security.scram.ScramLoginModule");
}

public static KafkaAuthenticationSpecificConfig getInstance() {
public static KafkaAuthenticationSpecificConfig getInstance(final boolean doubleDecodingEnabled) {
KafkaAuthenticationSpecificConfig result = instance;
if (null == result) {
result = new KafkaAuthenticationSpecificConfig();
result = new KafkaAuthenticationSpecificConfig(doubleDecodingEnabled);
instance = result;
}
return result;
Expand Down Expand Up @@ -89,8 +93,10 @@ private static boolean containsValidSaslMechanismConfiguration(final Connection
@Override
public Map<String, String> apply(final Connection connection) {

final Optional<String> username = connection.getUsername();
final Optional<String> password = connection.getPassword();
final Optional<String> username =
connection.getUsername().map(u -> doubleDecodingEnabled ? tryDecodeUriComponent(u) : u);
final Optional<String> password =
connection.getPassword().map(p -> doubleDecodingEnabled ? tryDecodeUriComponent(p) : p);
// chose to not use isApplicable() but directly check username and password since we need to Optional#get them.
if (isValid(connection) && username.isPresent() && password.isPresent()) {
final String saslMechanism = getSaslMechanismOrDefault(connection).toUpperCase();
Expand All @@ -103,6 +109,15 @@ public Map<String, String> apply(final Connection connection) {
return Collections.emptyMap();
}

private static String tryDecodeUriComponent(final String string) {
try {
final String withoutPlus = string.replace("+", "%2B");
return URLDecoder.decode(withoutPlus, StandardCharsets.UTF_8);
} catch (final IllegalArgumentException e) {
return string;
}
}

private static String getJaasConfig(final String loginModule, final String username, final String password) {
return String.format(JAAS_CONFIG_TEMPLATE, loginModule, username, password);
}
Expand Down
Loading

0 comments on commit 23e5477

Please sign in to comment.