From f64a67148bdfffa5ca2e60b92e83c64f14748874 Mon Sep 17 00:00:00 2001 From: Pablo Canto Date: Wed, 21 Feb 2024 12:53:30 -0300 Subject: [PATCH] SETUP EventType support --- apps/examples/client-example/api/openapi.json | 5 +- apps/examples/simple-example/api/openapi.json | 5 +- .../simple-example/config/app-config.json | 4 + .../service/something_generator.py | 26 +- .../src/simple_example/setup_something.py | 24 + engine/src/hopeit/app/config.py | 4 +- engine/src/hopeit/server/web.py | 637 +++++++++++------- plugins/ops/apps-visualizer/api/openapi.json | 5 +- 8 files changed, 460 insertions(+), 250 deletions(-) create mode 100644 apps/examples/simple-example/src/simple_example/setup_something.py diff --git a/apps/examples/client-example/api/openapi.json b/apps/examples/client-example/api/openapi.json index d249a6a7..bbe6647d 100644 --- a/apps/examples/client-example/api/openapi.json +++ b/apps/examples/client-example/api/openapi.json @@ -635,7 +635,8 @@ "POST", "STREAM", "SERVICE", - "MULTIPART" + "MULTIPART", + "SETUP" ], "x-enum-name": "EventType", "x-module-name": "hopeit.app.config" @@ -704,7 +705,7 @@ } }, "x-module-name": "hopeit.app.config", - "description": "\n Event Descriptor: configures event implementation\n\n :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE\n :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the\n current app (ON_APP) or it will be created in the original plugin (STANDALONE, default)\n :field: route, optional str: custom route for endpoint. If not specified route will be derived\n from `/api/app_name/app_version/event_name`\n :field: impl, optional str: custom event implementation Python module. If not specified, module\n with same same as event will be imported.\n :field: connections, list of EventConnection: specifies dependencies on other apps/endpoints,\n that can be used by client plugins to call events on external apps\n :field: read_stream, optional ReadStreamDescriptor: specifies source stream to read from.\n Valid only for STREAM events.\n :field: write_stream, optional WriteStreamDescriptor: for any type of events, resultant dataobjects will\n be published to the specified stream.\n :field: auth, list of AuthType: supported authentication schemas for this event. If not specified\n application default will be used.\n :field: setting_keys, list of str: by default EventContext will have access to the settings section\n with the same name of the event using `settings = context.settings(datatype=MySettingsType)`.\n In case additional sections are needed to be accessed from\n EventContext, then a list of setting keys, including the name of the event if needed,\n can be specified here. Then access to a `custom` key can be done using\n `custom_settings = context.settings(key=\"customer\", datatype=MyCustomSettingsType)`\n :field: dataobjects, list of str: list of full qualified dataobject types that this event can process.\n When not specified, the engine will inspect the module implementation and find all datatypes supported\n as payload in the functions defined as `__steps__`. In case of generic functions that support\n `payload: DataObject` argument, then a list of full qualified datatypes must be specified here.\n :field: group, str: group name, if none is assigned it is automatically assigned as 'DEFAULT'.\n " + "description": "\n Event Descriptor: configures event implementation\n\n :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE, SETUP\n :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the\n current app (ON_APP) or it will be created in the original plugin (STANDALONE, default)\n :field: route, optional str: custom route for endpoint. If not specified route will be derived\n from `/api/app_name/app_version/event_name`\n :field: impl, optional str: custom event implementation Python module. If not specified, module\n with same same as event will be imported.\n :field: connections, list of EventConnection: specifies dependencies on other apps/endpoints,\n that can be used by client plugins to call events on external apps\n :field: read_stream, optional ReadStreamDescriptor: specifies source stream to read from.\n Valid only for STREAM events.\n :field: write_stream, optional WriteStreamDescriptor: for any type of events, resultant dataobjects will\n be published to the specified stream.\n :field: auth, list of AuthType: supported authentication schemas for this event. If not specified\n application default will be used.\n :field: setting_keys, list of str: by default EventContext will have access to the settings section\n with the same name of the event using `settings = context.settings(datatype=MySettingsType)`.\n In case additional sections are needed to be accessed from\n EventContext, then a list of setting keys, including the name of the event if needed,\n can be specified here. Then access to a `custom` key can be done using\n `custom_settings = context.settings(key=\"customer\", datatype=MyCustomSettingsType)`\n :field: dataobjects, list of str: list of full qualified dataobject types that this event can process.\n When not specified, the engine will inspect the module implementation and find all datatypes supported\n as payload in the functions defined as `__steps__`. In case of generic functions that support\n `payload: DataObject` argument, then a list of full qualified datatypes must be specified here.\n :field: group, str: group name, if none is assigned it is automatically assigned as 'DEFAULT'.\n " }, "EventConnection": { "type": "object", diff --git a/apps/examples/simple-example/api/openapi.json b/apps/examples/simple-example/api/openapi.json index 442fb29e..5103e9ea 100644 --- a/apps/examples/simple-example/api/openapi.json +++ b/apps/examples/simple-example/api/openapi.json @@ -1933,7 +1933,8 @@ "POST", "STREAM", "SERVICE", - "MULTIPART" + "MULTIPART", + "SETUP" ], "x-enum-name": "EventType", "x-module-name": "hopeit.app.config" @@ -2002,7 +2003,7 @@ } }, "x-module-name": "hopeit.app.config", - "description": "\n Event Descriptor: configures event implementation\n\n :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE\n :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the\n current app (ON_APP) or it will be created in the original plugin (STANDALONE, default)\n :field: route, optional str: custom route for endpoint. If not specified route will be derived\n from `/api/app_name/app_version/event_name`\n :field: impl, optional str: custom event implementation Python module. If not specified, module\n with same same as event will be imported.\n :field: connections, list of EventConnection: specifies dependencies on other apps/endpoints,\n that can be used by client plugins to call events on external apps\n :field: read_stream, optional ReadStreamDescriptor: specifies source stream to read from.\n Valid only for STREAM events.\n :field: write_stream, optional WriteStreamDescriptor: for any type of events, resultant dataobjects will\n be published to the specified stream.\n :field: auth, list of AuthType: supported authentication schemas for this event. If not specified\n application default will be used.\n :field: setting_keys, list of str: by default EventContext will have access to the settings section\n with the same name of the event using `settings = context.settings(datatype=MySettingsType)`.\n In case additional sections are needed to be accessed from\n EventContext, then a list of setting keys, including the name of the event if needed,\n can be specified here. Then access to a `custom` key can be done using\n `custom_settings = context.settings(key=\"customer\", datatype=MyCustomSettingsType)`\n :field: dataobjects, list of str: list of full qualified dataobject types that this event can process.\n When not specified, the engine will inspect the module implementation and find all datatypes supported\n as payload in the functions defined as `__steps__`. In case of generic functions that support\n `payload: DataObject` argument, then a list of full qualified datatypes must be specified here.\n :field: group, str: group name, if none is assigned it is automatically assigned as 'DEFAULT'.\n " + "description": "\n Event Descriptor: configures event implementation\n\n :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE, SETUP\n :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the\n current app (ON_APP) or it will be created in the original plugin (STANDALONE, default)\n :field: route, optional str: custom route for endpoint. If not specified route will be derived\n from `/api/app_name/app_version/event_name`\n :field: impl, optional str: custom event implementation Python module. If not specified, module\n with same same as event will be imported.\n :field: connections, list of EventConnection: specifies dependencies on other apps/endpoints,\n that can be used by client plugins to call events on external apps\n :field: read_stream, optional ReadStreamDescriptor: specifies source stream to read from.\n Valid only for STREAM events.\n :field: write_stream, optional WriteStreamDescriptor: for any type of events, resultant dataobjects will\n be published to the specified stream.\n :field: auth, list of AuthType: supported authentication schemas for this event. If not specified\n application default will be used.\n :field: setting_keys, list of str: by default EventContext will have access to the settings section\n with the same name of the event using `settings = context.settings(datatype=MySettingsType)`.\n In case additional sections are needed to be accessed from\n EventContext, then a list of setting keys, including the name of the event if needed,\n can be specified here. Then access to a `custom` key can be done using\n `custom_settings = context.settings(key=\"customer\", datatype=MyCustomSettingsType)`\n :field: dataobjects, list of str: list of full qualified dataobject types that this event can process.\n When not specified, the engine will inspect the module implementation and find all datatypes supported\n as payload in the functions defined as `__steps__`. In case of generic functions that support\n `payload: DataObject` argument, then a list of full qualified datatypes must be specified here.\n :field: group, str: group name, if none is assigned it is automatically assigned as 'DEFAULT'.\n " }, "EventConnection": { "type": "object", diff --git a/apps/examples/simple-example/config/app-config.json b/apps/examples/simple-example/config/app-config.json index d3b65eb8..1d90dc47 100644 --- a/apps/examples/simple-example/config/app-config.json +++ b/apps/examples/simple-example/config/app-config.json @@ -66,6 +66,10 @@ } }, "events": { + "setup_something": { + "type": "SETUP", + "setting_keys": ["fs_storage"] + }, "list_somethings": { "type": "GET", "setting_keys": ["fs_storage"] diff --git a/apps/examples/simple-example/src/simple_example/service/something_generator.py b/apps/examples/simple-example/src/simple_example/service/something_generator.py index b4afe76f..ea97a7ad 100644 --- a/apps/examples/simple-example/src/simple_example/service/something_generator.py +++ b/apps/examples/simple-example/src/simple_example/service/something_generator.py @@ -3,22 +3,29 @@ -------------------------------------------------------------------- Creates and publish Something object every 10 seconds """ + import asyncio import random - +import os from hopeit.app.context import EventContext from hopeit.app.events import Spawn, service_running from hopeit.app.logger import app_extra_logger from model import Something, User, SomethingParams -__steps__ = ['create_something'] +__steps__ = ["create_something"] logger, extra = app_extra_logger() async def __service__(context: EventContext) -> Spawn[SomethingParams]: i = 1 + if not os.path.exists("/tmp/hopeit.initializead"): + raise RuntimeError( + "Missing /tmp/hopeit.initializead file. " + "Service will not start until run setup_something." + ) + os.remove("/tmp/hopeit.initializead") while service_running(context): logger.info(context, f"Generating something event {i}...") yield SomethingParams(f"id{i}", f"user{i}") @@ -27,13 +34,14 @@ async def __service__(context: EventContext) -> Spawn[SomethingParams]: logger.info(context, "Service seamlessly exit") -async def create_something(payload: SomethingParams, context: EventContext) -> Something: - logger.info(context, "Creating something...", extra=extra( - payload_id=payload.id, user=payload.user - )) - result = Something( - id=payload.id, - user=User(id=payload.user, name=payload.user) +async def create_something( + payload: SomethingParams, context: EventContext +) -> Something: + logger.info( + context, + "Creating something...", + extra=extra(payload_id=payload.id, user=payload.user), ) + result = Something(id=payload.id, user=User(id=payload.user, name=payload.user)) await asyncio.sleep(random.random() * 5.0) return result diff --git a/apps/examples/simple-example/src/simple_example/setup_something.py b/apps/examples/simple-example/src/simple_example/setup_something.py new file mode 100644 index 00000000..2334918b --- /dev/null +++ b/apps/examples/simple-example/src/simple_example/setup_something.py @@ -0,0 +1,24 @@ +""" +Simple Example: Setup Something +-------------------------------------------------------------------- +Setup run before initialize endpoints streams and services +""" + +from pathlib import Path + +from hopeit.app.context import EventContext +from hopeit.app.logger import app_extra_logger + + +__steps__ = ["run_once"] + + +logger, extra = app_extra_logger() + + +async def run_once(payload: None, context: EventContext): + """ + Load objects that match the given wildcard + """ + logger.info(context, "Setup done") + Path("/tmp/hopeit.initializead").touch() diff --git a/engine/src/hopeit/app/config.py b/engine/src/hopeit/app/config.py index 14d98d77..bb6ee51b 100644 --- a/engine/src/hopeit/app/config.py +++ b/engine/src/hopeit/app/config.py @@ -61,12 +61,14 @@ class EventType(str, Enum): STREAM: event triggered read events from stream. Can be started and stopped. SERVICE: event executed on demand or continuously. Long lived. Can be started and stopped. MULTIPART: event triggered from api postform-multipart request via endpoint. + SETUP: event that is executed once when service is starting """ GET = 'GET' POST = 'POST' STREAM = 'STREAM' SERVICE = 'SERVICE' MULTIPART = 'MULTIPART' + SETUP = 'SETUP' class StreamQueue: @@ -299,7 +301,7 @@ class EventDescriptor: """ Event Descriptor: configures event implementation - :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE + :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE, SETUP :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the current app (ON_APP) or it will be created in the original plugin (STANDALONE, default) :field: route, optional str: custom route for endpoint. If not specified route will be derived diff --git a/engine/src/hopeit/server/web.py b/engine/src/hopeit/server/web.py index d4d9c206..674b1a0e 100644 --- a/engine/src/hopeit/server/web.py +++ b/engine/src/hopeit/server/web.py @@ -1,12 +1,13 @@ """ Webserver module based on aiohttp to handle web/api requests """ + # flake8: noqa # pylint: disable=wrong-import-position, wrong-import-order from collections import namedtuple import aiohttp -setattr(aiohttp.http, 'SERVER_SOFTWARE', '') +setattr(aiohttp.http, "SERVER_SOFTWARE", "") import argparse import asyncio @@ -17,9 +18,7 @@ import uuid from datetime import datetime, timezone from functools import partial -from typing import ( - Any, Callable, Coroutine, Dict, List, Optional, Type, Tuple, Union -) +from typing import Any, Callable, Coroutine, Dict, List, Optional, Type, Tuple, Union import aiohttp_cors # type: ignore from aiohttp import web @@ -28,24 +27,33 @@ from stringcase import snakecase, titlecase # type: ignore from hopeit.app.config import ( - AppConfig, EventDescriptor, EventPlugMode, EventSettings, EventType, parse_app_config_json + AppConfig, + EventDescriptor, + EventPlugMode, + EventSettings, + EventType, + parse_app_config_json, ) from hopeit.app.context import ( - EventContext, NoopMultiparReader, PostprocessHook, PreprocessHook + EventContext, + NoopMultiparReader, + PostprocessHook, + PreprocessHook, ) from hopeit.app.errors import BadRequest, Unauthorized from hopeit.dataobjects import DataObject, EventPayload, EventPayloadType from hopeit.dataobjects.payload import Payload from hopeit.server import api, runtime from hopeit.server.api import app_route_name, OPEN_API_DEFAULTS -from hopeit.server.config import ( - AuthType, ServerConfig, parse_server_config_json -) +from hopeit.server.config import AuthType, ServerConfig, parse_server_config_json from hopeit.server.engine import AppEngine from hopeit.server.errors import ErrorInfo from hopeit.server.events import get_event_settings from hopeit.server.logger import ( - EngineLoggerWrapper, combined, engine_logger, extra_logger + EngineLoggerWrapper, + combined, + engine_logger, + extra_logger, ) from hopeit.server.metrics import metrics from hopeit.server.names import route_name @@ -53,13 +61,15 @@ from hopeit.toolkit import auth -__all__ = ['parse_args', - 'prepare_engine', - 'serve', - 'server_startup_hook', - 'app_startup_hook', - 'stream_startup_hook', - 'stop_server'] +__all__ = [ + "parse_args", + "prepare_engine", + "serve", + "server_startup_hook", + "app_startup_hook", + "stream_startup_hook", + "stop_server", +] logger: EngineLoggerWrapper = logging.getLogger(__name__) # type: ignore extra = extra_logger() @@ -70,8 +80,14 @@ auth_info_default = {} -def prepare_engine(*, config_files: List[str], api_file: Optional[str], api_auto: List[str], - enabled_groups: List[str], start_streams: bool): +def prepare_engine( + *, + config_files: List[str], + api_file: Optional[str], + api_auto: List[str], + enabled_groups: List[str], + start_streams: bool, +): """ Load configuration files and add hooks to setup engine server and apps, start streams and services. @@ -80,11 +96,9 @@ def prepare_engine(*, config_files: List[str], api_file: Optional[str], api_auto server_config: ServerConfig = _load_engine_config(config_files[0]) # Add startup hook to start engine - web_server.on_startup.append( - partial(server_startup_hook, server_config) - ) + web_server.on_startup.append(partial(server_startup_hook, server_config)) if server_config.auth.domain: - auth_info_default['domain'] = server_config.auth.domain + auth_info_default["domain"] = server_config.auth.domain if api_file is not None: api.load_api_file(api_file) @@ -92,7 +106,7 @@ def prepare_engine(*, config_files: List[str], api_file: Optional[str], api_auto if api_file is None and api_auto: if len(api_auto) < 3: - api_auto.extend(OPEN_API_DEFAULTS[len(api_auto) - 3:]) + api_auto.extend(OPEN_API_DEFAULTS[len(api_auto) - 3 :]) else: api_auto = api_auto[:3] api.init_auto_api(api_auto[0], api_auto[1], api_auto[2]) @@ -109,16 +123,12 @@ def prepare_engine(*, config_files: List[str], api_file: Optional[str], api_auto api.register_apps(apps_config) api.enable_swagger(server_config, web_server) for config in apps_config: - web_server.on_startup.append( - partial(app_startup_hook, config, enabled_groups) - ) + web_server.on_startup.append(partial(app_startup_hook, config, enabled_groups)) # Add hooks to start streams and service if start_streams: for config in apps_config: - web_server.on_startup.append( - partial(stream_startup_hook, config) - ) + web_server.on_startup.append(partial(stream_startup_hook, config)) web_server.on_shutdown.append(_shutdown_hook) logger.debug(__name__, "Performing forced garbage collection...") @@ -150,7 +160,9 @@ async def stop_server(): await web_server.cleanup() -async def app_startup_hook(config: AppConfig, enabled_groups: List[str], *args, **kwargs): +async def app_startup_hook( + config: AppConfig, enabled_groups: List[str], *args, **kwargs +): """ Start Hopeit app specified by config @@ -158,22 +170,31 @@ async def app_startup_hook(config: AppConfig, enabled_groups: List[str], *args, :param enabled_groups: list of event groups names to enable. If empty, all events will be enabled. """ - app_engine = await runtime.server.start_app(app_config=config, enabled_groups=enabled_groups) - cors_origin = aiohttp_cors.setup(web_server, defaults={ - config.engine.cors_origin: aiohttp_cors.ResourceOptions( - allow_credentials=True, - expose_headers="*", - allow_headers="*", + app_engine = await runtime.server.start_app( + app_config=config, enabled_groups=enabled_groups + ) + cors_origin = ( + aiohttp_cors.setup( + web_server, + defaults={ + config.engine.cors_origin: aiohttp_cors.ResourceOptions( + allow_credentials=True, + expose_headers="*", + allow_headers="*", + ) + }, ) - }) if config.engine.cors_origin else None + if config.engine.cors_origin + else None + ) - _setup_app_event_routes(app_engine) + await _setup_app_event_routes(app_engine) for plugin in config.plugins: plugin_engine = runtime.server.app_engine(app_key=plugin.app_key()) - _setup_app_event_routes(app_engine, plugin_engine) + await _setup_app_event_routes(app_engine, plugin_engine) if cors_origin: app = app_engine.app_config.app - _enable_cors(route_name('api', app.name, app.version), cors_origin) + _enable_cors(route_name("api", app.name, app.version), cors_origin) async def stream_startup_hook(app_config: AppConfig, *args, **kwargs): @@ -187,22 +208,25 @@ async def stream_startup_hook(app_config: AppConfig, *args, **kwargs): if event_info.type == EventType.STREAM: assert event_info.read_stream logger.info( - __name__, f"STREAM start event_name={event_name} read_stream={event_info.read_stream.name}") + __name__, + f"STREAM start event_name={event_name} read_stream={event_info.read_stream.name}", + ) asyncio.create_task(app_engine.read_stream(event_name=event_name)) elif event_info.type == EventType.SERVICE: - logger.info( - __name__, f"SERVICE start event_name={event_name}") + logger.info(__name__, f"SERVICE start event_name={event_name}") asyncio.create_task(app_engine.service_loop(event_name=event_name)) def _effective_events(app_engine: AppEngine, plugin: Optional[AppEngine] = None): if plugin is None: return { - k: v for k, v in app_engine.effective_events.items() + k: v + for k, v in app_engine.effective_events.items() if v.plug_mode == EventPlugMode.STANDALONE } return { - k: v for k, v in plugin.effective_events.items() + k: v + for k, v in plugin.effective_events.items() if v.plug_mode == EventPlugMode.ON_APP } @@ -229,8 +253,7 @@ def _enable_cors(prefix: str, cors: CorsConfig): cors.add(route) -def _setup_app_event_routes(app_engine: AppEngine, - plugin: Optional[AppEngine] = None): +async def _setup_app_event_routes(app_engine: AppEngine, plugin: Optional[AppEngine] = None): """ Setup http routes for existing events in app, in existing web_server global instance. @@ -248,23 +271,38 @@ def _setup_app_event_routes(app_engine: AppEngine, """ for event_name, event_info in _effective_events(app_engine, plugin).items(): if event_info.type == EventType.POST: - web_server.add_routes([ - _create_post_event_route( - app_engine, plugin=plugin, event_name=event_name, event_info=event_info - ) - ]) + web_server.add_routes( + [ + _create_post_event_route( + app_engine, + plugin=plugin, + event_name=event_name, + event_info=event_info, + ) + ] + ) elif event_info.type == EventType.GET: - web_server.add_routes([ - _create_get_event_route( - app_engine, plugin=plugin, event_name=event_name, event_info=event_info - ) - ]) + web_server.add_routes( + [ + _create_get_event_route( + app_engine, + plugin=plugin, + event_name=event_name, + event_info=event_info, + ) + ] + ) elif event_info.type == EventType.MULTIPART: - web_server.add_routes([ - _create_multipart_event_route( - app_engine, plugin=plugin, event_name=event_name, event_info=event_info - ) - ]) + web_server.add_routes( + [ + _create_multipart_event_route( + app_engine, + plugin=plugin, + event_name=event_name, + event_info=event_info, + ) + ] + ) elif event_info.type == EventType.STREAM and plugin is None: web_server.add_routes( _create_event_management_routes( @@ -277,8 +315,12 @@ def _setup_app_event_routes(app_engine: AppEngine, app_engine, event_name=event_name, event_info=event_info ) ) + elif event_info.type == EventType.SETUP: + await _execute_setup_event(app_engine, plugin, event_name) else: - raise ValueError(f"Invalid event_type:{event_info.type} for event:{event_name}") + raise ValueError( + f"Invalid event_type:{event_info.type} for event:{event_name}" + ) def _auth_types(app_engine: AppEngine, event_name: str): @@ -290,97 +332,143 @@ def _auth_types(app_engine: AppEngine, event_name: str): def _create_post_event_route( - app_engine: AppEngine, *, - plugin: Optional[AppEngine] = None, - event_name: str, - event_info: EventDescriptor) -> web.RouteDef: + app_engine: AppEngine, + *, + plugin: Optional[AppEngine] = None, + event_name: str, + event_info: EventDescriptor, +) -> web.RouteDef: """ Creates route for handling POST event """ - datatype = find_datatype_handler(app_config=app_engine.app_config, event_name=event_name, event_info=event_info) - route = app_route_name(app_engine.app_config.app, event_name=event_name, - plugin=None if plugin is None else plugin.app_config.app, - override_route_name=event_info.route) + datatype = find_datatype_handler( + app_config=app_engine.app_config, event_name=event_name, event_info=event_info + ) + route = app_route_name( + app_engine.app_config.app, + event_name=event_name, + plugin=None if plugin is None else plugin.app_config.app, + override_route_name=event_info.route, + ) logger.info(__name__, f"POST path={route} input={str(datatype)}") impl = plugin if plugin else app_engine - handler = partial(_handle_post_invocation, app_engine, impl, - event_name, datatype, _auth_types(impl, event_name)) - setattr(handler, '__closure__', None) - setattr(handler, '__code__', _handle_post_invocation.__code__) - api_handler = api.add_route('post', route, handler) + handler = partial( + _handle_post_invocation, + app_engine, + impl, + event_name, + datatype, + _auth_types(impl, event_name), + ) + setattr(handler, "__closure__", None) + setattr(handler, "__code__", _handle_post_invocation.__code__) + api_handler = api.add_route("post", route, handler) return web.post(route, api_handler) def _create_get_event_route( - app_engine: AppEngine, *, - plugin: Optional[AppEngine] = None, - event_name: str, - event_info: EventDescriptor) -> web.RouteDef: + app_engine: AppEngine, + *, + plugin: Optional[AppEngine] = None, + event_name: str, + event_info: EventDescriptor, +) -> web.RouteDef: """ Creates route for handling GET requests """ - route = app_route_name(app_engine.app_config.app, event_name=event_name, - plugin=None if plugin is None else plugin.app_config.app, - override_route_name=event_info.route) + route = app_route_name( + app_engine.app_config.app, + event_name=event_name, + plugin=None if plugin is None else plugin.app_config.app, + override_route_name=event_info.route, + ) logger.info(__name__, f"GET path={route}") impl = plugin if plugin else app_engine - handler = partial(_handle_get_invocation, app_engine, impl, event_name, _auth_types(impl, event_name)) - setattr(handler, '__closure__', None) - setattr(handler, '__code__', _handle_get_invocation.__code__) - api_handler = api.add_route('get', route, handler) + handler = partial( + _handle_get_invocation, + app_engine, + impl, + event_name, + _auth_types(impl, event_name), + ) + setattr(handler, "__closure__", None) + setattr(handler, "__code__", _handle_get_invocation.__code__) + api_handler = api.add_route("get", route, handler) return web.get(route, api_handler) def _create_multipart_event_route( - app_engine: AppEngine, *, - plugin: Optional[AppEngine] = None, - event_name: str, - event_info: EventDescriptor) -> web.RouteDef: + app_engine: AppEngine, + *, + plugin: Optional[AppEngine] = None, + event_name: str, + event_info: EventDescriptor, +) -> web.RouteDef: """ Creates route for handling MULTIPART event """ - datatype = find_datatype_handler(app_config=app_engine.app_config, event_name=event_name, event_info=event_info) - route = app_route_name(app_engine.app_config.app, event_name=event_name, - plugin=None if plugin is None else plugin.app_config.app, - override_route_name=event_info.route) + datatype = find_datatype_handler( + app_config=app_engine.app_config, event_name=event_name, event_info=event_info + ) + route = app_route_name( + app_engine.app_config.app, + event_name=event_name, + plugin=None if plugin is None else plugin.app_config.app, + override_route_name=event_info.route, + ) logger.info(__name__, f"MULTIPART path={route} input={str(datatype)}") impl = plugin if plugin else app_engine - handler = partial(_handle_multipart_invocation, app_engine, impl, - event_name, datatype, _auth_types(impl, event_name)) - setattr(handler, '__closure__', None) - setattr(handler, '__code__', _handle_multipart_invocation.__code__) - api_handler = api.add_route('post', route, handler) + handler = partial( + _handle_multipart_invocation, + app_engine, + impl, + event_name, + datatype, + _auth_types(impl, event_name), + ) + setattr(handler, "__closure__", None) + setattr(handler, "__code__", _handle_multipart_invocation.__code__) + api_handler = api.add_route("post", route, handler) return web.post(route, api_handler) def _create_event_management_routes( - app_engine: AppEngine, *, - event_name: str, - event_info: EventDescriptor) -> List[web.RouteDef]: + app_engine: AppEngine, *, event_name: str, event_info: EventDescriptor +) -> List[web.RouteDef]: """ Create routes to start and stop processing of STREAM events """ - evt = event_name.replace('.', '/').replace('$', '/') - base_route = app_route_name(app_engine.app_config.app, event_name=evt, - prefix='mgmt', override_route_name=event_info.route) - logger.info(__name__, f"{event_info.type.value.upper()} path={base_route}/[start|stop]") + evt = event_name.replace(".", "/").replace("$", "/") + base_route = app_route_name( + app_engine.app_config.app, + event_name=evt, + prefix="mgmt", + override_route_name=event_info.route, + ) + logger.info( + __name__, f"{event_info.type.value.upper()} path={base_route}/[start|stop]" + ) handler: Optional[partial[Coroutine[Any, Any, Response]]] = None if event_info.type == EventType.STREAM: handler = partial(_handle_stream_start_invocation, app_engine, event_name) elif event_info.type == EventType.SERVICE: handler = partial(_handle_service_start_invocation, app_engine, event_name) - assert handler is not None, f"No handler for event={event_name} type={event_info.type}" + assert ( + handler is not None + ), f"No handler for event={event_name} type={event_info.type}" return [ - web.get(base_route + '/start', handler), + web.get(base_route + "/start", handler), web.get( - base_route + '/stop', - partial(_handle_event_stop_invocation, app_engine, event_name) - ) + base_route + "/stop", + partial(_handle_event_stop_invocation, app_engine, event_name), + ), ] -def _response(*, track_ids: Dict[str, str], key: str, payload: EventPayload, hook: PostprocessHook) -> ResponseType: +def _response( + *, track_ids: Dict[str, str], key: str, payload: EventPayload, hook: PostprocessHook +) -> ResponseType: """ Creates a web response object from a given payload (body), header track ids and applies a postprocess hook @@ -388,12 +476,12 @@ def _response(*, track_ids: Dict[str, str], key: str, payload: EventPayload, hoo response: ResponseType headers = { **hook.headers, - **{f"X-{re.sub(' ', '-', titlecase(k))}": v for k, v in track_ids.items()} + **{f"X-{re.sub(' ', '-', titlecase(k))}": v for k, v in track_ids.items()}, } if hook.file_response is not None: response = web.FileResponse( path=hook.file_response, - headers={'Content-Type': hook.content_type, **headers} + headers={"Content-Type": hook.content_type, **headers}, ) elif hook.stream_response is not None: response = hook.stream_response.resp @@ -403,9 +491,7 @@ def _response(*, track_ids: Dict[str, str], key: str, payload: EventPayload, hoo ) body = serializer(payload, key=key) response = web.Response( - body=body, - headers=headers, - content_type=hook.content_type + body=body, headers=headers, content_type=hook.content_type ) for name, cookie in hook.cookies.items(): value, args, kwargs = cookie @@ -418,55 +504,78 @@ def _response(*, track_ids: Dict[str, str], key: str, payload: EventPayload, hoo def _response_info(response: ResponseType): - return extra(prefix='response.', status=str(response.status)) + return extra(prefix="response.", status=str(response.status)) def _track_ids(request: web.Request) -> Dict[str, str]: return { - 'track.operation_id': str(uuid.uuid4()), - 'track.request_id': str(uuid.uuid4()), - 'track.request_ts': datetime.now(tz=timezone.utc).isoformat(), + "track.operation_id": str(uuid.uuid4()), + "track.request_id": str(uuid.uuid4()), + "track.request_ts": datetime.now(tz=timezone.utc).isoformat(), **{ "track." + snakecase(k[8:].lower()): v - for k, v in request.headers.items() if k.lower().startswith('x-track-') - } + for k, v in request.headers.items() + if k.lower().startswith("x-track-") + }, } -def _failed_response(context: Optional[EventContext], - e: Exception) -> web.Response: +def _failed_response(context: Optional[EventContext], e: Exception) -> web.Response: if context: logger.error(context, e) logger.failed(context) else: logger.error(__name__, e) info = ErrorInfo.from_exception(e) - return web.Response( - status=500, - body=Payload.to_json(info) - ) + return web.Response(status=500, body=Payload.to_json(info)) -def _ignored_response(context: Optional[EventContext], - status: int, - e: BaseException) -> web.Response: +def _ignored_response( + context: Optional[EventContext], status: int, e: BaseException +) -> web.Response: if context: logger.error(context, e) logger.ignored(context) else: logger.error(__name__, e) info = ErrorInfo.from_exception(e) - return web.Response( - status=status, - body=Payload.to_json(info) + return web.Response(status=status, body=Payload.to_json(info)) + + +async def _execute_setup_event( + app_engine: AppEngine, + plugin: Optional[AppEngine], + event_name: str, +) -> EventContext: + """ + Executes event of SETUP type, on server start + """ + event_settings = get_event_settings(app_engine.settings, event_name) + context = EventContext( + app_config=app_engine.app_config, + plugin_config=app_engine.app_config if plugin is None else plugin.app_config, + event_name=event_name, + settings=event_settings, + track_ids={}, + auth_info=auth_info_default, ) + logger.start(context) + if plugin is None: + res = await app_engine.execute(context=context, query_args=None, payload=None) + else: + res = await plugin.execute(context=context, query_args=None, payload=None) + logger.done(context, extra=metrics(context)) + + return res + def _request_start(app_engine: AppEngine, plugin: AppEngine, event_name: str, event_settings: EventSettings, - request: web.Request) -> EventContext: + request: web.Request, +) -> EventContext: """ Extracts context and track information from a request and logs start of event """ @@ -498,22 +607,26 @@ def _ignore_auth(request: web.Request, context: EventContext) -> str: AuthType.BASIC: _extract_auth_header, AuthType.BEARER: _extract_auth_header, AuthType.REFRESH: _extract_refresh_cookie, - AuthType.UNSECURED: _ignore_auth + AuthType.UNSECURED: _ignore_auth, } -def _extract_authorization(auth_methods: List[AuthType], request: web.Request, context: EventContext): +def _extract_authorization( + auth_methods: List[AuthType], request: web.Request, context: EventContext +): for auth_type in auth_methods: auth_header = AUTH_HEADER_EXTRACTORS[auth_type](request, context) if auth_header is not None: return auth_header - return 'Unsecured -' + return "Unsecured -" -def _validate_authorization(app_config: AppConfig, - context: EventContext, - auth_types: List[AuthType], - request: web.Request): +def _validate_authorization( + app_config: AppConfig, + context: EventContext, + auth_types: List[AuthType], + request: web.Request, +): """ Validates Authorization header from request to provide valid credentials for the methods supported in event configuration. @@ -530,11 +643,11 @@ def _validate_authorization(app_config: AppConfig, except ValueError as e: raise BadRequest("Malformed Authorization") from e - context.auth_info['allowed'] = False + context.auth_info["allowed"] = False for auth_type in auth_types: if method.upper() == auth_type.name.upper(): auth.validate_auth_method(auth_type, data, context) - if context.auth_info.get('allowed'): + if context.auth_info.get("allowed"): return None raise Unauthorized(method) @@ -548,47 +661,49 @@ def _text_response(result: str, *args, **kwargs) -> str: CONTENT_TYPE_BODY_SER: Dict[str, Callable[..., str]] = { - 'application/json': _application_json_response, - 'text/html': _text_response, - 'text/plain': _text_response + "application/json": _application_json_response, + "text/html": _text_response, + "text/plain": _text_response, } async def _request_execute( - app_engine: AppEngine, - event_name: str, - context: EventContext, - query_args: Dict[str, Any], - payload: Optional[EventPayloadType], - preprocess_hook: PreprocessHook, - request: web.Request) -> ResponseType: + app_engine: AppEngine, + event_name: str, + context: EventContext, + query_args: Dict[str, Any], + payload: Optional[EventPayloadType], + preprocess_hook: PreprocessHook, + request: web.Request, +) -> ResponseType: """ Executes request using engine event handler """ response_hook = PostprocessHook(request) result = await app_engine.preprocess( - context=context, query_args=query_args, payload=payload, request=preprocess_hook) + context=context, query_args=query_args, payload=payload, request=preprocess_hook + ) if (preprocess_hook.status is None) or (preprocess_hook.status == 200): - result = await app_engine.execute(context=context, query_args=query_args, payload=result) - result = await app_engine.postprocess(context=context, payload=result, response=response_hook) + result = await app_engine.execute( + context=context, query_args=query_args, payload=result + ) + result = await app_engine.postprocess( + context=context, payload=result, response=response_hook + ) else: response_hook.set_status(preprocess_hook.status) response = _response( - track_ids=context.track_ids, - key=event_name, - payload=result, - hook=response_hook + track_ids=context.track_ids, key=event_name, payload=result, hook=response_hook ) - logger.done(context, extra=combined( - _response_info(response), metrics(context) - )) + logger.done(context, extra=combined(_response_info(response), metrics(context))) return response async def _request_process_payload( - context: EventContext, - datatype: Optional[Type[EventPayloadType]], - request: web.Request) -> Tuple[Optional[EventPayloadType], Optional[bytes]]: + context: EventContext, + datatype: Optional[Type[EventPayloadType]], + request: web.Request, +) -> Tuple[Optional[EventPayloadType], Optional[bytes]]: """ Extract payload from request. Returns payload if parsing succeeded. Raises BadRequest if payload fails to parse @@ -604,12 +719,13 @@ async def _request_process_payload( async def _handle_post_invocation( - app_engine: AppEngine, - impl: AppEngine, - event_name: str, - datatype: Optional[Type[DataObject]], - auth_types: List[AuthType], - request: web.Request) -> ResponseType: + app_engine: AppEngine, + impl: AppEngine, + event_name: str, + datatype: Optional[Type[DataObject]], + auth_types: List[AuthType], + request: web.Request, +) -> ResponseType: """ Handler to execute POST calls """ @@ -619,10 +735,20 @@ async def _handle_post_invocation( context = _request_start(app_engine, impl, event_name, event_settings, request) query_args = dict(request.query) _validate_authorization(app_engine.app_config, context, auth_types, request) - payload, payload_raw = await _request_process_payload(context, datatype, request) - hook: PreprocessHook[NoopMultiparReader] = PreprocessHook(headers=request.headers, payload_raw=payload_raw) + payload, payload_raw = await _request_process_payload( + context, datatype, request + ) + hook: PreprocessHook[NoopMultiparReader] = PreprocessHook( + headers=request.headers, payload_raw=payload_raw + ) return await _request_execute( - impl, event_name, context, query_args, payload, preprocess_hook=hook, request=request + impl, + event_name, + context, + query_args, + payload, + preprocess_hook=hook, + request=request, ) except Unauthorized as e: return _ignored_response(context, 401, e) @@ -633,11 +759,12 @@ async def _handle_post_invocation( async def _handle_get_invocation( - app_engine: AppEngine, - impl: AppEngine, - event_name: str, - auth_types: List[AuthType], - request: web.Request) -> ResponseType: + app_engine: AppEngine, + impl: AppEngine, + event_name: str, + auth_types: List[AuthType], + request: web.Request, +) -> ResponseType: """ Handler to execute GET calls """ @@ -647,15 +774,20 @@ async def _handle_get_invocation( context = _request_start(app_engine, impl, event_name, event_settings, request) _validate_authorization(app_engine.app_config, context, auth_types, request) query_args = dict(request.query) - payload = query_args.get('payload') + payload = query_args.get("payload") if payload is not None: - del query_args['payload'] - hook: PreprocessHook[NoopMultiparReader] = PreprocessHook(headers=request.headers) + del query_args["payload"] + hook: PreprocessHook[NoopMultiparReader] = PreprocessHook( + headers=request.headers + ) return await _request_execute( - impl, event_name, context, - query_args, payload=payload, + impl, + event_name, + context, + query_args, + payload=payload, preprocess_hook=hook, - request=request + request=request, ) except Unauthorized as e: return _ignored_response(context, 401, e) @@ -666,12 +798,13 @@ async def _handle_get_invocation( async def _handle_multipart_invocation( - app_engine: AppEngine, - impl: AppEngine, - event_name: str, - datatype: Optional[Type[DataObject]], - auth_types: List[AuthType], - request: web.Request) -> ResponseType: + app_engine: AppEngine, + impl: AppEngine, + event_name: str, + datatype: Optional[Type[DataObject]], + auth_types: List[AuthType], + request: web.Request, +) -> ResponseType: """ Handler to execute POST calls """ @@ -681,14 +814,17 @@ async def _handle_multipart_invocation( context = _request_start(app_engine, impl, event_name, event_settings, request) query_args = dict(request.query) _validate_authorization(app_engine.app_config, context, auth_types, request) - hook = PreprocessHook( # type: ignore + hook = PreprocessHook( # type: ignore headers=request.headers, multipart_reader=await request.multipart() # type: ignore ) return await _request_execute( - impl, event_name, context, - query_args, payload=None, + impl, + event_name, + context, + query_args, + payload=None, preprocess_hook=hook, - request=request + request=request, ) except Unauthorized as e: return _ignored_response(context, 401, e) @@ -699,9 +835,8 @@ async def _handle_multipart_invocation( async def _handle_stream_start_invocation( - app_engine: AppEngine, - event_name: str, - request: web.Request) -> web.Response: + app_engine: AppEngine, event_name: str, request: web.Request +) -> web.Response: """ Handles call to stream processing event `start` endpoint, spawning an async job that listens continuosly to event streams @@ -715,9 +850,8 @@ async def _handle_stream_start_invocation( async def _handle_service_start_invocation( - app_engine: AppEngine, - event_name: str, - request: web.Request) -> web.Response: + app_engine: AppEngine, event_name: str, request: web.Request +) -> web.Response: """ Handles call to service event `start` endpoint, spawning an async job that listens continuosly __service__ @@ -731,9 +865,8 @@ async def _handle_service_start_invocation( async def _handle_event_stop_invocation( - app_engine: AppEngine, - event_name: str, - request: web.Request) -> web.Response: + app_engine: AppEngine, event_name: str, request: web.Request +) -> web.Response: """ Signals engine for stopping an event. Used to stop reading stream processing events. @@ -747,8 +880,19 @@ async def _handle_event_stop_invocation( return web.Response(status=500, body=str(e)) -ParsedArgs = namedtuple("ParsedArgs", ["host", "port", "path", "start_streams", - "config_files", "api_file", "api_auto", "enabled_groups"]) +ParsedArgs = namedtuple( + "ParsedArgs", + [ + "host", + "port", + "path", + "start_streams", + "config_files", + "api_file", + "api_auto", + "enabled_groups", + ], +) def parse_args(args) -> ParsedArgs: @@ -760,7 +904,7 @@ def parse_args(args) -> ParsedArgs: --start-streams, optional True if to auto start all events of STREAM type --config-files, is a comma-separated list of hopeit apps config files relative or full paths --api-file, optional path to openapi.json file with at least openapi and info sections - --api-auto, optional when api_file is not defined, specify a semicolons-separated + --api-auto, optional when api_file is not defined, specify a semicolons-separated `version;title;description` to define API General Info and enable OpenAPI --enabled-groups, optional list of group label to be started Example:: @@ -773,20 +917,26 @@ def parse_args(args) -> ParsedArgs: """ parser = argparse.ArgumentParser(description="hopeit.py engine") - parser.add_argument('--host') - parser.add_argument('--path') - parser.add_argument('--port') - parser.add_argument('--start-streams', action='store_true') - parser.add_argument('--config-files') - parser.add_argument('--api-file') - parser.add_argument('--api-auto') - parser.add_argument('--enabled-groups') + parser.add_argument("--host") + parser.add_argument("--path") + parser.add_argument("--port") + parser.add_argument("--start-streams", action="store_true") + parser.add_argument("--config-files") + parser.add_argument("--api-file") + parser.add_argument("--api-auto") + parser.add_argument("--enabled-groups") parsed_args = parser.parse_args(args=args) - port = int(parsed_args.port) if parsed_args.port else 8020 if parsed_args.path is None else None - config_files = parsed_args.config_files.split(',') - enabled_groups = parsed_args.enabled_groups.split(',') if parsed_args.enabled_groups else [] - api_auto = [] if parsed_args.api_auto is None else parsed_args.api_auto.split(';') + port = ( + int(parsed_args.port) + if parsed_args.port + else 8020 if parsed_args.path is None else None + ) + config_files = parsed_args.config_files.split(",") + enabled_groups = ( + parsed_args.enabled_groups.split(",") if parsed_args.enabled_groups else [] + ) + api_auto = [] if parsed_args.api_auto is None else parsed_args.api_auto.split(";") return ParsedArgs( host=parsed_args.host, @@ -796,12 +946,17 @@ def parse_args(args) -> ParsedArgs: config_files=config_files, api_file=parsed_args.api_file, api_auto=api_auto, - enabled_groups=enabled_groups + enabled_groups=enabled_groups, ) -def init_web_server(config_files: List[str], api_file: str, api_auto: List[str], enabled_groups: List[str], - start_streams: bool) -> web.Application: +def init_web_server( + config_files: List[str], + api_file: str, + api_auto: List[str], + enabled_groups: List[str], + start_streams: bool, +) -> web.Application: """ Init Web Server """ @@ -812,28 +967,42 @@ def init_web_server(config_files: List[str], api_file: str, api_auto: List[str], api_file=api_file, api_auto=api_auto, start_streams=start_streams, - enabled_groups=enabled_groups + enabled_groups=enabled_groups, ) return web_server -def serve(host: str, path: str, port: int, config_files:List[str], api_file: str, api_auto: List[str], - start_streams:bool, enabled_groups:List[str]): +def serve( + host: str, + path: str, + port: int, + config_files: List[str], + api_file: str, + api_auto: List[str], + start_streams: bool, + enabled_groups: List[str], +): """ Serve hopeit.engine """ - web_app = init_web_server(config_files, api_file, api_auto, enabled_groups, start_streams) - logger.info(__name__, f"Starting web server host: {host} port: {port} socket: {path}...") + web_app = init_web_server( + config_files, api_file, api_auto, enabled_groups, start_streams + ) + logger.info( + __name__, f"Starting web server host: {host} port: {port} socket: {path}..." + ) web.run_app(web_app, host=host, path=path, port=port) if __name__ == "__main__": sys_args = parse_args(sys.argv[1:]) - serve(host=sys_args.host, + serve( + host=sys_args.host, path=sys_args.path, port=sys_args.port, config_files=sys_args.config_files, api_file=sys_args.api_file, api_auto=sys_args.api_auto, start_streams=sys_args.start_streams, - enabled_groups=sys_args.enabled_groups) + enabled_groups=sys_args.enabled_groups, + ) \ No newline at end of file diff --git a/plugins/ops/apps-visualizer/api/openapi.json b/plugins/ops/apps-visualizer/api/openapi.json index f84aadae..971e1031 100644 --- a/plugins/ops/apps-visualizer/api/openapi.json +++ b/plugins/ops/apps-visualizer/api/openapi.json @@ -634,7 +634,8 @@ "POST", "STREAM", "SERVICE", - "MULTIPART" + "MULTIPART", + "SETUP" ], "x-enum-name": "EventType", "x-module-name": "hopeit.app.config" @@ -703,7 +704,7 @@ } }, "x-module-name": "hopeit.app.config", - "description": "\n Event Descriptor: configures event implementation\n\n :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE\n :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the\n current app (ON_APP) or it will be created in the original plugin (STANDALONE, default)\n :field: route, optional str: custom route for endpoint. If not specified route will be derived\n from `/api/app_name/app_version/event_name`\n :field: impl, optional str: custom event implementation Python module. If not specified, module\n with same same as event will be imported.\n :field: connections, list of EventConnection: specifies dependencies on other apps/endpoints,\n that can be used by client plugins to call events on external apps\n :field: read_stream, optional ReadStreamDescriptor: specifies source stream to read from.\n Valid only for STREAM events.\n :field: write_stream, optional WriteStreamDescriptor: for any type of events, resultant dataobjects will\n be published to the specified stream.\n :field: auth, list of AuthType: supported authentication schemas for this event. If not specified\n application default will be used.\n :field: setting_keys, list of str: by default EventContext will have access to the settings section\n with the same name of the event using `settings = context.settings(datatype=MySettingsType)`.\n In case additional sections are needed to be accessed from\n EventContext, then a list of setting keys, including the name of the event if needed,\n can be specified here. Then access to a `custom` key can be done using\n `custom_settings = context.settings(key=\"customer\", datatype=MyCustomSettingsType)`\n :field: dataobjects, list of str: list of full qualified dataobject types that this event can process.\n When not specified, the engine will inspect the module implementation and find all datatypes supported\n as payload in the functions defined as `__steps__`. In case of generic functions that support\n `payload: DataObject` argument, then a list of full qualified datatypes must be specified here.\n :field: group, str: group name, if none is assigned it is automatically assigned as 'DEFAULT'.\n " + "description": "\n Event Descriptor: configures event implementation\n\n :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE, SETUP\n :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the\n current app (ON_APP) or it will be created in the original plugin (STANDALONE, default)\n :field: route, optional str: custom route for endpoint. If not specified route will be derived\n from `/api/app_name/app_version/event_name`\n :field: impl, optional str: custom event implementation Python module. If not specified, module\n with same same as event will be imported.\n :field: connections, list of EventConnection: specifies dependencies on other apps/endpoints,\n that can be used by client plugins to call events on external apps\n :field: read_stream, optional ReadStreamDescriptor: specifies source stream to read from.\n Valid only for STREAM events.\n :field: write_stream, optional WriteStreamDescriptor: for any type of events, resultant dataobjects will\n be published to the specified stream.\n :field: auth, list of AuthType: supported authentication schemas for this event. If not specified\n application default will be used.\n :field: setting_keys, list of str: by default EventContext will have access to the settings section\n with the same name of the event using `settings = context.settings(datatype=MySettingsType)`.\n In case additional sections are needed to be accessed from\n EventContext, then a list of setting keys, including the name of the event if needed,\n can be specified here. Then access to a `custom` key can be done using\n `custom_settings = context.settings(key=\"customer\", datatype=MyCustomSettingsType)`\n :field: dataobjects, list of str: list of full qualified dataobject types that this event can process.\n When not specified, the engine will inspect the module implementation and find all datatypes supported\n as payload in the functions defined as `__steps__`. In case of generic functions that support\n `payload: DataObject` argument, then a list of full qualified datatypes must be specified here.\n :field: group, str: group name, if none is assigned it is automatically assigned as 'DEFAULT'.\n " }, "EventConnection": { "type": "object",