Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

METRON-1505 Intermittent Profiler Integration Test Failure #977

Closed
wants to merge 3 commits into from

Conversation

nickwallen
Copy link
Contributor

@nickwallen nickwallen commented Apr 2, 2018

Problem

The integration tests were failing intermittently when Storm unexpectedly expired messages generated by the integration tests. When Storm expired these messages they were never received by the Profiler bolts, which caused the integration tests to fail.

Root Cause

Storm's event window mechanism was not configured correctly to use the timestamp extracted from the telemetry message. Storm was instead defaulting to system time.

If the time when the downstream ProfileBuilderBolt processed a message differed significantly enough from when the upstream ProfileSplitterBolt processed the message, the message would be errantly expired by Storm.

This is why the problem could only be replicated when run in Travis, a resource constrained environment. When run on any other environment, the system time when these two events occur will not differ enough for Storm to mistakenly expire the test messages.

This bug shows itself when you have significantly out-of-order messages and when flushing expired profiles, which is what the integration tests rely on.

The Fix

The simple fix was to ensure that Storm uses the correct event timestamp field. Doing this highlighted another problem. Storm does not work correctly when using tick tuples along with an event timestamp field. Storm will attempt to extract an event timestamp from the tick tuple, which will not exist and cause the entire topology to fail.

This meant that I could not use tick tuples. To work around this, I created a separate thread that flushes the expired profiles regularly. The separate thread introduces thread safety concerns, so I also needed to perform some locking.

Changes

Most of these changes were done in separate commits to making review easier.

  1. Added a separate thread to the ProfileBuilderBolt to flush expired profiles regularly. This is the core fix to the integration test bug.

  2. Corrected the key generated to cache ProfileBuilder objects. This previously relied on the underlying ProfileConfig.toString method which was error prone and slow. It now uses the hash key.

  3. Reduced the number of Profiler integration tests. There is now one integration test that tests event time processing and another that tests the same profile using processing time.

    Previously there were a number of different profiles that were tested. This was necessary before as the integration tests were the only effective way to test different profile logic. Since then, significant refactoring has occurred which allowed the same logic to be tested in unit tests rather than in integration tests.

    This allowed me to clean-up these tests which reduces run time and complexity in the integration tests.

  4. Added some simple debug logging to HBaseBolt.

Pull Request Checklist

  • Is there a JIRA ticket associated with this PR? If not one needs to be created at Metron Jira.
  • Does your PR title start with METRON-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
  • Has your PR been rebased against the latest commit within the target branch (typically master)?
  • Have you included steps to reproduce the behavior or problem that is being changed or addressed?
  • Have you included steps or a guide to how the change may be verified and tested manually?
  • Have you ensured that the full suite of tests and checks have been executed in the root metron folder
  • Have you written or updated unit tests and or integration tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • Have you verified the basic functionality of the build by building and running locally with Vagrant full-dev environment or the equivalent?

private String cacheKey(ProfileConfig profile, String entity) {
return format("%s:%s", profile, entity);
private int cacheKey(ProfileConfig profile, String entity) {
return new HashCodeBuilder(17, 37)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache key needs to ensure that when the user changes a profile definition, even slightly, that a different ProfileBuilder is used. Reusing the same ProfileBuilder would create inconsistent results.

Instead of using ProfileConfig.toString() as part of the cache key, it now uses the hash code from the profile and the entity. I think this is less error prone and more performant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not as familiar with this functionality - How do we cut over/end an existing profile when a profile definition is changed? Is there any continuity in the calculations or is it an immediate start over from scratch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That state is maintained in a ProfileBuilder stored in this cache. If the profile definition changes, the cache key would change, which would force it to start using a different ProfileBuilder instance.

Say I had a v1.0 of the profile that has been running and now I make changes, so I'll call that version 2.0 of the profile. We'd have aProfileBuilder that handles v1.0 of the profile definition and another that handles v2.0 of the profile.

The v1.0 instance will stop receiving messages because that profile definition no longer exists. The TTL for the profile will lapse and the profile will be marked as "expired". Then periodically this timer thread will trigger a flush of all expired profiles. The state that was in v1.0 will then be flushed and stored.

The v2.0 instance will start receiving messages and building its state. This instance will remain "active" because it is receiving messages. This active profile will flush when the period expires and its state will be stored.

It is not safe to mix state when a profile definition is changed by a user. You don't know how the profile was changed and whether the change was compatible or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation @nickwallen, makes sense.


if(notification.wasEvicted()) {

// the expired profile was NOT flushed in time
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A profile being removed from the expired cache is only 'bad' when it is evicted. When an eviction occurs, we get a WARN. Otherwise, only a DEBUG is used. This makes the logging much more useful when troubleshooting.


expiredFlushTimer = createTimer("flush-expired-profiles-timer");
expiredFlushTimer.scheduleRecurring(0, toSeconds(profileTimeToLiveMillis), () -> flushExpired());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the timer thread that flushes expired profiles regularly.


// flush the active profiles
List<ProfileMeasurement> measurements;
synchronized(messageDistributor) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access to the messageDistributor has to be synchronized now. It is not thread safe and it could be called from either the timer thread or when tuples are received.

List<ProfileMeasurement> measurements = messageDistributor.flushExpired();
emitMeasurements(measurements);
List<ProfileMeasurement> measurements;
synchronized (messageDistributor) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access to the messageDistributor has to be synchronized now. It is not thread safe and it could be called from either the timer thread or when tuples are received.

messageDistributor.distribute(message, timestamp, route, getStellarContext());
synchronized (messageDistributor) {
messageDistributor.distribute(message, timestamp, route, getStellarContext());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access to the messageDistributor has to be synchronized now. It is not thread safe and it could be called from either the timer thread or when tuples are received.

*/
@Test
public void testExample4() throws Exception {
public void testProcessingTime() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integration tests now only have a testProcessingTime and testEventTime.

@mmiklavc
Copy link
Contributor

mmiklavc commented Apr 6, 2018

Can't see anything wrong with the logic here @nickwallen. +1 by inspection. Thanks for the fix.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants