Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(engine): Streams startup on hopeit server start #125

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/examples/client-example/api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@
},
"engine_version": {
"type": "string",
"default": "0.14.0"
"default": "0.14.1"
}
},
"x-module-name": "hopeit.server.config",
Expand Down
2 changes: 1 addition & 1 deletion apps/examples/simple-example/api/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -2020,7 +2020,7 @@
},
"engine_version": {
"type": "string",
"default": "0.14.0"
"default": "0.14.1"
}
},
"x-module-name": "hopeit.server.config",
Expand Down
13 changes: 10 additions & 3 deletions docs/source/release-notes.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
Release Notes
=============

Version 0.14.1
______________
- Reworked web server startup:
- Fixed automatic stream and services start on server initialization
- Removed using of `loop.run_until_complete` in favour of aiohttp `on_startup` hooks


Version 0.14.0
______________
- Support for web.StreamResponse
- Added read() method to PreprocessFileHook to be used by libraries reading the file in chunks.
(Support is limited to read binary mode).
- Support for web.StreamResponse
- Added read() method to PreprocessFileHook to be used by libraries reading the file in chunks.
(Support is limited to read binary mode).


Version 0.13.0
Expand Down
2 changes: 2 additions & 0 deletions engine/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ coverage-badge
mypy
wheel
aiofiles
nest-asyncio

