Skip to content

Commit

Permalink
Merge 393cb4a into 135226b
Browse files Browse the repository at this point in the history
  • Loading branch information
ricwo committed Oct 4, 2019
2 parents 135226b + 393cb4a commit 84bb6d9
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 17 deletions.
12 changes: 10 additions & 2 deletions CHANGELOG.rst
Expand Up @@ -9,12 +9,20 @@ This project adheres to `Semantic Versioning`_ starting with version 1.0.

[Unreleased 1.3.8]
^^^^^^^^^^^^^^^^^^
Added
-----
- Added the ability to configure the number of Sanic worker processes in the HTTP
server (``rasa.server``) and input channel server
(``rasa.core.agent.handle_channels()``). The number of workers can be set using the
environment variable ``SANIC_WORKERS`` (default: 1). A value of >1 is allowed only in
combination with ``RedisLockStore`` as the lock store.

Fixed
-----
- Fixed error ``Object of type 'MaxHistoryTrackerFeaturizer' is not JSON serializable``
when running ``rasa train core``
- Default channel ``send_`` methods no longer support kwargs as they caused issues in incompatible channelss
when running ``rasa train core``.
- Default channel ``send_`` methods no longer support kwargs as they caused issues
in incompatible channels.

[1.3.7] - 2019-09-27
^^^^^^^^^^^^^^^^^^^^
Expand Down
5 changes: 2 additions & 3 deletions examples/nlg_server/nlg_server.py
Expand Up @@ -7,13 +7,12 @@
from rasa.core.domain import Domain
from rasa.core.nlg import TemplatedNaturalLanguageGenerator
from rasa.core.trackers import DialogueStateTracker
from rasa.constants import ENV_SANIC_BACKLOG, DEFAULT_SANIC_WORKERS

logger = logging.getLogger(__name__)

DEFAULT_SERVER_PORT = 5056

DEFAULT_SANIC_WORKERS = 1


def create_argument_parser():
"""Parse all the command line arguments for the nlg server script."""
Expand Down Expand Up @@ -75,7 +74,7 @@ async def nlg(request):
host="0.0.0.0",
port=port,
workers=workers,
backlog=int(os.environ.get("SANIC_BACKLOG", "100")),
backlog=int(os.environ.get(ENV_SANIC_BACKLOG, "100")),
)


Expand Down
4 changes: 4 additions & 0 deletions rasa/constants.py
Expand Up @@ -41,3 +41,7 @@
DEFAULT_LOG_LEVEL_LIBRARIES = "ERROR"
ENV_LOG_LEVEL = "LOG_LEVEL"
ENV_LOG_LEVEL_LIBRARIES = "LOG_LEVEL_LIBRARIES"

DEFAULT_SANIC_WORKERS = 1
ENV_SANIC_WORKERS = "SANIC_WORKERS"
ENV_SANIC_BACKLOG = "SANIC_BACKLOG"
6 changes: 4 additions & 2 deletions rasa/core/agent.py
Expand Up @@ -11,7 +11,8 @@

import rasa
import rasa.utils.io
from rasa.constants import DEFAULT_DOMAIN_PATH, LEGACY_DOCS_BASE_URL
import rasa.core.utils
from rasa.constants import DEFAULT_DOMAIN_PATH, LEGACY_DOCS_BASE_URL, ENV_SANIC_BACKLOG
from rasa.core import constants, jobs, training
from rasa.core.channels.channel import InputChannel, OutputChannel, UserMessage
from rasa.core.constants import DEFAULT_REQUEST_TIMEOUT
Expand Down Expand Up @@ -713,7 +714,8 @@ def handle_channels(
app.run(
host="0.0.0.0",
port=http_port,
backlog=int(os.environ.get("SANIC_BACKLOG", "100")),
backlog=int(os.environ.get(ENV_SANIC_BACKLOG, "100")),
workers=rasa.core.utils.number_of_sanic_workers(self.lock_store),
)

# this might seem unnecessary (as run does not return until the server
Expand Down
15 changes: 9 additions & 6 deletions rasa/core/run.py
Expand Up @@ -9,10 +9,10 @@

import rasa.core.utils
import rasa.utils
import rasa.utils.io
import rasa.utils.common
from rasa import model
from rasa import server
import rasa.utils.io
from rasa import model, server
from rasa.constants import ENV_SANIC_BACKLOG
from rasa.core import agent, channels, constants
from rasa.core.agent import Agent
from rasa.core.channels import console
Expand Down Expand Up @@ -184,9 +184,10 @@ def serve_application(
"before_server_start",
)

async def clear_model_files(app: Sanic, _loop: Text) -> None:
# noinspection PyUnresolvedReferences
async def clear_model_files(_app: Sanic, _loop: Text) -> None:
if app.agent.model_directory:
shutil.rmtree(app.agent.model_directory)
shutil.rmtree(_app.agent.model_directory)

app.register_listener(clear_model_files, "after_server_stop")

Expand All @@ -196,7 +197,8 @@ async def clear_model_files(app: Sanic, _loop: Text) -> None:
host="0.0.0.0",
port=port,
ssl=ssl_context,
backlog=int(os.environ.get("SANIC_BACKLOG", "100")),
backlog=int(os.environ.get(ENV_SANIC_BACKLOG, "100")),
workers=rasa.core.utils.number_of_sanic_workers(endpoints.lock_store),
)


