From d9aee4d6fa83c490217d0890033b81248732a18b Mon Sep 17 00:00:00 2001 From: Antony Mayi Date: Tue, 30 Aug 2022 10:35:13 +0000 Subject: [PATCH] serving tutorial --- .flake8 | 2 +- constraints.txt | 35 ++++--- docs/application.rst | 18 ++-- docs/conf.py | 1 + docs/io.rst | 2 +- docs/platform.rst | 5 +- docs/runner.rst | 2 +- docs/tutorials/titanic/lifecycle.rst | 5 + docs/tutorials/titanic/pipeline.rst | 4 +- docs/tutorials/titanic/serving.rst | 103 +++++++++++++++++++- forml/application/_descriptor.py | 8 +- forml/application/_strategy.py | 3 +- forml/io/_input/_producer.py | 2 +- forml/io/layout/_codec.py | 4 +- forml/io/layout/_external.py | 6 +- forml/project/_component/__init__.py | 75 ++++++++++---- forml/provider/gateway/rest.py | 67 ++++++++++--- forml/provider/runner/dask.py | 11 ++- forml/provider/runner/graphviz.py | 32 +++--- forml/provider/runner/pyfunc.py | 3 +- forml/runtime/_agent.py | 13 ++- forml/runtime/_perf.py | 2 +- forml/runtime/_service/__init__.py | 35 +++++-- forml/runtime/_service/dispatch.py | 2 +- forml/runtime/_service/prediction.py | 3 + forml/setup/{_conf/__init__.py => _conf.py} | 24 +++-- forml/setup/_logging.py | 10 +- forml/setup/_run/__init__.py | 11 ++- forml/setup/{_conf => }/config.toml | 3 +- forml/setup/{_conf => }/logging.ini | 32 +++--- setup.py | 4 +- tests/provider/registry/test_mlflow.py | 2 +- tutorials/config.toml | 8 ++ tutorials/titanic/application.py | 13 ++- 34 files changed, 399 insertions(+), 151 deletions(-) rename forml/setup/{_conf/__init__.py => _conf.py} (97%) rename forml/setup/{_conf => }/config.toml (97%) rename forml/setup/{_conf => }/logging.ini (88%) diff --git a/.flake8 b/.flake8 index f17de4c8..2f8def62 100644 --- a/.flake8 +++ b/.flake8 @@ -2,6 +2,6 @@ show-source = true enable-extensions=G max-line-length = 120 -ignore = B019,E731,W504,I001,W503 +ignore = B019,B024,E731,W504,I001,W503 exclude = .git,__pycache__,.eggs,*.egg min_python_version = 3.9.0 diff --git a/constraints.txt b/constraints.txt index 88219018..606ebd82 100644 --- a/constraints.txt +++ b/constraints.txt @@ -10,7 +10,7 @@ alembic==1.8.1 # via mlflow anyio==3.6.1 # via starlette -astroid==2.11.7 +astroid==2.12.4 # via pylint attrs==22.1.0 # via @@ -31,7 +31,7 @@ certifi==2022.6.15 # via requests cfgv==3.3.1 # via pre-commit -charset-normalizer==2.1.0 +charset-normalizer==2.1.1 # via requests click==8.1.3 # via @@ -50,15 +50,15 @@ cloudpickle==2.1.0 # mlflow coverage==6.4.4 # via pytest-cov -dask==2022.8.0 +dask==2022.8.1 # via forml (setup.py) -databricks-cli==0.17.1 +databricks-cli==0.17.3 # via mlflow defusedxml==0.7.1 # via nbconvert dill==0.3.5.1 # via pylint -distlib==0.3.5 +distlib==0.3.6 # via virtualenv docker==5.0.3 # via mlflow @@ -70,7 +70,6 @@ entrypoints==0.4 # via # jupyter-client # mlflow - # nbconvert fastjsonschema==2.16.1 # via nbformat filelock==3.8.0 @@ -80,7 +79,7 @@ flake8==5.0.4 # flake8-bugbear # flake8-colors # flake8-typing-imports -flake8-bugbear==22.7.1 +flake8-bugbear==22.8.23 # via forml (setup.py) flake8-colors==0.1.9 # via forml (setup.py) @@ -100,7 +99,7 @@ gitpython==3.1.27 # via mlflow graphviz==0.20.1 # via forml (setup.py) -greenlet==1.1.2 +greenlet==1.1.3 # via sqlalchemy gunicorn==20.1.0 # via mlflow @@ -132,9 +131,9 @@ jinja2==3.1.2 # sphinx joblib==1.1.0 # via scikit-learn -jsonschema==4.12.1 +jsonschema==4.14.0 # via nbformat -jupyter-client==7.3.4 +jupyter-client==7.3.5 # via nbclient jupyter-core==4.11.1 # via @@ -164,7 +163,7 @@ mccabe==0.7.0 # via # flake8 # pylint -mistune==0.8.4 +mistune==2.0.4 # via nbconvert mlflow==1.28.0 # via forml (setup.py) @@ -172,9 +171,9 @@ mypy-extensions==0.4.3 # via # black # typing-inspect -nbclient==0.6.6 +nbclient==0.6.7 # via nbconvert -nbconvert==6.5.3 +nbconvert==7.0.0 # via nbsphinx nbformat==5.4.0 # via @@ -258,7 +257,7 @@ pyhive==0.6.5 # via forml (setup.py) pyjwt==2.4.0 # via databricks-cli -pylint==2.14.5 +pylint==2.15.0 # via forml (setup.py) pyparsing==3.0.9 # via packaging @@ -301,7 +300,7 @@ requests==2.28.1 # sphinx scikit-learn==1.1.2 # via forml (setup.py) -scipy==1.9.0 +scipy==1.9.1 # via # mlflow # scikit-learn @@ -402,9 +401,9 @@ typing-extensions==4.3.0 # typing-inspect typing-inspect==0.8.0 # via libcst -urllib3==1.26.11 +urllib3==1.26.12 # via requests -uvicorn==0.18.2 +uvicorn==0.18.3 # via forml (setup.py) virtualenv==20.16.3 # via pre-commit @@ -412,7 +411,7 @@ webencodings==0.5.1 # via # bleach # tinycss2 -websocket-client==1.3.3 +websocket-client==1.4.0 # via docker werkzeug==2.2.2 # via flask diff --git a/docs/application.rst b/docs/application.rst index 640afcc2..3c9ca187 100644 --- a/docs/application.rst +++ b/docs/application.rst @@ -31,6 +31,14 @@ ML problem, *applications* aim to expose it (by means of :ref:`gateway provider `) in a domain-specific form suitable for integration with the *actual* decision making process. +ForML :ref:`platform ` persists :ref:`published applications ` +within a special :ref:`application inventory ` where they are picked from at runtime +by the :ref:`serving engine `. + + +.. _application-prjrelation: +.. rubric:: Project-Application Relationship + As shown in the diagram below, relationships between projects and applications can have any possible cardinality. Projects might not be associated with any application (not exposed for serving - e.g. *Project B*), on the other hand an application can possibly span multiple projects @@ -65,10 +73,6 @@ It makes sense to manage an application (descriptor) in the scope of some partic they form a 1:1 relationship (perhaps the most typical scenario). More complex applications might need to be maintained separately though. -ForML :ref:`platform ` persists :ref:`published applications ` -within a special :ref:`application inventory ` where they are picked from at runtime -by the :ref:`serving engine `. - .. _application-dispatch: Request Dispatching @@ -130,10 +134,10 @@ Another powerful way an application exerts control over the serving process is a :meth:`selection ` of the specific :ref:`model generation ` to be used for serving each particular request. -Applications can base the selection logic on the following available details: +Applications can base the selection logic on the following available facts: -* actual content of the :ref:`model registry ` (all existing model generations - to choose from) +* actual content of the :ref:`model registry ` (any existing model generation to choose + from) * custom metadata stored in the application *context* (e.g. as part of the query :meth:`receiving `) * various serving :class:`metrics ` provided by the system diff --git a/docs/conf.py b/docs/conf.py index 8d46d34e..50fe2f63 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -95,6 +95,7 @@ _target_blacklist = { 'py:class': ( '_Actor', + 'applications.Starlette', r'asset\.Generation', r'^dsl\.Operable', r'^dsl\.Ordering\.(?:Direction|Term)', diff --git a/docs/io.rst b/docs/io.rst index 74618087..0d2e780c 100644 --- a/docs/io.rst +++ b/docs/io.rst @@ -293,7 +293,7 @@ The two encoder/decoder matching functions bellow currently support the followin encodings/flavours: +-------------------------+----------------------------+------------------------------------------+ -| Encoding | Example | Implementation | +| Content-type | Example | Implementation | +=========================+============================+==========================================+ | ``application/json; | ``[{column -> value}, | | | format=pandas-records`` | ... , {column -> value}]`` | | diff --git a/docs/platform.rst b/docs/platform.rst index f9c3aa2e..99168920 100644 --- a/docs/platform.rst +++ b/docs/platform.rst @@ -57,7 +57,7 @@ will try to locate and merge the :file:`config.toml` file instances in the follo Following is the default content of the ForML platform configuration file: -.. literalinclude:: ../forml/setup/_conf/config.toml +.. literalinclude:: ../forml/setup/config.toml :caption: config.toml (default) :linenos: :language: toml @@ -168,9 +168,10 @@ command-line interface - see the integrated help for more details: Lifecycle Management for Datascience Projects. Options: - -C, --config PATH Additional config file. + -C, --config FILE Additional config file. -L, --loglevel [debug|info|warning|error] Global loglevel to use. + --logfile FILE Logfile path. --help Show this message and exit. Commands: diff --git a/docs/runner.rst b/docs/runner.rst index 43fafdb3..5bf70d96 100644 --- a/docs/runner.rst +++ b/docs/runner.rst @@ -40,7 +40,7 @@ Runner API ---------- .. autoclass:: forml.runtime.Runner - :members: _run + :members: run .. _runner-providers: diff --git a/docs/tutorials/titanic/lifecycle.rst b/docs/tutorials/titanic/lifecycle.rst index 46733018..8a1d59b7 100644 --- a/docs/tutorials/titanic/lifecycle.rst +++ b/docs/tutorials/titanic/lifecycle.rst @@ -135,3 +135,8 @@ free to change the directory to another location before executing the commands. .. image:: ../../_static/images/titanic-apply.png :target: ../../_static/images/titanic-apply.png + +Now, after exploring two of the :ref:`execution mechanisms ` (namely the +:ref:`interactive ` mode demonstrated during the :doc:`exploratory +analysis ` and the :ref:`command-line driven ` batch processing shown +in this chapter), we can proceed to the final :doc:`deployment and serving `. diff --git a/docs/tutorials/titanic/pipeline.rst b/docs/tutorials/titanic/pipeline.rst index 4dd2ac6d..d75ace38 100644 --- a/docs/tutorials/titanic/pipeline.rst +++ b/docs/tutorials/titanic/pipeline.rst @@ -51,8 +51,8 @@ Custom Preprocessing Operators In addition to the ``Imputer`` operator we've created in scope of our :doc:`exploration `, let's improve our preprocessing with a couple more operators. We stick to the -simple ``@wrap`` technique for implementing :ref:`actors ` and eventually -:ref:`operators `. +simple ``@wrap`` technique for implementing :ref:`actors ` and :ref:`operators +` eventually. ParseTitle ^^^^^^^^^^ diff --git a/docs/tutorials/titanic/serving.rst b/docs/tutorials/titanic/serving.rst index 68fe1718..f05c4637 100644 --- a/docs/tutorials/titanic/serving.rst +++ b/docs/tutorials/titanic/serving.rst @@ -17,7 +17,108 @@ Deployment and Serving ====================== -TODO +With all the :ref:`lifecycle actions ` simply :doc:`at our fingertips `, +let's now focus on deploying the project as a :ref:`ForML application ` making it +available for :ref:`serving `. + + +Creating and Publishing the Application Descriptor +-------------------------------------------------- + +ForML applications are :ref:`implemented ` in form of a *descriptor* +instance defined within a *Python module*. Due to the potentially non-exclusive nature of the +:ref:`project-application relationship `, this module might in general +need to be maintained out of the project scope but for the sake of this tutorial let's assume a +direct 1:1 relationship so that we can keep it along with the project sources in a module called +:file:`application.py`. Our top-level project structure is then going to look as follows: + +.. code-block:: console + + $ ls -1p forml-tutorial-titanic + application.py + notebooks/ + setup.py + tests/ + titanic/ + + +For simplicity, we choose to :ref:`implement the application ` in +the most basic (yet full-featured) manner by reusing the existing :class:`application.Generic +` descriptor and passing its instance to :func:`application.setup() +` for registration. Note this simplistic setup requires the application +name to match the project name (in order to select the relevant assets from the :ref:`model +registry `): + +.. literalinclude:: ../../../tutorials/titanic/application.py + :caption: application.py + :linenos: + :language: python + :start-after: # under the License. + +That's all it takes to implement a simple application descriptor. It can now be deployed by +the means of publishing into a :ref:`platform-configured ` application +:ref:`inventory `: + +.. code-block:: console + + $ forml application put application.py + $ forml application list + forml-example-titanic + Serving ------- + +Easiest way to expose our model for serving is to spin up a particular :ref:`serving gateway +` provider linked through the :ref:`platform configuration ` to +the same :ref:`inventory ` and :ref:`registry ` holding our application and +models respectively. + +The configured gateway (the :class:`rest.Gateway ` in our case) +can be started simply using the CLI: + +.. code-block:: console + + $ forml application serve + INFO: Started server process [568798] + INFO: Waiting for application startup. + INFO: Application startup complete. + INFO: Uvicorn running on http://127.0.0.1:8080 (Press CTRL+C to quit) + +Note the gateway is capable of serving any application in the linked inventory. + +Let's explore the capabilities using manual ``curl`` queries: + +#. Querying a non-existent application ``foobarbaz``: + + .. code-block:: console + + $ curl -X POST http://127.0.0.1:8080/foobarbaz + Application foobarbaz not found in Dispatch-registry + +#. Querying our ``forml-titanic-example`` application using a *JSON* encoded payload: + + .. code-block:: console + + $ curl -X POST -H 'Content-Type: application/json' -d '[{"Pclass":1, "Name":"Foo", "Sex": "male", "Age": 34, "SibSp": 3, "Parch": 2, "Ticket": "13", "Fare": 10.1, "Cabin": "123", "Embarked": "S"}]' http://127.0.0.1:8080/forml-example-titanic + [{"c0":0.3459976655}] + +#. Making the same query but requesting the result to be encoded as CSV: + + .. code-block:: console + + $ curl -X POST -H 'Content-Type: application/json' -H 'Accept: text/csv' -d '[{"Pclass":1,"Name":"Foo", "Sex": "male", "Age": 34, "SibSp": 3, "Parch": 2, "Ticket": "13", "Fare": 10.1, "Cabin": "123", "Embarked": "S"}]' http://127.0.0.1:8080/forml-example-titanic + c0 + 0.34599766550668526 + +#. Making the same query but sending the payload in the *pandas-split* (JSON) format and requesting + the result as (JSON) *values*: + + .. code-block:: console + + $ curl -X POST -H 'Content-Type: application/json; format=pandas-split' -H 'Accept: application/json; format=pandas-values' -d '{"columns": ["Pclass", "Name", "Sex", "Age", "SibSp", "Parch", "Ticket", "Fare", "Cabin", "Embarked"], "data": [[1, "Foo", "male", 34, 3, 2, 13, 10.1, "123", "S"]]}' http://127.0.0.1:8080/forml-example-titanic + [[0.3459976655]] + +That concludes this Titanic Challenge tutorial, from here you can continue to the other +:ref:`available tutorials ` or browse the general :doc:`ForML documentation +<../../index>`. diff --git a/forml/application/_descriptor.py b/forml/application/_descriptor.py index 557e65a5..d9821076 100644 --- a/forml/application/_descriptor.py +++ b/forml/application/_descriptor.py @@ -194,15 +194,17 @@ def respond( class Generic(Descriptor): """Generic application descriptor for basic serving scenarios. - It simply runs the directly decoded request payload through the model/generation selected using + It simply runs the directly decoded (using the :func:`available decoders + `) request payload through the model/generation selected using the provided :class:`application.Selector ` and returns the directly - encoded outcomes as the response. + encoded (using the :func:`available encoders `) outcomes as the + response. Args: name: The (unique) name for this application registration/lookup. selector: Implementation of a particular model-selection strategy (defaults to :class:`application.Latest ` selector expecting the - project name to be matching the application name). + project name to be *matching* the application name). Examples: >>> APP = application.Generic('forml-example-titanic') diff --git a/forml/application/_strategy.py b/forml/application/_strategy.py index 035d8fbb..2f4476dd 100644 --- a/forml/application/_strategy.py +++ b/forml/application/_strategy.py @@ -43,7 +43,8 @@ def select(self, registry: 'asset.Directory', context: typing.Any, stats: 'runti Args: registry: Model registry to select the model from. - context: Optional metadata carried over from decode. + context: Optional metadata carried over from the :meth:`application.Descriptor.receive + `. stats: Application specific serving metrics. Returns: diff --git a/forml/io/_input/_producer.py b/forml/io/_input/_producer.py index c2a95877..584d0cd6 100644 --- a/forml/io/_input/_producer.py +++ b/forml/io/_input/_producer.py @@ -84,7 +84,7 @@ def __call__(self, statement: 'dsl.Statement', entry: typing.Optional['layout.En complete, indices = self._match_entry(statement.schema, entry.schema) if not complete: # here we would go into augmentation mode - when implemented - raise forml.InvalidError('Augmentation not yet supported') + raise forml.MissingError('Augmentation not supported - please provide all features') return entry.data.take_columns(indices) if indices else entry.data LOGGER.debug('Parsing ETL query') diff --git a/forml/io/layout/_codec.py b/forml/io/layout/_codec.py index cebb2584..7d9ebaac 100644 --- a/forml/io/layout/_codec.py +++ b/forml/io/layout/_codec.py @@ -84,13 +84,13 @@ def header(self) -> str: @classmethod @functools.lru_cache def parse(cls, value: str) -> typing.Sequence['layout.Encoding']: - """Parse the content type header value. + """Caching parser of the content type header values. Args: value: Comma separated list of content type values and their parameters. Returns: - Sequence of the ``Encoding`` instances ordered according to the provided priority. + Sequence of the :class:`Encoding` instances ordered according to the provided priority. Examples: >>> layout.Encoding.parse('image/GIF; q=0.6; a=x, text/html; q=1.0') diff --git a/forml/io/layout/_external.py b/forml/io/layout/_external.py index a506d8af..c8a193c3 100644 --- a/forml/io/layout/_external.py +++ b/forml/io/layout/_external.py @@ -28,14 +28,14 @@ class Entry(typing.NamedTuple): - """Internal representation of the decoded ``Request`` payload.""" + """Internal representation of the decoded :class:`Request` payload.""" schema: 'dsl.Source.Schema' data: 'layout.Tabular' class Outcome(typing.NamedTuple): - """Internal result payload representation to be encoded as ``Response``.""" + """Internal result payload representation to be encoded as :class:`Response`.""" schema: 'dsl.Source.Schema' data: 'layout.RowMajor' @@ -51,7 +51,7 @@ class Request(collections.namedtuple('Request', 'payload, encoding, params, acce payload: Raw encoded payload. encoding: Content type encoding instance. params: Optional application-level parameters. - accept: Content types request for the eventual ``Response``. + accept: Content types request for the eventual :class:`Response`. """ class Decoded(typing.NamedTuple): diff --git a/forml/project/_component/__init__.py b/forml/project/_component/__init__.py index 539fa10f..8582f27d 100644 --- a/forml/project/_component/__init__.py +++ b/forml/project/_component/__init__.py @@ -27,20 +27,50 @@ import typing import forml -from forml import flow +from forml import flow as flowmod from forml import setup as setupmod -from forml.io import dsl, layout +from forml.io import dsl as dslmod +from forml.io import layout from .. import _body from . import virtual if typing.TYPE_CHECKING: - from forml import evaluation, project + from forml import evaluation, flow, project # pylint: disable=reimported + from forml.io import dsl # pylint: disable=reimported LOGGER = logging.getLogger(__name__) -def setup(component: typing.Any) -> None: # pylint: disable=unused-argument +@typing.overload +def setup(source: 'project.Source') -> None: + """Source component setup entrypoint. + + Args: + source: Source descriptor. + """ + + +@typing.overload +def setup(pipeline: 'flow.Composable', schema: 'typing.Optional[dsl.Source.Schema]' = None) -> None: + """Pipeline component setup entrypoint. + + Args: + pipeline: Workflow expression. + schema: Optional schema of the pipeline output. + """ + + +@typing.overload +def setup(evaluation: 'project.Evaluation') -> None: + """Evaluation component setup entrypoint. + + Args: + evaluation: Evaluation descriptor. + """ + + +def setup(component) -> None: # pylint: disable=unused-argument """Interface for registering principal component instances. This function is expected to be called exactly once from within every component module passing @@ -50,7 +80,10 @@ def setup(component: typing.Any) -> None: # pylint: disable=unused-argument loader context* (outside the context this is effectively no-op). Args: - component: Principal component instance to be registered. + source: Source descriptor. + pipeline: Workflow expression. + schema: Optional schema of the pipeline output. + evaluation: Evaluation descriptor. """ LOGGER.debug('Principal component setup attempted outside of a loader context: %s', component) @@ -84,33 +117,33 @@ class Source(typing.NamedTuple): """ Labels = typing.Union[ - dsl.Feature, - typing.Sequence[dsl.Feature], - flow.Builder[flow.Actor[layout.Tabular, None, tuple[layout.RowMajor, layout.RowMajor]]], + dslmod.Feature, + typing.Sequence[dslmod.Feature], + flowmod.Builder[flowmod.Actor[layout.Tabular, None, tuple[layout.RowMajor, layout.RowMajor]]], ] """Label type - either a single column, multiple columns or a generic label extracting actor - (with two output ports). + (with two output ports) builder. """ class Extract(collections.namedtuple('Extract', 'train, apply, labels, ordinal')): """Combo of select statements for the different modes.""" - train: dsl.Statement - apply: dsl.Statement + train: 'dsl.Statement' + apply: 'dsl.Statement' labels: typing.Optional['project.Source.Labels'] - ordinal: typing.Optional[dsl.Operable] + ordinal: typing.Optional['dsl.Operable'] def __new__( cls, - train: dsl.Source, - apply: dsl.Source, + train: 'dsl.Source', + apply: 'dsl.Source', labels: typing.Optional['project.Source.Labels'], - ordinal: typing.Optional[dsl.Operable], + ordinal: typing.Optional['dsl.Operable'], ): train = train.statement apply = apply.statement - if labels is not None and not isinstance(labels, flow.Builder): - if isinstance(labels, dsl.Feature): + if labels is not None and not isinstance(labels, flowmod.Builder): + if isinstance(labels, dslmod.Feature): lseq = [labels] else: lseq = labels = tuple(labels) @@ -119,16 +152,16 @@ def __new__( if train.schema != apply.schema: raise forml.InvalidError('Train-apply schema mismatch') if ordinal: - ordinal = dsl.Operable.ensure_is(ordinal) + ordinal = dslmod.Operable.ensure_is(ordinal) return super().__new__(cls, train, apply, labels, ordinal) @classmethod def query( cls, - features: dsl.Source, + features: 'dsl.Source', labels: typing.Optional['project.Source.Labels'] = None, - apply: typing.Optional[dsl.Source] = None, - ordinal: typing.Optional[dsl.Operable] = None, + apply: typing.Optional['dsl.Source'] = None, + ordinal: typing.Optional['dsl.Operable'] = None, ) -> 'project.Source': """Factory method for creating a new Source descriptor instance with the given *extraction* parameters. diff --git a/forml/provider/gateway/rest.py b/forml/provider/gateway/rest.py index 5fa4945f..134f6b6d 100644 --- a/forml/provider/gateway/rest.py +++ b/forml/provider/gateway/rest.py @@ -88,12 +88,53 @@ async def __endpoint(self, _: reqmod.Request) -> respmod.Response: class Gateway(runtime.Gateway, alias='rest'): - """Rest frontend. - - TODO: URL format doc (/application/...), HTTP methods... - """ - - DEFAULTS = {'headers': [('server', f'ForML {forml.__version__}')]} + """Gateway(inventory: typing.Optional[asset.Inventory] = None, registry: typing.Optional[asset.Registry] = None, feeds: typing.Optional[io.Importer] = None, processes: typing.Optional[int] = None, loop: typing.Optional[asyncio.AbstractEventLoop] = None, server: typing.Callable[[applications.Starlette, ...], None] = uvicorn.run, **options) + + Serving gateway implemented as a RESTful API. + + The frontend provides the following HTTP endpoints: + + ================== ====== ================================================================== + Path Method Description + ================== ====== ================================================================== + ``/stats`` GET Retrieve the Engine-provided performance :class:`metrics report + `. + ``/`` POST Prediction request for the given :ref:`application `. + The entire request *body* is passed to the :ref:`Engine ` + as the :class:`layout.Request.payload ` + with the declared ``content-type`` indicated via the ``.encoding`` + and any potential query parameters bundled within the ``.params``. + ================== ====== ================================================================== + + Args: + inventory: Inventory of applications to be served (default as per platform config). + registry: Model registry of project artifacts to be served (default as per platform config). + feeds: Feeds to be used for potential feature augmentation (default as per platform config). + processes: Process pool size for each model sandbox. + loop: Explicit event loop instance. + server: Serving loop main function accepting the provided `application instance + `_ (defaults to `uvicorn.run + `_). + options: Additional serving loop keyword arguments (i.e. `Uvicorn settings + `_). + + The provider can be enabled using the following :ref:`platform configuration `: + + .. code-block:: toml + :caption: config.toml + + [GATEWAY.http] + provider = "rest" + port = 8080 + processes = 3 + + Important: + Select the ``rest`` :ref:`extras to install ` ForML together with the + Starlette/Uvicorn support. + """ # pylint: disable=line-too-long # noqa: E501 + + OPTIONS = {'headers': [('server', f'ForML {forml.__version__}')]} + """Default server loop options.""" def __init__( self, @@ -103,17 +144,17 @@ def __init__( processes: typing.Optional[int] = None, loop: typing.Optional[asyncio.AbstractEventLoop] = None, server: typing.Callable[[applications.Starlette, ...], None] = uvicorn.run, - **kwargs, + **options, ): - super().__init__(inventory, registry, feeds, processes=processes, loop=loop) - self._server: typing.Callable[[applications.Starlette, ...], None] = server - self._kwargs = self.DEFAULTS | kwargs + super().__init__(inventory, registry, feeds, processes=processes, loop=loop, server=server, options=options) + @classmethod def run( - self, + cls, apply: typing.Callable[[str, layout.Request], typing.Awaitable[layout.Response]], stats: typing.Callable[[], typing.Awaitable[runtime.Stats]], + **kwargs, ) -> None: routes = [Apply(apply), Stats(stats)] - app = applications.Starlette(routes=routes, debug=True) - self._server(app, **self._kwargs) + app = applications.Starlette(routes=routes, debug=False) + kwargs['server'](app, **(cls.OPTIONS | kwargs['options'])) diff --git a/forml/provider/runner/dask.py b/forml/provider/runner/dask.py index 9c3bd09a..bb045a6d 100644 --- a/forml/provider/runner/dask.py +++ b/forml/provider/runner/dask.py @@ -115,10 +115,11 @@ def __init__( sink: typing.Optional[io.Sink] = None, scheduler: typing.Optional[str] = None, ): - super().__init__(instance, feed, sink) - self._scheduler: str = scheduler or self.SCHEDULER + super().__init__(instance, feed, sink, scheduler=scheduler) - def _run(self, symbols: typing.Collection[flow.Symbol]) -> None: - dag = self.Dag(symbols) + @classmethod + def run(cls, symbols: typing.Collection[flow.Symbol], **kwargs) -> None: + dag = cls.Dag(symbols) LOGGER.debug('Dask DAG: %s', dag) - importlib.import_module(f'{dask.__name__}.{self._scheduler}').get(dag, dag.output) + scheduler = kwargs.get('scheduler') or cls.SCHEDULER + importlib.import_module(f'{dask.__name__}.{scheduler}').get(dag, dag.output) diff --git a/forml/provider/runner/graphviz.py b/forml/provider/runner/graphviz.py index 37bf8539..1eaf149d 100644 --- a/forml/provider/runner/graphviz.py +++ b/forml/provider/runner/graphviz.py @@ -38,16 +38,16 @@ class Runner(runtime.Runner, alias='graphviz'): For better readability, the runner is using the following shapes to plot the different objects: - ============= =========================================== - Shape Meaning - ============= =========================================== - Square box Actor in train mode. - Round box Actor in apply mode. - Ellipse System actor for output port selection. - Cylinder System actor for state persistence. - Solid edge Data transfer. - Dotted edge State transfer. - ============= =========================================== + =========== ======================================= + Shape Meaning + =========== ======================================= + Square box Actor in train mode. + Round box Actor in apply mode. + Ellipse System actor for output port selection. + Cylinder System actor for state persistence. + Solid edge Data transfer. + Dotted edge State transfer. + =========== ======================================= Args: filepath: Target path for producing the DOT file. @@ -86,13 +86,11 @@ def __init__( view: bool = True, **options: typing.Any, ): - super().__init__(instance, feed, sink) - self._filepath: pathlib.Path = pathlib.Path(filepath or self.FILEPATH) - self._view: bool = view - self._options: typing.Mapping[str, typing.Any] = self.OPTIONS | options + super().__init__(instance, feed, sink, filepath=filepath, view=view, options=options) - def _run(self, symbols: typing.Collection[flow.Symbol]) -> None: - dot: grviz.Digraph = grviz.Digraph(**self._options) + @classmethod + def run(cls, symbols: typing.Collection[flow.Symbol], **kwargs) -> None: + dot: grviz.Digraph = grviz.Digraph(**(cls.OPTIONS | kwargs['options'])) for sym in symbols: nodekw = dict(shape='ellipse') outkw = dict(style='solid') @@ -111,4 +109,4 @@ def _run(self, symbols: typing.Collection[flow.Symbol]) -> None: ): inkw.update(style='dotted') dot.edge(repr(id(arg)), repr(id(sym.instruction)), label=repr(idx), **inkw) - dot.render(self._filepath, view=self._view) + dot.render(pathlib.Path(kwargs['filepath'] or cls.FILEPATH), view=kwargs['view']) diff --git a/forml/provider/runner/pyfunc.py b/forml/provider/runner/pyfunc.py index 242af062..0b61ae93 100644 --- a/forml/provider/runner/pyfunc.py +++ b/forml/provider/runner/pyfunc.py @@ -283,7 +283,8 @@ def train(self, lower: typing.Optional[dsl.Native] = None, upper: typing.Optiona def tune(self, lower: typing.Optional[dsl.Native] = None, upper: typing.Optional[dsl.Native] = None) -> None: raise forml.InvalidError('Invalid runner mode') - def _run(self, symbols: typing.Collection[flow.Symbol]) -> None: + @classmethod + def run(cls, symbols: typing.Collection[flow.Symbol], **kwargs) -> None: Expression(symbols)(None) def call(self, entry: layout.Entry) -> layout.Outcome: diff --git a/forml/runtime/_agent.py b/forml/runtime/_agent.py index 2b9f0785..cfa301d7 100644 --- a/forml/runtime/_agent.py +++ b/forml/runtime/_agent.py @@ -43,13 +43,14 @@ class Runner(provider.Service, default=setup.Runner.default, path=setup.Runner.p The public API allows to perform all the standard actions of the :doc:`ForML lifecycles `. - All that needs to be supplied by the provider is the abstract :meth:`_run` method. + All that needs to be supplied by the provider is the abstract :meth:`run` method. Args: instance: Particular instance of the persistent artifacts to be executed. feed: Optional input feed instance to retrieve the data from (falls back to the default configured feed). sink: Output sink instance (no output is produced if omitted). + kwargs: Additional keyword arguments for the :meth:`run` method. """ _METRIC_SCHEMA = dsl.Schema.from_fields(dsl.Field(dsl.Float(), name='Metric')) @@ -59,11 +60,12 @@ def __init__( instance: typing.Optional['asset.Instance'] = None, feed: typing.Optional['io.Feed'] = None, sink: typing.Optional['io.Sink'] = None, - **_, + **kwargs, ): self._instance: 'asset.Instance' = instance or assetmod.Instance() self._feed: 'io.Feed' = feed or iomod.Feed() self._sink: typing.Optional['io.Sink'] = sink + self._kwargs: typing.Mapping[str, typing.Any] = kwargs def train(self, lower: typing.Optional[dsl.Native] = None, upper: typing.Optional[dsl.Native] = None) -> None: """Run the training code. @@ -181,14 +183,17 @@ def _exec(self, segment: 'flow.Segment', assets: typing.Optional['asset.State'] Returns: Optional return value. """ - return self._run(flowmod.compile(segment, assets)) + return self.run(flowmod.compile(segment, assets), **self._kwargs) + @classmethod @abc.abstractmethod - def _run(self, symbols: typing.Collection['flow.Symbol']) -> None: + def run(cls, symbols: typing.Collection['flow.Symbol'], **kwargs) -> None: """Actual run action implementation using the specific provider execution technology. Args: symbols: Collection of portable symbols representing the workflow task graph to be executed as produced by the :func:`flow.compile() ` function. + kwargs: Custom keyword arguments provided via the constructor. """ + raise NotImplementedError() diff --git a/forml/runtime/_perf.py b/forml/runtime/_perf.py index dfdef06a..16906153 100644 --- a/forml/runtime/_perf.py +++ b/forml/runtime/_perf.py @@ -21,7 +21,7 @@ class Stats(typing.NamedTuple): - """Runtime performance metrics. + """Runtime performance metrics report. Todo: Complete the Stats concept. diff --git a/forml/runtime/_service/__init__.py b/forml/runtime/_service/__init__.py index 557dce1c..b75c8301 100644 --- a/forml/runtime/_service/__init__.py +++ b/forml/runtime/_service/__init__.py @@ -21,7 +21,9 @@ import logging import typing -from forml import io, provider, setup +from forml import io, provider +from forml import runtime as runmod +from forml import setup from forml.io import asset, layout from .. import _perf @@ -30,7 +32,7 @@ if typing.TYPE_CHECKING: import asyncio - from forml import runtime + from forml import runtime # pylint: disable=reimported LOGGER = logging.getLogger(__name__) @@ -56,7 +58,14 @@ def shutdown(self): self._dealer.shutdown() async def stats(self) -> 'runtime.Stats': - """Get the collected stats report.""" + """Get the collected stats report. + + Returns: + Performance metrics report. + + Todo: Implement true stats collection. + """ + return runmod.Stats() async def apply(self, application: str, request: layout.Request) -> layout.Response: """Engine predict entrypoint. @@ -77,11 +86,12 @@ class Gateway(provider.Service, default=setup.Gateway.default, path=setup.Gatewa """Top-level serving gateway abstraction. Args: - inventory: Inventory of applications to be served. - registry: Model registry of project artifacts to be served. - feeds: Feeds to be used for potential feature augmentation. + inventory: Inventory of applications to be served (default as per platform config). + registry: Model registry of project artifacts to be served (default as per platform config). + feeds: Feeds to be used for potential feature augmentation (default as per platform config). processes: Process pool size for each model sandbox. - loop: Explicit even loop instance. + loop: Explicit event loop instance. + kwargs: Additional serving loop keyword arguments passed to the :meth:`run` method. """ def __init__( @@ -91,7 +101,7 @@ def __init__( feeds: typing.Optional['io.Importer'] = None, processes: typing.Optional[int] = None, loop: typing.Optional['asyncio.AbstractEventLoop'] = None, - **_, + **kwargs, ): if not inventory: inventory = asset.Inventory() @@ -100,19 +110,22 @@ def __init__( if not feeds: feeds = io.Importer(io.Feed()) self._engine: Engine = Engine(inventory, registry, feeds, processes=processes, loop=loop) + self._kwargs: typing.Mapping[str, typing.Any] = kwargs def __enter__(self): - self.run(self._engine.apply, self._engine.stats) + self.run(self._engine.apply, self._engine.stats, **self._kwargs) return self def __exit__(self, exc_type, exc_val, exc_tb): self._engine.shutdown() + @classmethod @abc.abstractmethod def run( - self, + cls, apply: typing.Callable[[str, 'layout.Request'], typing.Awaitable['layout.Response']], stats: typing.Callable[[], typing.Awaitable['runtime.Stats']], + **kwargs, ) -> None: """Serving loop implementation. @@ -121,7 +134,9 @@ def run( The handler expects two parameters - the target *application name* and the *prediction request*. stats: Stats producer callback provided by the engine. + kwargs: Additional keyword arguments provided via the constructor. """ + raise NotImplementedError() def main(self) -> None: """Frontend main method.""" diff --git a/forml/runtime/_service/dispatch.py b/forml/runtime/_service/dispatch.py index c994dda0..7726a3b7 100644 --- a/forml/runtime/_service/dispatch.py +++ b/forml/runtime/_service/dispatch.py @@ -79,7 +79,7 @@ class Frozen(asset.Registry): ERROR = TypeError('Frozen registry is immutable') def __init__(self, registry: asset.Registry): - super().__init__() + super().__init__(staging=asset.TMPDIR) # staging is irrelevant - only to avoid warning self._registry: asset.Registry = registry def __hash__(self): diff --git a/forml/runtime/_service/prediction.py b/forml/runtime/_service/prediction.py index 01283f0e..ccb46768 100644 --- a/forml/runtime/_service/prediction.py +++ b/forml/runtime/_service/prediction.py @@ -26,6 +26,7 @@ from concurrent import futures from multiprocessing import context +import forml from forml import io from forml.io import asset, dsl, layout from forml.provider.runner import pyfunc @@ -129,6 +130,8 @@ def run(self) -> None: continue try: self._results.put_nowait(task.success(self._runner.call(task.entry))) + except forml.AnyError as err: + self._results.put_nowait(task.failure(err)) except Exception as err: self._results.put_nowait(task.failure(err)) self._stopped.set() diff --git a/forml/setup/_conf/__init__.py b/forml/setup/_conf.py similarity index 97% rename from forml/setup/_conf/__init__.py rename to forml/setup/_conf.py index fd17c27a..1fcfde00 100644 --- a/forml/setup/_conf/__init__.py +++ b/forml/setup/_conf.py @@ -26,6 +26,7 @@ import tempfile import types import typing +from logging import handlers import tomli @@ -264,6 +265,7 @@ def _lookup(cls, reference: typing.Iterable[str]) -> typing.Sequence[Section]: return tuple(sorted(cls(r) for r in reference)) +SECTION_LOGGING = 'LOGGING' SECTION_PLATFORM = 'PLATFORM' SECTION_REGISTRY = 'REGISTRY' SECTION_FEED = 'FEED' @@ -272,7 +274,8 @@ def _lookup(cls, reference: typing.Iterable[str]) -> typing.Sequence[Section]: SECTION_INVENTORY = 'INVENTORY' SECTION_GATEWAY = 'GATEWAY' SECTION_TESTING = 'TESTING' -OPT_LOGCFG = 'logcfg' +OPT_CONFIG = 'config' +OPT_FACILITY = 'facility' OPT_TMPDIR = 'tmpdir' OPT_PROVIDER = 'provider' OPT_PRIORITY = 'priority' @@ -287,9 +290,21 @@ def _lookup(cls, reference: typing.Iterable[str]) -> typing.Sequence[Section]: OPT_APPLY = 'apply' OPT_EVAL = 'eval' +APPNAME = 'forml' +SYSDIR = pathlib.Path('/etc') / APPNAME +USRDIR = pathlib.Path(os.getenv(f'{APPNAME.upper()}_HOME', pathlib.Path.home() / f'.{APPNAME}')) +PATH = pathlib.Path(__file__).parent, SYSDIR, USRDIR +APPCFG = 'config.toml' +PRJNAME = re.sub(r'\.[^.]*$', '', pathlib.Path(sys.argv[0]).name) + DEFAULTS = { # all static defaults should go rather to the ./config.toml (in this package) OPT_TMPDIR: tempfile.gettempdir(), + SECTION_LOGGING: { + OPT_CONFIG: 'logging.ini', + OPT_FACILITY: handlers.SysLogHandler.LOG_USER, + OPT_PATH: f'./{PRJNAME}.log', + }, SECTION_REGISTRY: {OPT_PATH: [registry.__name__]}, SECTION_RUNNER: {OPT_PATH: [runner.__name__]}, SECTION_FEED: {OPT_PATH: [feed.__name__]}, @@ -298,13 +313,6 @@ def _lookup(cls, reference: typing.Iterable[str]) -> typing.Sequence[Section]: SECTION_GATEWAY: {OPT_PATH: [gateway.__name__]}, } -APPNAME = 'forml' -SYSDIR = pathlib.Path('/etc') / APPNAME -USRDIR = pathlib.Path(os.getenv(f'{APPNAME.upper()}_HOME', pathlib.Path.home() / f'.{APPNAME}')) -PATH = pathlib.Path(__file__).parent, SYSDIR, USRDIR -APPCFG = 'config.toml' -PRJNAME = re.sub(r'\.[^.]*$', '', pathlib.Path(sys.argv[0]).name) - CONFIG = Config(DEFAULTS, *(p / APPCFG for p in PATH)) tmpdir = CONFIG[OPT_TMPDIR] diff --git a/forml/setup/_logging.py b/forml/setup/_logging.py index 5e56a99f..0197f008 100644 --- a/forml/setup/_logging.py +++ b/forml/setup/_logging.py @@ -23,21 +23,23 @@ import logging as logmod import pathlib import typing -from logging import config, handlers +from logging import config from . import _conf LOGGER = logmod.getLogger(__name__) -DEFAULTS = dict(prj_name=_conf.PRJNAME, log_facility=handlers.SysLogHandler.LOG_USER, log_path=f'./{_conf.PRJNAME}.log') def logging(*path: pathlib.Path, **defaults: typing.Any): """Setup logger according to the params.""" - parser = configparser.ConfigParser(DEFAULTS | defaults) + parser = configparser.ConfigParser(_conf.CONFIG[_conf.SECTION_LOGGING] | defaults) tried = set() used = parser.read( p - for p in ((b / _conf.CONFIG[_conf.OPT_LOGCFG]).resolve() for b in itertools.chain(_conf.PATH, path)) + for p in ( + (b / _conf.CONFIG[_conf.SECTION_LOGGING][_conf.OPT_CONFIG]).resolve() + for b in itertools.chain(_conf.PATH, path) + ) if not (p in tried or tried.add(p)) ) config.fileConfig(parser, disable_existing_loggers=True) diff --git a/forml/setup/_run/__init__.py b/forml/setup/_run/__init__.py index 6e1c09ea..2d4520ad 100644 --- a/forml/setup/_run/__init__.py +++ b/forml/setup/_run/__init__.py @@ -37,6 +37,7 @@ class Scope(typing.NamedTuple): config: typing.Optional[str] loglevel: typing.Optional[str] + logfile: typing.Optional[str] @staticmethod def print(listing: typing.Iterable[typing.Any]) -> None: @@ -55,23 +56,29 @@ def print(listing: typing.Iterable[typing.Any]) -> None: @click.group(name='forml') -@click.option('--config', '-C', type=click.Path(exists=True, file_okay=True), help='Additional config file.') +@click.option( + '--config', '-C', type=click.Path(exists=True, file_okay=True, dir_okay=False), help='Additional config file.' +) @click.option( '--loglevel', '-L', type=click.Choice(['debug', 'info', 'warning', 'error'], case_sensitive=False), help='Global loglevel to use.', ) +@click.option('--logfile', type=click.Path(file_okay=True, dir_okay=False, writable=True), help='Logfile path.') @click.pass_context def group( context: core.Context, config: typing.Optional[str], loglevel: typing.Optional[str], # pylint: disable=unused-argument + logfile: typing.Optional[str], ): """Lifecycle Management for Datascience Projects.""" if config: _conf.CONFIG.read(config) - context.obj = Scope(config, loglevel) + if logfile: + _conf.CONFIG.update({_conf.SECTION_LOGGING: {_conf.OPT_PATH: logfile}}) + context.obj = Scope(config, loglevel, logfile) group.add_command(model.group) diff --git a/forml/setup/_conf/config.toml b/forml/setup/config.toml similarity index 97% rename from forml/setup/_conf/config.toml rename to forml/setup/config.toml index 84457e63..7c229768 100644 --- a/forml/setup/_conf/config.toml +++ b/forml/setup/config.toml @@ -15,7 +15,8 @@ # specific language governing permissions and limitations # under the License. -logcfg = "logging.ini" +[LOGGING] +config = "logging.ini" [RUNNER] default = "dask" diff --git a/forml/setup/_conf/logging.ini b/forml/setup/logging.ini similarity index 88% rename from forml/setup/_conf/logging.ini rename to forml/setup/logging.ini index 863aec6e..2d5d5e0f 100644 --- a/forml/setup/_conf/logging.ini +++ b/forml/setup/logging.ini @@ -16,7 +16,7 @@ # under the License. [loggers] -keys=root, forml, application, cli, conf, extension, flow, io, pipeline, project, runtime, testing +keys=root, forml, application, evaluation, flow, io, pipeline, project, provider, runtime, setup, testing [formatters] keys=simple, verbose @@ -42,7 +42,7 @@ args=(sys.stderr,) class=handlers.RotatingFileHandler level=DEBUG formatter=verbose -args=('%(log_path)s', 'a', 1048576, 4) +args=('%(path)s', 'a', 1048576, 4) [logger_root] level=INFO @@ -61,22 +61,10 @@ handlers=console, file qualname=forml.application propagate=0 -[logger_cli] +[logger_evaluation] level=INFO handlers=console, file -qualname=forml.cli -propagate=0 - -[logger_conf] -level=INFO -handlers=console, file -qualname=forml.conf -propagate=0 - -[logger_extension] -level=INFO -handlers=console, file -qualname=forml.extension +qualname=forml.evaluation propagate=0 [logger_flow] @@ -103,12 +91,24 @@ handlers=console, file qualname=forml.project propagate=0 +[logger_provider] +level=INFO +handlers=console, file +qualname=forml.provider +propagate=0 + [logger_runtime] level=INFO handlers=console, file qualname=forml.runtime propagate=0 +[logger_setup] +level=INFO +handlers=console, file +qualname=forml.setup +propagate=0 + [logger_testing] level=INFO handlers=console, file diff --git a/setup.py b/setup.py index 32165d5b..7ca65327 100644 --- a/setup.py +++ b/setup.py @@ -68,7 +68,7 @@ maintainer_email='forml-dev@googlegroups.com', license='Apache License 2.0', packages=setuptools.find_packages(include=['forml*'], where=os.path.dirname(__file__)), - package_data={'forml.conf': ['config.toml', 'logging.ini']}, + package_data={'forml.setup': ['config.toml', 'logging.ini']}, setup_requires=['setuptools', 'wheel', 'tomli'], install_requires=[ 'click', @@ -98,7 +98,7 @@ }, python_requires='>=3.9', classifiers=[ - 'Development Status :: 3 - Alpha', + 'Development Status :: 4 - Beta', 'Environment :: Console', 'Intended Audience :: Developers', 'Intended Audience :: Science/Research', diff --git a/tests/provider/registry/test_mlflow.py b/tests/provider/registry/test_mlflow.py index 8722fe46..631c7e65 100644 --- a/tests/provider/registry/test_mlflow.py +++ b/tests/provider/registry/test_mlflow.py @@ -77,7 +77,7 @@ def constructor( uri = f'http://{cls.MLFLOW_HOST}:{cls.MLFLOW_PORT}' while server.poll() is None: # wait for successful startup try: - if requests.head(uri).headers.get('server') == cls.GUNISRV_HEADER: + if requests.head(uri, timeout=5).headers.get('server') == cls.GUNISRV_HEADER: break except exceptions.RequestException: pass diff --git a/tutorials/config.toml b/tutorials/config.toml index 77708d39..a3738bbf 100644 --- a/tutorials/config.toml +++ b/tutorials/config.toml @@ -51,3 +51,11 @@ default = "tutorial" [INVENTORY.tutorial] provider = "posix" path = "/tmp/forml-tutorial/inventory" + +[GATEWAY] +default = "http" + +[GATEWAY.http] +provider = "rest" +port = 8080 +processes = 2 diff --git a/tutorials/titanic/application.py b/tutorials/titanic/application.py index dedb5d9a..513c62a6 100644 --- a/tutorials/titanic/application.py +++ b/tutorials/titanic/application.py @@ -14,8 +14,19 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +Titanic application descriptor. -"""Titanic application descriptor.""" +Using the basic ready-to-use application.Generic descriptor provides +the following features: + +* loading the *latest* model generation of the project *matching* + the application name +* attempting to decode the payload using any of the available decoders + based on the *declared content-type* +* returning the predictions encoded using any of the available encoders + based on the *requested content-type* +""" from forml import application