Skip to content

Commit

Permalink
Removed doubleDecodingEnabled and automaticConnectionDecodingMigratio…
Browse files Browse the repository at this point in the history
…nEnable
  • Loading branch information
SilviaGeorgievaLyoteva committed Sep 1, 2022
1 parent 5ec85bb commit 627ca6b
Show file tree
Hide file tree
Showing 14 changed files with 23 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,6 @@ public interface ConnectionConfig extends WithSupervisorConfig, WithActivityChec
*/
boolean areAllClientActorsOnOneNode();

/**
* Whether usernames and passwords in connection URIs should be double decoded, when creating the connection.
*
* @return whether double decoding in enabled.
*/
boolean doubleDecodingEnabled();

/**
* @return whether connections that fail with single decoded passwords should be retried again with additional
* decoding and in case of success are automatically adapted.
*/
boolean doubleDecodingMigrationEnabled();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code ConnectionConfig}.
Expand Down Expand Up @@ -226,18 +213,7 @@ enum ConnectionConfigValue implements KnownConfigValue {
/**
* How often the priority of a connection is getting updated.
*/
PRIORITY_UPDATE_INTERVAL("priority-update-interval", Duration.ofHours(24L)),

/**
* Whether double decoding of usernames and passwords in connection URIs is enabled.
*/
DOUBLE_DECODING_ENABLED("double-decoding-enabled", true),

/**
* Indicates whether connections that fail with single decoded passwords should be retried again with additional
* decoding and in case of success are automatically adapted.
*/
DOUBLE_DECODING_MIGRATION_ENABLED("double-decoding-migration.enabled", true);
PRIORITY_UPDATE_INTERVAL("priority-update-interval", Duration.ofHours(24L));

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ public final class DefaultConnectionConfig implements ConnectionConfig {
private final Duration ackLabelDeclareInterval;
private final Duration priorityUpdateInterval;
private final boolean allClientActorsOnOneNode;
private final boolean doubleDecodingEnabled;
private final boolean doubleDecodingMigrationEnabled;

private DefaultConnectionConfig(final ConfigWithFallback config) {
clientActorAskTimeout =
Expand Down Expand Up @@ -90,10 +88,6 @@ private DefaultConnectionConfig(final ConfigWithFallback config) {
config.getBoolean(ConnectionConfigValue.ALL_CLIENT_ACTORS_ON_ONE_NODE.getConfigPath());
priorityUpdateInterval =
config.getNonNegativeAndNonZeroDurationOrThrow(ConnectionConfigValue.PRIORITY_UPDATE_INTERVAL);
doubleDecodingEnabled = config.getBoolean(ConnectionConfigValue.DOUBLE_DECODING_ENABLED.getConfigPath());
doubleDecodingMigrationEnabled =
config.getBoolean(ConnectionConfigValue.DOUBLE_DECODING_MIGRATION_ENABLED.getConfigPath());

}

/**
Expand Down Expand Up @@ -220,16 +214,6 @@ public CleanupConfig getCleanupConfig() {
return cleanupConfig;
}

@Override
public boolean doubleDecodingEnabled() {
return doubleDecodingEnabled;
}

@Override
public boolean doubleDecodingMigrationEnabled() {
return doubleDecodingMigrationEnabled;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down Expand Up @@ -259,9 +243,7 @@ public boolean equals(final Object o) {
Objects.equals(maxNumberOfSources, that.maxNumberOfSources) &&
Objects.equals(ackLabelDeclareInterval, that.ackLabelDeclareInterval) &&
Objects.equals(priorityUpdateInterval, that.priorityUpdateInterval) &&
allClientActorsOnOneNode == that.allClientActorsOnOneNode &&
doubleDecodingEnabled == that.doubleDecodingEnabled &&
doubleDecodingMigrationEnabled == that.doubleDecodingMigrationEnabled;
allClientActorsOnOneNode == that.allClientActorsOnOneNode;
}

@Override
Expand All @@ -270,8 +252,7 @@ public int hashCode() {
blockedHostnames, blockedSubnets, blockedHostRegex, supervisorConfig, snapshotConfig,
acknowledgementConfig, cleanupConfig, maxNumberOfTargets, maxNumberOfSources, activityCheckConfig,
amqp10Config, amqp091Config, mqttConfig, kafkaConfig, httpPushConfig, ackLabelDeclareInterval,
priorityUpdateInterval, allClientActorsOnOneNode, doubleDecodingEnabled,
doubleDecodingMigrationEnabled);
priorityUpdateInterval, allClientActorsOnOneNode);
}

@Override
Expand All @@ -298,8 +279,6 @@ public String toString() {
", ackLabelDeclareInterval=" + ackLabelDeclareInterval +
", priorityUpdateInterval=" + priorityUpdateInterval +
", allClientActorsOnOneNode=" + allClientActorsOnOneNode +
", doubleDecodingEnabled=" + doubleDecodingEnabled +
", doubleDecodingMigrationEnabled=" + doubleDecodingMigrationEnabled +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private AmqpClientActor(final Connection connection,
final Amqp10Config amqp10Config = connectionConfig.getAmqp10Config();
jmsConnectionFactory =
ConnectionBasedJmsConnectionFactory.getInstance(AmqpSpecificConfig.toDefaultConfig(amqp10Config),
this::getSshTunnelState, getContext().getSystem(), connectionConfig.doubleDecodingEnabled());
this::getSshTunnelState, getContext().getSystem());
connectionListener = new StatusReportingListener(getSelf(), logger, connectionLogger);
consumerByNamePrefix = new HashMap<>();
recoverSessionOnSessionClosed = isRecoverSessionOnSessionClosedEnabled(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ private ConnectionBasedJmsConnectionFactory(final Map<String, String> defaultCon
*/
public static ConnectionBasedJmsConnectionFactory getInstance(final Map<String, String> defaultConfig,
final Supplier<SshTunnelState> sshTunnelConfigSupplier,
final ActorSystem actorSystem,
final boolean doubleEncodingEnabled) {
final ActorSystem actorSystem) {

final PlainCredentialsSupplier credentialsSupplier = SaslPlainCredentialsSupplier.of(actorSystem);
return new ConnectionBasedJmsConnectionFactory(defaultConfig, sshTunnelConfigSupplier, credentialsSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ final class KafkaAuthenticationSpecificConfig implements KafkaSpecificConfig {
private static final Map<String, String> SASL_MECHANISMS_WITH_LOGIN_MODULE = new HashMap<>();

@Nullable private static KafkaAuthenticationSpecificConfig instance;
private final boolean doubleDecodingEnabled;

private KafkaAuthenticationSpecificConfig(final boolean doubleDecodingEnabled) {
this.doubleDecodingEnabled = doubleDecodingEnabled;
private KafkaAuthenticationSpecificConfig() {
SASL_MECHANISMS_WITH_LOGIN_MODULE.put(PLAIN_SASL_MECHANISM,
"org.apache.kafka.common.security.plain.PlainLoginModule");
SASL_MECHANISMS_WITH_LOGIN_MODULE.put("SCRAM-SHA-256",
Expand All @@ -51,10 +49,10 @@ private KafkaAuthenticationSpecificConfig(final boolean doubleDecodingEnabled) {
"org.apache.kafka.common.security.scram.ScramLoginModule");
}

public static KafkaAuthenticationSpecificConfig getInstance(final boolean doubleDecodingEnabled) {
public static KafkaAuthenticationSpecificConfig getInstance() {
KafkaAuthenticationSpecificConfig result = instance;
if (null == result) {
result = new KafkaAuthenticationSpecificConfig(doubleDecodingEnabled);
result = new KafkaAuthenticationSpecificConfig();
instance = result;
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ private KafkaClientActor(final Connection connection,
super(connection, commandForwarderActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
kafkaConfig = connectivityConfig().getConnectionConfig().getKafkaConfig();
kafkaConsumerActors = new ArrayList<>();
propertiesFactory = PropertiesFactory.newInstance(connection, kafkaConfig, getClientId(connection.getId()),
connectivityConfig().getConnectionConfig().doubleDecodingEnabled());
propertiesFactory = PropertiesFactory.newInstance(connection, kafkaConfig, getClientId(connection.getId()));
this.publisherActorFactory = publisherActorFactory;
pendingStatusReportsFromStreams = new HashSet<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public final class KafkaValidator extends AbstractProtocolValidator {

private final Collection<KafkaSpecificConfig> specificConfigs;

private KafkaValidator(final boolean doubleDecodingEnabled) {
private KafkaValidator() {
super();
specificConfigs = List.of(KafkaAuthenticationSpecificConfig.getInstance(doubleDecodingEnabled),
specificConfigs = List.of(KafkaAuthenticationSpecificConfig.getInstance(),
KafkaBootstrapServerSpecificConfig.getInstance(),
KafkaConsumerGroupSpecificConfig.getInstance(),
KafkaConsumerOffsetResetSpecificConfig.getInstance());
Expand All @@ -73,13 +73,12 @@ private KafkaValidator(final boolean doubleDecodingEnabled) {
/**
* Returns an instance of the Kafka validator.
*
* @param doubleDecodingEnabled whether username and password should get double decoded.
* @return the instance.
*/
public static KafkaValidator getInstance(final boolean doubleDecodingEnabled) {
public static KafkaValidator getInstance() {
KafkaValidator result = instance;
if (null == result) {
result = new KafkaValidator(doubleDecodingEnabled);
result = new KafkaValidator();
instance = result;
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,16 @@ final class PropertiesFactory {
private final KafkaConfig config;
private final String clientId;
private final String bootstrapServers;
private final boolean doubleDecodingEnabled;

private PropertiesFactory(final Connection connection,
final KafkaConfig config,
final String clientId,
final boolean doubleDecodingEnabled) {
final String clientId) {

this.connection = checkNotNull(connection, "connection");
this.config = checkNotNull(config, "config");
this.clientId = checkNotNull(clientId, "clientId");
bootstrapServers = KafkaBootstrapServerSpecificConfig.getInstance().getBootstrapServers(connection);
this.doubleDecodingEnabled = doubleDecodingEnabled;
commonSpecificConfigs = List.of(KafkaAuthenticationSpecificConfig.getInstance(doubleDecodingEnabled),
commonSpecificConfigs = List.of(KafkaAuthenticationSpecificConfig.getInstance(),
KafkaBootstrapServerSpecificConfig.getInstance());
consumerSpecificConfigs = getConsumerSpecificConfigs(commonSpecificConfigs);
producerSpecificConfigs = List.copyOf(commonSpecificConfigs);
Expand All @@ -87,10 +84,9 @@ private static Collection<KafkaSpecificConfig> getConsumerSpecificConfigs(
*/
static PropertiesFactory newInstance(final Connection connection,
final KafkaConfig config,
final String clientId,
final boolean doubleDecodingEnabled) {
final String clientId) {

return new PropertiesFactory(connection, config, clientId, doubleDecodingEnabled);
return new PropertiesFactory(connection, config, clientId);
}

/**
Expand Down Expand Up @@ -160,7 +156,7 @@ private Map<String, String> getSecurityProtocolProperties() {

private boolean isConnectionAuthenticated() {
final KafkaSpecificConfig authenticationSpecificConfig =
KafkaAuthenticationSpecificConfig.getInstance(doubleDecodingEnabled);
KafkaAuthenticationSpecificConfig.getInstance();
return authenticationSpecificConfig.isApplicable(connection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ private static Optional<Mqtt3SimpleAuth> getMqtt3SimpleAuth(
private static Optional<SimpleAuthCredentials> getSimpleAuthCredentials(
final HiveMqttClientProperties hiveMqttClientProperties
) {
final var doubleDecodingEnabled = isDoubleDecodingEnabled(hiveMqttClientProperties);
final var mqttConnection = hiveMqttClientProperties.getMqttConnection();

return mqttConnection.getUsername()
Expand All @@ -211,11 +210,6 @@ private static Optional<SimpleAuthCredentials> getSimpleAuthCredentials(
);
}

private static boolean isDoubleDecodingEnabled(final HiveMqttClientProperties hiveMqttClientProperties) {
final var connectivityConfig = hiveMqttClientProperties.getConnectivityConfig();
final var connectionConfig = connectivityConfig.getConnectionConfig();
return connectionConfig.doubleDecodingEnabled();
}

/**
* Creates a {@link Mqtt5Client}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,6 @@ public final class ConnectionPersistenceActor

private final UpdatedConnectionTester updatedConnectionTester;

private final boolean automaticConnectionDecodingMigrationEnabled;

ConnectionPersistenceActor(final ConnectionId connectionId,
final ActorRef commandForwarderActor,
final ActorRef pubSubMediator,
Expand All @@ -218,7 +216,6 @@ public final class ConnectionPersistenceActor
connectivityConfig = getConnectivityConfigWithOverwrites(connectivityConfigOverwrites);
commandValidator = getCommandValidator();
final ConnectionConfig connectionConfig = connectivityConfig.getConnectionConfig();
automaticConnectionDecodingMigrationEnabled = connectionConfig.doubleDecodingMigrationEnabled();
this.allClientActorsOnOneNode = allClientActorsOnOneNode.orElse(connectionConfig.areAllClientActorsOnOneNode());
connectionPriorityProvider = ConnectionPriorityProviderFactory.get(actorSystem, dittoExtensionConfig)
.newProvider(self(), log);
Expand Down Expand Up @@ -848,7 +845,7 @@ private void openConnection(final StagedCommand command, final boolean ignoreErr
startAndAskClientActors(openConnection, getClientCount())
.thenAccept(successConsumer)
.exceptionally(error -> {
if (retry && automaticConnectionDecodingMigrationEnabled) {
if (retry) {
self().tell(new RetryOpenConnection(openConnection, error, ignoreErrors, command.getSender()),
ActorRef.noSender());
} else {
Expand Down Expand Up @@ -1205,7 +1202,7 @@ private ConnectivityCommandInterceptor getCommandValidator() {
AmqpValidator.newInstance(),
Mqtt3Validator.newInstance(mqttConfig),
Mqtt5Validator.newInstance(mqttConfig),
KafkaValidator.getInstance(connectivityConfig.getConnectionConfig().doubleDecodingEnabled()),
KafkaValidator.getInstance(),
HttpPushValidator.newInstance(connectivityConfig.getConnectionConfig().getHttpPushConfig()));

final DittoConnectivityCommandValidator dittoCommandValidator =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ public void underTestReturnsValuesOfConfigFile() {
softly.assertThat(underTest.areAllClientActorsOnOneNode())
.as(ConnectionConfig.ConnectionConfigValue.ALL_CLIENT_ACTORS_ON_ONE_NODE.getConfigPath())
.isEqualTo(true);

softly.assertThat(underTest.doubleDecodingEnabled())
.as(ConnectionConfig.ConnectionConfigValue.DOUBLE_DECODING_ENABLED.getConfigPath())
.isEqualTo(false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static void initTestFixture() {

@Before
public void setUp() {
underTest = KafkaAuthenticationSpecificConfig.getInstance(true);
underTest = KafkaAuthenticationSpecificConfig.getInstance();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void tearDown() {

@Before
public void setUp() {
underTest = KafkaValidator.getInstance(true);
underTest = KafkaValidator.getInstance();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void initTestFixture() {

@Before
public void setUp() {
underTest = PropertiesFactory.newInstance(connection, kafkaConfig, UUID.randomUUID().toString(), true);
underTest = PropertiesFactory.newInstance(connection, kafkaConfig, UUID.randomUUID().toString());
}

@Test
Expand Down

0 comments on commit 627ca6b

Please sign in to comment.