Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

METRON-1460: Create a complementary non-split-join enrichment topology #940

Closed
wants to merge 24 commits into
base: master
from

Conversation

Projects
None yet
7 participants
@cestella
Copy link
Member

cestella commented Feb 22, 2018

Contributor Comments

There are some deficiencies to the split/join topology.

It's hard to reason about

  • Understanding the latency of enriching a message requires looking at multiple bolts that each give summary statistics
  • The join bolt's cache is really hard to reason about when performance tuning
  • During spikes in traffic, you can overload the join bolt's cache and drop messages if you aren't careful
  • In general, it's hard to associate a cache size and a duration kept in cache with throughput and latency
  • There are a lot of network hops per message
  • Right now we are stuck at 2 stages of transformations being done (enrichment and threat intel). It's very possible that you might want stellar enrichments to depend on the output of other stellar enrichments. In order to implement this in split/join you'd have to create a cycle in the storm topology

I propose that we move to a model where we do enrichments in a single bolt in parallel using a static threadpool (e.g. multiple workers in the same process would share the threadpool). IN all other ways, this would be backwards compatible. A transparent drop-in for the existing enrichment topology.
There are some pros/cons about this too:

  • Pro
    • Easier to reason about from an individual message perspective
    • Architecturally decoupled from Storm
    • This sets us up if we want to consider other streaming technologies
    • Fewer bolts
      • spout -> enrichment bolt -> threatintel bolt -> output bolt
    • Way fewer network hops per message
      currently 2n+1 where n is the number of enrichments used (if using stellar subgroups, each subgroup is a hop)
    • Easier to reason about from a performance perspective
    • We trade cache size and eviction timeout for threadpool size
    • We set ourselves up to have stellar subgroups with dependencies
      i.e. stellar subgroups that depend on the output of other subgroups
      If we do this, we can shrink the topology to just spout -> enrichment/threat intel -> output
  • Con
    • We can no longer tune stellar enrichments independent from HBase enrichments
      • To be fair, with enrichments moving to stellar, this is the case in the split/join approach too
    • No idea about performance

What I propose is to submit a PR that will deliver an alternative, completely backwards compatible topology for enrichment that you can use by adjusting the start_enrichment_topology.sh script to use remote-unified.yaml instead of remote.yaml. If we live with it for a while and have some good experiences with it, maybe we can consider retiring the old enrichment topology.

To test this, spin up vagrant and edit $METRON_HOME/bin/start_enrichment_topology.sh to use remote-unified.yaml instead of remote.yaml. Restart enrichment and you should see a topology that looks something like:
image

Pull Request Checklist

Thank you for submitting a contribution to Apache Metron.
Please refer to our Development Guidelines for the complete guide to follow for contributions.
Please refer also to our Build Verification Guidelines for complete smoke testing guides.

In order to streamline the review of the contribution we ask you follow these guidelines and ask you to double check the following:

For all changes:

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?

For code changes:

  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?

  • Have you included steps or a guide to how the change may be verified and tested manually?

  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder via:

    mvn -q clean integration-test install && dev-utilities/build-utils/verify_licenses.sh 
    
  • Have you written or updated unit tests and or integration tests to verify your changes?

  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?

  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered by building and verifying the site-book? If not then run the following commands and the verify changes via site-book/target/site/index.html:

    cd site-book
    mvn site
    

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.
It is also recommended that travis-ci is set up for your personal repository such that your branches are built there before submitting a pull request.

import java.util.List;
import java.util.Map;
import java.util.UUID;

This comment has been minimized.

@ottobackwards

ottobackwards Feb 22, 2018

Contributor

This needs some doc, comments - something in hear to help with review and maintenance of what is going on in here.

This comment has been minimized.

@cestella

cestella Feb 22, 2018

Author Member

Yeah, good call.

This comment has been minimized.

@cestella

cestella Feb 22, 2018

Author Member

Ok, I went through and did more rigorous documentation throughout the new classes. Let me know if it makes sense or if there are still issues.

@cestella cestella changed the title Single bolt split join poc METRON-1460: Create a complementary non-split-join enrichment topology Feb 22, 2018

@mraliagha

This comment has been minimized.

Copy link

mraliagha commented Feb 23, 2018

Is there any document somewhere to show how the previous approach was implemented? I would like to understand the previous architecture in details. Becuase some of the pros/cons didn't make sense to me. Maybe I can help to predict what the impact will be. Thanks.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Feb 23, 2018

The current architecture is described Image of Enrichment Architecture

In short, for each message each splitter will

  • inspect the configs for the sensor
  • For each sensor, extract the fields required for enrichment and send them to the appropriate enrichment bolt (e.g. hbase, geo, stellar)
    • If one enrichment enriches k fields, then k messages will be sent to the enrichment bolt
    • In the case of stellar, each stellar subgroup will be a separate message
    • the original message is sent directly to the join bolt
  • The enrichment bolts do the enrichment and send the additional fields and values to the original message
  • The join bolt will asynchronously collect the subresults and join them with the original message
    • The join bolt has a LRU cache to hold subresults until all results arrive
    • Tuning performance involves tuning this cache (max size and time until eviction)
    • Tuning this can be complex because it has to be large enough to handle spikes in traffic
@merrimanr

This comment has been minimized.

Copy link
Contributor

merrimanr commented Feb 26, 2018

I tested this in full dev and worked as expected. +1

@nickwallen

This comment has been minimized.

Copy link
Contributor

nickwallen commented Feb 27, 2018

I'd hold on merging this until we can get this tested at some decent scale. Unless it already has been? Otherwise, I don't see a need to merge this until we know it actually addresses a problem.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Feb 28, 2018

@nickwallen Sounds good. When scale tests are done, can we make sure that we also include #944 ?

@mraliagha

This comment has been minimized.

Copy link

mraliagha commented Mar 1, 2018

@cestella Thanks, Casey. Wouldn't be still hard to tune this solution? Still, thread pool tuning and probably the race condition between these threads and normal Strom workers makes the tuning hard for a production platform with tons of feeds/topologies. Storm resource management is very basic at this stage to absorb spikes, and having a separate thread pool transfers the complexity from one place to another place.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 2, 2018

@mraliagha It's definitely a tradeoff. This is why this is as a complement to the original split/join topology. Keep in mind, also, that this architecture enables use-cases that the other would prevent or make extremely difficult and/or network intensive, such as multi-level stellar statements rather than the 2 levels we have now. We are undergoing some preliminary testing in-lab right now, which @nickwallen alluded to, to compare the two approaches under at least synthetic load and will report back.

Ultimately this boils down to efficiencies gained by avoiding network hops and whether that's going to provide an outsized impact, I think.

@ottobackwards

This comment has been minimized.

Copy link
Contributor

ottobackwards commented Mar 2, 2018

If we integrated storm with yarn this would also be a problem, as our resource management may be at odds with yarn's. I think?

What would be nice is if storm could manage the pool and we could just use it.

@ottobackwards

This comment has been minimized.

Copy link
Contributor

ottobackwards commented Mar 2, 2018

have we thought to send a mail to the storm dev list and ask if anyone has done this? potential issues?

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 2, 2018

@ottobackwards I haven't sent an email to the storm team, but I did run the PR past a storm committer that I know and asked his opinion prior to submitting the PR. The general answer was something to the effect of "The overall goal should be to reduce the network shuffle unless its really required." Also, the notion of using an external threadpool didn't seem to be fundamentally offensive.

@arunmahadevan

This comment has been minimized.

Copy link

arunmahadevan commented Mar 2, 2018

Managing threadpools within a bolt isn't fundamentally wrong, we have see some use cases where this is done. However, we have been putting efforts to reduce the overall number of threads created internally within storm since the thread context switches were causing performance bottlenecks. I assume the threadpool threads are mostly IO/network bound so it should not cause too much harm.

Do you need multiple threads since the enrichments involve external DB look ups and are time consuming ? Maybe you could compare the performance of maintaining a thread pool v/s increasing the bolt's parallelism to achieve a similar effect.

Another option might be to prefetch the enrichment data and load it into each bolt so that you might not need separate threads to do the enrichment.

If you are able to manage without threads, that would be preferable. Even otherwise its not that bad as long as you don't create too many threads and they are cleaned up properly. (we have had some cases were the internal threads were causing workers to hang).

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 2, 2018

