Skip to content

Forward kafka cluster options to clients #21275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Nov 17, 2020

Conversation

alexef
Copy link
Contributor

@alexef alexef commented Oct 10, 2020

Fixes: #14123. Related to: getsentry/snuba#1406

@alexef alexef requested a review from a team as a code owner October 10, 2020 06:22
@dashed dashed requested a review from BYK October 10, 2020 21:37
Copy link
Member

@BYK BYK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the syntax errors, this seems fine to me.

Deferring to @mattrobenolt and @tkaemming as our resident Kafka experts.

Copy link
Member

@BYK BYK left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks legit to me. Still waiting to hear back from @mattrobenolt and @tkaemming before merging though.

Thanks a lot for looking into this @alexef!

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alexef, happy to see you here as well :D

Thanks for doing this on Sentry as well.
There are a couple of things to figure out.

  1. the order of the override, please see the comment inline.
  2. there is a potential backward compatibility issue I would like @BYK to chime in on because it impact self hosted users. Today we are already applying the kafka connection parameters from the settings for producers (https://github.com/getsentry/sentry/blob/master/src/sentry/utils/kafka.py#L34-L35) Which means self hosted users may have already overridden their connection parameters. With this PR, at the next release, those overridden parameters will quietly start being applied to consumers as well. Most likely this is not an issue, but it is impossible to guarantee that nobody overrode a parameter that is fine for producers but breaks everything if applied to their consumers as well. @BYK what do you think? Is this something we have to watch out or communicate in the onpremise installation?

@BYK
Copy link
Member

BYK commented Oct 19, 2020

@BYK what do you think? Is this something we have to watch out or communicate in the onpremise installation?

@fpacifici - good point that said we pre-set this value in on-premise and don't provide support for external Kafka instances. What's the worst-case outcome if anyone modified these settings for their setup?

@fpacifici
Copy link
Contributor

@BYK the worst case scenario is very hard to estimate because the amount of possible interactions to rule out between parameters when applied to producers and consumers is huge. It is much less likely that somebody would have overridden a parameter for the producer that actually would not work for the consumer as in the end it is the same broker, but it is not impossible.
I think it would be quite unlikely that a user did an override of the producer config to begin with (today this is not applied to the consumer) they rely on to make things work. Otherwise they would have needed them for the consumers as well.
So likely we would not run into issues here.

To be 100% sure we would have to separate producer from consumer config, which is kind of wrong anyway because a lot of it is supposed to be in common.
Also the current situation (before this PR) makes it hard to enforce a list of allowed parameters and prevents us from enforcing a contract with users since we do not enforce what can be changed (to avoid having these issues again).

I would suggest we do the followings:

  • define only one config (producer/consumer) in the settings. We can apply specific overrides defined by the product so that what we would see in the config would be only connection related parameters which tend to be common
  • Have a list of allowed settings in the product (like I asked @alexef to build in the snuba version (thanks) ) so we can be sure of what can be overridden by settings going on.
  • enforce that list but have an additional setting to disable this behavior so if somebody already added an override to the producer settings they can easily unblock themselves by disabling the enforced config (we would need to communicate this in the next release). Hopefully this behavior is unlikely. Again changing fundamental parameters of the kafka connection on the producer only seems unlikely to me, though it is not impossible.

@BYK do you think it would be acceptable? At least we would be making it clear and explicit when an unsupported override is happening.

@BYK
Copy link
Member

BYK commented Oct 20, 2020

@fpacifici your plan sounds like it covers all the cases. It also seems like a longer term plan though. I think:

  1. We can land this patch as it is since it is fixing an issue that people are having and the potential fallout from this fix is near zero based on our usage numbers and patterns
  2. We can/should implement your suggestion of having a whitelist of options if this is important, but without the enforcement killswitch for the sake of simplicity and speed of implementation, given the case it will cover will be very small

What do you think?

@fpacifici
Copy link
Contributor

fpacifici commented Oct 21, 2020

@BYK, unfortunately I found a case where the settings for the producer may not be safe on the consumer that we use internally.
At this point I think the safest option is to split the config between producer and consumer. This would not be too complex for this PR and will also address the backward compatibility issue.

What about we changed the structure of KAFKA_CLUSTERS into

KAFKA_CLUSTERS = {
   'default': {
        'common': {
            "bootstrap.servers": "127.0.0.1:9092",
           .....
        },
        "producers": {
             "compression.type": "lz4",
             "message.max.bytes": 50000000,  # 50MB, default is 1MB
        },
       "consumers": {}
       # old parameters here for backward compatibility. These are deprecated and will be removed over time
       "bootstrap.servers": "127.0.0.1:9092",
   }

We would do like on snuba, the priority of parameters being loaded wouldl be:

  1. hardcoded application overrides
  2. legacy parameters (for backwards compatibility)
  3. specific parameters (producer/consumer)
  4. common parameters

I would suggest we build two simple method to load the consumer and the producer config that applies overrides according to the priority above, then replacing the direct references to settings.KAFKA_CLUSTERS with calls to those methods will be pretty easy since there are only few reference in the code base.

@alexef What do you think about this ?

@alexef
Copy link
Contributor Author

alexef commented Oct 23, 2020

Updated. I couldn't find any code overrides for configs, this is why I didn't add it to the config getter.

@fpacifici let me know what you think

@alexef alexef requested a review from a team October 23, 2020 13:42
Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this goes in the right direction.
A few comments inline.
Could you also write a short test for your config loader methods? This does not need to be as exhaustive as the one you built for snuba.

@alexef
Copy link
Contributor Author

alexef commented Oct 24, 2020

Up for another check :)

@BYK
Copy link
Member

BYK commented Nov 12, 2020

/gcbrun

@fpacifici
Copy link
Contributor

I found the issue that causes the cloudbuild to fail:

07:58:38 [ERROR] celery.app.trace: Task sentry.tasks.store.save_event[6c0d8572-1805-4c7a-ab2d-e5bbb37348aa] raised unexpected: 
ValueError('The `socket.timeout.ms` configuration key is not supported.',) (data={u'internal': False, u'traceback': u'Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 412, in trace_task  
    R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/sentry_sdk/integrations/celery.py", line 186, in _inner
    reraise(*exc_info)
File "/usr/local/lib/python2.7/site-packages/sentry_sdk/integrations/celery.py", line 181, in _inner
    return f(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/sentry/tasks/base.py", line 48, in _wrapped
    result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/sentry/tasks/store.py", line 870, in save_event
    _do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs)
File "/usr/local/lib/python2.7/site-packages/sentry/tasks/store.py", line 784, in _do_save_event
    project_id, assume_normalized=True, start_time=start_time, cache_key=cache_key
File "/usr/local/lib/python2.7/site-packages/sentry/utils/metrics.py", line 193, in inner
    return f(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/sentry/event_manager.py", line 478, in save
    _eventstream_insert_many(jobs)
File "/usr/local/lib/python2.7/site-packages/sentry/utils/metrics.py", line 193, in inner
    return f(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/sentry/event_manager.py", line 768, in _eventstream_insert_many
    skip_consume=job.get("raw", False),
File "/usr/local/lib/python2.7/site-packages/sentry/utils/services.py", line 105, in <lambda>
    context[key] = (lambda f: lambda *a, **k: getattr(self, f)(*a, **k))(key)
File "/usr/local/lib/python2.7/site-packages/sentry/eventstream/snuba.py", line 132, in insert
    headers={"Received-Timestamp": six.text_type(received_timestamp)},
File "/usr/local/lib/python2.7/site-packages/sentry/eventstream/kafka/backend.py", line 52, in _send
    self.producer.poll(0.0)
File "/usr/local/lib/python2.7/site-packages/django/utils/functional.py", line 35, in __get__
    res = instance.__dict__[self.name] = self.func(instance)
File "/usr/local/lib/python2.7/site-packages/sentry/eventstream/kafka/backend.py", line 25, in producer
    return kafka.producers.get(settings.KAFKA_EVENTS)
File "/usr/local/lib/python2.7/site-packages/sentry/utils/kafka.py", line 36, in get
    cluster_options = get_kafka_producer_cluster_options(cluster_name)
File "/usr/local/lib/python2.7/site-packages/sentry/utils/kafka_config.py", line 65, in get_kafka_producer_cluster_options
    return _get_kafka_cluster_options(cluster_name, PRODUCERS_SECTION, with_legacy=True)
File "/usr/local/lib/python2.7/site-packages/sentry/utils/kafka_config.py", line 54, in _get_kafka_cluster_options
    configuration_key=configuration_key\nValueError: The `socket.timeout.ms` configuration key is not supported.\n', u'name': 'sentry.tasks.store.save_event', u'args': '()', u'kwargs': "{'event_id': '87921e4de05e79d26f28ff33660dca96', 'project_id': 1L, 'start_time': 1605254317.0, 'cache_key': u'e:87921e4de05e79d26f28ff33660dca96:1', 'data': None}", u'description': u'raised unexpected', u'hostname': '7b4af7b6291f', u'id': '6c0d8572-1805-4c7a-ab2d-e5bbb37348aa', u'exc': "ValueError('The `socket.timeout.ms` configuration key is not supported.',)"})

Thats' because the PR applies the list of allowed parameters not only to the new configuration (correct) but also to the legacy configuration (incorrect). The legacy configuration should not be validated since we do not have control on what was added so far.
Also socket.timeout.ms is something that should be in the allowed list.

This issue shows up on cloud build (fortunately it got caught there) because it runs the onpremise version of sentry and that one overrides that parameter by default: https://github.com/getsentry/onpremise/blob/master/sentry/sentry.conf.example.py#L119

If you want to reproduce this:

  1. install self hosted sentry as explained in https://github.com/getsentry/onpremise
  2. execute the tests ./tests.sh It should work.
  3. replace the docker image for sentry with the one fromo your cloud build in this file: https://github.com/getsentry/onpremise/blob/master/.env#L6. That has to be replaced with us.gcr.io/sentryio/sentry:00f4a3978a6ed00cb0a4ea82b343422129976f54. I picked it from the logs of your build, but you cann technically build the image locally if you prefer.
  4. find the logs of the celery worker that is the one that throws (which is not shown in the logs of the cloud build): docker log sentry_onpremise_worker_1

@BYK The celery worker logs are not shown in cloudbuild upon failure, that means every errors happening in sentry during ingestion is hidden. That covers all the steps from the ingestion consumer (after relay) and before eventstream (where we produce to kafka for snuba)

@BYK
Copy link
Member

BYK commented Nov 13, 2020

Quite sure the failure is due to this config:

https://github.com/getsentry/onpremise/blob/066bf262aac7b50cdc870b3f4e41d378cc2193db/sentry/sentry.conf.example.py#L116-L125

I think if we cannot make this backward-compatible with these existing configurations out in the wild, we should rename this config.

@BYK
Copy link
Member

BYK commented Nov 13, 2020

Not sure if we can do something clever here to auto-migration/compatibility: https://github.com/getsentry/sentry/blob/master/docker/sentry.conf.py

@fpacifici
Copy link
Contributor

@BYK, let me clarify. There is nothing wrong with socket.timeout.ms. This PR is supposed to be backward compatible and support it without changes to the field, though right now it is applying the same validation to the legacy fields that it applies to the new ones. This is incorrect and @alexef should be able to fix that behavior quickly.

I tagged you because of the missing logs in the cloudbuild results. I think we should expose the celery worker logs as well, because there is where a big chunk of the ingestion logic in the sentry code base happens.

@BYK
Copy link
Member

BYK commented Nov 13, 2020

@fpacifici - sorry, I wrote my comment before seeing yours. I get the issue now. I've also submitted a PR to make the logs available.

@fpacifici
Copy link
Contributor

thanks

@fpacifici
Copy link
Contributor

/gcbuild

@fpacifici
Copy link
Contributor

/gcbrun

@fpacifici
Copy link
Contributor

/gcbrun

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one small issue in the kafka_config, but that is fairly important, it took me a while to figure out why it was working in a certain way. Then everything else pass, tomorrow I will merge if it is fixed.

Comment on lines 41 to 46
if with_legacy and legacy_options:
# producer uses all legacy_options
options.update(legacy_options)
elif "bootstrap.servers" in legacy_options:
# legacy override of bootstrap.servers should be preserved
options["bootstrap.servers"] = legacy_options["bootstrap.servers"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be very hard to grasp. We need to change how this is structured.
Basically what you are trying to do is to pick all legacy options for producers (if there are legacy options) and, for consumers, pick only bootstrap.servers from legacy options.
Why not rewriting it as

#Rename with_legacy into only_bootstrap and invert
if legacy_options:
   if only_bootstrap:
       assert "bootstrap.servers" in legacy_options
       options["bootstrap.servers"] = legacy_options["bootstrap.servers"]
   else :
       options.update(legacy_options)
else:
   # same as before.

So it should be clear that picking the bootstrap only from the legacy option is not a way to override other options but it is meant to build legacy options that discard everything that is not legacy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, your version is way more readable. this ended up like this due to many rewrittings.

updated

@BYK
Copy link
Member

BYK commented Nov 17, 2020

/gcbrun

@BYK
Copy link
Member

BYK commented Nov 17, 2020

Thanks a lot @alexef for following along for all this time!

@BYK
Copy link
Member

BYK commented Nov 17, 2020

/gcbrun

@alexef
Copy link
Contributor Author

alexef commented Nov 17, 2020

@BYK I can't wait to see this merged so we can upgrade our onprem installation.

The biggest challenge was keeping my dev environment usable - python2.7 + mac does not play well, but this should be addressed soon.

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this will be very useful.

@fpacifici fpacifici merged commit 24d86a0 into getsentry:master Nov 17, 2020
@alexef alexef deleted the forward-kafka-optins branch November 17, 2020 18:19
@github-actions github-actions bot locked and limited conversation to collaborators Dec 17, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

SASL & SSL Support for kafka config
3 participants