Skip to content
Permalink
Browse files

Merge pull request #2637 from OpenNMS/naicisum/jira/NMS-12215

NMS-12215: Add enhancements to DNS Resolver and Circuit Breaker
  • Loading branch information...
j-white committed Aug 12, 2019
2 parents e003576 + 5be891a commit c1a94bcc163cf572899197f3f9eccc958af97d71
@@ -93,6 +93,12 @@
private int maxTtlSeconds = -1;
private int negativeTtlSeconds = -1;

private boolean breakerEnabled = true;
private int breakerFailureRateThreshold = 80;
private int breakerWaitDurationInOpenState = 15;
private int breakerRingBufferSizeInHalfOpenState = 10;
private int breakerRingBufferSizeInClosedState = 100;

private List<NettyResolverContext> contexts;
private Iterator<NettyResolverContext> iterator;
private DnsCache cache;
@@ -135,10 +141,10 @@ public void init() {

// Configure this statically for now, we can expose this as needed
final CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(80)
.waitDurationInOpenState(Duration.ofSeconds(15))
.ringBufferSizeInHalfOpenState(10)
.ringBufferSizeInClosedState(100)
.failureRateThreshold(breakerFailureRateThreshold)
.waitDurationInOpenState(Duration.ofSeconds(breakerWaitDurationInOpenState))
.ringBufferSizeInHalfOpenState(breakerRingBufferSizeInHalfOpenState)
.ringBufferSizeInClosedState(breakerRingBufferSizeInClosedState)
.recordExceptions(DnsNameResolverTimeoutException.class)
.build();
circuitBreaker = CircuitBreaker.of("nettyDnsResolver", circuitBreakerConfig);
@@ -162,6 +168,12 @@ public void init() {
.onCallNotPermitted(e -> {
lookupsRejectedByCircuitBreaker.mark();
});

if (breakerEnabled) {
circuitBreaker.transitionToClosedState();
} else {
circuitBreaker.transitionToDisabledState();
}
}

public void destroy() {
@@ -197,6 +209,46 @@ public void destroy() {
}).toCompletableFuture();
}

public boolean getBreakerEnabled() {
return breakerEnabled;
}

public void setBreakerEnabled(boolean breakerEnabled) {
this.breakerEnabled = breakerEnabled;
}

public int getBreakerFailureRateThreshold() {
return breakerFailureRateThreshold;
}

public void setBreakerFailureRateThreshold(int breakerFailureRateThreshold) {
this.breakerFailureRateThreshold = breakerFailureRateThreshold;
}

public int getBreakerWaitDurationInOpenState() {
return breakerWaitDurationInOpenState;
}

public void setBreakerWaitDurationInOpenState(int breakerWaitDurationInOpenState) {
this.breakerWaitDurationInOpenState = breakerWaitDurationInOpenState;
}

public int getBreakerRingBufferSizeInHalfOpenState() {
return breakerRingBufferSizeInHalfOpenState;
}

public void setBreakerRingBufferSizeInHalfOpenState(int breakerRingBufferSizeInHalfOpenState) {
this.breakerRingBufferSizeInHalfOpenState = breakerRingBufferSizeInHalfOpenState;
}

public int getBreakerRingBufferSizeInClosedState() {
return breakerRingBufferSizeInClosedState;
}

public void setBreakerRingBufferSizeInClosedState(int breakerRingBufferSizeInClosedState) {
this.breakerRingBufferSizeInClosedState = breakerRingBufferSizeInClosedState;
}

public int getNumContexts() {
return numContexts;
}
@@ -18,6 +18,12 @@
<cm:property name="min-ttl-seconds" value="60" />
<cm:property name="max-ttl-seconds" value="-1" />
<cm:property name="negative-ttl-seconds" value="300" />
<!-- CircuitBreaker Settings -->
<cm:property name="breaker-enabled" value="true" />
<cm:property name="breaker-failure-rate-threshold" value="80" />
<cm:property name="breaker-wait-duration-in-open-state" value="15" />
<cm:property name="breaker-ring-buffer-size-in-half-open-state" value="10" />
<cm:property name="breaker-ring-buffer-size-in-closed-state" value="100" />
</cm:default-properties>
</cm:property-placeholder>

