-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SWATCH-1622 Resolve incoming Event conflicts and amend differences #3172
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really happy to remove the clean up events, great work!
src/main/java/org/candlepin/subscriptions/event/EventConflictResolver.java
Show resolved
Hide resolved
...ch-metrics/src/main/java/com/redhat/swatch/metrics/service/PrometheusMeteringController.java
Show resolved
Hide resolved
@@ -22,10 +22,16 @@ | |||
|
|||
import static org.hibernate.jpa.HibernateHints.HINT_FETCH_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "deleteStaleEvents" method needs to be removed as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DONE - next push.
src/main/java/org/candlepin/subscriptions/event/EventController.java
Outdated
Show resolved
Hide resolved
src/main/java/org/candlepin/subscriptions/event/EventController.java
Outdated
Show resolved
Hide resolved
* @param keys the {@link EventKey} to match on. | ||
* @return a list of conflicting events | ||
*/ | ||
default List<EventRecord> findConflictingEvents(List<EventKey> keys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern here is not about locating huge data in memory since I don't expect too many conflicting events, but about the performance and the number of memory allocations.
With the solution in place, we're doing:
1.- List<Event> eventsToResolve
: keep in memory the batch of events we receive from the topic or the admin API (let's say we process 500 events)
2.- Map<EventKey, List<EventRecord>> allConflicting
: run a single query to find all the events by event key (I don't expect too much data, but in the worst scenario, we might need to allocate unlimited events by event key which is a risk).
3.- List<EventRecord> resolvedEvents
: the actual events to be persisted.
4.- loop over the eventsToResolve
list and for every item:
4.1.- create again the EventKey
class
4.2.- check whether the event key exists in allConflicting
4.2.1.1.- if not exists, just add the event into resolvedEvents
4.2.2.1.- if exists, Map<String, Double> deductions
: aggregate the metrics with the same event key,
4.2.2.2.- if no deductions
, just add the event into resolvedEvents
4.2.2.3.- if there are deductions
, add the new event with the deduction for the metric into resolvedEvents
4.2.2.4.- if the event has other metrics with no deductions, add new events only for these metrics into resolvedEvents
Note that from the steps 1, 2, 3; we're keeping almost the same data duplicated in memory, plus the performance penalty of creating the event key.
I suggested changing the above by:
1.- as suggested by another comment, we can directly use Map<EventKey, List<Event>> eventsToResolve
as input
2.- instead of using Map<EventKey, List<EventRecord>> allConflicting
which can keep unlimited data in memory, modify the query to return Map<EventKey, Map<String, Double>> eventsByDeduction
. Note that I tried to do this and it worked fine with something like:
select org_id, event_type, event_source, instance_id, timestamp, measurements->>'uom' as uom, sum(cast(measurements->>'value' as double precision)) as total_value
from (
select org_id, event_type, event_source, instance_id, timestamp, jsonb_array_elements(data->'measurements') as measurements
from events
where (org_id, event_type, event_source, instance_id, timestamp) in (%s)
) a
group by org_id, event_type, event_source, instance_id, timestamp, measurements->>'uom'
This way, the logic within the step 4. is much simpler and performant since we don't need to perform any aggregation (the database is doing it for us).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chatted with Kevin about this suggestion a little while ago and we both agreed that at some point we may need to a little more than just aggregation with conflicting events (i.e event_source prioritization). This is a valid concern, though, I'm not sure it would be very common to have many conflicting events. I DO think it would be nice to have this solution in our back pocket should we need it.
I'm happy to have a chat if you feel strongly for this change though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
src/main/java/org/candlepin/subscriptions/event/EventConflictResolver.java
Outdated
Show resolved
Hide resolved
src/main/java/org/candlepin/subscriptions/event/EventConflictResolver.java
Outdated
Show resolved
Hide resolved
/retest |
2eebb70
to
12ee162
Compare
@Sgitario I made the changes in multiple commits. I'm happy to squash them down when you finish your next round of reviewing. |
/retest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm now.
Though, there are some failing tests that need fixing.
Tests are clean locally. Trying to figure out why they are failing for the PR. |
The failures may be unrelated, I will take a look on those |
/retest |
3 similar comments
/retest |
/retest |
/retest |
Can confirm auto opt-in after triggering metering, doesn't happen anymore.
|
/retest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pending the opt-in issue, everything else looks good. Updating the tests in-progress
To add some context around why this opt-in isn't happening for this failing test... When this test makes a metering request to the metering service, there are no new metrics pulled from prometheus and results in no event messages getting put on the event topic. Therefore, the Event processing logic on the worker service does not get triggered which does not trigger opt-in. This worked prior to these changes because a clean up event would always be sent, regardless of whether or not metrics were returned by prometheus, resulting in the opt-in logic getting hit on the worker service. Unless we are able to get prometheus to actually return metrics during the metering request, this test will fail. As I understand it, this is only able to be done in the stage environment and not in EE. |
@san7ket Please confirm based on last weeks discussion:
I will add the logic to prevent the negative measurement value via the API (already on the message ingestion side) and we will let the NEW test fail until this change goes in. |
/retest |
1 similar comment
/retest |
/retest |
/retest |
@san7ket The failure appears to be a test issue, but I need to confirm on Monday to make sure that it isn't a regression from my latest changes. |
When processing incoming Events from either the topic or Admin API, we now check if there are any events that are in conflict, and resolve them when required. A conflict occurs when there is an existing event that matches the EventKey (org_id, event_type, event_source, instance_id, timestamp) of the incoming event. In the case of a match, conflict resolution must occur. Example Conflict/Resolution scenarios include: 1. No Conflicts - Incoming event is valid and no resolution required. - An EventRecord is created from the incoming Event. 2. Conflicting event with equal measurements - Incoming Event is ignored. - Existing EventRecord remains as is with no updates. 3. Conflicting event with different measurements - EventRecord added with a negative accumulative measurement value (resetting the current the total for the timestamp to 0). - Incoming EventRecord measurements reflect the values at this timestamp. - e.g. An incoming event has a measurement of 10 cores. An existing EventRecord matching the tuple has a measurement of 20 cores. An EventRecord would be created with a cores value of -10 and another EventRecord created to apply the new value of 20 cores. NOTES: 1. Removed the CleanupEvent We no longer need to send/process cleanup events since the EventConflictResolver will address any existing events so that they are reflected when tallied.
When there is an error processing a batch of incoming events, an attempt is made to process them one by one before failing completely. Conflicts will need to be resolved in this case as well.
Amendment type is null for initial events, and is set to DEDUCTION when a deduction was required to resolve the event conflicts.
This change serves two purposes: 1. Any incoming event must have an instance_id in order to be processed as a service instance event. Do not attempt to process the event in this case. 2. During the transition away from cleanup events, checking for the existance of the instance_id will keep the logs a little quieter when a cleanup event is picked up. Skipped when detected.
Needed to activate the 'test-inventory' profile since the worker profile requires the inventory database. Without this profile the connection to the inventory DB was not being configured to match the test container DB.
Jira issue: SWATCH-1622
Description
When processing incoming Events from either the topic or Admin API, we now check if there are any events that are in conflict, and resolve them when required.
A conflict occurs when there is an existing event that matches the EventKey (org_id, event_type, event_source, instance_id, timestamp) of the incoming event. In the case of a match, conflict resolution must occur.
Example Conflict/Resolution scenarios include:
NOTES:
We no longer need to send/process cleanup events since the EventConflictResolver will address any existing events so that they are reflected when tallied.
How To Test
The following demonstrates how to test this functionality using smqe-tools, however, if you'd prefer testing with a client tool such as
curl
can be accomplished by adding the events using the internal API:POST /v1/internal/rpc/tally/events
Deploy the following components
Set up smqe-tools
Follow the HOWTO steps to install smqe-tools.
Make sure that you:
Set up the correct paths to the services you will be deploying
Activate the virtual env (ensure correct env path)
Set the proper dynaconf env
Enable some utility functions
Create the file
event_amendment_utils.py
in the smqe-tools check out directory. This file provides a couple of utility functions to wrapping logic for this test process.Test Amendment logic
Launch the python interpreter
Run the following commands.
You should see that there were 2 events added for a single host instance.
add_event
call, but this time be sure to sub the uuid in forinstance_id
so that apply a new cores value of 10.0 to the host for the current hour.You should see that the existing Instance-hour event was considered to be a duplicate, and is not changed. However, the new Cores measurement triggers an amendment. This amendment results in 1 deduction event for the current total (-1), and a new event representing the new incoming value (10.0).
You should see that the existing Instance-hour event was considered to be a duplicate, and is not changed. However, the new Cores measurement triggers an amendment. This amendment results in another deduction event for the current value (-10.0), and a new event representing the new incoming value (20.0).
You should see that the existing Instance-hour event was considered to be a duplicate, and is not changed. However, the new Cores measurement triggers an amendment. This amendment results in another deduction event for the current value (-20.0), and a new event representing the new incoming value (1.0).