Skip to content

Commit

Permalink
Merge 17acb62 into 197612b
Browse files Browse the repository at this point in the history
  • Loading branch information
wochinge committed Oct 28, 2019
2 parents 197612b + 17acb62 commit 7d97c73
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -21,6 +21,8 @@ Removed

Fixed
-----
- Fixed ``Connection reset by peer`` errors and bot response delays when using the
Rabbit MQ event broker.

[1.4.2] - 2019-10-28
^^^^^^^^^^^^^^^^^^^^
Expand Down
134 changes: 102 additions & 32 deletions rasa/core/brokers/pika.py
@@ -1,16 +1,22 @@
import json
import logging
import typing
from typing import Dict, Optional, Text, Union
import os
from threading import Thread
from typing import Dict, Optional, Text, Union, List

import time

import rasa.core.brokers.utils as rasa_broker_utils
from rasa.constants import ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
from rasa.core.brokers.event_channel import EventChannel
from rasa.utils.endpoints import EndpointConfig

if typing.TYPE_CHECKING:
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.adapters.blocking_connection import BlockingChannel
from pika import SelectConnection, BlockingConnection
from pika.channel import Channel
from pika.connection import Parameters, Connection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,6 +46,22 @@ def initialise_pika_connection(

import pika

parameters = _get_pika_parameters(
host, username, password, port, connection_attempts, retry_delay_in_seconds
)
return pika.BlockingConnection(parameters)


def _get_pika_parameters(
host: Text,
username: Text,
password: Text,
port: Union[Text, int] = 5672,
connection_attempts: int = 20,
retry_delay_in_seconds: Union[int, float] = 5,
) -> "Parameters":
import pika

if host.startswith("amqp"):
# user supplied a amqp url containing all the info
parameters = pika.URLParameters(host)
Expand All @@ -60,7 +82,8 @@ def initialise_pika_connection(
retry_delay=retry_delay_in_seconds,
ssl_options=rasa_broker_utils.create_rabbitmq_ssl_options(host),
)
return pika.BlockingConnection(parameters)

return parameters


def initialise_pika_channel(
Expand Down Expand Up @@ -106,7 +129,7 @@ def _declare_pika_channel_with_queue(
return channel


def close_pika_channel(channel: "BlockingChannel") -> None:
def close_pika_channel(channel: "Channel") -> None:
"""Attempt to close Pika channel."""

from pika.exceptions import AMQPError
Expand All @@ -118,7 +141,7 @@ def close_pika_channel(channel: "BlockingChannel") -> None:
logger.exception("Failed to close Pika channel.")


def close_pika_connection(connection: "BlockingConnection") -> None:
def close_pika_connection(connection: "Connection") -> None:
"""Attempt to close Pika connection."""

from pika.exceptions import AMQPError
Expand All @@ -138,7 +161,9 @@ def __init__(
password: Text,
port: Union[int, Text] = 5672,
queue: Text = "rasa_core_events",
loglevel: Union[Text, int] = logging.WARNING,
loglevel: Union[Text, int] = os.environ.get(
ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
),
):
logging.getLogger("pika").setLevel(loglevel)

Expand All @@ -147,28 +172,17 @@ def __init__(
self.username = username
self.password = password
self.port = port
self.channel = None # delay opening channel until first event
self.channel: Optional["Channel"] = None

# List to store unpublished messages which hopefully will be published later
self._unpublished_messages: List[Text] = []
self._run_pika()

def __del__(self) -> None:
if self.channel:
close_pika_channel(self.channel)
close_pika_connection(self.channel.connection)

def _open_channel(
self,
connection_attempts: int = 20,
retry_delay_in_seconds: Union[int, float] = 5,
) -> "BlockingChannel":
return initialise_pika_channel(
self.host,
self.queue,
self.username,
self.password,
self.port,
connection_attempts,
retry_delay_in_seconds,
)

@classmethod
def from_endpoint_config(
cls, broker_config: Optional["EndpointConfig"]
Expand All @@ -178,7 +192,53 @@ def from_endpoint_config(

return cls(broker_config.url, **broker_config.kwargs)

def publish(self, event: Dict, retries=60, retry_delay_in_seconds: int = 5) -> None:
def _run_pika(self) -> None:
self._pika_connection = self._get_connection()
# Run Pika io loop in extra thread so it's not blocking
self._run_pika_io_loop_in_thread()

def _get_connection(self) -> "Connection":
import pika

parameters = _get_pika_parameters(
self.host, self.username, self.password, self.port
)
return pika.SelectConnection(
parameters,
on_open_callback=self._on_open_connection,
on_open_error_callback=self._on_open_connection_error,
)

def _on_open_connection(self, connection: "SelectConnection") -> None:
logger.debug(f"Rabbit MQ connection to '{self.host}' was established.")
connection.channel(on_open_callback=self._on_channel_open)

def _on_open_connection_error(self, _, error: Text) -> None:
logger.warning(
f"Connecting to '{self.host}' failed with error '{error}. Trying again."
)

def _on_channel_open(self, channel: "Channel") -> None:
logger.debug("Rabbit MQ channel was opened.")
channel.queue_declare(self.queue, durable=True)

self.channel = channel

while len(self._unpublished_messages) > 0:
# Send unpublished messages
message = self._unpublished_messages.pop()
self._publish(message)

def _run_pika_io_loop_in_thread(self) -> None:
thread = Thread(target=self._run_pika_io_loop, daemon=True)
thread.start()

def _run_pika_io_loop(self) -> None:
self._pika_connection.ioloop.start()

def publish(
self, event: Dict, retries: int = 60, retry_delay_in_seconds: int = 5
) -> None:
"""Publish `event` into Pika queue.
Perform `retries` publish attempts with `retry_delay_in_seconds` between them.
Expand Down Expand Up @@ -207,12 +267,22 @@ def publish(self, event: Dict, retries=60, retry_delay_in_seconds: int = 5) -> N
)

def _publish(self, body: Text) -> None:
if not self.channel:
self.channel = self._open_channel(connection_attempts=1)

self.channel.basic_publish("", self.queue, body)

logger.debug(
"Published Pika events to queue '{}' on host "
"'{}':\n{}".format(self.queue, self.host, body)
)
if self._pika_connection.is_closed:
# Try to reset connection
self._pika_connection = self._get_connection()
self._run_pika_io_loop_in_thread()
elif not self.channel:
logger.warning(
"Rabbit MQ channel was not assigned yet. Adding message to "
"list of unpublished messages and trying to publish them "
"later."
)
self._unpublished_messages.append(body)

else:
self.channel.basic_publish("", self.queue, body)

logger.debug(
"Published Pika events to queue '{}' on host "
"'{}':\n{}".format(self.queue, self.host, body)
)

0 comments on commit 7d97c73

Please sign in to comment.