@arunmahadevan Thanks for chiming in Arun. I would say that most of the enrichment work is I/O bound and we try to avoid it whenever possible with a a time-evicted LRU cache in front of the enrichments. We don't always know a priori what enrichments users are doing, per se, as their individual enrichments may be expressed via stellar. The threads here are entirely managed via the fixed threadpool service in storm and the threadpool is shared across all of the executors running in-process on the worker, so we try to minimize that.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 3, 2018

Just FYI, as part of the performance experimentation in the lab here, we found that one major impediment to scale was the guava cache in this topology when the size of the cache becomes non-trivial in size (e.g. 10k+). Swapping out Caffeine (shout-out to @ben-manes, Caffeine rocks :) immediately had a substantial affect. I created #947 to migrate the split/join infrastructure to use caffeine as well and will look at the performance impact of that change. I wanted to separate that work from here as it may be that guava performance is fine outside of an explicit threadpool like we have here.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 5, 2018

I ran this up with vagrant and ensured:

  • Normal stellar works still in field transformations as well as enrichments
  • swapped in and out new enrichments live
  • swapped in and out new threat intel live

Are there any other pending issues here beyond a report of the performance impact?

@nickwallen

This comment has been minimized.

Copy link
Contributor

nickwallen commented Mar 5, 2018

I completed some fairly extensive performance testing comparing this new Unified topology against the existing Split-Join implementation. The difference was dramatic.

  • The Unified topology performed roughly 3.4 times faster than Split-Join.

Both topologies in this side-by-side test included the same fixes, including the Guava cache problem fixed in #947. The tests included two enrichments:

  • GeoIP enrichment; geo := GEO_GET(ip_dst_addr)
  • Compute-only Stellar enrichment; local := IN_SUBNET(ip_dst_addr, '192.168.0.0/24')

The number one driver of performance is the cache hit rate, which is heavily dependent on what your data looks-like. With these enrichments, that's driven by how varied the ip_dst_addr is in the data.

I tested both of these topologies with different sets of data intended to either increase or decrease that cache hit rate. The differences between the two topologies were fairly consistent across the different data sets.

When running these topologies, reasonably well-tuned, on the same data, I was able to consistently maintain 70,000 events per second with the Split/Join topology. In the same environment, I was able to maintain 312,000 events per second using the Unified topology.

The raw throughput numbers are relative and depend on how much hardware you are willing to throw at the problem. I was running on 3 nodes dedicated to running the Enrichment topology only. But with the same data, on the same hardware, the difference was 3.4 times. That's big.

Pushing as much as you can into a single executor and avoiding network hops is definitely the way to go here.

@ben-manes

This comment has been minimized.

Copy link

ben-manes commented Mar 5, 2018

Do you know what the hit rates were, for the same data set, between Guava and Caffeine? The caches use different policies so it is always interesting to see how they handle given workloads. As we continue to refine our adaptive version of W-TinyLFU, its handy to know what types of workloads to investigate. (P.S. We have a simulator for re-running persisted traces if useful for your tuning)

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 5, 2018

We were being purposefully unkind to the cache in the tests. The load simulation chose a IP address at random to present, so each IP had an equal probability of being selected. Whereas, in real traffic, we expect a coherent working set. Not sure of the exact hit rates, though.

@ben-manes

This comment has been minimized.

Copy link

ben-manes commented Mar 5, 2018

That makes sense. A uniform distribution will, of course, degrades all policies to random replacement so the test is then about how well the implementations handle concurrency. Most often caches exhibit a Zipfian distribution (80-20 rule), so our bias towards frequency is a net gain. We have observed a few rare cases where frequency is a poor signal and LRU is optimal, and we are exploring adaptive techniques to dynamically tune the cache based on the workload's characteristics. These cases don't seem to occur in many real-world scenarios that we know of, but it is always nice to know what users are experiencing and how much better (or worse) we perform than a standard LRU cache.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 5, 2018

The interesting thing that we found was that guava seems to be doing poorly when the # of items in the cache gets large. When we scaled the test down (830 distinct IP addresses chosen randomly and sent in at a rate of 200k events per second with a cache size of 100) kept up but scaling the test up (300k distinct ip addresses chosen randomly and sent in at a rate of 200k events per second with a cache size of 100k) didn't.

