Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery broker #12630

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Celery broker #12630

wants to merge 6 commits into from

Conversation

taytzehao
Copy link

Proposed changes:

  • Added celery broker . Jira ticket OSS-705. The broker is able to take in all key value pair arguments for apply async.

Status (please check what you already did):

  • added some tests for the functionality
  • updated the documentation
  • updated the changelog (please check changelog for instructions)
  • reformat files using black (please check Readme for instructions)

@taytzehao taytzehao requested a review from a team as a code owner July 6, 2023 16:21
@CLAassistant
Copy link

CLAassistant commented Jul 6, 2023

CLA assistant check
All committers have signed the CLA.

Copy link
Contributor

@radovanZRasa radovanZRasa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @taytzehao,
Thank you for your generous contribution! 💯

task_instance: Celery = Celery("rasa_event_broker", broker=broker_url)
task_callable = partial(task_instance.send_task, task_name)
else:
task_instance = rasa.shared.utils.common.class_from_module_path(task_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems redundant as Celery requires URL to be present.
If broker_url is not present we need to issue RasaException to signal to the user that configuration they provided is not valid.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@radovanZRasa, the idea here would be to allow users to have the choice to either use send_task or apply_async. If the user just uses apply_async, they would not need to pass it the broker_url. I did find that passing in the broker_url could be a bit of a hassle from the user's side as the user would need to pass it into the endpoint.yaml file via env variables. This is because the broker_url should contain sensitive auth info

If the user uses apply_async, they can use the broker_url that is already set functionally nearby their function. What do you think?



def test_celery_from_endpoint_config():
cfg = read_endpoint_config(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we do have several YAML test configs from broker and other parts of Rasa we plan to move away from them as chasing them down is not as fast as having these as pytest fixtures.
Suggestion is to change this to:

@pytest.fixture
def celery_endpoint_config_string() -> str:
    return """
        event_broker:
          type: celery
          broker_url: rediss://:password@localhost:6379/0
          task_name: tasks.event_logger
          countdown: 2
          priority: 1
    """

@pytest.fixture
def celery_endpoint_config(celery_endpoint_config_string: str) -> EndpointConfig:
   yaml_content = read_yaml(celery_endpoint_config_string)
   return EndpointConfig.from_dict(yaml_content)
   
 def test_celery_from_endpoint_config(celery_endpoint_config: EndpointConfig):
   actual = await EventBroker.create(celery_endpoint_config)

   assert isinstance(actual, CeleryEventBroker)

def __init__(
self,
broker_url: Optional[Text] = None,
task_name: Text = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be renamed to task_queue_name.

self._task = self._get_task(broker_url, task_name)
self._task_kwargs: Dict[Text, Any] = kwargs

def _get_task(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to obfuscate send_task method from Celery.
You can just implement publish method as:

def self.celery_instance.send_task(self.task_queue_name, [event], **self._task_kwargs)

This method can then be used to validate config and initialise client and thus renamed to match the purpose.

@taytzehao
Copy link
Author

@radovanZRasa , please review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants