Permalink
Browse files

HZN-1374: Sink API: Queue messages outside of the heap (#2158)

* hzn-1374: h2 data store

* hzn-1374: integration of offHeapStore with sink api

* HZN-1374: refactor marshalling/unmarshalling. handle review comments

* HZN-1374: Update doc

* HZN-1374: Documentation sprinkles.

* HZN-1374: Update doc and logger
  • Loading branch information...
cgorantla authored and j-white committed Oct 9, 2018
1 parent d11d7b7 commit d6ff23a09c7e5360cb9e8d0a17506d83e6597d5e
Showing with 1,418 additions and 27 deletions.
  1. +7 −0 container/features/src/main/resources/features.xml
  2. +69 −0 core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/OffHeapQueue.java
  3. +13 −2 core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/SinkModule.java
  4. +38 −0 core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/WriteFailedException.java
  5. +16 −0 core/ipc/sink/common/pom.xml
  6. +124 −7 core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/common/AsyncDispatcherImpl.java
  7. +52 −0 core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/offheap/Activator.java
  8. +102 −0 core/ipc/sink/common/src/main/java/org/opennms/core/ipc/sink/offheap/OffHeapServiceLoader.java
  9. +10 −0 core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/aggregation/AggregationTest.java
  10. +236 −0 core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/offheap/AsyncDispatcherOffHeapTest.java
  11. +44 −0 core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/offheap/MockMessage.java
  12. +99 −0 core/ipc/sink/common/src/test/java/org/opennms/core/ipc/sink/offheap/MockModule.java
  13. +10 −0 core/ipc/sink/mock-impl/src/main/java/org/opennms/core/ipc/sink/mock/MockSinkModule.java
  14. +83 −0 core/ipc/sink/off-heap/pom.xml
  15. +196 −0 core/ipc/sink/off-heap/src/main/java/org/opennms/core/ipc/sink/offheap/H2OffHeapStore.java
  16. +23 −0 core/ipc/sink/off-heap/src/main/resources/OSGI-INF/blueprint/blueprint.xml
  17. +129 −0 core/ipc/sink/off-heap/src/test/java/org/opennms/core/ipc/sink/offheap/H2OffHeapStoreTest.java
  18. +1 −0 core/ipc/sink/pom.xml
  19. +13 −0 core/ipc/sink/xml/src/main/java/org/opennms/core/ipc/sink/xml/AbstractXmlSinkModule.java
  20. +6 −0 features/events/daemon/src/main/java/org/opennms/netmgt/eventd/sink/EventModule.java
  21. +9 −0 features/events/syslog/src/main/java/org/opennms/netmgt/syslogd/SyslogSinkModule.java
  22. +2 −2 features/events/syslog/src/main/java/org/opennms/netmgt/syslogd/api/SyslogMessageDTO.java
  23. +28 −0 features/events/traps/src/main/java/org/opennms/netmgt/trapd/TrapInformationWrapper.java
  24. +35 −13 features/events/traps/src/main/java/org/opennms/netmgt/trapd/TrapSinkModule.java
  25. +6 −0 features/minion/repository/pom.xml
  26. +21 −3 features/telemetry/common/src/main/java/org/opennms/netmgt/telemetry/ipc/TelemetrySinkModule.java
  27. +5 −0 opennms-base-assembly/pom.xml
  28. +1 −0 opennms-doc/guide-admin/src/asciidoc/index.adoc
  29. +39 −0 opennms-doc/guide-admin/src/asciidoc/text/minion/offheap.doc
  30. +1 −0 pom.xml
@@ -605,6 +605,13 @@
<bundle>mvn:org.opennms.core.ipc.sink/org.opennms.core.ipc.sink.common/${project.version}</bundle>
<bundle>mvn:org.opennms.core.ipc.sink/org.opennms.core.ipc.sink.xml/${project.version}</bundle>
</feature>
<feature name="opennms-core-ipc-sink-offheap" description="OpenNMS :: Core :: IPC :: Sink :: OffHeap" version="${project.version}">
<feature>opennms-core-ipc-sink-api</feature>
<bundle>mvn:org.opennms.core.ipc.sink/org.opennms.core.ipc.sink.offheap/${project.version}</bundle>
<bundle>mvn:com.h2database/h2/${h2databaseVersion}</bundle>
<feature>dropwizard-metrics</feature>
</feature>
<!-- CAMEL -->
<feature name="opennms-core-ipc-sink-camel-common" description="OpenNMS :: Core :: IPC :: Sink :: Camel :: Common" version="${project.version}">
@@ -0,0 +1,69 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2018 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2018 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/
package org.opennms.core.ipc.sink.api;
import java.util.AbstractMap;
public interface OffHeapQueue {
/**
* Write Message for a Sink Module and a key that is unique to message.
*
* @param message sink message in byte array.
* @param moduleName Sink Module Name.
* @param key unique id for the sink message.
*/
boolean writeMessage(byte[] message, String moduleName, String key) throws WriteFailedException;
/**
*
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return key, value pair where key is the uuid and value is sink message.
* @throws InterruptedException if interrupted while waiting
*/
AbstractMap.SimpleImmutableEntry<String, byte[]> readNextMessage(String moduleName) throws InterruptedException;
/**
*
* @return size of OffHeap in bytes.
*/
public long getSize();
/**
*
* @param moduleName, Sink module Name.
* @return number of messages per module.
*/
public int getNumOfMessages(String moduleName);
}
@@ -55,15 +55,26 @@
int getNumConsumerThreads();
/**
* Marshals the message to a byte array.
* Marshals the aggregated message to a byte array.
*/
byte[] marshal(T message);
/**
* Unmarshals the message from a byte array.
* Unmarshals the aggregated message from a byte array.
*/
T unmarshal(byte[] message);
/**
* Marshals single message to a byte array.
*/
byte[] marshalSingleMessage(S message);
/**
* Unmarshals single message from a byte array.
*/
S unmarshalSingleMessage(byte[] message);
/**
* Defines how messages should be combined, and when they
* should be "released".
@@ -0,0 +1,38 @@
/*******************************************************************************
* This file is part of OpenNMS(R).
*
* Copyright (C) 2018 The OpenNMS Group, Inc.
* OpenNMS(R) is Copyright (C) 1999-2018 The OpenNMS Group, Inc.
*
* OpenNMS(R) is a registered trademark of The OpenNMS Group, Inc.
*
* OpenNMS(R) is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License,
* or (at your option) any later version.
*
* OpenNMS(R) is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with OpenNMS(R). If not, see:
* http://www.gnu.org/licenses/
*
* For more information contact:
* OpenNMS(R) Licensing <license@opennms.org>
* http://www.opennms.org/
* http://www.opennms.com/
*******************************************************************************/
package org.opennms.core.ipc.sink.api;
public class WriteFailedException extends Exception {
private static final long serialVersionUID = -6945401599648861750L;
public WriteFailedException(String message) {
super(message);
}
}
@@ -21,6 +21,7 @@
<Bundle-RequiredExecutionEnvironment>JavaSE-1.8</Bundle-RequiredExecutionEnvironment>
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Bundle-Version>${project.version}</Bundle-Version>
<Bundle-Activator>org.opennms.core.ipc.sink.offheap.Activator</Bundle-Activator>
</instructions>
</configuration>
</plugin>
@@ -63,6 +64,15 @@
<groupId>org.opennms</groupId>
<artifactId>opennms-util</artifactId>
</dependency>
<dependency>
<groupId>org.opennms.core</groupId>
<artifactId>org.opennms.core.soa</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.swrve</groupId>
<artifactId>rate-limited-logger</artifactId>
@@ -87,6 +97,12 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.opennms.core.ipc.sink</groupId>
<artifactId>org.opennms.core.ipc.sink.offheap</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
@@ -28,21 +28,32 @@
package org.opennms.core.ipc.sink.common;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.joda.time.Duration;
import org.opennms.core.concurrent.LogPreservingThreadFactory;
import org.opennms.core.ipc.sink.api.AsyncDispatcher;
import org.opennms.core.ipc.sink.api.AsyncPolicy;
import org.opennms.core.ipc.sink.api.Message;
import org.opennms.core.ipc.sink.api.OffHeapQueue;
import org.opennms.core.ipc.sink.api.SinkModule;
import org.opennms.core.ipc.sink.api.SyncDispatcher;
import org.opennms.core.ipc.sink.api.WriteFailedException;
import org.opennms.core.ipc.sink.offheap.OffHeapServiceLoader;
import org.opennms.core.utils.SystemInfoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,12 +63,18 @@
import com.codahale.metrics.MetricRegistry;
import com.swrve.ratelimitedlogger.RateLimitedLog;
public class AsyncDispatcherImpl<W, S extends Message, T extends Message> implements AsyncDispatcher<S> {
public class AsyncDispatcherImpl<W, S extends Message, T extends Message> implements AsyncDispatcher<S> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcherImpl.class);
private final SyncDispatcher<S> syncDispatcher;
private OffHeapAdapter offHeapAdapter;
private ExecutorService offHeapAdapterExecutor = Executors.newSingleThreadExecutor();
private final AsyncPolicy asyncPolicy;
private OffHeapQueue offHeapQueue;
private SinkModule<S,T> sinkModule;
private DispatcherState<W,S,T> state;
private boolean useOffHeap = false;
final RateLimitedLog rateLimittedLogger = RateLimitedLog
.withRateLimit(LOG)
.maxRate(5).every(Duration.standardSeconds(30))
@@ -66,11 +83,22 @@
final LinkedBlockingQueue<Runnable> queue;
final ExecutorService executor;
public AsyncDispatcherImpl(DispatcherState<W,S,T> state, AsyncPolicy asyncPolicy, SyncDispatcher<S> syncDispatcher) {
public AsyncDispatcherImpl(DispatcherState<W, S, T> state, AsyncPolicy asyncPolicy,
SyncDispatcher<S> syncDispatcher) {
Objects.requireNonNull(state);
Objects.requireNonNull(asyncPolicy);
this.syncDispatcher = Objects.requireNonNull(syncDispatcher);
this.asyncPolicy = asyncPolicy;
this.state = state;
sinkModule = state.getModule();
if (OffHeapServiceLoader.isOffHeapEnabled()) {
offHeapQueue = OffHeapServiceLoader.getOffHeapQueue();
if (offHeapQueue != null) {
useOffHeap = true;
LOG.info("Offheap storage enabled for sink module, {}", sinkModule.getId());
}
}
final RejectedExecutionHandler rejectedExecutionHandler;
if (asyncPolicy.isBlockWhenFull()) {
// This queue ensures that calling thread is blocked when the queue is full
@@ -129,6 +157,7 @@ public Integer getValue() {
*
* If the implementation is changed, make sure that that executor is built accordingly.
*/
private static class OfferBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 1L;
@@ -141,7 +170,7 @@ public boolean offer(E e) {
try {
put(e);
return true;
} catch(InterruptedException ie) {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
@@ -150,6 +179,22 @@ public boolean offer(E e) {
@Override
public CompletableFuture<S> send(S message) {
// Check if OffHeap is enabled and if local queue is full or if OffHeap not Empty then write message to OffHeap.
if (useOffHeap && (asyncPolicy.getQueueSize() == getQueueSize() ||
((offHeapAdapter != null) && !offHeapAdapter.isOffHeapEmpty()))) {
// Start drain thread before the first write to OffHeapQueue.
if (offHeapAdapter == null) {
this.offHeapAdapter = new OffHeapAdapter();
offHeapAdapterExecutor.execute(offHeapAdapter);
LOG.info("started drain thread for {}", sinkModule.getId());
}
try {
return offHeapAdapter.writeMessage(message);
} catch (WriteFailedException e) {
rateLimittedLogger.error("OffHeap write failed ", e);
}
}
try {
return CompletableFuture.supplyAsync(() -> {
syncDispatcher.send(message);
@@ -161,7 +206,7 @@ public boolean offer(E e) {
return future;
}
}
@Override
public int getQueueSize() {
return queue.size();
@@ -171,5 +216,77 @@ public int getQueueSize() {
public void close() throws Exception {
syncDispatcher.close();
executor.shutdown();
if (offHeapAdapter != null) {
offHeapAdapter.shutdown();
offHeapAdapterExecutor.shutdown();
}
}
/** This adapter encapsulates write/read sink messages to OffHeapQueue. **/
private class OffHeapAdapter implements Runnable {
private Map<String, CompletableFuture<S>> offHeapFutureMap = new ConcurrentHashMap<>();
private final CountDownLatch firstWrite = new CountDownLatch(1);
private final AtomicBoolean closed = new AtomicBoolean(false);
public OffHeapAdapter() {
state.getMetrics().register(MetricRegistry.name(state.getModule().getId(), "offheap-messages"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return offHeapQueue.getNumOfMessages(sinkModule.getId());
}
});
}
/** This is drain thread which polls data from OffHeapQueue, when data is available, it will push the data to the executor queue.
* It also retrieves the future from the map and completes the future.**/
@Override
public void run() {
while (!closed.get()) {
try {
// Wait till atleast one write call to OffHeapQueue.
firstWrite.await();
//retrieve key,value entry from top of queue.
AbstractMap.SimpleImmutableEntry<String, byte[]> keyValue = offHeapQueue
.readNextMessage(sinkModule.getId());
if (keyValue != null) {
queue.put(() -> {
S message = sinkModule.unmarshalSingleMessage(keyValue.getValue());
syncDispatcher.send(message);
CompletableFuture<S> future = offHeapFutureMap.get(keyValue.getKey());
future.complete(message);
offHeapFutureMap.remove(keyValue.getKey());
});
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while retrieving OffHeap Message for {} ", sinkModule.getId(), e);
}
}
}
/** writeMessage will marshal sink message and write to OffHeapQueue and return a future that is cached in map. **/
public CompletableFuture<S> writeMessage(S message) throws WriteFailedException {
final CompletableFuture<S> future = new CompletableFuture<>();
byte[] bytes = sinkModule.marshalSingleMessage(message);
String uuid = UUID.randomUUID().toString();
offHeapFutureMap.put(uuid, future);
offHeapQueue.writeMessage(bytes, sinkModule.getId(), uuid);
firstWrite.countDown();
return future;
}
public boolean isOffHeapEmpty() {
return offHeapFutureMap.isEmpty();
}
public void shutdown() {
firstWrite.countDown();
closed.set(true);
}
}
}
Oops, something went wrong.

0 comments on commit d6ff23a

Please sign in to comment.