@ben-manes

This comment has been minimized.

Copy link

ben-manes commented Mar 5, 2018

Guava defaults to a concurrencyLevel of 4, given its age and a desire to not abuse memory in low concurrent situations. You probably want to increase it to 64 in a heavy workload, which has a ~4x throughput gain on reads. It won't scale much higher, since it has internal bottlenecks and I could never get patches reviewed to fix those.

I've only noticed overall throughput be based on threads, and never realized there was a capacity constraint to its performance. One should expect some due to the older hash table design resulting in more collisions, whereas CHMv8 does much better there. Still, I would have expected it to even out enough unless have a bad hash code?

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 5, 2018

We actually did increase the concurrency level for guava to 64; that is what confused us as well. The hash code is mostly standard, should be evenly distributed (the key is pretty much a POJO).

@ben-manes

This comment has been minimized.

Copy link

ben-manes commented Mar 5, 2018

Internally Guava uses a ConcurrentLinkedQueue and an AtomicInteger to record its size, per segment. When a read occurs, it records that in the queue and then drains it under the segment's lock (via tryLock) to replay the events. This is similar to Caffeine, which uses optimized structures instead. I intended the CLQ & counter as baseline scaffolding for replacement, as it is an obvious bottleneck, but I could never get it replaced despite advocating for it. The penalty of draining the buffers is amortized, but unfortunately this buffer isn't capped.

Since there would be a higher hit rate with a larger cache, the reads would be recorded more often. Perhaps contention there and the penalty of draining the queue is more observable than a cache miss. That's still surprising since a cache miss is usually more expensive I/O. Is the loader doing expensive work in your case?

Caffeine gets around this problem by using more optimal buffers and being lossy (on reads only) if it can't keep up. By default it delegates the amortized maintenance work to a ForkJoinPool to avoid user-facing latencies, since you'll want those variances to be tight. Much of that can be back ported onto Guava for a nice boost.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 5, 2018

In this case, the loader isn't doing anything terribly expensive, though it may in a real scenario (incur a hbase get or some more expensive computation).

@ben-manes

This comment has been minimized.

Copy link

ben-manes commented Mar 5, 2018

Interesting. Then I guess the size must trigger the read bottleneck as larger than writes. Perhaps it is incurring a lot more GC overhead that causes more collections? The CLQ additions requires allocating a new queue node. That and the cache entry probably get promoted to old gen due to the high churn rate, causing everything to slow down. Probably isn't too interesting to investigate vs swapping libraries :)

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 5, 2018

I actually suspect GC as well. We adjusted the garbage collector to the G1GC and saw throughput gains, but not nearly the kinds of gains as we got with a drop-in of Caffeine to replace Guava.

@ben-manes

This comment has been minimized.

Copy link

ben-manes commented Mar 5, 2018

Caffeine doesn't allocate on read, so that would make sense. I saw a 25x boost (compared to current) when porting the buffers to Guava.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 5, 2018

Ahhh, that makes sense. I bet we were getting killed by small allocations in the caching layer.

@nickwallen
Copy link
Contributor

nickwallen left a comment

I like the end result, but I am having a hard time understanding the new enrichment abstractions, what each of them are responsible for and how they work together.

Can you describe the new abstractions/classes in a couple sentences?