5 changes: 4 additions & 1 deletion engine/src/hopeit/cli/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ def run(config_files: str, api_file: str, host: str, port: int, path: str, start
"""
Runs web server hosting apps specified in config files.
"""
web.main(host, port, path, start_streams, config_files.split(','), api_file)
web.prepare_engine(
config_files=config_files.split(','), api_file=api_file, start_streams=start_streams
)
web.serve(host=host, path=path, port=port)


cli = click.CommandCollection(sources=[server])
Expand Down
10 changes: 8 additions & 2 deletions engine/src/hopeit/server/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ async def stop(self):
Stops and clean handlers
"""
logger.info(__name__, f"Stopping app={self.app_key}...")
for event_name in self._running.keys():
await self.stop_event(event_name)
for event_name, running in self._running.items():
if running.locked():
await self.stop_event(event_name)
if self.stream_manager:
await asyncio.sleep(self.app_config.engine.read_stream_timeout + 5)
await self.stream_manager.close()
Expand Down Expand Up @@ -529,6 +530,9 @@ async def service_loop(self, *,
logger.info(__name__, "Finished service.", extra=extra(prefix='service.', **log_info))
return last_result

def is_running(self, event_name) -> bool:
return self._running[event_name].locked()

async def stop_event(self, event_name: str):
"""
Sets running state to stopped for a continuous-running event.
Expand All @@ -538,6 +542,8 @@ async def stop_event(self, event_name: str):
"""
if self._running[event_name].locked():
self._running[event_name].release()
else:
raise RuntimeError(f"Cannot stop non running event: {event_name}.")

@staticmethod
def _config_effective_events(app_config: AppConfig) -> Dict[str, EventDescriptor]:
Expand Down
2 changes: 1 addition & 1 deletion engine/src/hopeit/server/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys

ENGINE_NAME = "hopeit.engine"
ENGINE_VERSION = "0.14.0"
ENGINE_VERSION = "0.14.1"

# Major.Minor version to be used in App versions and Api endpoints for Apps/Plugins
APPS_API_VERSION = '.'.join(ENGINE_VERSION.split('.')[0:2])
Expand Down
124 changes: 83 additions & 41 deletions engine/src/hopeit/server/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
# flake8: noqa
# pylint: disable=wrong-import-position, wrong-import-order
from collections import namedtuple
import aiohttp

setattr(aiohttp.http, 'SERVER_SOFTWARE', '')
Expand All @@ -17,7 +18,7 @@
from datetime import datetime, timezone
from functools import partial
from typing import (
Any, Callable, Coroutine, Dict, List, Optional, Tuple, Type, Union
Any, Callable, Coroutine, Dict, List, Optional, Type, Union
)

import aiohttp_cors # type: ignore
Expand Down Expand Up @@ -52,9 +53,11 @@
from hopeit.toolkit import auth

__all__ = ['parse_args',
'main',
'start_server',
'start_app',
'prepare_engine',
'serve',
'server_startup_hook',
'app_startup_hook',
'stream_startup_hook',
'stop_server']

logger: EngineLoggerWrapper = logging.getLogger(__name__) # type: ignore
Expand All @@ -67,13 +70,18 @@
auth_info_default = {}


def main(host: Optional[str], port: Optional[int], path: Optional[str], start_streams: bool,
config_files: List[str], api_file: Optional[str]):
loop = asyncio.get_event_loop()

def prepare_engine(*, config_files: List[str], api_file: Optional[str], start_streams: bool):
"""
Load configuration files and add hooks to setup engine server and apps,
start streams and services.
"""
logger.info("Loading engine config file=%s...", config_files[0]) # type: ignore
server_config = _load_engine_config(config_files[0])
loop.run_until_complete(start_server(server_config))

# Add startup hook to start engine
web_server.on_startup.append(
partial(server_startup_hook, server_config)
)
if server_config.auth.domain:
auth_info_default['domain'] = server_config.auth.domain
if api_file is not None:
Expand All @@ -87,22 +95,36 @@ def main(host: Optional[str], port: Optional[int], path: Optional[str], start_st
config.server = server_config
apps_config.append(config)

# Register and add startup hooks to start configured apps
api.register_apps(apps_config)
api.enable_swagger(server_config, web_server)
for config in apps_config:
loop.run_until_complete(start_app(config, start_streams))
web_server.on_startup.append(
partial(app_startup_hook, config)
)

# Add hooks to start streams and service
if start_streams:
for config in apps_config:
web_server.on_startup.append(
partial(stream_startup_hook, config)
)

logger.debug(__name__, "Performing forced garbage collection...")
gc.collect()
web.run_app(web_server, path=path, port=port, host=host)


def serve(*, host: str, path: str, port: int):
logger.info(__name__, f"Starting web server host: {host} port: {port} socket: {path}...")
web.run_app(web_server, host=host, path=path, port=port)


def init_logger():
global logger
logger = engine_logger()


async def start_server(config: ServerConfig):
async def server_startup_hook(config: ServerConfig, *args, **kwargs):
"""
Start engine engine
"""
Expand All @@ -118,7 +140,7 @@ async def stop_server():
web_server = web.Application()


async def start_app(config: AppConfig, start_streams: bool = False):
async def app_startup_hook(config: AppConfig, *args, **kwargs):
"""
Start Hopeit app specified by config

Expand All @@ -141,8 +163,25 @@ async def start_app(config: AppConfig, start_streams: bool = False):
if cors_origin:
app = app_engine.app_config.app
_enable_cors(route_name('api', app.name, app.version), cors_origin)
if start_streams:
_start_streams(app_engine)


async def stream_startup_hook(app_config: AppConfig, *args, **kwargs):
"""
Start all stream event types configured in app.

:param app_key: already started app_key
"""
app_engine = runtime.server.app_engines[app_config.app_key()]
for event_name, event_info in app_engine.effective_events.items():
if event_info.type == EventType.STREAM:
assert event_info.read_stream
logger.info(
__name__, f"STREAM start event_name={event_name} read_stream={event_info.read_stream.name}")
asyncio.create_task(app_engine.read_stream(event_name=event_name))
elif event_info.type == EventType.SERVICE:
logger.info(
__name__, f"SERVICE start event_name={event_name}")
asyncio.create_task(app_engine.service_loop(event_name=event_name))


def _effective_events(app_engine: AppEngine, plugin: Optional[AppEngine] = None):
Expand Down Expand Up @@ -648,24 +687,6 @@ async def _handle_multipart_invocation(
except Exception as e: # pylint: disable=broad-except
return _failed_response(context, e)

def _start_streams(app_engine: AppEngine):
"""
Start all stream event types configured in app.

:param app_engine: already started instance of AppEngine
"""
for event_name, event_info in app_engine.effective_events.items():
if event_info.type == EventType.STREAM:
assert event_info.read_stream
logger.info(
__name__, f"STREAM start event_name={event_name} read_stream={event_info.read_stream.name}")
asyncio.create_task(app_engine.read_stream(event_name=event_name))
elif event_info.type == EventType.SERVICE:
logger.info(
__name__, f"SERVICE start event_name={event_name}")
asyncio.create_task(app_engine.service_loop(event_name=event_name))



async def _handle_stream_start_invocation(
app_engine: AppEngine,
Expand All @@ -677,6 +698,8 @@ async def _handle_stream_start_invocation(
in the background.
"""
assert request
if app_engine.is_running(event_name):
return web.Response(status=500, body=f"Stream already running: {event_name}")
asyncio.create_task(app_engine.read_stream(event_name=event_name))
return web.Response()

Expand All @@ -691,6 +714,8 @@ async def _handle_service_start_invocation(
generator in the background
"""
assert request
if app_engine.is_running(event_name):
return web.Response(status=500, body=f"Service already running: {event_name}")
asyncio.create_task(app_engine.service_loop(event_name=event_name))
return web.Response()

Expand All @@ -703,13 +728,19 @@ async def _handle_event_stop_invocation(
Signals engine for stopping an event.
Used to stop reading stream processing events.
"""
assert request
await app_engine.stop_event(event_name)
logger.info(__name__, f"Event stop signaled event_name={event_name}...")
return web.Response()
try:
assert request
await app_engine.stop_event(event_name)
logger.info(__name__, f"Event stop signaled event_name={event_name}...")
return web.Response()
except RuntimeError as e:
return web.Response(status=500, body=str(e))


ParsedArgs = namedtuple("ParsedArgs", ["host", "port", "path", "start_streams", "config_files", "api_file"])

def parse_args(args) -> Tuple[Optional[str], Optional[int], Optional[str], bool, List[str], Optional[str]]:

def parse_args(args) -> ParsedArgs:
"""
Parse command line arguments:
param: args: in form of --arg=value
Expand Down Expand Up @@ -739,10 +770,21 @@ def parse_args(args) -> Tuple[Optional[str], Optional[int], Optional[str], bool,
port = int(parsed_args.port) if parsed_args.port else 8020 if parsed_args.path is None else None
config_files = parsed_args.config_files.split(',')

return parsed_args.host, port, parsed_args.path, bool(parsed_args.start_streams), \
config_files, parsed_args.api_file
return ParsedArgs(
host=parsed_args.host,
port=port,
path=parsed_args.path,
start_streams=bool(parsed_args.start_streams),
config_files=config_files,
api_file=parsed_args.api_file
)


if __name__ == "__main__":
sys_args = parse_args(sys.argv[1:])
main(*sys_args)
prepare_engine(
config_files=sys_args.config_files,
api_file=sys_args.api_file,
start_streams=sys_args.start_streams
)
serve(host=sys_args.host, path=sys_args.path, port=sys_args.port)
10 changes: 6 additions & 4 deletions engine/test/integration/server/test_it_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import hopeit.server.web
from hopeit.server import api
from hopeit.server.web import start_server, stop_server, start_app
from hopeit.server.web import server_startup_hook, stop_server, app_startup_hook, stream_startup_hook

from mock_engine import MockStreamManager, MockEventHandler
from mock_app import MockResult, mock_app_config # type: ignore # noqa: F401
Expand Down Expand Up @@ -487,9 +487,11 @@ async def call_stop_service(client):

async def start_test_server(
mock_app_config, mock_plugin_config, streams=None): # noqa: F811
await start_server(mock_app_config.server)
await start_app(mock_plugin_config)
await start_app(mock_app_config, start_streams=streams)
await server_startup_hook(mock_app_config.server)
await app_startup_hook(mock_plugin_config)
await app_startup_hook(mock_app_config)
if streams:
await stream_startup_hook(mock_app_config)
print('Test engine started.', hopeit.server.web.web_server)
await asyncio.sleep(5)

Expand Down
Loading