New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HZN-1374: Sink API: Queue messages outside of the heap #2158
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this should work. Thanks for putting this together.
I think it we may want to have this enabled by default with some reasonable size such as 10MB and then reduce the current default limits of the number of messages kept on the heap.
<feature name="opennms-core-ipc-sink-offheap" description="OpenNMS :: Core :: IPC :: Sink :: OffHeap" version="${project.version}"> | ||
<feature>opennms-core-ipc-sink-api</feature> | ||
<bundle>mvn:org.opennms.core.ipc.sink/org.opennms.core.ipc.sink.offheap/${project.version}</bundle> | ||
<bundle>mvn:com.h2database/h2/1.4.197</bundle> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this version reference and the one from the pom to a property in the root pom? i.e. h2databaseVersion
/** | ||
* Marshals single message to a byte array. | ||
*/ | ||
default byte[] marshalSingleMessage(S message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it makes sense to provide a default impl. here - all of the implementation should define how the messages and marshaled/unmarshaled.
.../sink/common/src/test/java/org/opennms/core/ipc/sink/offheap/AsyncDispatcherOffHeapTest.java
Show resolved
Hide resolved
import org.opennms.core.ipc.sink.common.ThreadLockingSyncDispatcher; | ||
import org.osgi.service.cm.ConfigurationAdmin; | ||
|
||
@RunWith(MockitoJUnitRunner.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't seem to need this runner here.
return new TrapInformationWrapper(trapDTO); | ||
} | ||
|
||
private XmlHandler<TrapDTO> getXmlHandler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we find a way to hide the XML handling in the AbstractXmlSinkModule
?
if (message.getTrapInformation() != null) { | ||
return message.getTrapInformation().getTrapAddress(); | ||
} else { | ||
return message.getTrapDTO().getTrapAddress(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we maybe just call message.getTrapAddress()
instead, and hide this logic in the TrapInformationWrapper
itself?
@@ -69,6 +70,9 @@ | |||
private byte[] rawMessage; | |||
@XmlElement(name = "trap-identity") | |||
private TrapIdentityDTO trapIdentity; | |||
@XmlAttribute(name = "trap-address") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid adding these fields to this message, they are already in the log.
@XmlRootElement(name = "syslog-message") | ||
@XmlAccessorType(XmlAccessType.FIELD) | ||
public class SyslogMessageDTO { | ||
|
||
@XmlAttribute(name = "source-address") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we avoid adding these fields to this message, they are already in the log.
|
||
[source, sh] | ||
---- | ||
echo 'offHeapSize=1000000000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we enhance this to support units MB
and GB
units? i.e. a user should be able to set the size to 2GB without having to do the math for the byte conversion.
220badc
to
dd4edf1
Compare
dd4edf1
to
66088bd
Compare
Many thanks @j-white for the review. All the issues are addressed now. bamboo went green. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works well for me. The overhead of the H2 store is pretty high though, I was able to store about 300k Syslog messages in 1GB of memory.
A couple future enhancements I can think off:
- Make better use of off-heap memory - maybe another storage solution, or perhaps we can leverage comrpession i.e. LZ4
- Use a PriorityQueue in the AsyncDispatcherImpl to allow us to immediately start writing to the on-heap queue once it has started draining again
core/ipc/sink/api/src/main/java/org/opennms/core/ipc/sink/api/OffHeapQueue.java
Show resolved
Hide resolved
try { | ||
return offHeapAdapter.writeMessage(message); | ||
} catch (WriteFailedException e) { | ||
LOG.error("OffHeap write failed ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the off heap storage is full, this will flood the log with error messages and stack traces. Perhaps we can use a rate limited logger like we do with: https://github.com/OpenNMS/opennms/blob/opennms-22.0.4-1/features/newts/src/main/java/org/opennms/netmgt/newts/NewtsWriter.java#L184?
On Oct 4, 2018, at 13:46, Jesse White ***@***.***> wrote:
We strategy should continue retrying until connectivity is available.
Is that Craig's Irish influence on you? A "wee strategy..."
:)
|
Provide offheap option for sink messages (traps/syslog/..) on minion.
Offheap queue will extend buffering of sink messages which will help in case of connectivity issues with broker ( activemq/kafka).
Our continuous integration system will test and verify your changes.