* @param tuple
* @return
*/
public JSONObject generateMessage(Tuple tuple) {

This comment has been minimized.

@nickwallen

nickwallen Mar 5, 2018

Contributor

Isn't this what the MessageGetStrategy was intended to solve? Can we reuse that logic here instead?

This comment has been minimized.

@cestella

cestella Mar 6, 2018

Author Member

Yep, good catch.

This comment has been minimized.

@cestella

cestella Mar 6, 2018

Author Member

done

*
*/
public interface Strategy {
Constants.ErrorType getErrorType();

This comment has been minimized.

@nickwallen

nickwallen Mar 5, 2018

Contributor

Can we javadoc each method? This seems like an important interface.

This comment has been minimized.

@cestella

cestella Mar 5, 2018

Author Member

Sure thing.

This comment has been minimized.

@cestella

cestella Mar 6, 2018

Author Member

done

import java.util.Map;
import java.util.concurrent.Executor;

public enum EnrichmentStrategies implements Strategy {

This comment has been minimized.

@nickwallen

nickwallen Mar 5, 2018

Contributor

I don't understand the purpose of this class. Why have an EnrichmentStrategy, a ThreatIntelStrategy, and EnrichmentStrategies?

This comment has been minimized.

@cestella

cestella Mar 5, 2018

Author Member

This is a strategy pattern using an enum. The purpose of this class is to resolve the specific strategies possible. It's broadly in line with other strategy patterns (e.g. Extractors, MessageGetters).

This comment has been minimized.

@cestella

cestella Mar 6, 2018

Author Member

I might be answering a question that you're not asking, but this bit of awkwardness arises because we have merged the concepts of threat intel and enrichment, which differ really only in post-processing. The approach presented here, in contrast to the inheritance-based approach in the bolts, allows for an abstraction through composition whereby we localize all the interactions with the sensor enrichment config in a strategy rather than bind the abstraction to Storm, our distributed processing engine. That is the rationale behind this approach at least.

This comment has been minimized.

@cestella

cestella Mar 6, 2018

Author Member

I decided that this is too onerous of an abstraction and rethought it a bit. Give it another look, please.

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 6, 2018

@nickwallen Ok, I refactored the abstraction to separate some concerns, name things a bit, and collapse some of the more onerous abstractions. Also updated javadocs.

Can you give it another look and see what you think? We probably should also give it another smoketest in the lab to make sure I didn't do something dumb.

To specifically give you what you asked for, though, these are the abstractions:

  • UnifiedEnrichmentBolt provides the actual bolt, the storm interface to use the ParallelEnricher to enrich messages
  • ParallelEnricher enriches messages using a threadpool and given an enrichment strategy
  • EnrichmentStrategy is a interface for how to interacting with specific phases of enrichment (either threat intel phase or enrichment phase)
  • EnrichmentStrategies is the set of enrichment strategies available
  • ConcurrencyContext is the concurrency context (e.g. the cache and thread pool)
  • WorkerPoolStrategies is the strategy pattern for how to create the threadpool
@ottobackwards

This comment has been minimized.

Copy link
Contributor

ottobackwards commented Mar 6, 2018

This should have the equiv. diagram and documentation ( i believe as shown above ) to the original split join strategy.

@ottobackwards

This comment has been minimized.

Copy link
Contributor

ottobackwards commented Mar 6, 2018

Maybe the issue has to do with our keys, and their distribution as the size get's larger? Maybe when we get larger sizes we get more collisions and end up calling equals() more or something.

@nickwallen

This comment has been minimized.

Copy link
Contributor

nickwallen commented Mar 6, 2018

That's great @cestella . Many thanks. I will run it up in the lab. No problem.

cestella added some commits Mar 6, 2018

@cestella

This comment has been minimized.

Copy link
Member Author

cestella commented Mar 6, 2018

Ok, README is updated with the new topology diagram. Let me know if there's anything else.

@nickwallen
Copy link
Contributor

nickwallen left a comment

I am just running through the last bit of tests.

}
}
String sensorType = MessageUtils.getSensorType(message);
message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());

This comment has been minimized.

@nickwallen

nickwallen Mar 6, 2018

Contributor

Should we be adding splitter timestamps in here?

This comment has been minimized.

@cestella

cestella Mar 6, 2018

Author Member

Well I felt that we still would want to track splitting time, since that’s non-trivial (possibly).

This comment has been minimized.

@nickwallen

nickwallen Mar 6, 2018

Contributor

I would expect to see this in the split/join bolts. When this is used in the unified topology, we're still going to report a split/join time?

This comment has been minimized.

@cestella

cestella Mar 7, 2018

Author Member

Well, split time and enrichment time as enrichment + join is really bound up together. I know that some people are using these statistics and I wanted to match as much as it made sense the old topology. If you think they aren't useful at all, though, I can certainly take them out or just replace them with an overall time. What do you think?

@nickwallen

This comment has been minimized.

Copy link
Contributor

nickwallen commented Mar 6, 2018

+1 The unified topology works great.

@asfgit asfgit closed this in 1d95b83 Mar 7, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.