-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add consume mode #34
Add consume mode #34
Conversation
c57e155
to
328609b
Compare
fb727e9
to
f6731c4
Compare
4d49c17
to
5755ec5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly just minor nits
// TODO: Not too sure how relevant it is for consuming from the bus | ||
//clientOptions.OperationTimeout = p.readTimeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this comment still relevant? I can't see the vars clientOptions
nor p
, so it looks like maybe that line would be outdated now anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's still relevant, something to ask when I am speaking to a pulsar expert next: 6b31f27
|
||
// this is set when a retry able error happend | ||
errRemoteWriteRetryable := false | ||
blockingSampleCh := make(chan pulsar.ReceivedSample) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the purpose of the blockingSampleCh
...
So when a retryable error happens on remote write then:
errRemoteWriteRetryable
gets set totrue
on:182
- on the next iteration
sample()
gets called again on:131
- because
errRemoteWriteRetryable
istrue
,sample()
returnsblockingSampleCh
- the
select
at the beginning of the loop will then wait forblockingSampleCh
to yield an object or the ticker to tick or thectx
to get cancelled - most likely the next event will then be the ticker ticking, causing a retry
So is the desired function of blockingSampleCh
to introduce a short wait time between retries? If that's the case, then wouldn't we only want to sleep before sending the next sample of that tenant for which the error occurred (since limits or often tenant specific)? In the current implementation, wouldn't that sleep block all tenants?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, even if it's true that in the case of a retryable error we're blocking the forwarding of data for all tenants, instead of just blocking it for the ones which yielded the error, i don't think this is an issue which is critical enough to block the merging of this PR. so i'll just approve anyway to not unnecessarily block you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good point you are raising here. Currently when an error for only a single tenant happens on the remote write end of the adapter we would still receive messages from pulsar for that tenant as they are all coming from the same queue and we have no way to tell the tenant.
I think there would be more intelligent was of handling that e.g.:
Nack
messages for blocked tenants, but that would mean we need to have a good resumption strategy to avoid out of order samples.- Having different queue per tenant in pulsar
For now I would like to keep that simple behaviour and figure out what would be the best way forward in terms of real world error cases.
I have improved the comment in code to reflect that
* pulsar-client-go v0.2.0 * prometheus v2.22.0 Signed-off-by: Christian Simon <simon@swine.de>
In the consume mode the adapter subscribes to a pulsar topic and writes out the metrics as remote_write request to cortex. TODOs: - [ ] Verify error behaviour and ensure those cases have tests - [ ] Provide integration tests (esp. for tenant ID forwarding) Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Only check once checkPeriod is reached or more samples than MaxBatchSize have been received. Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
05ac70b
to
f088518
Compare
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
Signed-off-by: Christian Simon <simon@swine.de>
In the consume mode the adapter subscribes to a pulsar topic and writes
out the metrics as remote_write request to cortex.
TODOs:
An image is available: