Skip to content

Commit

Permalink
Merge 3b413bc into 197612b
Browse files Browse the repository at this point in the history
  • Loading branch information
wochinge committed Oct 29, 2019
2 parents 197612b + 3b413bc commit 19407c0
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -21,6 +21,8 @@ Removed

Fixed
-----
- Fixed ``Connection reset by peer`` errors and bot response delays when using the
RabbitMQ event broker.

[1.4.2] - 2019-10-28
^^^^^^^^^^^^^^^^^^^^
Expand Down
171 changes: 139 additions & 32 deletions rasa/core/brokers/pika.py
@@ -1,16 +1,23 @@
import json
import logging
import typing
from typing import Dict, Optional, Text, Union
import os
from collections import deque
from threading import Thread
from typing import Dict, Optional, Text, Union, Deque, Callable

import time

import rasa.core.brokers.utils as rasa_broker_utils
from rasa.constants import ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
from rasa.core.brokers.event_channel import EventChannel
from rasa.utils.endpoints import EndpointConfig

if typing.TYPE_CHECKING:
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
from pika import SelectConnection, BlockingConnection
from pika.channel import Channel
from pika.connection import Parameters, Connection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,6 +47,36 @@ def initialise_pika_connection(

import pika

parameters = _get_pika_parameters(
host, username, password, port, connection_attempts, retry_delay_in_seconds
)
return pika.BlockingConnection(parameters)


def _get_pika_parameters(
host: Text,
username: Text,
password: Text,
port: Union[Text, int] = 5672,
connection_attempts: int = 20,
retry_delay_in_seconds: Union[int, float] = 5,
) -> "Parameters":
"""Create Pika `Parameters`.
Args:
host: Pika host
username: username for authentication with Pika host
password: password for authentication with Pika host
port: port of the Pika host
connection_attempts: number of channel attempts before giving up
retry_delay_in_seconds: delay in seconds between channel attempts
Returns:
Pika `Paramaters` which can be used to create a new connection to a broker.
"""

import pika

if host.startswith("amqp"):
# user supplied a amqp url containing all the info
parameters = pika.URLParameters(host)
Expand All @@ -60,7 +97,34 @@ def initialise_pika_connection(
retry_delay=retry_delay_in_seconds,
ssl_options=rasa_broker_utils.create_rabbitmq_ssl_options(host),
)
return pika.BlockingConnection(parameters)

return parameters


def initialise_pika_select_connection(
parameters: "Parameters",
on_open_callback: Callable[["SelectConnection"], None],
on_open_error_callback: Callable[["SelectConnection", Text], None],
) -> "SelectConnection":
"""Create a non-blocking Pika `SelectConnection`.
Args:
parameters: Parameters which should be used to connect.
on_open_callback: Callback which is called when the connection was established.
on_open_error_callback: Callback which is called when connecting to the broker
failed.
Returns:
An callback based connection to the RabbitMQ event broker.
"""

import pika

return pika.SelectConnection(
parameters,
on_open_callback=on_open_callback,
on_open_error_callback=on_open_error_callback,
)


def initialise_pika_channel(
Expand Down Expand Up @@ -106,7 +170,7 @@ def _declare_pika_channel_with_queue(
return channel


def close_pika_channel(channel: "BlockingChannel") -> None:
def close_pika_channel(channel: "Channel") -> None:
"""Attempt to close Pika channel."""

from pika.exceptions import AMQPError
Expand All @@ -118,7 +182,7 @@ def close_pika_channel(channel: "BlockingChannel") -> None:
logger.exception("Failed to close Pika channel.")


def close_pika_connection(connection: "BlockingConnection") -> None:
def close_pika_connection(connection: "Connection") -> None:
"""Attempt to close Pika connection."""

from pika.exceptions import AMQPError
Expand All @@ -138,7 +202,9 @@ def __init__(
password: Text,
port: Union[int, Text] = 5672,
queue: Text = "rasa_core_events",
loglevel: Union[Text, int] = logging.WARNING,
loglevel: Union[Text, int] = os.environ.get(
ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
),
):
logging.getLogger("pika").setLevel(loglevel)

Expand All @@ -147,28 +213,17 @@ def __init__(
self.username = username
self.password = password
self.port = port
self.channel = None # delay opening channel until first event
self.channel: Optional["Channel"] = None

# List to store unpublished messages which hopefully will be published later
self._unpublished_messages: Deque[Text] = deque()
self._run_pika()

def __del__(self) -> None:
if self.channel:
close_pika_channel(self.channel)
close_pika_connection(self.channel.connection)

def _open_channel(
self,
connection_attempts: int = 20,
retry_delay_in_seconds: Union[int, float] = 5,
) -> "BlockingChannel":
return initialise_pika_channel(
self.host,
self.queue,
self.username,
self.password,
self.port,
connection_attempts,
retry_delay_in_seconds,
)

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
Expand All @@ -178,7 +233,50 @@ def from_endpoint_config(

return cls(broker_config.url, **broker_config.kwargs)

def publish(self, event: Dict, retries=60, retry_delay_in_seconds: int = 5) -> None:
def _run_pika(self) -> None:
parameters = _get_pika_parameters(
self.host, self.username, self.password, self.port
)
self._pika_connection = initialise_pika_select_connection(
parameters, self._on_open_connection, self._on_open_connection_error
)
# Run Pika io loop in extra thread so it's not blocking
self._run_pika_io_loop_in_thread()

def _on_open_connection(self, connection: "SelectConnection") -> None:
logger.debug(f"RabbitMQ connection to '{self.host}' was established.")
connection.channel(on_open_callback=self._on_channel_open)

def _on_open_connection_error(self, _, error: Text) -> None:
logger.warning(
f"Connecting to '{self.host}' failed with error '{error}'. Trying again."
)

def _on_channel_open(self, channel: "Channel") -> None:
logger.debug("RabbitMQ channel was opened.")
channel.queue_declare(self.queue, durable=True)

self.channel = channel

while self._unpublished_messages:
# Send unpublished messages
message = self._unpublished_messages.popleft()
self._publish(message)
logger.debug(
f"Published message from queue of unpublished messages. "
f"Remaining unpublished messages: {len(self._unpublished_messages)}."
)

def _run_pika_io_loop_in_thread(self) -> None:
thread = Thread(target=self._run_pika_io_loop, daemon=True)
thread.start()

def _run_pika_io_loop(self) -> None:
self._pika_connection.ioloop.start()

def publish(
self, event: Dict, retries: int = 60, retry_delay_in_seconds: int = 5
) -> None:
"""Publish `event` into Pika queue.
Perform `retries` publish attempts with `retry_delay_in_seconds` between them.
Expand Down Expand Up @@ -207,12 +305,21 @@ def publish(self, event: Dict, retries=60, retry_delay_in_seconds: int = 5) -> N
)

def _publish(self, body: Text) -> None:
if not self.channel:
self.channel = self._open_channel(connection_attempts=1)

self.channel.basic_publish("", self.queue, body)

logger.debug(
"Published Pika events to queue '{}' on host "
"'{}':\n{}".format(self.queue, self.host, body)
)
if self._pika_connection.is_closed:
# Try to reset connection
self._run_pika()
elif not self.channel:
logger.warning(
f"RabbitMQ channel has not been assigned. Adding message to "
f"list of unpublished messages and trying to publish them "
f"later. Current number of unpublished messages is "
f"{len(self._unpublished_messages)}."
)
self._unpublished_messages.append(body)
else:
self.channel.basic_publish("", self.queue, body)

logger.debug(
f"Published Pika events to queue '{self.queue}' on host "
f"'{self.host}':\n{body}"
)

0 comments on commit 19407c0

Please sign in to comment.