@@ -52,6 +58,11 @@
<property name="maxTtlSeconds" value="${min-ttl-seconds}"/>
<property name="negativeTtlSeconds" value="${negative-ttl-seconds}"/>

<property name="breakerEnabled" value="${breaker-enabled}"/>
<property name="breakerFailureRateThreshold" value="${breaker-failure-rate-threshold}"/>
<property name="breakerWaitDurationInOpenState" value="${breaker-wait-duration-in-open-state}"/>
<property name="breakerRingBufferSizeInHalfOpenState" value="${breaker-ring-buffer-size-in-half-open-state}"/>
<property name="breakerRingBufferSizeInClosedState" value="${breaker-ring-buffer-size-in-closed-state}"/>
</bean>

<service ref="dnsResolver" interface="org.opennms.netmgt.dnsresolver.api.DnsResolver">
@@ -83,4 +83,40 @@ public void canTriggerOpenCircuit() throws InterruptedException, TimeoutExceptio
assertThat(e.getCause(), is(instanceOf(CallNotPermittedException.class)));
}
}

@Test
public void canDisableCircuitBreaker() throws InterruptedException, TimeoutException {
// Create the resolver
EventForwarder eventForwarder = mock(EventForwarder.class);
NettyDnsResolver dnsResolver = new NettyDnsResolver(eventForwarder, new MetricRegistry());
// Use a non-routable address as the target - we want the queries to fail due to timeouts
dnsResolver.setNameservers(InetAddressUtils.str(InetAddressUtils.UNPINGABLE_ADDRESS));
dnsResolver.setBreakerEnabled(false);
dnsResolver.init();

// Now trigger enough requests to open the circuit breaker
final int N = 2 * dnsResolver.getCircuitBreaker().getCircuitBreakerConfig().getRingBufferSizeInClosedState();
final CompletableFuture futures[] = new CompletableFuture[N];
for (int i = 0; i < N; i++) {
futures[i] = dnsResolver.reverseLookup(InetAddressUtils.addr("fe80::"));
}

// Wait for the requests to complete
try {
CompletableFuture.allOf(futures)
// This should not take longer than the query timeout
.get(2 * dnsResolver.getQueryTimeoutMillis(), TimeUnit.MILLISECONDS);
fail("Expected an ExecutionException to be thrown");
} catch (ExecutionException e) {
// pass
}

// The circuit breaker should be disabled and netty should return a DnsNameResolverTimeoutException
try {
dnsResolver.reverseLookup(InetAddressUtils.addr("fe80::")).get();
fail("Expected an DnsNameResolverTimeoutException to be thrown");
} catch (ExecutionException e) {
assertThat(e.getCause(), is(instanceOf(io.netty.resolver.dns.DnsNameResolverTimeoutException.class)));
}
}
}
@@ -118,6 +118,8 @@

private long clockSkewEventRate = 0;

private boolean dnsLookupsEnabled = true;

private LoadingCache<InetAddress, Optional<Instant>> eventCache;

private ExecutorService executor;
@@ -213,6 +215,14 @@ public void setClockSkewEventRate(final long clockSkewEventRate) {
});
}

public boolean getDnsLookupsEnabled() {
return dnsLookupsEnabled;
}

public void setDnsLookupsEnabled(boolean dnsLookupsEnabled) {
this.dnsLookupsEnabled = dnsLookupsEnabled;
}

