Skip to content

Commit

Permalink
Remove connection credentials double decoding configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
SilviaGeorgievaLyoteva committed Aug 30, 2022
1 parent c80fd7c commit c3286d9
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,34 +136,17 @@ public interface Connection extends Jsonifiable.WithFieldSelectorAndPredicate<Js
* Returns the username part of the URI of this {@code Connection}.
*
* @return the username.
* @deprecated since 2.4.0 use {@link #getUsername(boolean)} instead.
*/
Optional<String> getUsername();

/**
* Returns the username part of the URI of this {@code Connection}.
*
* @param shouldUriDecode whether the username should be URI-decoded.
* @return the username.
*/
Optional<String> getUsername(boolean shouldUriDecode);
Optional<String> getUsername();

/**
* Returns the password part of the URI of this {@code Connection}.
*
* @return the password.
* @deprecated since 2.4.0 use {@link #getPassword(boolean)} instead.
*/
Optional<String> getPassword();

/**
* Returns the password part of the URI of this {@code Connection}.
*
* @param shouldUriDecode whether the password should be URI-decoded.
* @return the password.
*/
Optional<String> getPassword(boolean shouldUriDecode);

/**
* Returns the hostname part of the URI of this {@code Connection}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,33 +322,12 @@ public String getProtocol() {

@Override
public Optional<String> getUsername() {
return getUsername(true);
}

@Override
public Optional<String> getUsername(final boolean shouldUriDecode) {
final Optional<String> username = uri.getUserName();
return shouldUriDecode ? username.map(ImmutableConnection::tryDecodeUriComponent) : username;
return uri.getUserName();
}

@Override
public Optional<String> getPassword() {
return getPassword(true);
}

@Override
public Optional<String> getPassword(final boolean shouldUriDecode) {
final Optional<String> password = uri.getPassword();
return shouldUriDecode ? password.map(ImmutableConnection::tryDecodeUriComponent) : password;
}

private static String tryDecodeUriComponent(final String string) {
try {
final String withoutPlus = string.replace("+", "%2B");
return URLDecoder.decode(withoutPlus, "UTF-8");
} catch (final IllegalArgumentException | UnsupportedEncodingException e) {
return string;
}
return uri.getPassword();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
import org.eclipse.ditto.connectivity.service.config.ClientConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectionConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor;
import org.eclipse.ditto.connectivity.service.messaging.BaseClientData;
import org.eclipse.ditto.connectivity.service.messaging.amqp.status.ConnectionFailureStatusReport;
Expand Down Expand Up @@ -183,7 +182,7 @@ public static Props props(final Connection connection, final ActorRef commandFor
final ActorRef connectionActor, final Config configOverwrites, final ActorSystem actorSystem,
final DittoHeaders dittoHeaders) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem, configOverwrites),
return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem),
commandForwarderActor, connectionActor, configOverwrites, dittoHeaders);
}

Expand All @@ -201,23 +200,18 @@ static Props propsForTest(final Connection connection, @Nullable final ActorRef
final ActorRef connectionActor, final JmsConnectionFactory jmsConnectionFactory,
final ActorSystem actorSystem) {

return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem, ConfigFactory.empty()),
return Props.create(AmqpClientActor.class, validateConnection(connection, actorSystem),
jmsConnectionFactory, commandForwarderActor, connectionActor, DittoHeaders.empty());
}

private static Connection validateConnection(final Connection connection, final ActorSystem actorSystem,
final Config configOverwrites) {
private static Connection validateConnection(final Connection connection, final ActorSystem actorSystem) {
try {
final Config withOverwrites = configOverwrites.withFallback(actorSystem.settings().config());
final ConnectivityConfig connectivityConfig = ConnectivityConfig.of(withOverwrites);

final String connectionUri = ConnectionBasedJmsConnectionFactory.buildAmqpConnectionUri(connection,
final String connectionUri = ConnectionBasedJmsConnectionFactory.buildAmqpConnectionUri(connection,
connection.getId().toString(),
// fake established tunnel state for uri validation
() -> SshTunnelState.from(connection).established(22222),
Map.of(),
SaslPlainCredentialsSupplier.of(actorSystem),
connectivityConfig.getConnectionConfig().doubleDecodingEnabled());
SaslPlainCredentialsSupplier.of(actorSystem));
ProviderFactory.create(URI.create(connectionUri));
// it is safe to pass an empty map as default config as only default values are loaded via that config
// of which we can be certain that they are always valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,15 @@ private AmqpSpecificConfig(final Map<String, String> amqpParameters,
* @param connection the connection.
* @param defaultConfig the default config values.
* @param plainCredentialsSupplier supplier of username-password credentials.
* @param doubleDecodingEnabled whether the username and password should be double decoded.
* @return the AMQP specific config.
*/
public static AmqpSpecificConfig withDefault(final String clientId,
final Connection connection,
final Map<String, String> defaultConfig,
final PlainCredentialsSupplier plainCredentialsSupplier,
final boolean doubleDecodingEnabled) {
final PlainCredentialsSupplier plainCredentialsSupplier) {

final var amqpParameters = new LinkedHashMap<>(filterForAmqpParameters(defaultConfig));
final Optional<UserPasswordCredentials> credentialsOptional = plainCredentialsSupplier.get(connection, doubleDecodingEnabled);
final Optional<UserPasswordCredentials> credentialsOptional = plainCredentialsSupplier.get(connection);
addSaslMechanisms(amqpParameters, credentialsOptional.isPresent());
addTransportParameters(amqpParameters, connection);
addSpecificConfigParameters(amqpParameters, connection, AmqpSpecificConfig::isPermittedAmqpConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,14 @@ public final class ConnectionBasedJmsConnectionFactory implements JmsConnectionF
private final Map<String, String> defaultConfig;
private final Supplier<SshTunnelState> sshTunnelConfigSupplier;
private final PlainCredentialsSupplier credentialsSupplier;
private final boolean doubleEncodingEnabled;

private ConnectionBasedJmsConnectionFactory(final Map<String, String> defaultConfig,
final Supplier<SshTunnelState> sshTunnelConfigSupplier,
final PlainCredentialsSupplier credentialsSupplier,
final boolean doubleEncodingEnabled) {
final PlainCredentialsSupplier credentialsSupplier) {

this.defaultConfig = checkNotNull(defaultConfig, "defaultConfig");
this.sshTunnelConfigSupplier = checkNotNull(sshTunnelConfigSupplier, "sshTunnelConfigSupplier");
this.credentialsSupplier = credentialsSupplier;
this.doubleEncodingEnabled = doubleEncodingEnabled;
}

/**
Expand All @@ -74,8 +71,7 @@ public static ConnectionBasedJmsConnectionFactory getInstance(final Map<String,
final boolean doubleEncodingEnabled) {

final PlainCredentialsSupplier credentialsSupplier = SaslPlainCredentialsSupplier.of(actorSystem);
return new ConnectionBasedJmsConnectionFactory(defaultConfig, sshTunnelConfigSupplier, credentialsSupplier,
doubleEncodingEnabled);
return new ConnectionBasedJmsConnectionFactory(defaultConfig, sshTunnelConfigSupplier, credentialsSupplier);
}

@Override
Expand All @@ -101,19 +97,18 @@ public JmsConnection createConnection(final Connection connection, final Excepti

private String buildAmqpConnectionUri(final Connection connection, final String clientId) {
return buildAmqpConnectionUri(connection, clientId, sshTunnelConfigSupplier, defaultConfig,
credentialsSupplier, doubleEncodingEnabled);
credentialsSupplier);
}

public static String buildAmqpConnectionUri(final Connection connection,
final String clientId,
final Supplier<SshTunnelState> sshTunnelConfigSupplier,
final Map<String, String> defaultConfig,
final PlainCredentialsSupplier plainCredentialsSupplier,
final boolean doubleDecodingEnabled) {
final PlainCredentialsSupplier plainCredentialsSupplier) {

final URI uri = sshTunnelConfigSupplier.get().getURI(connection);
final var amqpSpecificConfig = AmqpSpecificConfig.withDefault(clientId, connection, defaultConfig,
plainCredentialsSupplier, doubleDecodingEnabled);
plainCredentialsSupplier);
final var connectionUri = amqpSpecificConfig.render(uri.toString());
LOGGER.debug("[{}] URI: {}", clientId, connectionUri);
return connectionUri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ public interface PlainCredentialsSupplier {
* Get the username-password credentials of a connection.
*
* @param connection the connection.
* @param doubleDecodingEnabled whether double decoding of usernames and passwords is enabled.
* @return the optional credentials.
*/
Optional<UserPasswordCredentials> get(Connection connection, boolean doubleDecodingEnabled);
Optional<UserPasswordCredentials> get(Connection connection);

/**
* Remove userinfo from a connection URI.
Expand All @@ -50,8 +49,8 @@ default String getUriWithoutUserinfo(final String uri) {
* @return the URI.
*/
static PlainCredentialsSupplier fromUri() {
return (connection, doubleDecodingEnabled) -> connection.getUsername(doubleDecodingEnabled).flatMap(username ->
connection.getPassword(doubleDecodingEnabled)
return (connection) -> connection.getUsername().flatMap(username ->
connection.getPassword()
.map(password -> UserPasswordCredentials.newInstance(username, password)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ public static PlainCredentialsSupplier of(final ActorSystem actorSystem) {
}

@Override
public Optional<UserPasswordCredentials> get(final Connection connection, final boolean doubleDecodingEnabled) {
public Optional<UserPasswordCredentials> get(final Connection connection) {
final Optional<Credentials> optionalCredentials = connection.getCredentials();
if (optionalCredentials.isPresent()) {
final var credentials = optionalCredentials.get();
final var requestSigning = credentials.accept(amqpConnectionSigningExtension);
return requestSigning.createSignedCredentials().or(() -> FROM_URI.get(connection, doubleDecodingEnabled));
return requestSigning.createSignedCredentials().or(() -> FROM_URI.get(connection));
}
return FROM_URI.get(connection, doubleDecodingEnabled);
return FROM_URI.get(connection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static KafkaAuthenticationSpecificConfig getInstance(final boolean double

@Override
public boolean isApplicable(final Connection connection) {
return connection.getUsername(false).isPresent() && connection.getPassword(false).isPresent();
return connection.getUsername().isPresent() && connection.getPassword().isPresent();
}

@Override
Expand Down Expand Up @@ -91,8 +91,8 @@ private static boolean containsValidSaslMechanismConfiguration(final Connection
@Override
public Map<String, String> apply(final Connection connection) {

final Optional<String> username = connection.getUsername(doubleDecodingEnabled);
final Optional<String> password = connection.getPassword(doubleDecodingEnabled);
final Optional<String> username = connection.getUsername();
final Optional<String> password = connection.getPassword();
// chose to not use isApplicable() but directly check username and password since we need to Optional#get them.
if (isValid(connection) && username.isPresent() && password.isPresent()) {
final String saslMechanism = getSaslMechanismOrDefault(connection).toUpperCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ private static Optional<SimpleAuthCredentials> getSimpleAuthCredentials(
final var doubleDecodingEnabled = isDoubleDecodingEnabled(hiveMqttClientProperties);
final var mqttConnection = hiveMqttClientProperties.getMqttConnection();

return mqttConnection.getUsername(doubleDecodingEnabled)
.flatMap(username -> mqttConnection.getPassword(doubleDecodingEnabled)
return mqttConnection.getUsername()
.flatMap(username -> mqttConnection.getPassword()
.map(pw -> pw.getBytes(StandardCharsets.UTF_8))
.map(pwBytes -> new SimpleAuthCredentials(username, pwBytes))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ private void interpretStagedCommand(final StagedCommand command) {
@Override
protected Receive matchAnyAfterInitialization() {
return ReceiveBuilder.create()
.match(RetryOpenConnection.class, this::retryOpenConnectionWithAdaptedEntity)
// .match(RetryOpenConnection.class, this::retryOpenConnectionWithAdaptedEntity)
// CreateSubscription is a ThingSearchCommand, but it is created in InboundDispatchingSink from an
// adaptable and directly sent to this actor:
.match(CreateSubscription.class, this::startThingSearchSession)
Expand Down Expand Up @@ -1198,7 +1198,7 @@ private void restoreOpenConnection() {
private void retryOpenConnectionWithAdaptedEntity(final RetryOpenConnection retryOpenConnection) {
stopClientActors();
if (entity != null) {
final Optional<String> passwordOptional = entity.getPassword(false);
final Optional<String> passwordOptional = entity.getPassword();
if (passwordOptional.isPresent()) {
final String oldUri = entity.getUri();
final URI uri;
Expand All @@ -1211,7 +1211,7 @@ private void retryOpenConnectionWithAdaptedEntity(final RetryOpenConnection retr
}
final var oldUserNameAndPassword = uri.getRawUserInfo();
final var newUserNameAndPassword =
entity.getUsername(false).orElseThrow() + ":" + passwordOptional.orElseThrow();
entity.getUsername().orElseThrow() + ":" + passwordOptional.orElseThrow();
final var newUri = entity.getUri().replace(oldUserNameAndPassword, newUserNameAndPassword);
if (newUri.equals(oldUri)) {
handleOpenConnectionError(retryOpenConnection.error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,45 @@ public final class AmqpSpecificConfigTest {
@Test
public void decodeDoublyEncodedUsernameAndPasswordDouble() {
final var uri = "amqps://%2525u%2525s%2525e%2525r:%2525p%2525a%2525%252Bs%2525s@localhost:1234/";
testDecoding(uri, "%25u%25s%25e%25r", "%25p%25a%25%2Bs%25s", true);
testDecoding(uri, "%25u%25s%25e%25r", "%25p%25a%25%2Bs%25s");
}

@Test
public void decodeDoublyEncodedUsernameAndPasswordSingle() {
final var uri = "amqps://%2525u%2525s%2525e%2525r:%2525p%2525a%2525%252Bs%2525s@localhost:1234/";
testDecoding(uri, "%2525u%2525s%2525e%2525r", "%2525p%2525a%2525%252Bs%2525s", false);
testDecoding(uri, "%2525u%2525s%2525e%2525r", "%2525p%2525a%2525%252Bs%2525s");
}


@Test
public void decodeSinglyEncodedUsernameAndPasswordContainingPercentageSign() {
final var uri = "amqps://%25u%25s%25e%25r:%25p%25a%25%2Bs%25s@localhost:1234/";
testDecoding(uri, "%25u%25s%25e%25r", "%25p%25a%25%2Bs%25s", false);
testDecoding(uri, "%25u%25s%25e%25r", "%25p%25a%25%2Bs%25s");
}

@Test
public void decodeSinglyEncodedUsernameAndPasswordDouble() {
final var uri = "amqps://user:pa%2Bss@localhost:1234/";
testDecoding(uri, "user", "pa%2Bss", true);
testDecoding(uri, "user", "pa%2Bss");
}

@Test
public void decodeSinglyEncodedUsernameAndPasswordSingle() {
final var uri = "amqps://user:pa%2Bss@localhost:1234/";
testDecoding(uri, "user", "pa%2Bss", false);
testDecoding(uri, "user", "pa%2Bss");
}

private static void testDecoding(final String uri,
final String expectedUsername,
final String expectedPassword,
final boolean doubleDecodingEnabled) {
final String expectedPassword) {

final var connection = TestConstants.createConnection()
.toBuilder()
.uri(uri)
.build();

final var underTest = AmqpSpecificConfig.withDefault("CID", connection, Map.of(),
PlainCredentialsSupplier.fromUri(), doubleDecodingEnabled);
PlainCredentialsSupplier.fromUri());

assertThat(underTest.render(uri))
.isEqualTo("failover:(amqps://localhost:1234/?amqp.saslMechanisms=PLAIN)" +
Expand All @@ -90,7 +89,7 @@ public void appendDefaultParameters() {
final var defaultConfig = AmqpSpecificConfig.toDefaultConfig(amqp10Config);

final var underTest = AmqpSpecificConfig.withDefault("CID", connection, defaultConfig,
PlainCredentialsSupplier.fromUri(), true);
PlainCredentialsSupplier.fromUri());

assertThat(underTest.render("amqps://localhost:1234/"))
.isEqualTo("failover:(amqps://localhost:1234/?amqp.saslMechanisms=PLAIN)" +
Expand All @@ -107,7 +106,7 @@ public void appendDefaultParameters() {
public void withoutFailover() {
final var connection = TestConstants.createConnection().toBuilder().failoverEnabled(false).build();
final var underTest = AmqpSpecificConfig.withDefault("CID", connection, Map.of(),
PlainCredentialsSupplier.fromUri(), true);
PlainCredentialsSupplier.fromUri());
assertThat(underTest.render("amqps://localhost:1234/"))
.isEqualTo("amqps://localhost:1234/?amqp.saslMechanisms=PLAIN&jms.clientID=CID" +
"&jms.username=username&jms.password=password");
Expand All @@ -117,10 +116,10 @@ public void withoutFailover() {
public void withPlainCredentials() {
final UserPasswordCredentials credentials = UserPasswordCredentials.newInstance("foo", "bar");
final PlainCredentialsSupplier plainCredentialsSupplier =
(connection, doubleDecodingEnabled) -> Optional.of(credentials);
(connection) -> Optional.of(credentials);
final Connection connection = TestConstants.createConnection();
final AmqpSpecificConfig underTest = AmqpSpecificConfig.withDefault("CID", connection, Map.of(),
plainCredentialsSupplier, true);
plainCredentialsSupplier);
assertThat(underTest.render("amqps://localhost:1234/"))
.isEqualTo("failover:(amqps://localhost:1234/?amqp.saslMechanisms=PLAIN)" +
"?jms.clientID=CID&jms.username=foo&jms.password=bar" +
Expand Down
Loading

0 comments on commit c3286d9

Please sign in to comment.