Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR introduces some improvements to how Kafka Streams state stores are used in the vulnerability-analyzer.
Automatic cleanup of
completed-scans-table
state storeThe
completed-scans-table
store is where we aggregate results from all applicable scanners, perScanKey
. The intention is to only forward a result events todtrack.vuln-analysis.result
once all scanners completed their work. The store was prone to accumulating lots of data, for the following reasons:KTable
s do not support manual deletions or TTL policies, so entries never expire and stay around foreversegment.bytes
for changelog topics is 256 MiB. Kafka can only delete records of "inactive" segments, meaning that partitions in the changelog topic must accumulate >256 MiB of data first before compaction can kick in.All of the above are amplified by the fact that
ScanKey
s are unbounded, as there is no finite set of keys.We worked around (2) by introducing a processor that emits Tombstone records for
ScanKey
s for which no update to thecompleted-scans-table
was observed for over an hour. This approach is recommended by Confluent.While this does keep the table from growing indefinitely, we could still run into situations where lots of unique keys accumulate over the course of 1h. This is unfortunate because we don't need the aggregated scanner results anymore, once they are "complete" and we forwarded them.
This PR adds a custom
Processor
that replaces the previously used KTable. It includes logic to delete completed aggregates from the store, but it also still performs TTL enforcement to regularly clean up old records. This change is backward-compatible, because it reuses the existing state store.Configurable state stores
It is now possible to configure whether in-memory or persistent state stores shall be used. Further, certain customizations to RocksDB are configurable (e.g. the compaction style and compression type). These settings are currently applied globally to all state stores. In the future we can scope them to individual state stores, too.
To switch all state stores to RocksDB, and enable compression, the following properties may be used
Smaller changelog topics
When using state stores, Kafka Streams will, per default, create changelog topics with
segment.bytes
set to 256 MiB, which is a lot (too much) data in many cases. It artificially increases restoration times.We now have a centralized default configuration for changelog topics, that applies a few stricter limits:
hyades/vulnerability-analyzer/src/main/java/org/hyades/util/StateStoreUtil.java
Lines 54 to 59 in 33bee86
Log segments are limited to 64 MiB, and records are signaled to be eligible for compaction immediately after they've been written. This should help with keeping topics smaller, and having compaction kick in sooner.
This config has also been added to
TOPICS.md
.For some topics it may even be practical to reduce the
segment.bytes
even further, but I am not quite sure yet what the actual performance impact on the broker side is yet.