Skip to content

Commit

Permalink
Implement config for toggling whether messages whose enrichment faile…
Browse files Browse the repository at this point in the history
…d should be published anyways

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Feb 21, 2022
1 parent 5980ae7 commit 09e297a
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 6 deletions.
Expand Up @@ -34,13 +34,15 @@ public final class DefaultMappingConfig implements MappingConfig {
private final int bufferSize;
private final int parallelism;
private final int maxPoolSize;
private final boolean publishFailedEnrichments;
private final JavaScriptConfig javaScriptConfig;
private final MapperLimitsConfig mapperLimitsConfig;

private DefaultMappingConfig(final ScopedConfig config) {
bufferSize = config.getNonNegativeIntOrThrow(MappingConfigValue.BUFFER_SIZE);
parallelism = config.getPositiveIntOrThrow(MappingConfigValue.PARALLELISM);
maxPoolSize = config.getPositiveIntOrThrow(MappingConfigValue.MAX_POOL_SIZE);
publishFailedEnrichments = config.getBoolean(MappingConfigValue.PUBLISH_FAILED_ENRICHMENTS.getConfigPath());
mapperLimitsConfig = DefaultMapperLimitsConfig.of(config);
javaScriptConfig = DefaultJavaScriptConfig.of(config);
}
Expand Down Expand Up @@ -74,6 +76,11 @@ public int getMaxPoolSize() {
return maxPoolSize;
}

@Override
public boolean getPublishFailedEnrichments() {
return publishFailedEnrichments;
}

@Override
public JavaScriptConfig getJavaScriptConfig() {
return javaScriptConfig;
Expand All @@ -96,13 +103,14 @@ public boolean equals(final Object o) {
return bufferSize == that.bufferSize &&
parallelism == that.parallelism &&
maxPoolSize == that.maxPoolSize &&
publishFailedEnrichments == that.publishFailedEnrichments &&
Objects.equals(javaScriptConfig, that.javaScriptConfig) &&
Objects.equals(mapperLimitsConfig, that.mapperLimitsConfig);
}

@Override
public int hashCode() {
return Objects.hash(bufferSize, parallelism, maxPoolSize, javaScriptConfig, mapperLimitsConfig);
return Objects.hash(bufferSize, parallelism, maxPoolSize, publishFailedEnrichments, javaScriptConfig, mapperLimitsConfig);
}

@Override
Expand All @@ -111,6 +119,7 @@ public String toString() {
"bufferSize=" + bufferSize +
", parallelism=" + parallelism +
", maxPoolSize=" + maxPoolSize +
", publishFailedEnrichments=" + publishFailedEnrichments +
", javaScriptConfig=" + javaScriptConfig +
", mapperLimitsConfig=" + mapperLimitsConfig +
"]";
Expand Down
Expand Up @@ -46,6 +46,11 @@ public interface MappingConfig {
*/
int getMaxPoolSize();

/**
* @return whether messages with failed enrichments should be published.
*/
boolean getPublishFailedEnrichments();

/**
* Returns the config of the JavaScript message mapping.
*
Expand Down Expand Up @@ -79,7 +84,12 @@ enum MappingConfigValue implements KnownConfigValue {
/**
* The maximum parallelism used for mapping inbound and outbound messages in mapping processor actor.
*/
MAX_POOL_SIZE("max-pool-size", 5);
MAX_POOL_SIZE("max-pool-size", 5),

/**
* If messages with failed enrichments should be published.
*/
PUBLISH_FAILED_ENRICHMENTS("publish-failed-enrichments", false);

private final String path;
private final Object defaultValue;
Expand Down
Expand Up @@ -415,6 +415,7 @@ private static Optional<EntityId> extractEntityId(Signal<?> signal) {
}

// Called inside future; must be thread-safe
@Nullable
private OutboundSignalWithSender recoverFromEnrichmentError(final OutboundSignalWithSender outboundSignal,
final Target target, final Throwable error) {

Expand All @@ -441,7 +442,11 @@ private OutboundSignalWithSender recoverFromEnrichmentError(final OutboundSignal
ConnectionFailure.internal(getSelf(), dittoRuntimeException, "Signal enrichment failed");
clientActor.tell(connectionFailure, getSelf());
}
return outboundSignal.setTargets(Collections.singletonList(target));
if (mappingConfig.getPublishFailedEnrichments()) {
return outboundSignal.setTargets(Collections.singletonList(target));
} else {
return null;
}
}

private void logEnrichmentFailure(final OutboundSignal outboundSignal, final DittoRuntimeException error) {
Expand Down
4 changes: 4 additions & 0 deletions connectivity/service/src/main/resources/connectivity.conf
Expand Up @@ -563,6 +563,10 @@ ditto {
max-pool-size = 5
max-pool-size = ${?CONNECTIVITY_MESSAGE_MAPPING_MAX_POOL_SIZE}

# Whether messages with failed enrichments should be published.
publish-failed-enrichments = false
publish-failed-enrichments = ${?CONNECTIVITY_MESSAGE_MAPPING_PUBLISH_FAILED_ENRICHMENTS}

javascript {
# the maximum script size in bytes of a mapping script to run
# prevents loading big JS dependencies into the script (e.g. jQuery which has ~250kB)
Expand Down
Expand Up @@ -19,7 +19,6 @@
import org.assertj.core.api.JUnitSoftAssertions;
import org.eclipse.ditto.connectivity.service.config.javascript.JavaScriptConfig;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -63,7 +62,7 @@ public void toStringContainsExpected() {

softly.assertThat(underTest.toString())
.contains(underTest.getClass().getSimpleName())
.contains("javaScriptConfig", "mapperLimitsConfig", "bufferSize", "parallelism", "maxPoolSize");
.contains("javaScriptConfig", "mapperLimitsConfig", "publishFailedEnrichments", "bufferSize", "parallelism", "maxPoolSize");
}

@Test
Expand All @@ -79,8 +78,12 @@ public void underTestReturnsValuesOfConfigFile() {
.isEqualTo(67890);

softly.assertThat(underTest.getMaxPoolSize())
.describedAs(MappingConfig.MappingConfigValue.PARALLELISM.getConfigPath())
.describedAs(MappingConfig.MappingConfigValue.MAX_POOL_SIZE.getConfigPath())
.isEqualTo(37);

softly.assertThat(underTest.getPublishFailedEnrichments())
.describedAs(MappingConfig.MappingConfigValue.PUBLISH_FAILED_ENRICHMENTS.getConfigPath())
.isEqualTo(true);
}

}
2 changes: 2 additions & 0 deletions connectivity/service/src/test/resources/mapping-test.conf
Expand Up @@ -6,6 +6,8 @@ mapping {

max-pool-size = 37

publish-failed-enrichments = true

javascript {
maxScriptSizeBytes = 42000
maxScriptExecutionTime = 815ms
Expand Down

0 comments on commit 09e297a

Please sign in to comment.