Skip to content

Commit

Permalink
eclipse-ditto#605 extract connection ids retrieval into separate acto…
Browse files Browse the repository at this point in the history
…r, add necessary config, add actor test, add unit for new retrieve command/response

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos authored and VadimGue committed Feb 18, 2021
1 parent 9f62048 commit 93741c5
Show file tree
Hide file tree
Showing 18 changed files with 732 additions and 88 deletions.
Expand Up @@ -186,7 +186,7 @@ This command returns the ids of all connections.

```json
{
"targetActorSelection": "/user/connectivityRoot/reconnect/singleton",
"targetActorSelection": "/user/connectivityRoot/connectionIdsRetrieval/singleton",
"headers": {
"aggregate": false
},
Expand Down
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.config;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.services.utils.config.KnownConfigValue;

/**
* Provides configuration settings for Connectivity service's behaviour for retrieval of connection ids.
*/
@Immutable
public interface ConnectionIdsRetrievalConfig {

/**
* Returns the number of events to read from the event journal with one query.
*
* @return the number of events to read with one query.
*/
int getReadJournalBatchSize();

/**
* Returns the number of entries to read from the snapshot collection with one query.
*
* @return the number of entries to read with one query.
*/
int getReadSnapshotBatchSize();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code ReconnectConfig}.
*/
enum ConnectionIdsRetrievalConfigValue implements KnownConfigValue {

/**
* The number of events to read in one query.
*/
READ_JOURNAL_BATCH_SIZE("read-journal-batch-size", 500),

/**
* The number of snapshots to read in one query.
*/
READ_SNAPSHOT_BATCH_SIZE("read-snapshot-batch-size", 50);

private final String path;
private final Object defaultValue;

// enum constructors are always private.
ConnectionIdsRetrievalConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}

@Override
public Object getDefaultValue() {
return defaultValue;
}

@Override
public String getConfigPath() {
return path;
}

}

}
Expand Up @@ -44,6 +44,13 @@ public interface ConnectivityConfig extends ServiceSpecificConfig, WithHealthChe
*/
ReconnectConfig getReconnectConfig();

/**
* Returns the config for Connectivity service's behaviour for retrieval of connection ids.
*
* @return the config.
*/
ConnectionIdsRetrievalConfig getConnectionIdsRetrievalConfig();

/**
* Returns the config for the Connectivity service's client.
*
Expand Down
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.config;

import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.services.utils.config.ConfigWithFallback;
import org.eclipse.ditto.services.utils.config.ScopedConfig;

import com.typesafe.config.Config;

/**
* This class is the default implementation of the config for the retrieval of connection ids.
*/
@Immutable
public final class DefaultConnectionIdsRetrievalConfig implements ConnectionIdsRetrievalConfig {

private static final String CONFIG_PATH = "connection-ids-retrieval";

private final int readJournalBatchSize;
private final int readSnapshotBatchSize;

private DefaultConnectionIdsRetrievalConfig(final ScopedConfig config) {
readJournalBatchSize = config.getInt(ConnectionIdsRetrievalConfigValue.READ_JOURNAL_BATCH_SIZE.getConfigPath());
readSnapshotBatchSize =
config.getInt(ConnectionIdsRetrievalConfigValue.READ_SNAPSHOT_BATCH_SIZE.getConfigPath());
}

/**
* Returns an instance of {@code DefaultReconnectConfig} based on the settings of the specified Config.
*
* @param config is supposed to provide the settings of the JavaScript mapping config at {@value #CONFIG_PATH}.
* @return the instance.
* @throws org.eclipse.ditto.services.utils.config.DittoConfigError if {@code config} is invalid.
*/
public static DefaultConnectionIdsRetrievalConfig of(final Config config) {
final ConfigWithFallback reconnectScopedConfig =
ConfigWithFallback.newInstance(config, CONFIG_PATH, ConnectionIdsRetrievalConfigValue.values());

return new DefaultConnectionIdsRetrievalConfig(reconnectScopedConfig);
}

@Override
public int getReadJournalBatchSize() {
return readJournalBatchSize;
}

@Override
public int getReadSnapshotBatchSize() {
return readSnapshotBatchSize;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final DefaultConnectionIdsRetrievalConfig that = (DefaultConnectionIdsRetrievalConfig) o;
return readJournalBatchSize == that.readJournalBatchSize && readSnapshotBatchSize == that.readSnapshotBatchSize;
}

@Override
public int hashCode() {
return Objects.hash(readJournalBatchSize, readSnapshotBatchSize);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"readJournalBatchSize=" + readJournalBatchSize +
", readSnapshotBatchSize=" + readSnapshotBatchSize +
"]";
}
}
Expand Up @@ -34,13 +34,11 @@ public final class DefaultReconnectConfig implements ReconnectConfig {
private final Duration interval;
private final RateConfig rateConfig;
private final int readJournalBatchSize;
private final int readSnapBatchSize;

private DefaultReconnectConfig(final ScopedConfig config, final RateConfig theRateConfig) {
initialDelay = config.getDuration(ReconnectConfigValue.INITIAL_DELAY.getConfigPath());
interval = config.getDuration(ReconnectConfigValue.INTERVAL.getConfigPath());
readJournalBatchSize = config.getInt(ReconnectConfigValue.READ_JOURNAL_BATCH_SIZE.getConfigPath());
readSnapBatchSize = config.getInt(ReconnectConfigValue.READ_SNAP_BATCH_SIZE.getConfigPath());
rateConfig = theRateConfig;
}

Expand Down Expand Up @@ -73,11 +71,6 @@ public int getReadJournalBatchSize() {
return readJournalBatchSize;
}

@Override
public int getReadSnapBatchSize() {
return readSnapBatchSize;
}

@Override
public RateConfig getRateConfig() {
return rateConfig;
Expand All @@ -95,13 +88,12 @@ public boolean equals(final Object o) {
return Objects.equals(initialDelay, that.initialDelay) &&
Objects.equals(interval, that.interval) &&
readJournalBatchSize == that.readJournalBatchSize &&
readSnapBatchSize == that.readSnapBatchSize &&
Objects.equals(rateConfig, that.rateConfig);
}

@Override
public int hashCode() {
return Objects.hash(initialDelay, interval, readJournalBatchSize, readSnapBatchSize, rateConfig);
return Objects.hash(initialDelay, interval, readJournalBatchSize, rateConfig);
}

@Override
Expand All @@ -110,7 +102,6 @@ public String toString() {
"initialDelay=" + initialDelay +
", interval=" + interval +
", readJournalBatchSize=" + readJournalBatchSize +
", readSnapBatchSize=" + readSnapBatchSize +
", rateConfig=" + rateConfig +
"]";
}
Expand Down
Expand Up @@ -52,6 +52,7 @@ public final class DittoConnectivityConfig implements ConnectivityConfig {
private final HealthCheckConfig healthCheckConfig;
private final ConnectionConfig connectionConfig;
private final ReconnectConfig reconnectConfig;
private final ConnectionIdsRetrievalConfig connectionIdsRetrievalConfig;
private final ClientConfig clientConfig;
private final ProtocolConfig protocolConfig;
private final MonitoringConfig monitoringConfig;
Expand All @@ -67,6 +68,7 @@ private DittoConnectivityConfig(final ScopedConfig dittoScopedConfig) {
protocolConfig = DefaultProtocolConfig.of(dittoScopedConfig);
connectionConfig = DefaultConnectionConfig.of(serviceSpecificConfig);
reconnectConfig = DefaultReconnectConfig.of(serviceSpecificConfig);
connectionIdsRetrievalConfig = DefaultConnectionIdsRetrievalConfig.of(serviceSpecificConfig);
clientConfig = DefaultClientConfig.of(serviceSpecificConfig);
monitoringConfig = DefaultMonitoringConfig.of(serviceSpecificConfig);
mappingConfig = DefaultMappingConfig.of(serviceSpecificConfig);
Expand Down Expand Up @@ -96,6 +98,11 @@ public ReconnectConfig getReconnectConfig() {
return reconnectConfig;
}

@Override
public ConnectionIdsRetrievalConfig getConnectionIdsRetrievalConfig() {
return connectionIdsRetrievalConfig;
}

@Override
public ClientConfig getClientConfig() {
return clientConfig;
Expand Down Expand Up @@ -177,6 +184,7 @@ public boolean equals(@Nullable final Object o) {
Objects.equals(healthCheckConfig, that.healthCheckConfig) &&
Objects.equals(connectionConfig, that.connectionConfig) &&
Objects.equals(reconnectConfig, that.reconnectConfig) &&
Objects.equals(connectionIdsRetrievalConfig, that.connectionIdsRetrievalConfig) &&
Objects.equals(clientConfig, that.clientConfig) &&
Objects.equals(protocolConfig, that.protocolConfig) &&
Objects.equals(monitoringConfig, that.monitoringConfig) &&
Expand All @@ -188,7 +196,7 @@ public boolean equals(@Nullable final Object o) {
@Override
public int hashCode() {
return Objects.hash(serviceSpecificConfig, persistenceOperationsConfig, mongoDbConfig, healthCheckConfig,
connectionConfig, reconnectConfig, clientConfig, protocolConfig,
connectionConfig, reconnectConfig, connectionIdsRetrievalConfig, clientConfig, protocolConfig,
monitoringConfig, mappingConfig, signalEnrichmentConfig, acknowledgementConfig);
}

Expand All @@ -201,6 +209,7 @@ public String toString() {
", healthCheckConfig=" + healthCheckConfig +
", connectionConfig=" + connectionConfig +
", reconnectConfig=" + reconnectConfig +
", connectionIdsRetrievalConfig=" + connectionIdsRetrievalConfig +
", clientConfig=" + clientConfig +
", protocolConfig=" + protocolConfig +
", monitoringConfig=" + monitoringConfig +
Expand Down
Expand Up @@ -45,13 +45,6 @@ public interface ReconnectConfig {
*/
int getReadJournalBatchSize();

/**
* Returns the number of entries to read from the snap collection with one query.
*
* @return the number of entries to read with one query.
*/
int getReadSnapBatchSize();

/**
* Returns the config for recovery throttling.
*
Expand All @@ -78,12 +71,7 @@ enum ReconnectConfigValue implements KnownConfigValue {
/**
* The number of events to read in one query.
*/
READ_JOURNAL_BATCH_SIZE("read-journal-batch-size", 500),

/**
* The number of events to read in one query.
*/
READ_SNAP_BATCH_SIZE("read-snap-batch-size", 50);
READ_JOURNAL_BATCH_SIZE("read-journal-batch-size", 500);

private final String path;
private final Object defaultValue;
Expand Down
@@ -0,0 +1,83 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.config;

import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import org.assertj.core.api.JUnitSoftAssertions;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import nl.jqno.equalsverifier.EqualsVerifier;

/**
* Unit test for {@link org.eclipse.ditto.services.connectivity.messaging.config.DefaultConnectionIdsRetrievalConfig}.
*/
public final class DefaultConnectionIdsRetrievalConfigTest {

private static Config connectionIdsConf;

@Rule
public final JUnitSoftAssertions softly = new JUnitSoftAssertions();

@BeforeClass
public static void initTestFixture() {
connectionIdsConf = ConfigFactory.load("connections-ids-retrieval-test");
}

@Test
public void assertImmutability() {
assertInstancesOf(DefaultConnectionIdsRetrievalConfig.class,
areImmutable());
}

@Test
public void testHashCodeAndEquals() {
EqualsVerifier.forClass(DefaultConnectionIdsRetrievalConfig.class)
.usingGetClass()
.verify();
}

@Test
public void underTestReturnsDefaultValuesIfBaseConfigWasEmpty() {
final ConnectionIdsRetrievalConfig underTest = DefaultConnectionIdsRetrievalConfig.of(ConfigFactory.empty());

softly.assertThat(underTest.getReadJournalBatchSize())
.as(ConnectionIdsRetrievalConfig.ConnectionIdsRetrievalConfigValue.READ_JOURNAL_BATCH_SIZE.getConfigPath())
.isEqualTo(
ConnectionIdsRetrievalConfig.ConnectionIdsRetrievalConfigValue.READ_JOURNAL_BATCH_SIZE.getDefaultValue());

softly.assertThat(underTest.getReadSnapshotBatchSize())
.as(ConnectionIdsRetrievalConfig.ConnectionIdsRetrievalConfigValue.READ_SNAPSHOT_BATCH_SIZE.getConfigPath())
.isEqualTo(
ConnectionIdsRetrievalConfig.ConnectionIdsRetrievalConfigValue.READ_SNAPSHOT_BATCH_SIZE.getDefaultValue());
}

@Test
public void underTestReturnsValuesOfConfigFile() {
final ConnectionIdsRetrievalConfig underTest = DefaultConnectionIdsRetrievalConfig.of(connectionIdsConf);

softly.assertThat(underTest.getReadJournalBatchSize())
.as(ConnectionIdsRetrievalConfig.ConnectionIdsRetrievalConfigValue.READ_JOURNAL_BATCH_SIZE.getConfigPath())
.isEqualTo(42);

softly.assertThat(underTest.getReadSnapshotBatchSize())
.as(ConnectionIdsRetrievalConfig.ConnectionIdsRetrievalConfigValue.READ_SNAPSHOT_BATCH_SIZE.getConfigPath())
.isEqualTo(21);
}
}

0 comments on commit 93741c5

Please sign in to comment.