Expand All @@ -214,6 +216,7 @@ async def load_agent_on_start(
(hence the `app` and `loop` arguments)."""
import rasa.core.brokers.utils as broker_utils

# noinspection PyBroadException
try:
with model.get_model(model_path) as unpacked_model:
_, nlu_model = model.get_model_subdirectories(unpacked_model)
Expand Down
77 changes: 73 additions & 4 deletions rasa/core/utils.py
Expand Up @@ -2,26 +2,38 @@
import argparse
import json
import logging
import os
import re
import sys
from asyncio import Future
from hashlib import md5, sha1
from io import StringIO
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Text, Tuple, Callable
from typing import Union
from typing import (
Any,
Dict,
List,
Optional,
Set,
TYPE_CHECKING,
Text,
Tuple,
Callable,
Union,
)

import aiohttp
from aiohttp import InvalidURL
from sanic import Sanic
from sanic.views import CompositionView

import rasa.utils.io as io_utils
from rasa.constants import ENV_SANIC_WORKERS, DEFAULT_SANIC_WORKERS

# backwards compatibility 1.0.x
# noinspection PyUnresolvedReferences
from rasa.utils.endpoints import concat_url
from rasa.utils.endpoints import read_endpoint_config
from rasa.core.lock_store import LockStore, RedisLockStore
from rasa.utils.endpoints import EndpointConfig, read_endpoint_config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -457,3 +469,60 @@ def handler(fut: Future) -> None:
)

return handler


def _lock_store_is_redis_lock_store(
lock_store: Union[EndpointConfig, LockStore, None]
) -> bool:
# determine whether `lock_store` is associated with a `RedisLockStore`
if isinstance(lock_store, LockStore):
if isinstance(lock_store, RedisLockStore):
return True
return False

# `lock_store` is `None` or `EndpointConfig`
return lock_store is not None and lock_store.type == "redis"


def number_of_sanic_workers(lock_store: Union[EndpointConfig, LockStore]) -> int:
"""Get the number of Sanic workers to use in `app.run()`.
If the environment variable constants.ENV_SANIC_WORKERS is set and is not equal to
1, that value will only be permitted if the used lock store supports shared
resources across multiple workers (e.g. ``RedisLockStore``).
"""

def _log_and_get_default_number_of_workers():
logger.debug(
f"Using the default number of Sanic workers ({DEFAULT_SANIC_WORKERS})."
)
return DEFAULT_SANIC_WORKERS

try:
env_value = int(os.environ.get(ENV_SANIC_WORKERS, DEFAULT_SANIC_WORKERS))
except ValueError:
logger.error(
f"Cannot convert environment variable `{ENV_SANIC_WORKERS}` "
f"to int ('{os.environ[ENV_SANIC_WORKERS]}')."
)
return _log_and_get_default_number_of_workers()

if env_value == DEFAULT_SANIC_WORKERS:
return _log_and_get_default_number_of_workers()

if env_value < 1:
logger.debug(
f"Cannot set number of Sanic workers to the desired value "
f"({env_value}). The number of workers must be at least 1."
)
return _log_and_get_default_number_of_workers()

if _lock_store_is_redis_lock_store(lock_store):
logger.debug(f"Using {env_value} Sanic workers.")
return env_value

logger.debug(
f"Unable to assign desired number of Sanic workers ({env_value}) as "
f"no `RedisLockStore` endpoint configuration has been found."
)
return _log_and_get_default_number_of_workers()
68 changes: 68 additions & 0 deletions tests/core/test_utils.py
@@ -1,9 +1,15 @@
import asyncio
import os
from typing import Optional, Text, Union

import pytest

import rasa.core.lock_store
import rasa.utils.io
from rasa.constants import ENV_SANIC_WORKERS
from rasa.core import utils
from rasa.core.lock_store import LockStore, RedisLockStore, InMemoryLockStore
from rasa.utils.endpoints import EndpointConfig


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -91,3 +97,65 @@ def test_convert_bytes_to_string():

# string remains string
assert utils.convert_bytes_to_string(decoded_string) == decoded_string


@pytest.mark.parametrize(
"env_value,lock_store,expected",
[
(1, "redis", 1),
(4, "redis", 4),
(None, "redis", 1),
(0, "redis", 1),
(-4, "redis", 1),
("illegal value", "redis", 1),
(None, None, 1),
(None, "in_memory", 1),
(5, "in_memory", 1),
(2, None, 1),
(0, "in_memory", 1),
(3, RedisLockStore(), 3),
(2, InMemoryLockStore(), 1),
],
)
def test_get_number_of_sanic_workers(
env_value: Optional[Text],
lock_store: Union[LockStore, Text, None],
expected: Optional[int],
):
# remember pre-test value of SANIC_WORKERS env var
pre_test_value = os.environ.get(ENV_SANIC_WORKERS)

# set env var to desired value and make assertion
if env_value is not None:
os.environ[ENV_SANIC_WORKERS] = str(env_value)

# store_type may be string or LockStore object
# create EndpointConfig if it's a string, otherwise pass the object
if isinstance(lock_store, str):
lock_store = EndpointConfig(type=lock_store)

assert utils.number_of_sanic_workers(lock_store) == expected

# reset env var to pre-test value
os.environ.pop(ENV_SANIC_WORKERS, None)

if pre_test_value is not None:
os.environ[ENV_SANIC_WORKERS] = pre_test_value


@pytest.mark.parametrize(
"lock_store,expected",
[
(EndpointConfig(type="redis"), True),
(RedisLockStore(), True),
(EndpointConfig(type="in_memory"), False),
(EndpointConfig(type="random_store"), False),
(None, False),
(InMemoryLockStore(), False),
],
)
def test_lock_store_is_redis_lock_store(
lock_store: Union[EndpointConfig, LockStore, None], expected: bool
):
# noinspection PyProtectedMember
assert rasa.core.utils._lock_store_is_redis_lock_store(lock_store) == expected

0 comments on commit 84bb6d9

Please sign in to comment.