Skip to content

Commit

Permalink
Fix RabbitMQ url parsing (#12325)
Browse files Browse the repository at this point in the history
* fix url parsing of path and query params, add integration test

* add changelog entry, fix regression bug to allow credentials in URL

* fix issue with pruning conflict when tests were run in parallel

* apply review suggestion

* make method private
  • Loading branch information
ancalita committed Apr 27, 2023
1 parent 319811f commit b6268c3
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 6 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ test-integration:
# OMP_NUM_THREADS can improve overall performance using one thread by process (on tensorflow), avoiding overload
# TF_CPP_MIN_LOG_LEVEL=2 sets C code log level for tensorflow to error suppressing lower log events
ifeq (,$(wildcard tests_deployment/.env))
OMP_NUM_THREADS=1 TF_CPP_MIN_LOG_LEVEL=2 poetry run pytest $(INTEGRATION_TEST_FOLDER) -n $(JOBS) -m $(INTEGRATION_TEST_PYTEST_MARKERS)
OMP_NUM_THREADS=1 TF_CPP_MIN_LOG_LEVEL=2 poetry run pytest $(INTEGRATION_TEST_FOLDER) -n $(JOBS) -m $(INTEGRATION_TEST_PYTEST_MARKERS) --dist loadgroup
else
set -o allexport; source tests_deployment/.env && OMP_NUM_THREADS=1 TF_CPP_MIN_LOG_LEVEL=2 poetry run pytest $(INTEGRATION_TEST_FOLDER) -n $(JOBS) -m $(INTEGRATION_TEST_PYTEST_MARKERS) && set +o allexport
set -o allexport; source tests_deployment/.env && OMP_NUM_THREADS=1 TF_CPP_MIN_LOG_LEVEL=2 poetry run pytest $(INTEGRATION_TEST_FOLDER) -n $(JOBS) -m $(INTEGRATION_TEST_PYTEST_MARKERS) --dist loadgroup && set +o allexport
endif

test-cli: PYTEST_MARKER=category_cli and (not flaky)
Expand Down
2 changes: 2 additions & 0 deletions changelog/12325.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix parsing of RabbitMQ URL provided in `endpoints.yml` file to include vhost path and query parameters.
Re-allows inclusion of credentials in the URL as a regression fix (this was supported in 2.x).
28 changes: 24 additions & 4 deletions rasa/core/brokers/pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,35 @@ async def connect(self) -> None:

self._exchange = await self._set_up_exchange(channel)

async def _connect(self) -> aio_pika.abc.AbstractRobustConnection:
def _configure_url(self) -> Optional[Text]:
"""Configures the URL to connect to RabbitMQ."""
url = None
# The `url` parameter will take precedence over parameters like `login` or
# `password`.

if self.host.startswith("amqp"):

parsed_host = urlparse(self.host)

amqp_user = f"{self.username}:{self.password}"
url = f"{parsed_host.scheme}://{amqp_user}@{parsed_host.netloc}:{self.port}"
if amqp_user not in parsed_host.netloc:
url = f"{parsed_host.scheme}://{amqp_user}@{parsed_host.netloc}"
else:
url = f"{parsed_host.scheme}://{parsed_host.netloc}"

if str(self.port) not in url:
url = f"{url}:{self.port}"

if parsed_host.path:
url = f"{url}{parsed_host.path}"

if parsed_host.query:
url = f"{url}?{parsed_host.query}"

return url

async def _connect(self) -> aio_pika.abc.AbstractRobustConnection:
# The `url` parameter will take precedence over parameters like `login` or
# `password`.
url = self._configure_url()

ssl_options = _create_rabbitmq_ssl_options(self.host)
logger.info("Connecting to RabbitMQ ...")
Expand Down
30 changes: 30 additions & 0 deletions tests/core/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,33 @@ async def test_sql_connection_error(monkeypatch: MonkeyPatch):
)
with pytest.raises(ConnectionException):
await EventBroker.create(cfg)


@pytest.mark.parametrize(
"host,expected_url",
[
("localhost", None),
("amqp://localhost", "amqp://test_user:test_pass@localhost:5672"),
(
"amqp://test_user:test_pass@localhost",
"amqp://test_user:test_pass@localhost:5672",
),
(
"amqp://test_user:test_pass@localhost/myvhost?connection_timeout=10",
"amqp://test_user:test_pass@localhost:5672/myvhost?connection_timeout=10",
),
("amqp://localhost:5672", "amqp://test_user:test_pass@localhost:5672"),
(
"amqp://test_user:test_pass@localhost:5672/myvhost?connection_timeout=10",
"amqp://test_user:test_pass@localhost:5672/myvhost?connection_timeout=10",
),
],
)
def test_pika_event_broker_configure_url(
host: Text, expected_url: Optional[Text]
) -> None:
username = "test_user"
password = "test_pass"
broker = PikaEventBroker(host=host, username=username, password=password)
url = broker._configure_url()
assert url == expected_url
51 changes: 51 additions & 0 deletions tests/integration_tests/core/brokers/test_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Text

import docker
import pytest
import randomname
from pytest import LogCaptureFixture

from rasa.core.brokers.pika import PikaEventBroker, RABBITMQ_EXCHANGE
Expand Down Expand Up @@ -29,6 +31,7 @@ async def test_pika_event_broker_connect():
await broker.close()


@pytest.mark.xdist_group("rabbitmq")
async def test_pika_event_broker_publish_after_restart(
docker_client: docker.DockerClient,
caplog: LogCaptureFixture,
Expand Down Expand Up @@ -97,3 +100,51 @@ async def test_pika_event_broker_publish_after_restart(

rabbitmq_container.stop()
rabbitmq_container.remove()


@pytest.mark.xdist_group("rabbitmq")
@pytest.mark.parametrize("host_component", ["localhost", "myuser:mypassword@localhost"])
async def test_pika_event_broker_connect_with_path_and_query_params_in_url(
docker_client: docker.DockerClient,
host_component: Text,
) -> None:
username = "myuser"
password = "mypassword"
vhost = "myvhost"
hostname = "my-rabbitmq"

environment = {
"RABBITMQ_DEFAULT_USER": username,
"RABBITMQ_DEFAULT_PASS": password,
"RABBITMQ_DEFAULT_VHOST": vhost,
}

rabbitmq_container = docker_client.containers.run(
image="rabbitmq:3-management",
detach=True,
environment=environment,
name=f"rabbitmq_{randomname.generate(5)}",
hostname=hostname,
ports={f"{RABBITMQ_PORT}/tcp": RABBITMQ_PORT, "15672/tcp": 15672},
)
rabbitmq_container.reload()
assert rabbitmq_container.status == "running"

query_param = "heartbeat=5"

broker = PikaEventBroker(
host=f"amqp://{host_component}/{vhost}?{query_param}",
username=username,
password=password,
port=RABBITMQ_PORT,
queues=[RABBITMQ_DEFAULT_QUEUE],
)

try:
await broker.connect()
assert broker.is_ready()
finally:
await broker.close()

rabbitmq_container.stop()
rabbitmq_container.remove()

0 comments on commit b6268c3

Please sign in to comment.