Skip to content

Commit

Permalink
make connection decoding migration configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Aug 1, 2022
1 parent 48c3196 commit 8c404fe
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,17 @@ public interface ConnectionConfig extends WithSupervisorConfig, WithActivityChec

/**
* Whether usernames and passwords in connection URIs should be double decoded, when creating the connection.
*
* @return whether double decoding in enabled.
*/
boolean doubleDecodingEnabled();

/**
* @return whether connections that fail with single decoded passwords should be retried again with additional
* decoding and in case of success are automatically adapted.
*/
boolean doubleDecodingMigrationEnabled();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code ConnectionConfig}.
Expand Down Expand Up @@ -224,7 +231,13 @@ enum ConnectionConfigValue implements KnownConfigValue {
/**
* Whether double decoding of usernames and passwords in connection URIs is enabled.
*/
DOUBLE_DECODING_ENABLED("double-decoding-enabled", true);
DOUBLE_DECODING_ENABLED("double-decoding-enabled", true),

/**
* Indicates whether connections that fail with single decoded passwords should be retried again with additional
* decoding and in case of success are automatically adapted.
*/
DOUBLE_DECODING_MIGRATION_ENABLED("double-decoding-migration.enabled", true);

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public final class DefaultConnectionConfig implements ConnectionConfig {
private final Duration priorityUpdateInterval;
private final boolean allClientActorsOnOneNode;
private final boolean doubleDecodingEnabled;
private final boolean doubleDecodingMigrationEnabled;

private DefaultConnectionConfig(final ConfigWithFallback config) {
clientActorAskTimeout =
Expand Down Expand Up @@ -90,6 +91,7 @@ private DefaultConnectionConfig(final ConfigWithFallback config) {
priorityUpdateInterval =
config.getNonNegativeAndNonZeroDurationOrThrow(ConnectionConfigValue.PRIORITY_UPDATE_INTERVAL);
doubleDecodingEnabled = config.getBoolean(ConnectionConfigValue.DOUBLE_DECODING_ENABLED.getConfigPath());
doubleDecodingMigrationEnabled = config.getBoolean(ConnectionConfigValue.DOUBLE_DECODING_MIGRATION_ENABLED.getConfigPath());

}

Expand Down Expand Up @@ -222,6 +224,11 @@ public boolean doubleDecodingEnabled() {
return doubleDecodingEnabled;
}

@Override
public boolean doubleDecodingMigrationEnabled() {
return doubleDecodingMigrationEnabled;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -252,7 +259,8 @@ public boolean equals(final Object o) {
Objects.equals(ackLabelDeclareInterval, that.ackLabelDeclareInterval) &&
Objects.equals(priorityUpdateInterval, that.priorityUpdateInterval) &&
allClientActorsOnOneNode == that.allClientActorsOnOneNode &&
doubleDecodingEnabled == that.doubleDecodingEnabled;
doubleDecodingEnabled == that.doubleDecodingEnabled &&
doubleDecodingMigrationEnabled == that.doubleDecodingMigrationEnabled;
}

@Override
Expand All @@ -261,7 +269,7 @@ public int hashCode() {
blockedHostnames, blockedSubnets, blockedHostRegex, supervisorConfig, snapshotConfig,
acknowledgementConfig, cleanupConfig, maxNumberOfTargets, maxNumberOfSources, activityCheckConfig,
amqp10Config, amqp091Config, mqttConfig, kafkaConfig, httpPushConfig, ackLabelDeclareInterval,
priorityUpdateInterval, allClientActorsOnOneNode, doubleDecodingEnabled);
priorityUpdateInterval, allClientActorsOnOneNode, doubleDecodingEnabled, doubleDecodingMigrationEnabled);
}

@Override
Expand Down Expand Up @@ -289,6 +297,7 @@ public String toString() {
", priorityUpdateInterval=" + priorityUpdateInterval +
", allClientActorsOnOneNode=" + allClientActorsOnOneNode +
", doubleDecodingEnabled=" + doubleDecodingEnabled +
", doubleDecodingMigrationEnabled=" + doubleDecodingMigrationEnabled +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ public final class ConnectionPersistenceActor

private final UpdatedConnectionTester updatedConnectionTester;

private final boolean automaticConnectionDecodingMigrationEnabled;

ConnectionPersistenceActor(final ConnectionId connectionId,
final ActorRef proxyActor,
final ActorRef pubSubMediator,
Expand All @@ -214,6 +216,7 @@ public final class ConnectionPersistenceActor
connectivityConfig = getConnectivityConfigWithOverwrites(connectivityConfigOverwrites);
this.commandValidator = getCommandValidator(customCommandValidator);
final ConnectionConfig connectionConfig = this.connectivityConfig.getConnectionConfig();
automaticConnectionDecodingMigrationEnabled = connectionConfig.doubleDecodingMigrationEnabled();
this.allClientActorsOnOneNode = allClientActorsOnOneNode.orElse(connectionConfig.areAllClientActorsOnOneNode());
connectionPriorityProvider = connectionPriorityProviderFactory.newProvider(self(), log);
clientActorAskTimeout = connectionConfig.getClientActorAskTimeout();
Expand Down Expand Up @@ -802,7 +805,7 @@ private void openConnection(final StagedCommand command, final boolean ignoreErr
startAndAskClientActors(openConnection, getClientCount())
.thenAccept(successConsumer)
.exceptionally(error -> {
if (retry) {
if (retry && automaticConnectionDecodingMigrationEnabled) {
self().tell(new RetryOpenConnection(openConnection, error, ignoreErrors, command.getSender()),
ActorRef.noSender());
} else {
Expand Down

0 comments on commit 8c404fe

Please sign in to comment.