-
-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Add rebalance integration testing via pytest and setup integration testing on CI #53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Looking for some feedback on the following:
|
| import yaml | ||
|
|
||
|
|
||
| def manage_consumer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the purpose of this function. At the moment it starts, and then after a random delay dies. Is that the intended behaviour? There's no guarantee it will process the messages assigned to it. I think Mark had a good idea in his PR, where he passes in the number of expected messages, and can track to see whether the consumer has processed them or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the function starts, it executes the rust binary in a new child process. Then, thread itself sleeps for a random delay. This gives the consumer a chance to start up, receive assigned partitions, and process messages. Finally, a SIGINT is sent to the process and the consumer shutsdown gracefully.
It is true that we aren't explicitly checking all messages are processed here. It is possible that the consumer is started and dies before it can process any messages (though unlikely given the min_sleep and max_sleep parameters). The purpose of the test is to check for duplicate messages during rebalancing, not necessarily that all messages have been processed.
3eb3a5e to
61da38e
Compare
61da38e to
79eb4a2
Compare
| process = subprocess.Popen( | ||
| [consumer_path, "-c", config_file_path], stderr=subprocess.STDOUT, stdout=log_file | ||
| ) | ||
| random.seed(random_seed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to set the seed multiple times. Doing it once before you generate other random values is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the outcomes of seeding, is that all (8) consumers restart at the same time for every iteration. FWIW, I think this is also a good test to have because we'd want to simulate both rolling deploys and deploys which shutdown multiple consumers at the same time.
| min_restart_duration = 5 | ||
| max_restart_duration = 20 | ||
| topic_name = "task-worker" | ||
| curr_time = int(time.time()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we fixate the time/seed when trying to reproduce a failure? We could read from an environment variable like TEST_SEED 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we just have the seed as cli argument? And in our gha it's always set to 1 or something
Ooops never mind, I see we already went for envar
|
This integration test no longer relies on sentry to produce test messages for integration tests for two reasons:
|
markstory
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. I think we should move forward with this. We can refine and expand it incrementally.
c566a7a to
75ca8e0
Compare
bd85f0a to
1a197a5
Compare
c897e1a to
e407bf9
Compare
e407bf9 to
242c047
Compare
114f746 to
22ec37a
Compare
|
After a long investigation, @john-z-yang and I have found what change was causing CI to fail this integration test. Strangely enough, this test passed consistently locally which made investigating this troublesome. When this test was executed on ubuntu, the consumer would write duplicate messages (typically 0-2 before and after the rebalance). Reverting this change: 2b9064c#diff-316edec7494fa460e0a4bdc62a604da6b4caa7933db418d023bbfd27c6277c4dR64 fixed the problem. This is because when using |
| res | ||
| } | ||
| } | ||
| handle_os_signals(event_sender.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is now working the way we want: how to handle_events and handle_consumer_client signal shutdowns if they error out? I think if those threads just stop running, the consumer won't shut down?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind elaborating on this? I'm having a hard time coming up with a scenario where those 2 threads panic
Overview
This is PR is mainly responsible for adding an end-to-end test for validating that consumer rebalances should not produce 1 or more messages in sqlite. More importantly, this PR introduces python dependencies (pytest) into this project as it is much simpler/quicker to write these end-to-end test in python.
Testing
Run
make integration-test