public int getThreads() {
return threads;
}
@@ -233,7 +243,7 @@ public void setThreads(int threads) {
final CompletableFuture<TelemetryMessage> future = new CompletableFuture<>();
final Timer.Context timerContext = recordEnrichmentTimer.time();
// Trigger record enrichment (performing DNS reverse lookups for example)
final RecordEnricher recordEnricher = new RecordEnricher(dnsResolver);
final RecordEnricher recordEnricher = new RecordEnricher(dnsResolver, getDnsLookupsEnabled());
recordEnricher.enrich(record).whenComplete((enrichment, ex) -> {
timerContext.close();
if (ex != null) {
@@ -29,6 +29,7 @@
package org.opennms.netmgt.telemetry.protocols.netflow.parser;

import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -61,12 +62,20 @@
private static final Logger LOG = LoggerFactory.getLogger(RecordEnricher.class);

private final DnsResolver dnsResolver;
private boolean dnsLookupsEnabled;

public RecordEnricher(DnsResolver dnsResolver) {
public RecordEnricher(DnsResolver dnsResolver, boolean dnsLookupsEnabled) {
this.dnsResolver = Objects.requireNonNull(dnsResolver);
this.dnsLookupsEnabled = dnsLookupsEnabled;
}

public CompletableFuture<RecordEnrichment> enrich(Iterable<Value<?>> record) {
if (!this.dnsLookupsEnabled) {
final CompletableFuture<RecordEnrichment> emptyFuture = new CompletableFuture<>();
final RecordEnrichment emptyEnrichment = new DefaultRecordEnrichment(Collections.<InetAddress, String>emptyMap());
emptyFuture.complete(emptyEnrichment);
return emptyFuture;
}
final IpAddressCapturingVisitor ipAddressCapturingVisitor = new IpAddressCapturingVisitor();
for (final Value<?> value : record) {
value.visit(ipAddressCapturingVisitor);
@@ -100,7 +109,6 @@ public RecordEnricher(DnsResolver dnsResolver) {
final RecordEnrichment enrichment = new DefaultRecordEnrichment(hostnamesByAddress);
future.complete(enrichment);
});

return future;
}

@@ -127,7 +135,6 @@ public RecordEnricher(DnsResolver dnsResolver) {
@Override
public void accept(IPv4AddressValue value) {
addresses.add(value.getValue());

}

@Override
@@ -57,19 +57,28 @@
*/
@Test
public void canEnrichFlow() throws InvalidPacketException, ExecutionException, InterruptedException, UnknownHostException {
enrichFlow(CompletableFuture.completedFuture(Optional.of("test")), Optional.of("test"));
enrichFlow(CompletableFuture.completedFuture(Optional.empty()), Optional.empty());
enrichFlow(CompletableFuture.completedFuture(Optional.of("test")), Optional.of("test"), true);
enrichFlow(CompletableFuture.completedFuture(Optional.empty()), Optional.empty(), true);

CompletableFuture exceptionalFuture = new CompletableFuture();
exceptionalFuture.completeExceptionally(new RuntimeException());
enrichFlow(exceptionalFuture, Optional.empty());
enrichFlow(exceptionalFuture, Optional.empty(), true);
}

private void enrichFlow(CompletableFuture reverseLookupFuture, Optional<String> expectedValue) throws InvalidPacketException, ExecutionException, InterruptedException, UnknownHostException {
@Test
public void canDisableEnrichFlow() throws InvalidPacketException, ExecutionException, InterruptedException, UnknownHostException {
enrichFlow(CompletableFuture.completedFuture(Optional.of("test")), Optional.empty(), false);

CompletableFuture exceptionalFuture = new CompletableFuture();
exceptionalFuture.completeExceptionally(new RuntimeException());
enrichFlow(exceptionalFuture, Optional.empty(), false);
}

private void enrichFlow(CompletableFuture reverseLookupFuture, Optional<String> expectedValue, boolean dnsLookupsEnabled) throws InvalidPacketException, ExecutionException, InterruptedException, UnknownHostException {
DnsResolver dnsResolver = mock(DnsResolver.class);
when(dnsResolver.reverseLookup(any())).thenReturn(reverseLookupFuture);

RecordEnricher enricher = new RecordEnricher(dnsResolver);
RecordEnricher enricher = new RecordEnricher(dnsResolver, dnsLookupsEnabled);

final Packet packet = getSampleNf5Packet();
final List<CompletableFuture<RecordEnrichment>> enrichmentFutures = packet.getRecords().map(enricher::enrich)
@@ -72,7 +72,9 @@
private final SampleDatagramEnricher enricher;

private int threads = DEFAULT_NUM_THREADS;


private boolean dnsLookupsEnabled = true;

private ExecutorService executor;

public SFlowUdpParser(final String name,
@@ -94,7 +96,7 @@ public Thread newThread(Runnable r) {
}
};

enricher = new SampleDatagramEnricher(dnsResolver);
enricher = new SampleDatagramEnricher(dnsResolver, getDnsLookupsEnabled());
}

@Override
@@ -163,6 +165,14 @@ public boolean handles(final ByteBuffer buffer) {
return future;
}

public boolean getDnsLookupsEnabled() {
return dnsLookupsEnabled;
}

public void setDnsLookupsEnabled(boolean dnsLookupsEnabled) {
this.dnsLookupsEnabled = dnsLookupsEnabled;
}

@Override
public String getName() {
return name;
@@ -29,6 +29,7 @@
package org.opennms.netmgt.telemetry.protocols.sflow.parser;

import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -53,12 +54,20 @@
public class SampleDatagramEnricher {

private final DnsResolver dnsResolver;
private final boolean dnsLookupsEnabled;

public SampleDatagramEnricher(DnsResolver dnsResolver) {
public SampleDatagramEnricher(DnsResolver dnsResolver, boolean dnsLookupsEnabled) {
this.dnsResolver = Objects.requireNonNull(dnsResolver);
this.dnsLookupsEnabled = dnsLookupsEnabled;
}

public CompletableFuture<SampleDatagramEnrichment> enrich(SampleDatagram datagram) {
if (!this.dnsLookupsEnabled) {
final CompletableFuture<SampleDatagramEnrichment> emptyFuture = new CompletableFuture<>();
final SampleDatagramEnrichment emptyEnrichment = new DefaultSampleDatagramEnrichment(Collections.<InetAddress, String>emptyMap());
emptyFuture.complete(emptyEnrichment);
return emptyFuture;
}
final Set<InetAddress> addressesToReverseLookup = new HashSet<>();
datagram.visit(new SampleDatagramVisitor() {
@Override
@@ -344,6 +344,11 @@ include::text/workarounds/snmp.adoc[]
== IFTTT Integration
include::text/ifttt/ifttt-integration.adoc[]


[[ga-dnsresolver]]
== DNS Resolver
include::text/dnsresolver/introduction.adoc[]

[[ga-telemetryd]]
== Telemetry Daemon
include::text/telemetryd/introduction.adoc[]
@@ -0,0 +1,45 @@

// Allow GitHub image rendering
:imagesdir: ../../images

The DNS Resolver is used internally by OpenNMS modules and functions to provide lookup functionality as required.

=== Modules that use DNS Resolution
* <<ga-telemetryd, TelemetryD>>


=== Configuring DNS Resolution
In order to customize the DNS servers that are queried, the following commands can be used:
[source]
----
$ ssh -p 8201 admin@localhost
...
admin@minion()> config:edit org.opennms.features.dnsresolver.netty
admin@minion()> property-set nameservers 8.8.8.8,4.2.2.2:53,[::1]:5353
admin@minion()> property-set query-timeout-millis 5000
admin@minion()> config:update
----

If no nameservers are set (or set to an empty string), the servers configured by the system running the JVM will be used.

The resolved host names are cached for their TTL as specified in the returned DNS records.
TTL handling can be customized by setting the `min-ttl-seconds`, `max-ttl-seconds` and `negative-ttl-seconds` properties in the configuration above.

=== Configuring Circuit Breaker
Circuit Breaker functionality exist that helps prevent your DNS infrastructure from being flooded with requests when multiple failures occur. It is enabled by default but can be disabled by setting `breaker-enabled` to `false`

Additional parameters can be modified to tune the functionality of the circuit breaker:
[source]
----
$ ssh -p 8201 admin@localhost
...
admin@minion()> config:edit org.opennms.features.dnsresolver.netty
admin@minion()> property-set breaker-enabled true
admin@minion()> property-set breaker-failure-rate-threshold 80
admin@minion()> property-set breaker-wait-duration-in-open-state 15
admin@minion()> property-set breaker-ring-buffer-size-in-half-open-state 10
admin@minion()> property-set breaker-ring-buffer-size-in-closed-state 100
admin@minion()> config:update
----

NOTE: If the circuit breaker is disabled, the lookup statistics `lookupsSuccessful` and `lookupsFailed` are no longer tracked.

0 comments on commit c1a94bc

Please sign in to comment.
You can’t perform that action at this time.