From bed7ba3f311b60f6bad21f10804b2e3fd13deacc Mon Sep 17 00:00:00 2001 From: leosmerling-hopeit <61629371+leosmerling-hopeit@users.noreply.github.com> Date: Wed, 20 Mar 2024 16:14:01 +0100 Subject: [PATCH] feature(plugin): dataframes plugin (#177) * Plugin structure Working @dataframe --- .../hopeit-engine-pypi-publishing.yml | 8 + .vscode/launch.json | 16 + Makefile | 9 +- README.md | 10 + apps/build/ci-static-apps.sh | 8 + apps/examples/client-example/api/openapi.json | 24 +- .../api/create_openapi_file.sh | 8 + .../dataframes-example/api/openapi.json | 1339 +++++++++++++++++ .../dataframes-example/config/app-config.json | 56 + apps/examples/dataframes-example/setup.py | 28 + .../src/dataframes_example/__init__.py | 0 .../dataframes_example/experiment_storage.py | 38 + .../src/dataframes_example/iris.py | 86 ++ .../src/dataframes_example/model_storage.py | 59 + .../src/dataframes_example/predict.py | 55 + .../src/dataframes_example/prepare_data.py | 49 + .../src/dataframes_example/settings.py | 18 + .../src/dataframes_example/setup_storage.py | 14 + .../src/dataframes_example/train_model.py | 171 +++ apps/examples/simple-example/api/openapi.json | 86 +- apps/examples/simple-example/setup.py | 1 - docs/source/release-notes.rst | 7 + engine/requirements-dev.txt | 9 +- engine/setup.py | 3 + engine/src/hopeit/server/serialization.py | 45 +- engine/src/hopeit/server/version.py | 2 +- engine/src/hopeit/server/web.py | 40 +- engine/test/unit/server/test_serialization.py | 287 +++- plugins/build/ci-static-plugins.sh | 13 + plugins/build/ci-test-plugins.sh | 6 + plugins/data/dataframes/README.md | 28 + .../data/dataframes/config/plugin-config.json | 23 + plugins/data/dataframes/setup.py | 57 + .../src/hopeit/dataframes/__init__.py | 109 ++ .../src/hopeit/dataframes/dataframe.py | 242 +++ .../src/hopeit/dataframes/dataframeobject.py | 184 +++ .../dataframes/src/hopeit/dataframes/py.typed | 0 .../dataframes/serialization/__init__.py | 0 .../dataframes/serialization/dataset.py | 28 + .../hopeit/dataframes/serialization/files.py | 116 ++ .../dataframes/serialization/settings.py | 14 + .../src/hopeit/dataframes/setup/__init__.py | 0 .../src/hopeit/dataframes/setup/dataframes.py | 52 + .../dataframes/test/integration/conftest.py | 103 ++ .../test/integration/test_dataframes_api.py | 63 + .../integration/test_dataframes_imports.py | 22 + plugins/ops/apps-visualizer/api/openapi.json | 22 +- plugins/storage/fs/test/unit/test_fs.py | 1 - .../src/hopeit/redis_streams/__init__.py | 12 +- 49 files changed, 3394 insertions(+), 177 deletions(-) create mode 100644 apps/examples/dataframes-example/api/create_openapi_file.sh create mode 100644 apps/examples/dataframes-example/api/openapi.json create mode 100644 apps/examples/dataframes-example/config/app-config.json create mode 100644 apps/examples/dataframes-example/setup.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/__init__.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/experiment_storage.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/iris.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/model_storage.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/predict.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/prepare_data.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/settings.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/setup_storage.py create mode 100644 apps/examples/dataframes-example/src/dataframes_example/train_model.py create mode 100644 plugins/data/dataframes/README.md create mode 100644 plugins/data/dataframes/config/plugin-config.json create mode 100644 plugins/data/dataframes/setup.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/__init__.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/dataframe.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/dataframeobject.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/py.typed create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/serialization/__init__.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/serialization/dataset.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/serialization/files.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/serialization/settings.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/setup/__init__.py create mode 100644 plugins/data/dataframes/src/hopeit/dataframes/setup/dataframes.py create mode 100644 plugins/data/dataframes/test/integration/conftest.py create mode 100644 plugins/data/dataframes/test/integration/test_dataframes_api.py create mode 100644 plugins/data/dataframes/test/integration/test_dataframes_imports.py diff --git a/.github/workflows/hopeit-engine-pypi-publishing.yml b/.github/workflows/hopeit-engine-pypi-publishing.yml index 04bcb484..646206f8 100644 --- a/.github/workflows/hopeit-engine-pypi-publishing.yml +++ b/.github/workflows/hopeit-engine-pypi-publishing.yml @@ -50,6 +50,9 @@ jobs: - name: make plugin apps-client run: | make PLUGINFOLDER=plugins/clients/apps-client/ dist-plugin + - name: make plugin dataframes + run: | + make PLUGINFOLDER=plugins/data/dataframes/ dist-plugin - name: Publish hopeit.engine on PyPI env: @@ -91,3 +94,8 @@ jobs: PYPI_API_TOKEN: ${{ secrets.PYPI_API_TOKEN }} run: | make PLUGINFOLDER=plugins/clients/apps-client pypi-plugin + - name: Publish plugin dataframes on PyPI + env: + PYPI_API_TOKEN: ${{ secrets.PYPI_API_TOKEN }} + run: | + make PLUGINFOLDER=plugins/data/dataframes pypi-plugin \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index 45c8e1d1..51a10d58 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -45,6 +45,22 @@ "cwd": "${workspaceFolder}", "console": "integratedTerminal" }, + { + "name": "dataframes-example", + "type": "python", + "request": "launch", + "module": "hopeit.server.web", + "env": { + }, + "args": [ + "--port=8040", + "--config-files=engine/config/dev-local.json,plugins/auth/basic-auth/config/plugin-config.json,plugins/ops/config-manager/config/plugin-config.json,plugins/data/dataframes/config/plugin-config.json,apps/examples/dataframes-example/config/app-config.json", + "--api-file=apps/examples/dataframes-example/api/openapi.json", + "--start-streams", + ], + "cwd": "${workspaceFolder}", + "console": "integratedTerminal" + }, { "name": "apps-visualizer", "type": "python", diff --git a/Makefile b/Makefile index 9e2cfec9..c30e60e4 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,10 @@ install-plugin: cd $(PLUGINFOLDER) && \ pip install -U -e . +install-plugin-extras: + cd $(PLUGINFOLDER) && \ + pip install -U -e .[$(PLUGINEXTRAS)] + qa: test check echo "DONE." @@ -104,6 +108,7 @@ pypi-test-plugin: update-examples-api: install-examples bash apps/examples/simple-example/api/create_openapi_file.sh && \ bash apps/examples/client-example/api/create_openapi_file.sh && \ + bash apps/examples/dataframes-example/api/create_openapi_file.sh && \ bash plugins/ops/apps-visualizer/api/create_openapi_file.sh install-examples: @@ -116,8 +121,10 @@ install-examples: make PLUGINFOLDER=plugins/ops/apps-visualizer install-plugin && \ make PLUGINFOLDER=plugins/auth/basic-auth install-plugin && \ make PLUGINFOLDER=plugins/clients/apps-client install-plugin && \ + make PLUGINFOLDER=plugins/data/dataframes PLUGINEXTRAS=pyarrow install-plugin-extras && \ make APPFOLDER=apps/examples/simple-example install-app && \ - make APPFOLDER=apps/examples/client-example install-app + make APPFOLDER=apps/examples/client-example install-app && \ + make APPFOLDER=apps/examples/dataframes-example install-app run-simple-example: export PYTHONPATH=apps/examples/simple-example/src && \ diff --git a/README.md b/README.md index f2ffcb11..4ace74f1 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,16 @@ Python library that exposes helpers to persist data objects to Redis. pip install hopeit.engine[redis-storage] ``` +**hopeit.dataframes** + +Plugin to support working with pandas dataframes as they were objects, +supporting them as web request and respose payloads and transferring them +through streams. + +``` +pip install hopeit.engine[dataframes] +``` + ### Motivation diff --git a/apps/build/ci-static-apps.sh b/apps/build/ci-static-apps.sh index e2b6857e..05c65fc4 100644 --- a/apps/build/ci-static-apps.sh +++ b/apps/build/ci-static-apps.sh @@ -27,6 +27,14 @@ code+=$? python3 -m pylint apps/examples/client-example/src/client_example/ code+=$? +echo "apps/dataframes-example" +export MYPYPATH=engine/src/:plugins/storage/fs/src/:plugins/data/dataframes/src/:apps/examples/simple-example/src/:apps/examples/dataframes-example/src/ && python3 -m mypy --namespace-packages -p dataframes_example +code+=$? +python3 -m flake8 --max-line-length=120 apps/examples/dataframes-example/src/ +code+=$? +python3 -m pylint apps/examples/dataframes-example/src/dataframes_example/ +code+=$? + if [ $code -gt 0 ] then echo "[FAILED] CI STATIC ANALYSIS" diff --git a/apps/examples/client-example/api/openapi.json b/apps/examples/client-example/api/openapi.json index c9fa11e0..0c85456b 100644 --- a/apps/examples/client-example/api/openapi.json +++ b/apps/examples/client-example/api/openapi.json @@ -1,12 +1,12 @@ { "openapi": "3.0.3", "info": { - "version": "0.23", + "version": "0.24", "title": "Client Example", "description": "Client Example" }, "paths": { - "/api/config-manager/0x23/runtime-apps-config": { + "/api/config-manager/0x24/runtime-apps-config": { "get": { "summary": "Config Manager: Runtime Apps Config", "description": "Returns the runtime config for the Apps running on this server", @@ -62,11 +62,11 @@ } }, "tags": [ - "config_manager.0x23" + "config_manager.0x24" ] } }, - "/api/config-manager/0x23/cluster-apps-config": { + "/api/config-manager/0x24/cluster-apps-config": { "get": { "summary": "Config Manager: Cluster Apps Config", "description": "Handle remote access to runtime configuration for a group of hosts", @@ -122,11 +122,11 @@ } }, "tags": [ - "config_manager.0x23" + "config_manager.0x24" ] } }, - "/api/client-example/0x23/call-unsecured": { + "/api/client-example/0x24/call-unsecured": { "get": { "summary": "Client Example: Call Unsecured", "description": "List all available Something objects connecting to simple-example app", @@ -196,11 +196,11 @@ } }, "tags": [ - "client_example.0x23" + "client_example.0x24" ] } }, - "/api/client-example/0x23/count-and-save": { + "/api/client-example/0x24/count-and-save": { "get": { "summary": "Client Example: Count Objects and Save new one", "description": "Count all available Something objects connecting to simple-example app", @@ -267,7 +267,7 @@ } }, "tags": [ - "client_example.0x23" + "client_example.0x24" ], "security": [ { @@ -276,7 +276,7 @@ ] } }, - "/api/client-example/0x23/handle-responses": { + "/api/client-example/0x24/handle-responses": { "get": { "summary": "Client Example: Handle Responses", "description": "Non default responses and UnhandledResponse exception\n\nTo manage different types of responses from the same endpoint we can use the `responses` parameter where we list the\nhttp response status codes expected and the corresponding data type for each one. In this example `app_call` expect\nand handle, 200 and 404 responses.\n\nAlso in the code you can see how to handle an expection of type `UnhandledResponse` and log as warining.", @@ -361,7 +361,7 @@ } }, "tags": [ - "client_example.0x23" + "client_example.0x24" ], "security": [ { @@ -835,7 +835,7 @@ }, "engine_version": { "type": "string", - "default": "0.23.0" + "default": "0.24.0" } }, "x-module-name": "hopeit.server.config", diff --git a/apps/examples/dataframes-example/api/create_openapi_file.sh b/apps/examples/dataframes-example/api/create_openapi_file.sh new file mode 100644 index 00000000..bf3aa4e8 --- /dev/null +++ b/apps/examples/dataframes-example/api/create_openapi_file.sh @@ -0,0 +1,8 @@ +#!/bin/bash +export PYTHONPATH=./apps/examples/dataframes-example/src && \ +hopeit_openapi create \ +--title="Dataframes Example" \ +--description="Dataframes Example" \ +--api-version="$(python -m hopeit.server.version APPS_API_VERSION)" \ +--config-files=engine/config/dev-local.json,plugins/auth/basic-auth/config/plugin-config.json,plugins/ops/config-manager/config/plugin-config.json,plugins/data/dataframes/config/plugin-config.json,apps/examples/dataframes-example/config/app-config.json \ +--output-file=apps/examples/dataframes-example/api/openapi.json diff --git a/apps/examples/dataframes-example/api/openapi.json b/apps/examples/dataframes-example/api/openapi.json new file mode 100644 index 00000000..b4f6912c --- /dev/null +++ b/apps/examples/dataframes-example/api/openapi.json @@ -0,0 +1,1339 @@ +{ + "openapi": "3.0.3", + "info": { + "version": "0.24", + "title": "Dataframes Example", + "description": "Dataframes Example" + }, + "paths": { + "/api/basic-auth/0x24/decode": { + "get": { + "summary": "Basic Auth: Decode", + "description": "Returns decoded auth info", + "parameters": [ + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "Information extracted from token", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ContextUserInfo" + } + } + } + } + }, + "tags": [ + "basic_auth.0x24" + ], + "security": [ + { + "auth.bearer": [] + } + ] + } + }, + "/api/config-manager/0x24/runtime-apps-config": { + "get": { + "summary": "Config Manager: Runtime Apps Config", + "description": "Returns the runtime config for the Apps running on this server", + "parameters": [ + { + "name": "url", + "in": "query", + "required": false, + "description": "URL used to reach this server, informative", + "schema": { + "type": "string" + } + }, + { + "name": "expand_events", + "in": "query", + "required": false, + "description": "Retrieve expanded effective events from event steps", + "schema": { + "type": "boolean" + } + }, + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "Config info about running apps in current process", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RuntimeApps" + } + } + } + } + }, + "tags": [ + "config_manager.0x24" + ] + } + }, + "/api/config-manager/0x24/cluster-apps-config": { + "get": { + "summary": "Config Manager: Cluster Apps Config", + "description": "Handle remote access to runtime configuration for a group of hosts", + "parameters": [ + { + "name": "hosts", + "in": "query", + "required": true, + "description": "Comma-separated list of http://host:port strings", + "schema": { + "type": "string" + } + }, + { + "name": "expand_events", + "in": "query", + "required": true, + "description": "Extract effective events from event steps", + "schema": { + "type": "boolean" + } + }, + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "Combined config info about running apps in provided list of hosts", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/RuntimeApps" + } + } + } + } + }, + "tags": [ + "config_manager.0x24" + ] + } + }, + "/api/dataframes-example/0x24/prepare-data": { + "get": { + "summary": "Prepare Data", + "description": "Prepare Data", + "parameters": [ + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "InputData", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InputData" + } + } + } + } + }, + "tags": [ + "dataframes_example.0x24" + ], + "security": [ + { + "auth.bearer": [] + } + ] + } + }, + "/api/dataframes-example/0x24/train-model": { + "post": { + "summary": "Training Pipeline", + "description": "Training Pipeline", + "parameters": [ + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "requestBody": { + "description": "Serialized dataframeobject with iris dataset", + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/InputData" + } + } + } + }, + "responses": { + "200": { + "description": "Iris", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Iris" + } + } + } + } + }, + "tags": [ + "dataframes_example.0x24" + ], + "security": [ + { + "auth.bearer": [] + } + ] + } + }, + "/api/dataframes-example/0x24/predict": { + "post": { + "summary": "Predict", + "description": "Predict", + "parameters": [ + { + "name": "experiment_id", + "in": "query", + "required": true, + "description": "experiment_id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "requestBody": { + "description": "Batch of prediction requests", + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/IrisBatchPredictionRequest" + } + } + } + }, + "responses": { + "200": { + "description": "List", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/IrisFeatures" + } + } + } + } + } + }, + "tags": [ + "dataframes_example.0x24" + ], + "security": [ + { + "auth.bearer": [] + } + ] + } + }, + "/api/dataframes-example/0x24/basic-auth/0x24/login": { + "get": { + "summary": "Basic Auth: Login", + "description": "Handles users login using basic-auth\nand generate access tokens for external services invoking apps\nplugged in with basic-auth plugin.", + "parameters": [ + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "Authentication information to be used for further API calls", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AuthInfo" + } + } + } + }, + "401": { + "description": "Login failed, invalid credentials", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorInfo" + } + } + } + } + }, + "tags": [ + "dataframes_example.0x24" + ], + "security": [ + { + "auth.basic": [] + } + ] + } + }, + "/api/dataframes-example/0x24/basic-auth/0x24/refresh": { + "get": { + "summary": "Basic Auth: Refresh", + "description": "This event can be used for obtain new access token and update refresh token (http cookie),\nwith no need to re-login the user if there is a valid refresh token active.", + "parameters": [ + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "Refreshed authentication information to be used for further API calls", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/AuthInfo" + } + } + } + }, + "401": { + "description": "Login failed, invalid credentials. An http-cookie is expected", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorInfo" + } + } + } + } + }, + "tags": [ + "dataframes_example.0x24" + ], + "security": [ + { + "dataframes_example.0x24.refresh": [] + } + ] + } + }, + "/api/dataframes-example/0x24/basic-auth/0x24/logout": { + "get": { + "summary": "Basic Auth: Logout", + "description": "Invalidates previous refresh cookies.", + "parameters": [ + { + "name": "X-Track-Request-Id", + "in": "header", + "required": false, + "description": "Track information: Request-Id", + "schema": { + "type": "string" + } + }, + { + "name": "X-Track-Request-Ts", + "in": "header", + "required": false, + "description": "Track information: Request-Ts", + "schema": { + "type": "string", + "format": "date-time" + } + } + ], + "responses": { + "200": { + "description": "Logged out message.", + "content": { + "application/json": { + "schema": { + "type": "object", + "required": [ + "logout" + ], + "properties": { + "logout": { + "type": "string" + } + }, + "description": "logout string payload" + } + } + } + }, + "401": { + "description": "Login failed, invalid credentials or not logged in.", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ErrorInfo" + } + } + } + } + }, + "tags": [ + "dataframes_example.0x24" + ], + "security": [ + { + "dataframes_example.0x24.refresh": [] + } + ] + } + } + }, + "components": { + "schemas": { + "ErrorInfo": { + "type": "object", + "required": [ + "msg", + "tb" + ], + "properties": { + "msg": { + "type": "string" + }, + "tb": { + "type": "array", + "items": { + "type": "string" + } + } + }, + "x-module-name": "hopeit.server.errors", + "description": "\n Error information to be returned in failed responses\n " + }, + "ContextUserInfo": { + "type": "object", + "required": [ + "id", + "user", + "email" + ], + "properties": { + "id": { + "type": "string" + }, + "user": { + "type": "string" + }, + "email": { + "type": "string" + } + }, + "x-module-name": "hopeit.basic_auth", + "description": "\n User info that will be available in context during events execution\n " + }, + "AuthInfo": { + "type": "object", + "required": [ + "access_token", + "token_type", + "renew" + ], + "properties": { + "access_token": { + "type": "string" + }, + "token_type": { + "type": "string" + }, + "renew": { + "type": "integer" + } + }, + "x-module-name": "hopeit.basic_auth", + "description": "\n Minimal auth info that should be returned outside this app\n " + }, + "RuntimeAppInfo": { + "type": "object", + "required": [ + "servers", + "app_config", + "effective_events" + ], + "properties": { + "servers": { + "type": "array", + "items": { + "$ref": "#/components/schemas/ServerInfo" + } + }, + "app_config": { + "$ref": "#/components/schemas/AppConfig" + }, + "effective_events": { + "type": "object", + "additionalProperties": { + "$ref": "#/components/schemas/EventDescriptor" + } + } + }, + "x-module-name": "hopeit.config_manager", + "description": "\n Application config information associated to servers at runtime\n " + }, + "ServerInfo": { + "type": "object", + "required": [ + "host_name", + "pid" + ], + "properties": { + "host_name": { + "type": "string" + }, + "pid": { + "type": "string" + }, + "url": { + "type": "string", + "default": "in-process" + } + }, + "x-module-name": "hopeit.config_manager", + "description": "\n Server info associated with runtime apps\n " + }, + "AppConfig": { + "type": "object", + "required": [ + "app" + ], + "properties": { + "app": { + "$ref": "#/components/schemas/AppDescriptor" + }, + "engine": { + "$ref": "#/components/schemas/AppEngineConfig", + "default": { + "import_modules": null, + "read_stream_timeout": 1000, + "read_stream_interval": 1000, + "default_stream_compression": "lz4", + "default_stream_serialization": "json+base64", + "track_headers": [ + "track.request_id", + "track.request_ts" + ], + "cors_origin": null + } + }, + "app_connections": { + "type": "object", + "additionalProperties": { + "$ref": "#/components/schemas/AppConnection" + }, + "default": {} + }, + "env": { + "type": "object", + "additionalProperties": { + "type": "object", + "additionalProperties": { + "anyOf": [ + { + "type": "boolean" + }, + { + "type": "integer" + }, + { + "type": "number" + }, + { + "type": "string" + } + ] + } + }, + "default": {} + }, + "events": { + "type": "object", + "additionalProperties": { + "$ref": "#/components/schemas/EventDescriptor" + }, + "default": {} + }, + "server": { + "$ref": "#/components/schemas/ServerConfig" + }, + "plugins": { + "type": "array", + "items": { + "$ref": "#/components/schemas/AppDescriptor" + }, + "default": [] + }, + "settings": { + "type": "object", + "additionalProperties": { + "type": "object" + }, + "default": {} + }, + "effective_settings": { + "type": "object", + "additionalProperties": { + "type": "object" + } + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n App Configuration container\n " + }, + "AppDescriptor": { + "type": "object", + "required": [ + "name", + "version" + ], + "properties": { + "name": { + "type": "string" + }, + "version": { + "type": "string" + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n App descriptor\n " + }, + "AppEngineConfig": { + "type": "object", + "properties": { + "import_modules": { + "type": "array", + "items": { + "type": "string" + } + }, + "read_stream_timeout": { + "type": "integer", + "default": 1000 + }, + "read_stream_interval": { + "type": "integer", + "default": 1000 + }, + "default_stream_compression": { + "type": "string", + "enum": [ + "none", + "lz4", + "lz4:0", + "lz4:16", + "zip", + "zip:1", + "zip:9", + "gzip", + "gzip:1", + "gzip:9", + "bz2", + "bz2:1", + "bz2:9", + "lzma" + ], + "x-enum-name": "Compression", + "x-module-name": "hopeit.app.config", + "default": "lz4" + }, + "default_stream_serialization": { + "type": "string", + "enum": [ + "json", + "json+base64", + "pickle:3", + "pickle:4", + "pickle:5" + ], + "x-enum-name": "Serialization", + "x-module-name": "hopeit.app.config", + "default": "json+base64" + }, + "track_headers": { + "type": "array", + "items": { + "type": "string" + }, + "default": [] + }, + "cors_origin": { + "type": "string" + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n Engine specific parameters shared among events\n\n :field import_modules: list of string with the python module names to import to find\n events and datatype implementations\n :field read_stream_timeout: timeout in milliseconds to block connection pool when waiting for stream events\n :field read_stream_interval: delay in milliseconds to wait before attempting a new batch. Use to prevent\n connection pool to be blocked constantly.\n :track_headers: list of required X-Track-* headers\n :cors_origin: allowed CORS origin for web server\n " + }, + "AppConnection": { + "type": "object", + "required": [ + "name", + "version" + ], + "properties": { + "name": { + "type": "string" + }, + "version": { + "type": "string" + }, + "client": { + "type": "string", + "default": "<>" + }, + "settings": { + "type": "string" + }, + "plugin_name": { + "type": "string" + }, + "plugin_version": { + "type": "string" + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n AppConnections: metadata to initialize app client in order to connect\n and issue requests to other running apps\n\n :field: name, str: target app name to connect to\n :field: version, str: target app version\n :field: client, str: hopeit.app.client.Client class implementation, from available client plugins\n :field: settings, optional str: key under `settings` section of app config containing connection configuration,\n if not specified, plugin will lookup its default section usually the plugin name. But in case multiple\n clients need to be configured, this value can be overridden.\n " + }, + "EventDescriptor": { + "type": "object", + "required": [ + "type" + ], + "properties": { + "type": { + "type": "string", + "enum": [ + "GET", + "POST", + "STREAM", + "SERVICE", + "MULTIPART", + "SETUP" + ], + "x-enum-name": "EventType", + "x-module-name": "hopeit.app.config" + }, + "plug_mode": { + "type": "string", + "enum": [ + "Standalone", + "OnApp" + ], + "x-enum-name": "EventPlugMode", + "x-module-name": "hopeit.app.config", + "default": "Standalone" + }, + "route": { + "type": "string" + }, + "impl": { + "type": "string" + }, + "connections": { + "type": "array", + "items": { + "$ref": "#/components/schemas/EventConnection" + }, + "default": [] + }, + "read_stream": { + "$ref": "#/components/schemas/ReadStreamDescriptor" + }, + "write_stream": { + "$ref": "#/components/schemas/WriteStreamDescriptor" + }, + "auth": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "Unsecured", + "Basic", + "Bearer", + "Refresh" + ], + "x-enum-name": "AuthType", + "x-module-name": "hopeit.server.config" + }, + "default": [] + }, + "setting_keys": { + "type": "array", + "items": { + "type": "string" + }, + "default": [] + }, + "dataobjects": { + "type": "array", + "items": { + "type": "string" + }, + "default": [] + }, + "group": { + "type": "string", + "default": "DEFAULT" + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n Event Descriptor: configures event implementation\n\n :field: type, EventType: type of event i.e.: GET, POST, MULTIPART, STREAM, SERVICE, SETUP\n :field: plug_mode, EventPlugMode: defines whether an event defined in a plugin is created in the\n current app (ON_APP) or it will be created in the original plugin (STANDALONE, default)\n :field: route, optional str: custom route for endpoint. If not specified route will be derived\n from `/api/app_name/app_version/event_name`\n :field: impl, optional str: custom event implementation Python module. If not specified, module\n with same same as event will be imported.\n :field: connections, list of EventConnection: specifies dependencies on other apps/endpoints,\n that can be used by client plugins to call events on external apps\n :field: read_stream, optional ReadStreamDescriptor: specifies source stream to read from.\n Valid only for STREAM events.\n :field: write_stream, optional WriteStreamDescriptor: for any type of events, resultant dataobjects will\n be published to the specified stream.\n :field: auth, list of AuthType: supported authentication schemas for this event. If not specified\n application default will be used.\n :field: setting_keys, list of str: by default EventContext will have access to the settings section\n with the same name of the event using `settings = context.settings(datatype=MySettingsType)`.\n In case additional sections are needed to be accessed from\n EventContext, then a list of setting keys, including the name of the event if needed,\n can be specified here. Then access to a `custom` key can be done using\n `custom_settings = context.settings(key=\"customer\", datatype=MyCustomSettingsType)`\n :field: dataobjects, list of str: list of full qualified dataobject types that this event can process.\n When not specified, the engine will inspect the module implementation and find all datatypes supported\n as payload in the functions defined as `__steps__`. In case of generic functions that support\n `payload: DataObject` argument, then a list of full qualified datatypes must be specified here.\n :field: group, str: group name, if none is assigned it is automatically assigned as 'DEFAULT'.\n " + }, + "EventConnection": { + "type": "object", + "required": [ + "app_connection", + "event", + "type" + ], + "properties": { + "app_connection": { + "type": "string" + }, + "event": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "GET", + "POST" + ], + "x-enum-name": "EventConnectionType", + "x-module-name": "hopeit.app.config" + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n EventConnection: describes dependencies on this event when calling\n event on apps configured in `app_connections` sections. Only events\n specified are allowed to be invoked using `hopeit.client`\n\n :field: app_connection, str: key of app entry used in app_connections sections\n :field: event, str: target event_name to be called\n :filed: type, EventConnectionType: a valid event connection type, i.e. GET or POST\n :field: route, optional str: custom route in case event is not attached to default `app/version/event`\n " + }, + "ReadStreamDescriptor": { + "type": "object", + "required": [ + "name", + "consumer_group" + ], + "properties": { + "name": { + "type": "string" + }, + "consumer_group": { + "type": "string" + }, + "queues": { + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "AUTO" + ] + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n Configuration to read streams\n\n :field stream_name: str, base stream name to read\n :consumer_group: str, consumer group to send to stream processing engine to keep track of\n next messag to consume\n :queues: List[str], list of queue names to poll from. Each queue act as separate stream\n with queue name used as stream name suffix, where `AUTO` queue name means to consume\n events when no queue where specified at publish time, allowing to consume message with different\n priorities without waiting for all events in the stream to be consumed.\n Queues specified in this entry will be consumed by this event\n on each poll cycle, on the order specified. If not present\n only AUTO queue will be consumed. Take into account that in applications using multiple\n queue names, in order to ensure all messages are consumed, all queue names should be listed\n here including AUTO, except that the app is intentionally designed for certain events to\n consume only from specific queues. This configuration is manual to allow consuming messages\n produced by external apps.\n " + }, + "WriteStreamDescriptor": { + "type": "object", + "required": [ + "name" + ], + "properties": { + "name": { + "type": "string" + }, + "queues": { + "type": "array", + "items": { + "type": "string" + }, + "default": [ + "AUTO" + ] + }, + "queue_strategy": { + "type": "string", + "enum": [ + "PROPAGATE", + "DROP" + ], + "x-enum-name": "StreamQueueStrategy", + "x-module-name": "hopeit.app.config", + "default": "DROP" + } + }, + "x-module-name": "hopeit.app.config", + "description": "\n Configuration to publish messages to a stream\n\n :field: name, str: stream name\n :field: queue, List[str], queue names to be used to publish to stream.\n Each queue act as separate stream with queue name used as stream name suffix,\n allowing to publish messages to i.e. a queue that will be consumed with priority,\n or to multiple queues that will be consumed by different readers.\n Queue suffix will be propagated through events, allowing an event in a defined queue\n and successive events in following steps to be consumed using same queue name.\n Notice that queue will be applied only to messages coming from default queue\n (where queue is not specified at intial message creation). Messages consumed\n from other queues will be published using same queue name as they have when consumed.\n :field queue_stategory: strategy to be used when consuming messages from a stream\n with a queue name and publishing to another stream. Default is `StreamQueueStrategy.DROP`,\n so in case of complex stream propagating queue names are configured,\n `StreamQueueStrategy.PROPAGATE` must be explicitly specified.\n " + }, + "ServerConfig": { + "type": "object", + "properties": { + "streams": { + "$ref": "#/components/schemas/StreamsConfig", + "default": { + "stream_manager": "hopeit.streams.NoStreamManager", + "connection_str": "<>", + "delay_auto_start_seconds": 3, + "initial_backoff_seconds": 1.0, + "max_backoff_seconds": 60.0, + "num_failures_open_circuit_breaker": 1 + } + }, + "logging": { + "$ref": "#/components/schemas/LoggingConfig", + "default": { + "log_level": "INFO", + "log_path": "logs/" + } + }, + "auth": { + "$ref": "#/components/schemas/AuthConfig", + "default": { + "secrets_location": ".secrets/", + "auth_passphrase": "", + "enabled": false, + "create_keys": false, + "domain": null, + "encryption_algorithm": "RS256", + "default_auth_methods": [ + "Unsecured" + ] + } + }, + "api": { + "$ref": "#/components/schemas/APIConfig", + "default": { + "docs_path": null + } + }, + "engine_version": { + "type": "string", + "default": "0.24.0" + } + }, + "x-module-name": "hopeit.server.config", + "description": "\n Server configuration\n " + }, + "StreamsConfig": { + "type": "object", + "properties": { + "stream_manager": { + "type": "string", + "default": "hopeit.streams.NoStreamManager" + }, + "connection_str": { + "type": "string", + "default": "<>" + }, + "delay_auto_start_seconds": { + "type": "integer", + "default": 3 + }, + "initial_backoff_seconds": { + "type": "number", + "default": 1.0 + }, + "max_backoff_seconds": { + "type": "number", + "default": 60.0 + }, + "num_failures_open_circuit_breaker": { + "type": "integer", + "default": 1 + } + }, + "x-module-name": "hopeit.server.config", + "description": "\n :field connection_str: str, url to connect to streams server: i.e. redis://localhost:6379\n if using redis stream manager plugin to connect locally\n " + }, + "LoggingConfig": { + "type": "object", + "properties": { + "log_level": { + "type": "string", + "default": "INFO" + }, + "log_path": { + "type": "string", + "default": "logs/" + } + }, + "x-module-name": "hopeit.server.config", + "description": "LoggingConfig(log_level: str = 'INFO', log_path: str = 'logs/')" + }, + "AuthConfig": { + "type": "object", + "required": [ + "secrets_location", + "auth_passphrase" + ], + "properties": { + "secrets_location": { + "type": "string" + }, + "auth_passphrase": { + "type": "string" + }, + "enabled": { + "type": "boolean", + "default": true + }, + "create_keys": { + "type": "boolean", + "default": false + }, + "domain": { + "type": "string" + }, + "encryption_algorithm": { + "type": "string", + "default": "RS256" + }, + "default_auth_methods": { + "type": "array", + "items": { + "type": "string", + "enum": [ + "Unsecured", + "Basic", + "Bearer", + "Refresh" + ], + "x-enum-name": "AuthType", + "x-module-name": "hopeit.server.config" + }, + "default": [] + } + }, + "x-module-name": "hopeit.server.config", + "description": "\n Server configuration to handle authorization tokens\n " + }, + "APIConfig": { + "type": "object", + "properties": { + "docs_path": { + "type": "string" + } + }, + "x-module-name": "hopeit.server.config", + "description": "\n Config for Open API docs page\n " + }, + "RuntimeApps": { + "type": "object", + "required": [ + "apps" + ], + "properties": { + "apps": { + "type": "object", + "additionalProperties": { + "$ref": "#/components/schemas/RuntimeAppInfo" + } + }, + "server_status": { + "type": "object", + "additionalProperties": { + "type": "string", + "enum": [ + "ALIVE", + "ERROR" + ], + "x-enum-name": "ServerStatus", + "x-module-name": "hopeit.config_manager" + }, + "default": {} + } + }, + "x-module-name": "hopeit.config_manager", + "description": "\n Combined App Config and Server Status information for running apps\n " + }, + "Iris": { + "type": "object", + "required": [ + "sepal_length", + "sepal_width", + "petal_length", + "petal_width", + "variety" + ], + "properties": { + "sepal_length": { + "type": "number" + }, + "sepal_width": { + "type": "number" + }, + "petal_length": { + "type": "number" + }, + "petal_width": { + "type": "number" + }, + "variety": { + "type": "integer" + } + }, + "x-module-name": "hopeit.dataframes.dataframe", + "description": "Iris_(sepal_length: float, sepal_width: float, petal_length: float, petal_width: float, variety: int)" + }, + "Dataset": { + "type": "object", + "required": [ + "protocol", + "partition_key", + "key", + "datatype" + ], + "properties": { + "protocol": { + "type": "string" + }, + "partition_key": { + "type": "string" + }, + "key": { + "type": "string" + }, + "datatype": { + "type": "string" + } + }, + "x-module-name": "hopeit.dataframes.serialization.dataset", + "description": "Dataset(protocol: str, partition_key: str, key: str, datatype: str)" + }, + "InputData": { + "type": "object", + "required": [ + "iris" + ], + "properties": { + "iris": { + "$ref": "#/components/schemas/Dataset" + } + }, + "x-module-name": "hopeit.dataframes.dataframeobject", + "description": "InputData_(iris: hopeit.dataframes.serialization.dataset.Dataset)" + }, + "IrisPredictionRequest": { + "type": "object", + "required": [ + "prediction_id", + "features" + ], + "properties": { + "prediction_id": { + "type": "string" + }, + "features": { + "$ref": "#/components/schemas/IrisFeatures" + } + }, + "x-module-name": "dataframes_example.iris", + "description": "IrisPredictionRequest(prediction_id: str, features: dataframes_example.iris.IrisFeatures)" + }, + "IrisFeatures": { + "type": "object", + "required": [ + "sepal_length", + "sepal_width", + "petal_length", + "petal_width" + ], + "properties": { + "sepal_length": { + "type": "number" + }, + "sepal_width": { + "type": "number" + }, + "petal_length": { + "type": "number" + }, + "petal_width": { + "type": "number" + } + }, + "x-module-name": "hopeit.dataframes.dataframe", + "description": "IrisFeatures_(sepal_length: float, sepal_width: float, petal_length: float, petal_width: float)" + }, + "IrisBatchPredictionRequest": { + "type": "object", + "required": [ + "items" + ], + "properties": { + "items": { + "type": "array", + "items": { + "$ref": "#/components/schemas/IrisPredictionRequest" + } + } + }, + "x-module-name": "dataframes_example.iris", + "description": "IrisBatchPredictionRequest(items: List[dataframes_example.iris.IrisPredictionRequest])" + } + }, + "securitySchemes": { + "auth.basic": { + "type": "http", + "scheme": "basic" + }, + "auth.bearer": { + "type": "http", + "scheme": "bearer" + }, + "dataframes_example.0x24.refresh": { + "type": "apiKey", + "in": "cookie", + "name": "dataframes_example.0x24.refresh" + } + } + }, + "security": [] +} \ No newline at end of file diff --git a/apps/examples/dataframes-example/config/app-config.json b/apps/examples/dataframes-example/config/app-config.json new file mode 100644 index 00000000..0c8afaa5 --- /dev/null +++ b/apps/examples/dataframes-example/config/app-config.json @@ -0,0 +1,56 @@ +{ + "app": { + "name": "dataframes-example", + "version": "${HOPEIT_APPS_API_VERSION}" + }, + "plugins": [ + { + "name": "dataframes", + "version": "${HOPEIT_APPS_API_VERSION}" + }, + { + "name": "basic-auth", + "version": "${HOPEIT_APPS_API_VERSION}" + } + ], + "engine": { + "track_headers": [], + "cors_origin": "*" + }, + "settings": { + "data_storage": { + "ingest_data_path": "apps/examples/dataframes-example/data/raw" + }, + "model_storage": { + "path": "apps/examples/dataframes-example/data/{auto}" + }, + "experiment_storage": { + "path": "apps/examples/dataframes-example/data/{auto}", + "partition_dateformat": "%Y/%m/%d/%H/" + } + }, + "events": { + "setup_storage": { + "type": "SETUP", + "setting_keys": [ + "model_storage", + "experiment_storage" + ] + }, + "prepare_data": { + "type": "GET", + "setting_keys": [ + "data_storage" + ] + }, + "train_model": { + "type": "POST", + "setting_keys": [ + "data_storage" + ] + }, + "predict": { + "type": "POST" + } + } +} \ No newline at end of file diff --git a/apps/examples/dataframes-example/setup.py b/apps/examples/dataframes-example/setup.py new file mode 100644 index 00000000..f93fb27b --- /dev/null +++ b/apps/examples/dataframes-example/setup.py @@ -0,0 +1,28 @@ +import setuptools + +version = {} +with open("../../../engine/src/hopeit/server/version.py") as fp: + exec(fp.read(), version) + +setuptools.setup( + name="dataframes_example", + version=version['ENGINE_VERSION'], + description="hopeit.engine dataframes example app", + package_dir={ + "": "src" + }, + packages=[ + "dataframes_example", + ], + include_package_data=True, + python_requires=">=3.8", + install_requires=[ + f"hopeit.engine[web,cli,redis-streams]=={version['ENGINE_VERSION']}", + f"hopeit.dataframes[pyarrow]=={version['ENGINE_VERSION']}", + f"scikit-learn", + ], + extras_require={ + }, + entry_points={ + } +) diff --git a/apps/examples/dataframes-example/src/dataframes_example/__init__.py b/apps/examples/dataframes-example/src/dataframes_example/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/apps/examples/dataframes-example/src/dataframes_example/experiment_storage.py b/apps/examples/dataframes-example/src/dataframes_example/experiment_storage.py new file mode 100644 index 00000000..ca4953be --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/experiment_storage.py @@ -0,0 +1,38 @@ +"""Simple storage for training experiments using fs storage +""" + +from dataclasses import asdict +from typing import Optional + +from dataframes_example.iris import Experiment +from hopeit.app.context import EventContext +from hopeit.dataframes import DataFrames +from hopeit.fs_storage import FileStorage, FileStorageSettings +from hopeit.server.logger import engine_extra_logger + +logger, extra = engine_extra_logger() + +fs: Optional[FileStorage] = None + + +async def init_experiment_storage(context: EventContext): + """Initializes fs storage for experiments""" + global fs + if fs is None: + settings: FileStorageSettings = context.settings( + key="experiment_storage", datatype=FileStorageSettings + ) + logger.info( + context, + "Initializing experiment storage...", + extra=extra(**asdict(settings)), + ) + fs = FileStorage.with_settings(settings) + + +async def save_experiment(experiment: Experiment, context: EventContext) -> str: + assert fs is not None + return await fs.store( + key=experiment.experiment_id, + value=await DataFrames.serialize(experiment), + ) diff --git a/apps/examples/dataframes-example/src/dataframes_example/iris.py b/apps/examples/dataframes-example/src/dataframes_example/iris.py new file mode 100644 index 00000000..a37e2588 --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/iris.py @@ -0,0 +1,86 @@ +"""Iris datasets schemas +""" + +from dataclasses import dataclass, field +from datetime import datetime +from typing import List, Optional + +from hopeit.dataframes import dataframe, dataframeobject +from hopeit.dataobjects import dataobject + + +@dataframe +@dataclass +class Iris: + sepal_length: float = field(metadata={"source_field_name": "sepal length (cm)"}) + sepal_width: float = field(metadata={"source_field_name": "sepal width (cm)"}) + petal_length: float = field(metadata={"source_field_name": "petal length (cm)"}) + petal_width: float = field(metadata={"source_field_name": "petal width (cm)"}) + variety: int = field(metadata={"source_field_name": "target"}) + + +@dataframe +@dataclass +class IrisFeatures: + sepal_length: float + sepal_width: float + petal_length: float + petal_width: float + + +@dataframe +@dataclass +class IrisLabels: + variety: int + + +@dataobject +@dataclass +class EvalMetrics: + accuracy_score: float + + +@dataframeobject +@dataclass +class InputData: + iris: Iris + + +@dataframeobject +@dataclass +class Experiment: + experiment_id: str + experiment_dt: datetime + input_data: Iris + train_features: Optional[IrisFeatures] = None + train_labels: Optional[IrisLabels] = None + test_features: Optional[IrisFeatures] = None + test_labels: Optional[IrisLabels] = None + model_location: Optional[str] = None + eval_metrics: Optional[EvalMetrics] = None + + +@dataobject(unsafe=True) +@dataclass +class IrisPredictionRequest: + prediction_id: str + features: IrisFeatures + + +@dataobject(unsafe=True) +@dataclass +class IrisBatchPredictionRequest: + items: List[IrisPredictionRequest] + + +@dataobject(unsafe=True) +@dataclass +class IrisPredictionResponse: + prediction_id: str + prediction: IrisLabels + + +@dataobject(unsafe=True) +@dataclass +class IrisBatchPredictionResponse: + items: List[IrisPredictionResponse] diff --git a/apps/examples/dataframes-example/src/dataframes_example/model_storage.py b/apps/examples/dataframes-example/src/dataframes_example/model_storage.py new file mode 100644 index 00000000..3cc5ca0d --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/model_storage.py @@ -0,0 +1,59 @@ +"""Simple storage for trained models using fs-storage +""" + +import io +import os +import pickle +from dataclasses import asdict +from pathlib import Path +from typing import Optional, Tuple, TypeVar + +import aiofiles +from dataframes_example.settings import ModelStorage +from hopeit.app.context import EventContext +from hopeit.server.logger import engine_extra_logger + +logger, extra = engine_extra_logger() + +model_storage: Optional[ModelStorage] = None + +ModelT = TypeVar("ModelT") + + +async def init_model_storage(context: EventContext): + global model_storage + model_storage = context.settings(key="model_storage", datatype=ModelStorage) + assert model_storage is not None + logger.info( + context, + "Initializing model storage...", + extra=extra(**asdict(model_storage)), + ) + + +async def save_model(model: ModelT, experiment_id: str, context: EventContext) -> str: + model_path, model_location, model_location_str = _get_model_location(experiment_id) + + os.makedirs(model_path, exist_ok=True) + async with aiofiles.open(model_location, "wb") as f: + await f.write(pickle.dumps(model, protocol=5)) + + return model_location_str + + +async def load_model(model_location: str, context: EventContext) -> ModelT: + async with aiofiles.open(Path(model_location), "rb") as f: + buffer = io.BytesIO(await f.read()) + return pickle.load(buffer) + + +async def load_experiment_model(experiment_id: str, context: EventContext) -> ModelT: + _, _, model_location_str = _get_model_location(experiment_id) + return await load_model(model_location_str, context) + + +def _get_model_location(experiment_id: str) -> Tuple[Path, Path, str]: + assert model_storage is not None + model_path = Path(model_storage.path) + model_location = model_path / f"model_{experiment_id}.pkl5" + return model_path, model_location, model_location.as_posix() diff --git a/apps/examples/dataframes-example/src/dataframes_example/predict.py b/apps/examples/dataframes-example/src/dataframes_example/predict.py new file mode 100644 index 00000000..093bd83a --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/predict.py @@ -0,0 +1,55 @@ +"""Endpoint to run predictions using trained model""" + +from typing import List + +from dataframes_example.iris import ( + IrisBatchPredictionRequest, + IrisBatchPredictionResponse, + IrisFeatures, + IrisLabels, + IrisPredictionResponse, +) +from dataframes_example.model_storage import load_experiment_model + +from hopeit.app.api import event_api +from hopeit.app.context import EventContext +from hopeit.dataframes import DataFrames + +from sklearn.tree import DecisionTreeClassifier # type: ignore + +__steps__ = ["predict"] + + +__api__ = event_api( + summary="Predict", + query_args=[("experiment_id", str)], + payload=(IrisBatchPredictionRequest, "Batch of prediction requests"), + responses={200: List[IrisFeatures]}, +) + + +async def predict( + request: IrisBatchPredictionRequest, context: EventContext, *, experiment_id: str +) -> IrisBatchPredictionResponse: + """Loads model and predict based on request features""" + model: DecisionTreeClassifier = await load_experiment_model(experiment_id, context) + + features = DataFrames.from_dataobjects( + IrisFeatures, (item.features for item in request.items) + ) + + model_predictions = model.predict(DataFrames.df(features)) + + predictions = DataFrames.from_array(IrisLabels, model_predictions) + + return IrisBatchPredictionResponse( + items=[ # type: ignore + IrisPredictionResponse( + prediction_id=request.prediction_id, + prediction=prediction, + ) + for request, prediction in zip( + request.items, DataFrames.to_dataobjects(predictions) + ) + ] + ) diff --git a/apps/examples/dataframes-example/src/dataframes_example/prepare_data.py b/apps/examples/dataframes-example/src/dataframes_example/prepare_data.py new file mode 100644 index 00000000..89322144 --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/prepare_data.py @@ -0,0 +1,49 @@ +"""Prepares input data for training pipeline +""" + +from dataclasses import asdict, fields + +from dataframes_example.iris import InputData, Iris +from dataframes_example.settings import DataStorage +from hopeit.app.api import event_api +from hopeit.app.context import EventContext +from hopeit.app.logger import app_extra_logger +from hopeit.dataframes import DataFrames +from sklearn import datasets # type: ignore + +logger, extra = app_extra_logger() + +__steps__ = [ + "download_data", + "save_raw_data", +] + +__api__ = event_api(summary="Prepare Data", responses={200: InputData}) + + +def download_data(payload: None, context: EventContext) -> Iris: + """Downloads training data using scikit-learn""" + + logger.info(context, "Downloading input data..") + + raw = datasets.load_iris(as_frame=True) + + iris = DataFrames.from_df( + Iris, + raw.frame.rename( + columns={ + field.metadata["source_field_name"]: field.name + for field in fields(Iris) + } + ), + ) + + return iris + + +async def save_raw_data(iris: Iris, context: EventContext) -> InputData: + settings: DataStorage = context.settings(key="data_storage", datatype=DataStorage) + + logger.info(context, "Saving input data..", extra=extra(**asdict(settings))) + + return await DataFrames.serialize(InputData(iris=iris)) diff --git a/apps/examples/dataframes-example/src/dataframes_example/settings.py b/apps/examples/dataframes-example/src/dataframes_example/settings.py new file mode 100644 index 00000000..860a44cb --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/settings.py @@ -0,0 +1,18 @@ +"""Dataframes example settings classes +""" + +from dataclasses import dataclass + +from hopeit.dataobjects import dataobject + + +@dataobject +@dataclass +class DataStorage: + ingest_data_path: str + + +@dataobject +@dataclass +class ModelStorage: + path: str diff --git a/apps/examples/dataframes-example/src/dataframes_example/setup_storage.py b/apps/examples/dataframes-example/src/dataframes_example/setup_storage.py new file mode 100644 index 00000000..cc64e9c5 --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/setup_storage.py @@ -0,0 +1,14 @@ +"""Intial setup event for experiment and model storage +""" + +from dataframes_example import experiment_storage, model_storage +from hopeit.app.context import EventContext + +__steps__ = [ + "init_experiment_storage", +] + + +async def init_experiment_storage(payload: None, context: EventContext) -> None: + await experiment_storage.init_experiment_storage(context) + await model_storage.init_model_storage(context) diff --git a/apps/examples/dataframes-example/src/dataframes_example/train_model.py b/apps/examples/dataframes-example/src/dataframes_example/train_model.py new file mode 100644 index 00000000..f7d53716 --- /dev/null +++ b/apps/examples/dataframes-example/src/dataframes_example/train_model.py @@ -0,0 +1,171 @@ +"""Example training pipeline for the Iris dataset +""" + +# pylint: disable=invalid-name + +import uuid +from datetime import datetime, timezone + +import pandas as pd +from dataframes_example import experiment_storage, model_storage +from dataframes_example.iris import ( + EvalMetrics, + Experiment, + InputData, + Iris, + IrisFeatures, + IrisLabels, +) +from hopeit.app.api import event_api +from hopeit.app.context import EventContext, PostprocessHook +from hopeit.app.logger import app_extra_logger +from hopeit.dataframes import DataFrames +from hopeit.server.steps import SHUFFLE + +from sklearn.metrics import accuracy_score # type: ignore +from sklearn.model_selection import train_test_split # type: ignore +from sklearn.tree import DecisionTreeClassifier # type: ignore + +logger, extra = app_extra_logger() + +__steps__ = [ + "prepare_experiment", + SHUFFLE, + "prepare_datasets", + SHUFFLE, + "train_model", + SHUFFLE, + "evaluate_model", + SHUFFLE, + "save_experiment", +] + + +__api__ = event_api( + summary="Training Pipeline", + payload=(InputData, "Serialized dataframeobject with iris dataset"), + responses={200: Iris}, +) + + +def prepare_experiment(input_data: InputData, context: EventContext) -> Experiment: + experiment_id = str(uuid.uuid4()) + + logger.info( + context, "Setting up experiment", extra=extra(experiment_id=experiment_id) + ) + + return Experiment( + experiment_id=experiment_id, + experiment_dt=datetime.now(tz=timezone.utc), + input_data=input_data.iris, + ) + + +async def __postprocess__( + experiment: Experiment, context: EventContext, response: PostprocessHook +) -> Experiment: + return await DataFrames.serialize(experiment) + + +def prepare_datasets(experiment: Experiment, context: EventContext) -> Experiment: + """Split training and test datasets""" + logger.info( + context, + "Preparing feature and label datasets", + extra=extra(experiment_id=experiment.experiment_id), + ) + + X = DataFrames.from_dataframe(IrisFeatures, experiment.input_data) + y = DataFrames.from_dataframe(IrisLabels, experiment.input_data) + + X_train, X_test, y_train, y_test = train_test_split( + DataFrames.df(X), DataFrames.df(y), test_size=0.2, random_state=42 + ) + + experiment.train_features = DataFrames.from_df(IrisFeatures, X_train) + experiment.train_labels = DataFrames.from_df(IrisLabels, y_train) + experiment.test_features = DataFrames.from_df(IrisFeatures, X_test) + experiment.test_labels = DataFrames.from_df(IrisLabels, y_test) + + return experiment + + +async def train_model(experiment: Experiment, context: EventContext) -> Experiment: + """Trains DecisionTreeClassifier""" + + logger.info( + context, + "Training model...", + extra=extra(experiment_id=experiment.experiment_id), + ) + + clf = DecisionTreeClassifier(random_state=42) + clf.fit( + DataFrames.df(experiment.train_features), DataFrames.df(experiment.train_labels) + ) + + logger.info( + context, + "Saving model...", + extra=extra( + experiment_id=experiment.experiment_id, + ), + ) + experiment.model_location = await model_storage.save_model( + clf, experiment.experiment_id, context + ) + + return experiment + + +async def evaluate_model(experiment: Experiment, context: EventContext) -> Experiment: + """Evaluates trained model score usint test dataset""" + logger.info( + context, + "Loading model...", + extra=extra( + experiment_id=experiment.experiment_id, + model_location=experiment.model_location, + ), + ) + + assert experiment.model_location is not None + + clf: DecisionTreeClassifier = await model_storage.load_model( + experiment.model_location, context + ) + + logger.info( + context, + "Evaluating model...", + extra=extra(experiment_id=experiment.experiment_id), + ) + + y = clf.predict(DataFrames.df(experiment.test_features)) + pred_labels = IrisLabels(variety=pd.Series(y)) + accuracy = accuracy_score( + DataFrames.df(experiment.test_labels), DataFrames.df(pred_labels) + ) + + experiment.eval_metrics = EvalMetrics(accuracy_score=accuracy) + return experiment + + +async def save_experiment(experiment: Experiment, context: EventContext) -> Experiment: + """Save experiment results""" + logger.info( + context, + "Saving experiment...", + extra=extra(experiment_id=experiment.experiment_id), + ) + + location = await experiment_storage.save_experiment(experiment, context) + + logger.info( + context, + "Experiment saved.", + extra=extra(experiment_id=experiment.experiment_id, location=location), + ) + + return experiment diff --git a/apps/examples/simple-example/api/openapi.json b/apps/examples/simple-example/api/openapi.json index 9fa91804..fa917442 100644 --- a/apps/examples/simple-example/api/openapi.json +++ b/apps/examples/simple-example/api/openapi.json @@ -1,12 +1,12 @@ { "openapi": "3.0.3", "info": { - "version": "0.23", + "version": "0.24", "title": "Simple Example", "description": "Simple Example" }, "paths": { - "/api/basic-auth/0x23/decode": { + "/api/basic-auth/0x24/decode": { "get": { "summary": "Basic Auth: Decode", "description": "Returns decoded auth info", @@ -44,7 +44,7 @@ } }, "tags": [ - "basic_auth.0x23" + "basic_auth.0x24" ], "security": [ { @@ -53,7 +53,7 @@ ] } }, - "/api/config-manager/0x23/runtime-apps-config": { + "/api/config-manager/0x24/runtime-apps-config": { "get": { "summary": "Config Manager: Runtime Apps Config", "description": "Returns the runtime config for the Apps running on this server", @@ -109,11 +109,11 @@ } }, "tags": [ - "config_manager.0x23" + "config_manager.0x24" ] } }, - "/api/config-manager/0x23/cluster-apps-config": { + "/api/config-manager/0x24/cluster-apps-config": { "get": { "summary": "Config Manager: Cluster Apps Config", "description": "Handle remote access to runtime configuration for a group of hosts", @@ -169,11 +169,11 @@ } }, "tags": [ - "config_manager.0x23" + "config_manager.0x24" ] } }, - "/api/simple-example/0x23/list-somethings": { + "/api/simple-example/0x24/list-somethings": { "get": { "summary": "Simple Example: List Objects", "description": "Lists all available Something objects", @@ -243,7 +243,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -252,7 +252,7 @@ ] } }, - "/api/simple-example/0x23/list-somethings-unsecured": { + "/api/simple-example/0x24/list-somethings-unsecured": { "get": { "summary": "Simple Example: List Objects Unsecured", "description": "Lists all available Something objects", @@ -322,11 +322,11 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ] } }, - "/api/simple-example/0x23/query-something": { + "/api/simple-example/0x24/query-something": { "get": { "summary": "Simple Example: Query Something", "description": "Loads Something from disk", @@ -412,7 +412,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -516,7 +516,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -525,7 +525,7 @@ ] } }, - "/api/simple-example/0x23/save-something": { + "/api/simple-example/0x24/save-something": { "post": { "summary": "Simple Example: Save Something", "description": "Creates and saves Something", @@ -622,7 +622,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -631,7 +631,7 @@ ] } }, - "/api/simple-example/0x23/download-something": { + "/api/simple-example/0x24/download-something": { "get": { "summary": "Simple Example: Download Something", "description": "Download image file. The PostprocessHook return the requested file as stream.", @@ -718,7 +718,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -727,7 +727,7 @@ ] } }, - "/api/simple-example/0x23/download-something-streamed": { + "/api/simple-example/0x24/download-something-streamed": { "get": { "summary": "Simple Example: Download Something Streamed", "description": "Download streamd created content as file.\nThe PostprocessHook return the requested resource as stream using `prepare_stream_response`.", @@ -795,11 +795,11 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ] } }, - "/api/simple-example/0x23/upload-something": { + "/api/simple-example/0x24/upload-something": { "post": { "summary": "Simple Example: Multipart Upload files", "description": "Upload files using Multipart form request", @@ -937,7 +937,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -946,7 +946,7 @@ ] } }, - "/api/simple-example/0x23/streams/something-event": { + "/api/simple-example/0x24/streams/something-event": { "post": { "summary": "Simple Example: Something Event", "description": "Submits a Something object to a stream to be processed asynchronously by process-events app event", @@ -1015,7 +1015,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -1024,7 +1024,7 @@ ] } }, - "/api/simple-example/0x23/collector/query-concurrently": { + "/api/simple-example/0x24/collector/query-concurrently": { "post": { "summary": "Simple Example: Query Concurrently", "description": "Loads 2 Something objects concurrently from disk and combine the results\nusing `collector` steps constructor (instantiating an `AsyncCollector`)", @@ -1096,7 +1096,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -1105,7 +1105,7 @@ ] } }, - "/api/simple-example/0x23/collector/collect-spawn": { + "/api/simple-example/0x24/collector/collect-spawn": { "post": { "summary": "Simple Example: Collect and Spawn", "description": "Loads 2 Something objects concurrently from disk, combine the results\nusing `collector` steps constructor (instantiating an `AsyncCollector`)\nthen spawn the items found individually into a stream", @@ -1183,7 +1183,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -1192,7 +1192,7 @@ ] } }, - "/api/simple-example/0x23/shuffle/spawn-event": { + "/api/simple-example/0x24/shuffle/spawn-event": { "post": { "summary": "Simple Example: Spawn Event", "description": "This example will spawn 3 data events, those are going to be send to a stream using SHUFFLE\nand processed in asynchronously / in parallel if multiple nodes are available", @@ -1270,7 +1270,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -1279,7 +1279,7 @@ ] } }, - "/api/simple-example/0x23/shuffle/parallelize-event": { + "/api/simple-example/0x24/shuffle/parallelize-event": { "post": { "summary": "Simple Example: Parallelize Event", "description": "This example will spawn 2 copies of payload data, those are going to be send to a stream using SHUFFLE\nand processed in asynchronously / in parallel if multiple nodes are available,\nthen submitted to other stream to be updated and saved", @@ -1357,7 +1357,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -1366,7 +1366,7 @@ ] } }, - "/api/simple-example/0x23/basic-auth/0x23/login": { + "/api/simple-example/0x24/basic-auth/0x24/login": { "get": { "summary": "Basic Auth: Login", "description": "Handles users login using basic-auth\nand generate access tokens for external services invoking apps\nplugged in with basic-auth plugin.", @@ -1434,7 +1434,7 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { @@ -1443,7 +1443,7 @@ ] } }, - "/api/simple-example/0x23/basic-auth/0x23/refresh": { + "/api/simple-example/0x24/basic-auth/0x24/refresh": { "get": { "summary": "Basic Auth: Refresh", "description": "This event can be used for obtain new access token and update refresh token (http cookie),\nwith no need to re-login the user if there is a valid refresh token active.", @@ -1511,16 +1511,16 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { - "simple_example.0x23.refresh": [] + "simple_example.0x24.refresh": [] } ] } }, - "/api/simple-example/0x23/basic-auth/0x23/logout": { + "/api/simple-example/0x24/basic-auth/0x24/logout": { "get": { "summary": "Basic Auth: Logout", "description": "Invalidates previous refresh cookies.", @@ -1597,11 +1597,11 @@ } }, "tags": [ - "simple_example.0x23" + "simple_example.0x24" ], "security": [ { - "simple_example.0x23.refresh": [] + "simple_example.0x24.refresh": [] } ] } @@ -2133,7 +2133,7 @@ }, "engine_version": { "type": "string", - "default": "0.23.0" + "default": "0.24.0" } }, "x-module-name": "hopeit.server.config", @@ -2406,10 +2406,10 @@ "type": "http", "scheme": "bearer" }, - "simple_example.0x23.refresh": { + "simple_example.0x24.refresh": { "type": "apiKey", "in": "cookie", - "name": "simple_example.0x23.refresh" + "name": "simple_example.0x24.refresh" } } }, diff --git a/apps/examples/simple-example/setup.py b/apps/examples/simple-example/setup.py index 437c3bfa..0f773f4e 100644 --- a/apps/examples/simple-example/setup.py +++ b/apps/examples/simple-example/setup.py @@ -19,7 +19,6 @@ python_requires=">=3.8", install_requires=[ f"hopeit.engine[web,cli,redis-streams,fs-storage]=={version['ENGINE_VERSION']}", - f"hopeit.fs-storage=={version['ENGINE_VERSION']}" ], extras_require={ }, diff --git a/docs/source/release-notes.rst b/docs/source/release-notes.rst index 4c70339d..969b0e04 100644 --- a/docs/source/release-notes.rst +++ b/docs/source/release-notes.rst @@ -1,6 +1,13 @@ Release Notes ============= +Version 0.24.0 +______________ +- Plugin: + + - dataframes (beta): new plugin to support workflows based on `pandas` + + Version 0.23.0 ______________ - Plugin: diff --git a/engine/requirements-dev.txt b/engine/requirements-dev.txt index 8cc92521..d35bd193 100644 --- a/engine/requirements-dev.txt +++ b/engine/requirements-dev.txt @@ -17,7 +17,12 @@ types-redis black isort -# Plugin dependencies +# Plugin test dependencies redis watchdog -uvloop \ No newline at end of file +uvloop + +# data/dataframes plugin tests +pandas +pandas-stubs +pyarrow diff --git a/engine/setup.py b/engine/setup.py index c21e9cd9..0cb66af4 100644 --- a/engine/setup.py +++ b/engine/setup.py @@ -136,6 +136,9 @@ def libversion(lib): ], "apps-client": [ f"hopeit.apps-client=={version['ENGINE_VERSION']}" + ], + "dataframes": [ + f"hopeit.dataframes=={version['ENGINE_VERSION']}" ] }, entry_points={ diff --git a/engine/src/hopeit/server/serialization.py b/engine/src/hopeit/server/serialization.py index 225d8f39..99cf17ff 100644 --- a/engine/src/hopeit/server/serialization.py +++ b/engine/src/hopeit/server/serialization.py @@ -1,41 +1,46 @@ """ Provides generic `serialize`, `deserialize` methods to handle payloads """ + import base64 import pickle from typing import Type from hopeit.app.config import Serialization, Compression -__all__ = ['serialize', 'deserialize'] +__all__ = ["serialize", "deserialize"] from hopeit.dataobjects import EventPayload, EventPayloadType from hopeit.dataobjects.payload import Payload from hopeit.server.compression import compress, decompress -def _ser_json_utf8(data: EventPayload, level: int) -> bytes: - return Payload.to_json(data).encode('utf-8') +async def _ser_json_utf8(data: EventPayload, level: int) -> bytes: + return Payload.to_json(data).encode("utf-8") -def _deser_json_utf8(data: bytes, datatype: Type[EventPayloadType]) -> EventPayload: - return Payload.from_json(data.decode('utf-8'), datatype) +async def _deser_json_utf8( + data: bytes, datatype: Type[EventPayloadType] +) -> EventPayload: + return Payload.from_json(data.decode("utf-8"), datatype) -def _ser_pickle(data: EventPayload, level: int) -> bytes: +async def _ser_pickle(data: EventPayload, level: int) -> bytes: return pickle.dumps(data, protocol=level) -def _deser_pickle(data: bytes, datatype: Type[EventPayloadType]) -> EventPayload: +async def _deser_pickle(data: bytes, datatype: Type[EventPayloadType]) -> EventPayload: return pickle.loads(data) -def _ser_json_base64(data: EventPayload, level: int) -> bytes: - return base64.b64encode(_ser_json_utf8(data, level)) +async def _ser_json_base64(data: EventPayload, level: int) -> bytes: + return base64.b64encode(await _ser_json_utf8(data, level)) -def _deser_json_base64(data: bytes, datatype: Type[EventPayloadType]) -> EventPayload: - return _deser_json_utf8(base64.b64decode(data), datatype) +async def _deser_json_base64( + data: bytes, datatype: Type[EventPayloadType] +) -> EventPayload: + return await _deser_json_utf8(base64.b64decode(data), datatype) _SERDESER = { @@ -43,18 +48,24 @@ def _deser_json_base64(data: bytes, datatype: Type[EventPayloadType]) -> EventPa Serialization.JSON_BASE64: (_ser_json_base64, 0, _deser_json_base64), Serialization.PICKLE3: (_ser_pickle, 3, _deser_pickle), Serialization.PICKLE4: (_ser_pickle, 4, _deser_pickle), - Serialization.PICKLE5: (_ser_pickle, 5, _deser_pickle) + Serialization.PICKLE5: (_ser_pickle, 5, _deser_pickle), } -def serialize(data: EventPayload, serialization: Serialization, compression: Compression) -> bytes: +async def serialize( + data: EventPayload, serialization: Serialization, compression: Compression +) -> bytes: algos = _SERDESER[serialization] - encoded = algos[0](data, level=algos[1]) + encoded = await algos[0](data, level=algos[1]) return compress(encoded, compression) -def deserialize(data: bytes, serialization: Serialization, compression: Compression, - datatype: Type[EventPayloadType]) -> EventPayload: +async def deserialize( + data: bytes, + serialization: Serialization, + compression: Compression, + datatype: Type[EventPayloadType], +) -> EventPayload: algos = _SERDESER[serialization] decomp = decompress(data, compression) - return algos[2](decomp, datatype) + return await algos[2](decomp, datatype) diff --git a/engine/src/hopeit/server/version.py b/engine/src/hopeit/server/version.py index 7116fde4..6c59318e 100644 --- a/engine/src/hopeit/server/version.py +++ b/engine/src/hopeit/server/version.py @@ -8,7 +8,7 @@ import sys ENGINE_NAME = "hopeit.engine" -ENGINE_VERSION = "0.23.0" +ENGINE_VERSION = "0.24.0" # Major.Minor version to be used in App versions and Api endpoints for Apps/Plugins APPS_API_VERSION = '.'.join(ENGINE_VERSION.split('.')[0:2]) diff --git a/engine/src/hopeit/server/web.py b/engine/src/hopeit/server/web.py index a527bda0..f0a66e5a 100644 --- a/engine/src/hopeit/server/web.py +++ b/engine/src/hopeit/server/web.py @@ -5,7 +5,9 @@ # flake8: noqa # pylint: disable=wrong-import-position, wrong-import-order from collections import namedtuple + import aiohttp +from hopeit.server.serialization import deserialize setattr(aiohttp.http, "SERVER_SOFTWARE", "") @@ -18,20 +20,21 @@ import uuid from datetime import datetime, timezone from functools import partial -from typing import Any, Callable, Coroutine, Dict, List, Optional, Type, Tuple, Union +from typing import Any, Callable, Coroutine, Dict, List, Optional, Tuple, Type, Union import aiohttp_cors # type: ignore from aiohttp import web from aiohttp.web_response import Response from aiohttp_cors import CorsConfig -from stringcase import snakecase, titlecase # type: ignore -from hopeit.app.config import ( +from hopeit.app.config import ( # pylint: disable=ungrouped-imports AppConfig, + Compression, EventDescriptor, EventPlugMode, EventSettings, EventType, + Serialization, parse_app_config_json, ) from hopeit.app.context import ( @@ -44,7 +47,6 @@ from hopeit.dataobjects import DataObject, EventPayload, EventPayloadType from hopeit.dataobjects.payload import Payload from hopeit.server import api, runtime -from hopeit.server.api import app_route_name, OPEN_API_DEFAULTS from hopeit.server.config import AuthType, ServerConfig, parse_server_config_json from hopeit.server.engine import AppEngine from hopeit.server.errors import ErrorInfo @@ -58,9 +60,10 @@ from hopeit.server.metrics import metrics from hopeit.server.names import route_name from hopeit.server.steps import find_datatype_handler - from hopeit.toolkit import auth +from stringcase import snakecase, titlecase # type: ignore + __all__ = [ "parse_args", "prepare_engine", @@ -106,7 +109,7 @@ def prepare_engine( if api_file is None and api_auto: if len(api_auto) < 3: - api_auto.extend(OPEN_API_DEFAULTS[len(api_auto) - 3 :]) + api_auto.extend(api.OPEN_API_DEFAULTS[len(api_auto) - 3 :]) else: api_auto = api_auto[:3] api.init_auto_api(api_auto[0], api_auto[1], api_auto[2]) @@ -346,7 +349,7 @@ def _create_post_event_route( datatype = find_datatype_handler( app_config=app_engine.app_config, event_name=event_name, event_info=event_info ) - route = app_route_name( + route = api.app_route_name( app_engine.app_config.app, event_name=event_name, plugin=None if plugin is None else plugin.app_config.app, @@ -378,7 +381,7 @@ def _create_get_event_route( """ Creates route for handling GET requests """ - route = app_route_name( + route = api.app_route_name( app_engine.app_config.app, event_name=event_name, plugin=None if plugin is None else plugin.app_config.app, @@ -412,7 +415,7 @@ def _create_multipart_event_route( datatype = find_datatype_handler( app_config=app_engine.app_config, event_name=event_name, event_info=event_info ) - route = app_route_name( + route = api.app_route_name( app_engine.app_config.app, event_name=event_name, plugin=None if plugin is None else plugin.app_config.app, @@ -441,7 +444,7 @@ def _create_event_management_routes( Create routes to start and stop processing of STREAM events """ evt = event_name.replace(".", "/").replace("$", "/") - base_route = app_route_name( + base_route = api.app_route_name( app_engine.app_config.app, event_name=evt, prefix="mgmt", @@ -561,12 +564,13 @@ async def _execute_setup_event( track_ids={}, auth_info=auth_info_default, ) - logger.start(context) + if plugin is None: await app_engine.execute(context=context, query_args=None, payload=None) else: await plugin.execute(context=context, query_args=None, payload=None) + logger.done(context, extra=metrics(context)) @@ -714,7 +718,17 @@ async def _request_process_payload( try: payload_raw = await request.read() if datatype is not None: - return Payload.from_json(payload_raw, datatype), payload_raw # type: ignore + try: + return Payload.from_json(payload_raw, datatype), payload_raw # type: ignore + except AttributeError: + # Attempts to deserialize using stream/dataobject serialization methods + # (that can be customized by plugins) in case sync `from_json` is not available. + return ( + await deserialize( # type: ignore + payload_raw, Serialization.JSON_UTF8, Compression.NONE, datatype + ), + payload_raw, + ) return None, payload_raw except ValueError as e: logger.error(context, e) @@ -988,6 +1002,8 @@ def serve( """ Serve hopeit.engine """ + init_logger() + web_app = init_web_server( config_files, api_file, api_auto, enabled_groups, start_streams ) diff --git a/engine/test/unit/server/test_serialization.py b/engine/test/unit/server/test_serialization.py index 5a8c80a8..86db5d69 100644 --- a/engine/test/unit/server/test_serialization.py +++ b/engine/test/unit/server/test_serialization.py @@ -6,8 +6,11 @@ from hopeit.app.config import Serialization, Compression from hopeit.dataobjects import dataobject from hopeit.server.serialization import serialize, deserialize +import pytest -pickle5_available = (sys.version_info.major > 3) or ((sys.version_info.major == 3) and (sys.version_info.minor >= 8)) +pickle5_available = (sys.version_info.major > 3) or ( + (sys.version_info.major == 3) and (sys.version_info.minor >= 8) +) @dataobject @@ -21,82 +24,226 @@ class Data: ser = { Serialization.JSON_UTF8: b'{"x": "data", "y": 42}', - Serialization.JSON_BASE64: b'eyJ4IjogImRhdGEiLCAieSI6IDQyfQ==', + Serialization.JSON_BASE64: b"eyJ4IjogImRhdGEiLCAieSI6IDQyfQ==", Serialization.PICKLE3: ( - b'\x80\x03ctest_serialization\nData\nq\x00)\x81q\x01}q\x02(X\x01\x00\x00\x00xq' - b'\x03X\x04\x00\x00\x00dataq\x04X\x01\x00\x00\x00yq\x05K*ub.'), + b"\x80\x03ctest_serialization\nData\nq\x00)\x81q\x01}q\x02(X\x01\x00\x00\x00xq" + b"\x03X\x04\x00\x00\x00dataq\x04X\x01\x00\x00\x00yq\x05K*ub." + ), Serialization.PICKLE4: ( - b'\x80\x04\x958\x00\x00\x00\x00\x00\x00\x00\x8c\x12test_serialization\x94' - b'\x8c\x04Data\x94\x93\x94)\x81\x94}\x94(\x8c\x01x\x94\x8c\x04data\x94\x8c\x01' - b'y\x94K*ub.'), + b"\x80\x04\x958\x00\x00\x00\x00\x00\x00\x00\x8c\x12test_serialization\x94" + b"\x8c\x04Data\x94\x93\x94)\x81\x94}\x94(\x8c\x01x\x94\x8c\x04data\x94\x8c\x01" + b"y\x94K*ub." + ), Serialization.PICKLE5: ( - b'\x80\x05\x958\x00\x00\x00\x00\x00\x00\x00\x8c\x12test_serialization\x94' - b'\x8c\x04Data\x94\x93\x94)\x81\x94}\x94(\x8c\x01x\x94\x8c\x04data\x94\x8c\x01' - b'y\x94K*ub.') + b"\x80\x05\x958\x00\x00\x00\x00\x00\x00\x00\x8c\x12test_serialization\x94" + b"\x8c\x04Data\x94\x93\x94)\x81\x94}\x94(\x8c\x01x\x94\x8c\x04data\x94\x8c\x01" + b"y\x94K*ub." + ), } -def test_serialize(): +@pytest.mark.asyncio +async def test_serialize(): print(sys.version_info, sys.version) - assert serialize(data, Serialization.JSON_UTF8, Compression.NONE) == \ - ser[Serialization.JSON_UTF8] == data.to_json().encode() - assert serialize(data, Serialization.JSON_BASE64, Compression.NONE) == \ - ser[Serialization.JSON_BASE64] == base64.b64encode(data.to_json().encode()) - assert serialize(data, Serialization.PICKLE3, Compression.NONE) == \ - ser[Serialization.PICKLE3] == pickle.dumps(data, 3) - assert serialize(data, Serialization.PICKLE4, Compression.NONE) == \ - ser[Serialization.PICKLE4] == pickle.dumps(data, 4) + assert ( + await serialize(data, Serialization.JSON_UTF8, Compression.NONE) + == ser[Serialization.JSON_UTF8] + == data.to_json().encode() + ) + assert ( + await serialize(data, Serialization.JSON_BASE64, Compression.NONE) + == ser[Serialization.JSON_BASE64] + == base64.b64encode(data.to_json().encode()) + ) + assert ( + await serialize(data, Serialization.PICKLE3, Compression.NONE) + == ser[Serialization.PICKLE3] + == pickle.dumps(data, 3) + ) + assert ( + await serialize(data, Serialization.PICKLE4, Compression.NONE) + == ser[Serialization.PICKLE4] + == pickle.dumps(data, 4) + ) if pickle5_available: - assert serialize(data, Serialization.PICKLE5, Compression.NONE) == \ - ser[Serialization.PICKLE5] == pickle.dumps(data, 5) - - -def test_deserialize(): - assert deserialize(ser[Serialization.JSON_UTF8], Serialization.JSON_UTF8, Compression.NONE, Data) == data - assert deserialize(ser[Serialization.JSON_BASE64], Serialization.JSON_BASE64, Compression.NONE, Data) == data - assert deserialize(ser[Serialization.PICKLE3], Serialization.PICKLE3, Compression.NONE, Data) == data - assert deserialize(ser[Serialization.PICKLE4], Serialization.PICKLE4, Compression.NONE, Data) == data + assert ( + await serialize(data, Serialization.PICKLE5, Compression.NONE) + == ser[Serialization.PICKLE5] + == pickle.dumps(data, 5) + ) + + +@pytest.mark.asyncio +async def test_deserialize(): + assert ( + await deserialize( + ser[Serialization.JSON_UTF8], + Serialization.JSON_UTF8, + Compression.NONE, + Data, + ) + == data + ) + assert ( + await deserialize( + ser[Serialization.JSON_BASE64], + Serialization.JSON_BASE64, + Compression.NONE, + Data, + ) + == data + ) + assert ( + await deserialize( + ser[Serialization.PICKLE3], Serialization.PICKLE3, Compression.NONE, Data + ) + == data + ) + assert ( + await deserialize( + ser[Serialization.PICKLE4], Serialization.PICKLE4, Compression.NONE, Data + ) + == data + ) if pickle5_available: - assert deserialize(ser[Serialization.PICKLE5], Serialization.PICKLE5, Compression.NONE, Data) == data - - -def test_serialize_primitives(): - assert serialize("test", Serialization.JSON_UTF8, Compression.NONE) == b'{"value": "test"}' - assert deserialize(b'{"value": "test"}', Serialization.JSON_UTF8, Compression.NONE, str) == "test" - - assert serialize("test", Serialization.JSON_BASE64, Compression.NONE) == b'eyJ2YWx1ZSI6ICJ0ZXN0In0=' - assert deserialize(b'eyJ2YWx1ZSI6ICJ0ZXN0In0=', Serialization.JSON_BASE64, Compression.NONE, str) == "test" - - assert serialize("test", Serialization.PICKLE3, Compression.NONE) == b'\x80\x03X\x04\x00\x00\x00testq\x00.' - assert deserialize(b'\x80\x03X\x04\x00\x00\x00testq\x00.', Serialization.PICKLE3, Compression.NONE, str) == "test" - - assert serialize("test", Serialization.PICKLE4, Compression.NONE) == \ - b'\x80\x04\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94.' - assert deserialize(b'\x80\x04\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94.', - Serialization.PICKLE4, Compression.NONE, str) == "test" + assert ( + await deserialize( + ser[Serialization.PICKLE5], + Serialization.PICKLE5, + Compression.NONE, + Data, + ) + == data + ) + + +@pytest.mark.asyncio +async def test_serialize_primitives(): + assert ( + await serialize("test", Serialization.JSON_UTF8, Compression.NONE) + == b'{"value": "test"}' + ) + assert ( + await deserialize( + b'{"value": "test"}', Serialization.JSON_UTF8, Compression.NONE, str + ) + == "test" + ) + + assert ( + await serialize("test", Serialization.JSON_BASE64, Compression.NONE) + == b"eyJ2YWx1ZSI6ICJ0ZXN0In0=" + ) + assert ( + await deserialize( + b"eyJ2YWx1ZSI6ICJ0ZXN0In0=", + Serialization.JSON_BASE64, + Compression.NONE, + str, + ) + == "test" + ) + + assert ( + await serialize("test", Serialization.PICKLE3, Compression.NONE) + == b"\x80\x03X\x04\x00\x00\x00testq\x00." + ) + assert ( + await deserialize( + b"\x80\x03X\x04\x00\x00\x00testq\x00.", + Serialization.PICKLE3, + Compression.NONE, + str, + ) + == "test" + ) + + assert ( + await serialize("test", Serialization.PICKLE4, Compression.NONE) + == b"\x80\x04\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94." + ) + assert ( + await deserialize( + b"\x80\x04\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94.", + Serialization.PICKLE4, + Compression.NONE, + str, + ) + == "test" + ) if pickle5_available: - assert serialize("test", Serialization.PICKLE5, Compression.NONE) == \ - b'\x80\x05\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94.' - assert deserialize(b'\x80\x05\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94.', - Serialization.PICKLE5, Compression.NONE, str) == "test" - - assert serialize(42, Serialization.JSON_UTF8, Compression.NONE) == b'{"value": 42}' - assert deserialize(b'{"value": 42}', Serialization.JSON_UTF8, Compression.NONE, int) == 42 - - assert serialize(42.5, Serialization.JSON_UTF8, Compression.NONE) == b'{"value": 42.5}' - assert deserialize(b'{"value": 42.5}', Serialization.JSON_UTF8, Compression.NONE, float) == 42.5 - - assert serialize(True, Serialization.JSON_UTF8, Compression.NONE) == b'{"value": true}' - assert deserialize(b'{"value": true}', Serialization.JSON_UTF8, Compression.NONE, bool) is True - - -def test_serialize_collections(): - assert serialize({"test": "value"}, Serialization.JSON_UTF8, Compression.NONE) == b'{"test": "value"}' - assert deserialize(b'{"test": "value"}', Serialization.JSON_UTF8, Compression.NONE, dict) == {"test": "value"} - - assert serialize(["test", "value"], Serialization.JSON_UTF8, Compression.NONE) == b'["test", "value"]' - assert deserialize(b'["test", "value"]', Serialization.JSON_UTF8, Compression.NONE, list) == ["test", "value"] - - assert serialize({"test", "value"}, Serialization.JSON_UTF8, Compression.NONE) == b'["test", "value"]' \ - or serialize({"test", "value"}, Serialization.JSON_UTF8, Compression.NONE) == b'["value", "test"]' - assert deserialize(b'["test", "value"]', Serialization.JSON_UTF8, Compression.NONE, set) == {"test", "value"} + assert ( + await serialize("test", Serialization.PICKLE5, Compression.NONE) + == b"\x80\x05\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94." + ) + assert ( + await deserialize( + b"\x80\x05\x95\x08\x00\x00\x00\x00\x00\x00\x00\x8c\x04test\x94.", + Serialization.PICKLE5, + Compression.NONE, + str, + ) + == "test" + ) + + assert ( + await serialize(42, Serialization.JSON_UTF8, Compression.NONE) + == b'{"value": 42}' + ) + assert ( + await deserialize( + b'{"value": 42}', Serialization.JSON_UTF8, Compression.NONE, int + ) + == 42 + ) + + assert ( + await serialize(42.5, Serialization.JSON_UTF8, Compression.NONE) + == b'{"value": 42.5}' + ) + assert ( + await deserialize( + b'{"value": 42.5}', Serialization.JSON_UTF8, Compression.NONE, float + ) + == 42.5 + ) + + assert ( + await serialize(True, Serialization.JSON_UTF8, Compression.NONE) + == b'{"value": true}' + ) + assert ( + await deserialize( + b'{"value": true}', Serialization.JSON_UTF8, Compression.NONE, bool + ) + is True + ) + + +@pytest.mark.asyncio +async def test_serialize_collections(): + assert ( + await serialize({"test": "value"}, Serialization.JSON_UTF8, Compression.NONE) + == b'{"test": "value"}' + ) + assert await deserialize( + b'{"test": "value"}', Serialization.JSON_UTF8, Compression.NONE, dict + ) == {"test": "value"} + + assert ( + await serialize(["test", "value"], Serialization.JSON_UTF8, Compression.NONE) + == b'["test", "value"]' + ) + assert await deserialize( + b'["test", "value"]', Serialization.JSON_UTF8, Compression.NONE, list + ) == ["test", "value"] + + assert ( + await serialize({"test", "value"}, Serialization.JSON_UTF8, Compression.NONE) + == b'["test", "value"]' + or await serialize({"test", "value"}, Serialization.JSON_UTF8, Compression.NONE) + == b'["value", "test"]' + ) + assert await deserialize( + b'["test", "value"]', Serialization.JSON_UTF8, Compression.NONE, set + ) == {"test", "value"} diff --git a/plugins/build/ci-static-plugins.sh b/plugins/build/ci-static-plugins.sh index fda4638f..dbbf2242 100644 --- a/plugins/build/ci-static-plugins.sh +++ b/plugins/build/ci-static-plugins.sh @@ -102,6 +102,19 @@ code+=$? python3 -m pylint plugins/ops/log-streamer/src/hopeit/log_streamer/ code+=$? fi + +if [ "$1" == "" ] || [ "data/dataframes" = "$1" ] ; then +echo "data/dataframes" +export MYPYPATH=engine/src/:plugins/storage/fs/src/:plugins/data/dataframes/src/ && python3 -m mypy --namespace-packages -p hopeit.dataframes +code+=$? +export MYPYPATH=engine/src/:plugins/storage/fs/src/:plugins/data/dataframes/src/ && python3 -m mypy --namespace-packages plugins/data/dataframes/test/integration/ +code+=$? +python3 -m flake8 --max-line-length=120 plugins/data/dataframes/src/hopeit/ plugins/data/dataframes/test/integration/ +code+=$? +python3 -m pylint plugins/data/dataframes/src/hopeit/dataframes/ +code+=$? +fi + if [ $code -gt 0 ] then echo "[FAILED] CI STATIC ANALYSIS: PLUGINS" diff --git a/plugins/build/ci-test-plugins.sh b/plugins/build/ci-test-plugins.sh index 02437e08..43ae9d81 100644 --- a/plugins/build/ci-test-plugins.sh +++ b/plugins/build/ci-test-plugins.sh @@ -52,6 +52,12 @@ export PYTHONPATH=engine/src/:plugins/storage/fs/src/:plugins/ops/log-streamer/s code+=$? fi +if [ "$1" == "" ] || [ "data/dataframes" = "$1" ] ; then +# data/dataframes +export PYTHONPATH=engine/src/:plugins/storage/fs/src/:plugins/data/dataframes/src/:plugins/data/dataframes/test/integration/ && python3 -m pytest -v --cov-fail-under=80 --cov-report=term --cov=plugins/data/dataframes/src/ plugins/data/dataframes/test/integration/ +code+=$? +fi + if [ $code -gt 0 ] then echo "[FAILED] CI TEST: PLUGINS" diff --git a/plugins/data/dataframes/README.md b/plugins/data/dataframes/README.md new file mode 100644 index 00000000..9525428e --- /dev/null +++ b/plugins/data/dataframes/README.md @@ -0,0 +1,28 @@ +# hopeit.engine dataframes plugin + + +This library is part of hopeit.engine: + +> check: https://github.com/hopeit-git/hopeit.engine + + +### Install using extras when installing hopeit.engine: + +``` +pip install hopeit.engine[dataframes] +``` + +### hopeit.dataframes + +This plugin introduces dataclasses annotations to work with `pandas` dataframes +as other dataobjects: + +`@dataframe` annotation allows a dataclass to become the schema and container for a dataframe +`@dataframeobject` annotation, acts as @dataobject with support to have dataframe annotated fields +`DataFrames` class, provides an api to create, serialize, and access pandas dataframe + +Features: +-Type coercion for @dataframe fields +-Transparent access to series in @dataframe objects using dot notation +-Serialization for @dataframe and @dataframeobjects allowing them to be transferred through streams (using file system storage to store the actual data, and transferring only metadata for deserialization in the stream) +-Support to handle @dataframeobject as payload for web requests diff --git a/plugins/data/dataframes/config/plugin-config.json b/plugins/data/dataframes/config/plugin-config.json new file mode 100644 index 00000000..b4a0a264 --- /dev/null +++ b/plugins/data/dataframes/config/plugin-config.json @@ -0,0 +1,23 @@ +{ + "app" : { + "name": "dataframes", + "version": "${HOPEIT_APPS_API_VERSION}" + }, + "engine" : { + "import_modules": ["hopeit.dataframes"] + }, + "settings" : { + "dataset_serialization": { + "protocol": "hopeit.dataframes.serialization.files.DatasetFileStorage", + "location": "apps/examples/dataframes-example/data/{auto}", + "partition_dateformat": "%Y/%m/%d/%H/" + } + }, + "events": { + "setup.dataframes": { + "type": "SETUP", + "plug_mode": "Standalone", + "setting_keys": ["dataset_serialization"] + } + } +} diff --git a/plugins/data/dataframes/setup.py b/plugins/data/dataframes/setup.py new file mode 100644 index 00000000..5cd43046 --- /dev/null +++ b/plugins/data/dataframes/setup.py @@ -0,0 +1,57 @@ +import setuptools + +version = {} +with open("../../../engine/src/hopeit/server/version.py") as fp: + exec(fp.read(), version) + +setuptools.setup( + name="hopeit.dataframes", + version=version["ENGINE_VERSION"], + description="Hopeit Engine Dataframes Toolkit", + license="Apache 2", + long_description=open("README.md").read(), + long_description_content_type="text/markdown", + author="Leo Smerling and Pablo Canto", + author_email="contact@hopeit.com.ar", + url="https://github.com/hopeit-git/hopeit.engine", + classifiers=[ + "License :: OSI Approved :: Apache Software License", + "Intended Audience :: Developers", + "Programming Language :: Python", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Development Status :: 4 - Beta", + "Operating System :: POSIX :: Linux", + "Operating System :: MacOS :: MacOS X", + "Operating System :: Microsoft :: Windows", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Software Development :: Libraries :: Python Modules", + "Framework :: AsyncIO", + ], + project_urls={ + "CI: GitHub Actions": "https://github.com/hopeit-git/hopeit.engine/actions?query=workflow", # noqa + "Docs: RTD": "https://hopeitengine.readthedocs.io/en/latest/", + "GitHub: issues": "https://github.com/hopeit-git/hopeit.engine/issues", + "GitHub: repo": "https://github.com/hopeit-git/hopeit.engine", + }, + package_dir={"": "src"}, + packages=[ + "hopeit.dataframes", + ], + include_package_data=True, + package_data={ + "hopeit.dataframes": ["py.typed"], + }, + python_requires=">=3.8", + install_requires=[ + f"hopeit.engine[fs-storage]=={version['ENGINE_VERSION']}", + "pandas", + "numpy", + ], + extras_require={ + "pyarrow": ["pyarrow"], + }, + entry_points={}, +) diff --git a/plugins/data/dataframes/src/hopeit/dataframes/__init__.py b/plugins/data/dataframes/src/hopeit/dataframes/__init__.py new file mode 100644 index 00000000..ad2ddbe6 --- /dev/null +++ b/plugins/data/dataframes/src/hopeit/dataframes/__init__.py @@ -0,0 +1,109 @@ +""" +hopeit.engine dataframes plugin entry point + +This module exposes the 3 main constructions to be used inside apps: +`@dataframe` dataclass annotation +`@dataframeobject` dataclass annotation +`DataFrames` class to handle manipulation of dataframe/dataframeobjects + +Usage: +``` +from hopeit.dataframes import DataFrames, dataframe, dataframeobject + +@dataframe +@dataclass +class MyDataFrame: + field1: int + field2: str + ... + +@dataframeobject +@dataclass +class MyDataset: + dataset_name: str + example_data: MyDataFrame + + +df = pd.DataFrame(...) # create or load your pandas dataframe + +my_data = DataFrames.from_df(pd.DataFrame(..)) + +return MyDataSet( + dataset_name="example", + example_data=my_data +) +``` +""" + +from typing import Dict, Generic, Iterator, List, Type + +import numpy as np +import pandas as pd +from hopeit.dataframes.dataframe import DataFrameT, dataframe +from hopeit.dataframes.dataframeobject import DataFrameObjectT, dataframeobject +from hopeit.dataobjects import DataObject + +__all__ = ["DataFrames", "dataframe", "dataframeobject"] + + +class DataFrames(Generic[DataFrameT, DataFrameObjectT, DataObject]): + """ + Dataframes manipulation utilities methods + """ + + @staticmethod + async def serialize(obj: DataFrameObjectT) -> DataObject: + """Serialize/saves contents of dataframe fields of a `@dataframeobject` + and converts to a `DataObject` json-compatible with pointers to saved + locations. + + This method can be used to i.e. return `@dataframeobject`s as a JSON response + """ + return await obj._serialize() # type: ignore # pylint: disable=protected-access + + @staticmethod + async def deserialize( + datatype: Type[DataFrameObjectT], dataobject: DataObject + ) -> DataFrameObjectT: + """Deserialize/load contents of serialized dataobject fields of a `@dataframeobject` + loading saved Dataset information for @dataframe fields + """ + return await datatype._deserialize(dataobject) # type: ignore # pylint: disable=protected-access + + @staticmethod + def from_df( + datatype: Type[DataFrameT], df: pd.DataFrame, **series: Dict[str, pd.Series] + ) -> DataFrameT: + """Create a `@dataframe` instance of a particular `datatype` from a pandas DataFrame. + Optionally, add or override series. + """ + return datatype._from_df(df, **series) # type: ignore # pylint: disable=protected-access + + @staticmethod + def from_dataframe( + datatype: Type[DataFrameT], obj: DataFrameT, **series: Dict[str, pd.Series] + ) -> DataFrameT: + """Creates a new `@dataframe` object extracting fields from another `@dataframe`""" + return datatype._from_df(obj._df, **series) # type: ignore # pylint: disable=protected-access + + @staticmethod + def from_dataobjects( + datatype: Type[DataFrameT], dataobjects: Iterator[DataFrameObjectT] + ) -> DataFrameT: + """Converts standard json serializable `@dataobject`s to a single `@dataframe`""" + return datatype._from_dataobjects(dataobjects) # type: ignore # pylint: disable=protected-access + + @staticmethod + def to_dataobjects(obj: DataFrameT) -> List[DataObject]: + """Converts `@dataframe` object to a list of standard `@dataobject`s""" + return obj._to_dataobjects() # type: ignore # pylint: disable=protected-access + + @staticmethod + def from_array(datatype: Type[DataFrameT], array: np.ndarray) -> DataFrameT: + """Creates `@dataframe` object from a numpy array""" + return datatype._from_array(array) # type: ignore # pylint: disable=protected-access + + @staticmethod + def df(obj: DataFrameT) -> pd.DataFrame: + """Provides acces to the internal pandas dataframe of a `@dataframe` object""" + return obj._df # type: ignore # pylint: disable=protected-access diff --git a/plugins/data/dataframes/src/hopeit/dataframes/dataframe.py b/plugins/data/dataframes/src/hopeit/dataframes/dataframe.py new file mode 100644 index 00000000..1b96ad09 --- /dev/null +++ b/plugins/data/dataframes/src/hopeit/dataframes/dataframe.py @@ -0,0 +1,242 @@ +""" +DataFrames type abstractions. + +Example: + + from hopeit.dataobjects import dataclass # equivalent to `dataclasses.dataclass` + from hopeit.dataframes import dataframe + + @dataframe + @dataclass + class MyObject: + name: str + number: int +""" + +from dataclasses import Field, asdict, dataclass, fields, make_dataclass +from datetime import date, datetime, timezone +from typing import Any, Callable, Dict, Generic, Iterator, List, Optional, Type, TypeVar + +import numpy as np +import pandas as pd +from dataclasses_jsonschema import JsonSchemaMixin +from hopeit.dataobjects import ( + DataObject, + StreamEventMixin, + StreamEventParams, + dataobject, +) + +DataFrameT = TypeVar("DataFrameT") + + +@dataclass +class DataFrameMetadata(Generic[DataObject]): + columns: List[str] + fields: Dict[str, Field] + serialized_type: Type[DataObject] + + +@dataclass +class DataFrameParams: + """ + Helper class used to access attributes in @dataframe + decorated objects, based on dot notation expressions + """ + + datatypes: Optional[str] + + @staticmethod + def extract_attr(obj, expr): + value = obj + for attr_name in expr.split("."): + if value: + value = getattr(value, attr_name) + return value + + +class DataFrameMixin(Generic[DataFrameT, DataObject]): + """ + MixIn class to add functionality for DataFrames dataobjects + + Do not use this class directly, instead use `@dataframe` class decorator. + """ + + DATATYPE_MAPPING = { + int: lambda x: x.astype(np.int64), + float: lambda x: x.astype(np.float64), + str: lambda x: x.astype(object), + date: pd.to_datetime, + datetime: pd.to_datetime, + } + + def __init__(self) -> None: + # Fields added here only to allow mypy to provide correct type hints + self.__data_object__: Dict[str, Any] = {} + self.__dataframe__: DataFrameMetadata = None # type: ignore + self.__df = pd.DataFrame() + raise NotImplementedError # must use @dataframe decorator # pragma: no cover + + @staticmethod + def __init_from_series__( + self, **series: pd.Series + ): # pylint: disable=bad-staticmethod-argument + if self.__data_object__["validate"]: + series = self._coerce_datatypes(series) + df = pd.DataFrame(series) + setattr(self, "__df", df[self.__dataframe__.columns]) + + @classmethod + def _from_df(cls, df: pd.DataFrame, **series: Any) -> DataFrameT: + df = df if cls.__data_object__["unsafe"] else pd.DataFrame(df) + obj = cls(**{**df._series, **series}) # pylint: disable=protected-access + return obj # type: ignore + + @classmethod + def _from_array(cls, array: np.ndarray) -> DataFrameT: + return cls._from_df(pd.DataFrame(array, columns=cls.__dataframe__.columns)) + + @classmethod + def _from_dataobjects(cls, items: Iterator[DataObject]) -> DataFrameT: + return cls._from_df(pd.DataFrame(asdict(item) for item in items)) # type: ignore + + @classmethod + def _from_df_unsafe(cls, df: pd.DataFrame, **series: pd.Series) -> DataFrameT: + for col, values in series.items(): + df[col] = values + obj = cls(**df._series) # pylint: disable=protected-access + return obj # type: ignore + + @property + def _df(self) -> pd.DataFrame: + return getattr(self, "__df") + + def __getitem__(self, key) -> "DataFrameT": + return self._from_df(self.__df[key]) + + def _to_dataobjects(self) -> List[DataObject]: + return [ + self.__dataframe__.serialized_type(**fields) + for fields in self.__df.to_dict(orient="records") + ] + + def to_json(self, *args, **kwargs) -> str: + raise NotImplementedError( + "Dataframe must be used inside `@dataobject(unsafe=True)` to be used as an output" + ) + + def to_dict(self, *args, **kwargs) -> Dict[str, Any]: + raise NotImplementedError( + "Dataframe must be used inside `@dataobject(unsafe=True)` to be used as an output" + ) + + @classmethod + def from_json(cls, *args, **kwargs) -> DataObject: + return cls.__dataframe__.serialized_type.from_dict(*args, **kwargs) + + @classmethod + def from_dict( + cls, + *args, + **kwargs, + ) -> DataObject: + return cls.__dataframe__.serialized_type.from_dict(*args, **kwargs) + + @classmethod + def json_schema(cls, *args, **kwargs) -> Dict[str, Any]: + if cls.__data_object__["schema"]: + schema = cls.__dataframe__.serialized_type.json_schema(*args, **kwargs) + schema[cls.__name__] = schema[cls.__dataframe__.serialized_type.__name__] + return schema + return {} + + def event_id(self, *args, **kwargs) -> str: + return "" + + def event_ts(self, *args, **kwargs) -> datetime: + return datetime.now(tz=timezone.utc) + + def __getattribute__(self, name: str) -> Any: + if name[:2] == "__": + return object.__getattribute__(self, name) + if name in self.__dataframe__.columns: + return self.__df[name] + if name[:15] == "_DataFrameMixin": + return object.__getattribute__(self, name[15:]) + return object.__getattribute__(self, name) + + def __setattr__(self, name: str, value: Any) -> None: + if name in self.__dataframe__.columns: + self.__df[name] = value + else: + object.__setattr__(self, name, value) + + def _coerce_datatypes(self, series: Dict[str, pd.Series]) -> Dict[str, pd.Series]: + return { + name: self.DATATYPE_MAPPING[field.type](series[name]) # type: ignore + for name, field in self.__dataframe__.fields.items() + } + + +def dataframe( + decorated_class=None, + unsafe: bool = False, + validate: bool = True, + schema: bool = True, +) -> Callable[[Type], Type[DataFrameMixin]]: + """ + Decorator for dataclasses intended to be used as dataframes. + """ + + def add_dataframe_mixin(cls) -> Type[DataFrameMixin]: + if hasattr(cls, "__annotations__") and hasattr(cls, "__dataclass_fields__"): + amended_class = type( + cls.__name__, + (DataFrameMixin, JsonSchemaMixin) + cls.__mro__, + dict(cls.__dict__), + ) + setattr(amended_class, "__init__", DataFrameMixin.__init_from_series__) + return amended_class + return cls + + def add_dataframe_metadata(cls): + serialized_fiels = [(field.name, field.type) for field in fields(cls)] + serialized_type = make_dataclass(cls.__name__ + "_", serialized_fiels) + serialized_type = dataobject(serialized_type, unsafe=True) + + setattr( + cls, + "__dataframe__", + DataFrameMetadata( + columns=[field.name for field in fields(cls)], + fields={field.name: field for field in fields(cls)}, + serialized_type=serialized_type, + ), + ) + + def add_dataobject_annotations(cls, unsafe: bool, validate: bool, schema: bool): + setattr( + cls, + "__data_object__", + {"unsafe": unsafe, "validate": validate, "schema": schema}, + ) + setattr(cls, "__stream_event__", StreamEventParams(None, None)) + setattr(cls, "event_id", StreamEventMixin.event_id) + setattr(cls, "event_ts", StreamEventMixin.event_ts) + + def set_fields_optional(cls): + for field in fields(cls): + field.default = None + + def wrap(cls) -> Type[DataFrameMixin]: + if hasattr(cls, "__dataframe__"): + return cls + amended_class = add_dataframe_mixin(cls) + add_dataframe_metadata(amended_class) + add_dataobject_annotations(amended_class, unsafe, validate, schema) + set_fields_optional(amended_class) + return amended_class + + if decorated_class is None: + return wrap + return wrap(decorated_class) # type: ignore diff --git a/plugins/data/dataframes/src/hopeit/dataframes/dataframeobject.py b/plugins/data/dataframes/src/hopeit/dataframes/dataframeobject.py new file mode 100644 index 00000000..361a1b9a --- /dev/null +++ b/plugins/data/dataframes/src/hopeit/dataframes/dataframeobject.py @@ -0,0 +1,184 @@ +""" +`@dataframeobject` annonation mixin to serialize a group of `@dataframe`s. + +Datasets behaves as DataObject so they can be used as payload +for endpoints and streams. +""" + +from dataclasses import Field, dataclass, fields, make_dataclass +from typing import ( + Any, + Callable, + ClassVar, + Dict, + Generic, + Optional, + Type, + TypeVar, + Union, + get_args, + get_origin, +) + +from hopeit.dataframes.serialization.dataset import Dataset +from hopeit.dataobjects import ( + DataObject, + StreamEventMixin, + StreamEventParams, + dataobject, +) + +DataFrameObjectT = TypeVar("DataFrameObjectT") +NoneType = type(None) + + +@dataclass +class DataFrameObjectMetadata(Generic[DataObject]): + serialized_type: Type[DataObject] + + +class DataFrameObjectMixin(Generic[DataFrameObjectT]): + """ + MixIn class to add functionality for `@dataframeobject`s + + Do not use this class directly, instead use `@dataframeobject` class decorator. + """ + + __storage: ClassVar[Any] = None # pylint: disable=invalid-name + + def __init__(self) -> None: + self.__dataframeobject__: DataFrameObjectMetadata = None # type: ignore + raise NotImplementedError( + "DataFrameObjectMixin() should not be called directly. Use `@dataframeobject` annotation" + ) + + async def _serialize(self) -> Optional[DataObject]: + """Saves internal `@dataframe`s using configured serialization protocol + and returns json-serialiable dataobject + """ + datasets = {} + for field in fields(self): # type: ignore + if _is_dataframe_field(field): + dataframe = getattr(self, field.name) + dataset = ( + None if dataframe is None else await self.__storage.save(dataframe) + ) + datasets[field.name] = dataset + else: + datasets[field.name] = getattr(self, field.name) + return self.__dataframeobject__.serialized_type(**datasets) + + @classmethod + async def _deserialize( + cls, serialized: DataObject + ) -> "DataFrameObjectMixin[DataFrameObjectT]": + """From a serialized datframeobject, load inner `@dataframe` objects + and returns a `@dataframeobject` instance""" + dataframes = {} + for field in fields(cls): # type: ignore + if _is_dataframe_field(field): + dataset = getattr(serialized, field.name) + dataframe = ( + None if dataset is None else await cls.__storage.load(dataset) + ) + dataframes[field.name] = dataframe + else: + dataframes[field.name] = getattr(serialized, field.name) + return cls(**dataframes) + + @classmethod + def json_schema(cls, *args, **kwargs) -> Dict[str, Any]: + schema = cls.__dataframeobject__.serialized_type.json_schema(*args, **kwargs) + schema[cls.__name__] = schema[cls.__dataframeobject__.serialized_type.__name__] + return schema + + def to_json(self, *args, **kwargs) -> Dict[str, Any]: + raise RuntimeError( + f"`{type(self).__name__}` `@dataframeobject` cannot be converted to json directly. " + "i.e. use `return await DataFrames.serialize(obj)` to return it as a reponse." + ) + + +def _is_dataframe_field(field: Field) -> bool: + return any( + hasattr(field_type, "__dataframe__") + for field_type in [field.type, *get_args(field.type)] + ) + + +def _serialized_field_type(field: Field) -> Type[Any]: + """Computes the `@dataobject` datatype used as a result + of serialized `@dataframeobject` + """ + if hasattr(field.type, "__dataframe__"): + return Dataset + if get_origin(field.type) is Union: + args = get_args(field.type) + if ( + len(args) == 2 + and any(hasattr(field_type, "__dataframe__") for field_type in args) + and any(field_type is NoneType for field_type in args) + ): + return Optional[Dataset] # type: ignore + if _is_dataframe_field(field): + raise TypeError( + f"field {field.name}: only `DataFrameT` or `Optional[DataFrameT]` are supported" + ) + return field.type + + +def dataframeobject( + decorated_class=None, +) -> Callable[[Type], Type[DataFrameObjectMixin]]: + """ + Decorator for dataclasses intended to be used as dataframes. + """ + + def add_dataframe_mixin(cls) -> Type[DataFrameObjectMixin]: + if hasattr(cls, "__annotations__") and hasattr(cls, "__dataclass_fields__"): + amended_class = type( + cls.__name__, + (DataFrameObjectMixin,) + cls.__mro__, + dict(cls.__dict__), + ) + return amended_class + return cls + + def add_dataframeobject_metadata(cls): + serialized_fiels = [ + (field.name, _serialized_field_type(field)) for field in fields(cls) + ] + serialized_type = make_dataclass(cls.__name__ + "_", serialized_fiels) + serialized_type = dataobject(serialized_type, unsafe=True) + + setattr( + cls, + "__dataframeobject__", + DataFrameObjectMetadata( + serialized_type=serialized_type, + ), + ) + + def add_dataobject_annotations(cls, unsafe: bool, validate: bool, schema: bool): + setattr( + cls, + "__data_object__", + {"unsafe": unsafe, "validate": validate, "schema": schema}, + ) + setattr(cls, "__stream_event__", StreamEventParams(None, None)) + setattr(cls, "event_id", StreamEventMixin.event_id) + setattr(cls, "event_ts", StreamEventMixin.event_ts) + + def wrap(cls) -> Type[DataFrameObjectMixin]: + if hasattr(cls, "__dataframeobject__"): + return cls + amended_class = add_dataframe_mixin(cls) + add_dataframeobject_metadata(amended_class) + add_dataobject_annotations( + amended_class, unsafe=False, validate=True, schema=True + ) + return amended_class + + if decorated_class is None: + return wrap + return wrap(decorated_class) # type: ignore diff --git a/plugins/data/dataframes/src/hopeit/dataframes/py.typed b/plugins/data/dataframes/src/hopeit/dataframes/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/plugins/data/dataframes/src/hopeit/dataframes/serialization/__init__.py b/plugins/data/dataframes/src/hopeit/dataframes/serialization/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/plugins/data/dataframes/src/hopeit/dataframes/serialization/dataset.py b/plugins/data/dataframes/src/hopeit/dataframes/serialization/dataset.py new file mode 100644 index 00000000..759766da --- /dev/null +++ b/plugins/data/dataframes/src/hopeit/dataframes/serialization/dataset.py @@ -0,0 +1,28 @@ +"""Dataset objects definition, used as a result of serialized dataframes +""" + +from importlib import import_module +from typing import Type, TypeVar + +from hopeit.dataobjects import dataclass, dataobject + +DataFrameT = TypeVar("DataFrameT") + + +@dataobject +@dataclass +class Dataset: + protocol: str + partition_key: str + key: str + datatype: str + + +def find_protocol_impl(qual_type_name: str) -> Type: + mod_name, type_name = ( + ".".join(qual_type_name.split(".")[:-1]), + qual_type_name.split(".")[-1], + ) + module = import_module(mod_name) + datatype = getattr(module, type_name) + return datatype diff --git a/plugins/data/dataframes/src/hopeit/dataframes/serialization/files.py b/plugins/data/dataframes/src/hopeit/dataframes/serialization/files.py new file mode 100644 index 00000000..d7c75e28 --- /dev/null +++ b/plugins/data/dataframes/src/hopeit/dataframes/serialization/files.py @@ -0,0 +1,116 @@ +"""Support for `@dataframes` serialization to files +""" + +import io +from importlib import import_module +from typing import Callable, Generic, Optional, Type, TypeVar, Union +from uuid import uuid4 + +import pandas as pd + +try: + import pyarrow # type: ignore # noqa # pylint: disable=unused-import +except ImportError as e: + raise ImportError( + "`pyarrow` needs to be installed to use `DatasetFileStorage`", + "Run `pip install hopeit.dataframes[pyarrow]`", + ) from e + +from hopeit.dataframes.dataframe import DataFrameMixin +from hopeit.dataframes.serialization.dataset import Dataset +from hopeit.dataobjects import EventPayloadType +from hopeit.fs_storage import FileStorage + +DataFrameT = TypeVar("DataFrameT", bound=DataFrameMixin) + + +class DatasetFileStorage(Generic[DataFrameT]): + """Support to store dataframes as files, + using pandas parquet format support in combination + with `hopeit.engine` file storage plugins + """ + + def __init__(self, *, location: str, partition_dateformat: Optional[str], **kwargs): + self.storage: FileStorage = FileStorage( + path=location, partition_dateformat=partition_dateformat + ) + + async def save(self, dataframe: DataFrameT) -> Dataset: + """Saves @dataframe annotated object as parquet to file system + and returns Dataset metadata to be used for retrieval + """ + datatype = type(dataframe) + key = f"{datatype.__qualname__.lower()}_{uuid4()}.parquet" + data = io.BytesIO( + dataframe._df.to_parquet( # pylint: disable=protected-access + engine="pyarrow" + ) + ) + location = await self.storage.store_file(file_name=key, value=data) + partition_key = self.storage.partition_key(location) + + return Dataset( + protocol=f"{__name__}.{type(self).__name__}", + partition_key=partition_key, + key=key, + datatype=f"{datatype.__module__}.{datatype.__qualname__}", + ) + + async def load(self, dataset: Dataset) -> EventPayloadType: + """Loads @dataframe annotated object using Dataset metadata""" + datatype: Type[DataFrameT] = find_dataframe_type(dataset.datatype) + data = await self.storage.get_file( + dataset.key, partition_key=dataset.partition_key + ) + if data is None: + raise FileNotFoundError(dataset.key) + df = pd.read_parquet(io.BytesIO(data), engine="pyarrow") + return datatype._from_df(df) # pylint: disable=protected-access + + async def ser_wrapper( + self, + base_serialization: Callable, + data: Union[EventPayloadType, DataFrameT], + level: int, + ) -> bytes: + """Serialization wrapper that plugins-in into hopeit.engine + serialization when dataframes plugin is initialized + """ + if hasattr(data, "__dataframeobject__"): + data = await data._serialize() # type: ignore # pylint: disable=protected-access + if hasattr(data, "__dataframe__"): + data = await self.save(data) # type: ignore + return await base_serialization(data, level) + + async def deser_wrapper( + self, + base_deserialization: Callable, + data: bytes, + datatype: Union[Type[EventPayloadType], Type[DataFrameT]], + ) -> Union[EventPayloadType, DataFrameT]: + """Deerialization wrapper that plugins-in into hopeit.engine + deserialization when dataframes plugin is initialized + """ + if hasattr(datatype, "__dataframeobject__"): + dataset = await base_deserialization( + data, datatype.__dataframeobject__.serialized_type # type: ignore + ) + return await datatype._deserialize(dataset) # type: ignore # pylint: disable=protected-access + if hasattr(datatype, "__dataframe__"): + dataset = await base_deserialization(data, Dataset) + return await self.load(dataset) + return await base_deserialization(data, datatype) + + +def find_dataframe_type(qual_type_name: str) -> Type[DataFrameT]: + """Returns dataframe class based on type name used during serialization""" + mod_name, type_name = ( + ".".join(qual_type_name.split(".")[:-1]), + qual_type_name.split(".")[-1], + ) + module = import_module(mod_name) + datatype = getattr(module, type_name) + assert hasattr( + datatype, "__dataframe__" + ), f"Type {qual_type_name} must be annotated with `@dataframe`." + return datatype diff --git a/plugins/data/dataframes/src/hopeit/dataframes/serialization/settings.py b/plugins/data/dataframes/src/hopeit/dataframes/serialization/settings.py new file mode 100644 index 00000000..fb359776 --- /dev/null +++ b/plugins/data/dataframes/src/hopeit/dataframes/serialization/settings.py @@ -0,0 +1,14 @@ +"""Support for plugin configuration +""" + +from typing import Optional + +from hopeit.dataobjects import dataclass, dataobject + + +@dataobject +@dataclass +class DatasetSerialization: + protocol: str + location: str + partition_dateformat: Optional[str] = None diff --git a/plugins/data/dataframes/src/hopeit/dataframes/setup/__init__.py b/plugins/data/dataframes/src/hopeit/dataframes/setup/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/plugins/data/dataframes/src/hopeit/dataframes/setup/dataframes.py b/plugins/data/dataframes/src/hopeit/dataframes/setup/dataframes.py new file mode 100644 index 00000000..cb08010e --- /dev/null +++ b/plugins/data/dataframes/src/hopeit/dataframes/setup/dataframes.py @@ -0,0 +1,52 @@ +"""hopeit.engine dataframes plugin SETUP event. + +This event executes when engine starts with dataframes plugin configuration file loaded, +and ensures that the engine will support serialization of `@dataframe` and `@dataframeobject` +types +""" + +from functools import partial + +from hopeit.app.context import EventContext +from hopeit.app.logger import app_logger +from hopeit.dataframes.dataframeobject import DataFrameObjectMixin +from hopeit.dataframes.serialization.dataset import find_protocol_impl +from hopeit.dataframes.serialization.settings import DatasetSerialization +from hopeit.server import serialization + +logger = app_logger() + +__steps__ = ["register_serialization"] + + +def register_serialization(payload: None, context: EventContext) -> None: + """Setups serizaltion wrappers in hopeit.engine based on + `DataSerialization` settings configured in plugin configuration file + """ + logger.info(context, "Registering serialization methods...") + + settings: DatasetSerialization = context.settings( + key="dataset_serialization", datatype=DatasetSerialization + ) + impl = find_protocol_impl(settings.protocol) + + storage = impl( + protocol=settings.protocol, + location=settings.location, + partition_dateformat=settings.partition_dateformat, + ) + setattr(DataFrameObjectMixin, "_DataFrameObjectMixin__storage", storage) + + serdeser_wrappers = {} + for ( + serdeser, + methods, + ) in serialization._SERDESER.items(): # pylint: disable=protected-access + serdeser_wrappers[serdeser] = ( + partial(storage.ser_wrapper, methods[0]), + methods[1], + partial(storage.deser_wrapper, methods[2]), + ) + + for serdeser, methods in serdeser_wrappers.items(): + serialization._SERDESER[serdeser] = methods # pylint: disable=protected-access diff --git a/plugins/data/dataframes/test/integration/conftest.py b/plugins/data/dataframes/test/integration/conftest.py new file mode 100644 index 00000000..b400c0b2 --- /dev/null +++ b/plugins/data/dataframes/test/integration/conftest.py @@ -0,0 +1,103 @@ +from dataclasses import dataclass +from datetime import datetime, timezone + +import pandas as pd +import pytest +from hopeit.app.config import ( + AppConfig, + AppDescriptor, + AppEngineConfig, + EventDescriptor, + EventType, +) +from hopeit.app.context import EventContext +from hopeit.dataframes import dataframe, dataframeobject +from hopeit.testing.apps import create_test_context, execute_event, server_config + + +@dataframe +@dataclass +class MyTestData: + number: int + name: str + timestamp: datetime + + +@dataframe +@dataclass +class MyNumericalData: + number: int + value: float + + +@dataframe +@dataclass +class MyPartialTestData: + number: int + name: str + + +@dataframeobject +@dataclass +class MyTestDataObject: + name: str + data: MyTestData + + +@pytest.fixture +def one_element_pandas_df() -> pd.DataFrame: + return pd.DataFrame( + [ + { + "number": 1, + "name": "test1", + "timestamp": datetime.now(tz=timezone.utc), + } + ] + ) + + +@pytest.fixture +def sample_pandas_df() -> pd.DataFrame: + return pd.DataFrame( + [ + { + "number": n, + "name": f"test{n}", + "timestamp": datetime.now(tz=timezone.utc), + } + for n in range(100) + ] + ) + + +@pytest.fixture +def plugin_config() -> EventContext: + return AppConfig( + app=AppDescriptor(name="hopeit.dataframes.test", version="test"), + engine=AppEngineConfig( + import_modules=["hopeit.dataframes"], + ), + settings={ + "dataset_serialization": { + "protocol": "hopeit.dataframes.serialization.files.DatasetFileStorage", + "location": "/tmp/hopeit/dataframes/test", + "partition_dateformat": "%Y/%m/%d/%H/", + } + }, + events={ + "setup.dataframes": EventDescriptor( + type=EventType.SETUP, setting_keys=["dataset_serialization"] + ) + }, + server=server_config(), + ).setup() + + +async def setup_serialization_context(plugin_config) -> EventContext: + context = create_test_context( + app_config=plugin_config, + event_name="setup.dataframes", + ) + await execute_event(plugin_config, "setup.dataframes", payload=None) + return context diff --git a/plugins/data/dataframes/test/integration/test_dataframes_api.py b/plugins/data/dataframes/test/integration/test_dataframes_api.py new file mode 100644 index 00000000..d8ae1902 --- /dev/null +++ b/plugins/data/dataframes/test/integration/test_dataframes_api.py @@ -0,0 +1,63 @@ +import numpy as np +import pandas as pd + +from conftest import ( + MyNumericalData, + MyPartialTestData, + MyTestData, + MyTestDataObject, + setup_serialization_context, +) +from hopeit.app.config import AppConfig +from hopeit.dataframes import DataFrames +from pandas.testing import assert_frame_equal, assert_series_equal + + +def test_dataframes_from_df(sample_pandas_df: pd.DataFrame): + initial_data = DataFrames.from_df(MyTestData, sample_pandas_df) + assert len(DataFrames.df(initial_data)) == 100 + + +def test_dataframes_from_dataframe(sample_pandas_df: pd.DataFrame): + initial_data = DataFrames.from_df(MyTestData, sample_pandas_df) + partial_data = DataFrames.from_dataframe(MyPartialTestData, initial_data) + assert len(DataFrames.df(partial_data)) == 100 + assert_series_equal(partial_data.number, initial_data.number) # type: ignore + assert_series_equal(partial_data.name, initial_data.name) # type: ignore + + +def test_dataframes_from_array(): + array = np.array([(n, 1.1 * n) for n in range(100)]) + numerical_data = DataFrames.from_array(MyNumericalData, array) + assert_series_equal( + numerical_data.number, pd.Series(array.T[0], name="number").astype(int) + ) + assert_series_equal(numerical_data.value, pd.Series(array.T[1], name="value")) + + +def test_dataobject_dataframes_conversion(one_element_pandas_df): + data = DataFrames.from_df(MyTestData, one_element_pandas_df) + objects = DataFrames.to_dataobjects(data) + assert objects == [ + MyTestData.__dataframe__.serialized_type( + number=1, name="test1", timestamp=objects[0].timestamp + ) + ] + back_to_dataframe = DataFrames.from_dataobjects(MyTestData, objects) + assert_frame_equal(DataFrames.df(data), DataFrames.df(back_to_dataframe)) + + +async def test_dataframe_object_serialization( + sample_pandas_df: pd.DataFrame, plugin_config: AppConfig +): + await setup_serialization_context(plugin_config) + + initial_data = DataFrames.from_df(MyTestData, sample_pandas_df) + dataobject = MyTestDataObject( + name="test", + data=initial_data, + ) + saved_obj = await DataFrames.serialize(dataobject) # type: ignore + loaded_obj = await DataFrames.deserialize(MyTestDataObject, saved_obj) + + assert_frame_equal(DataFrames.df(dataobject.data), DataFrames.df(loaded_obj.data)) diff --git a/plugins/data/dataframes/test/integration/test_dataframes_imports.py b/plugins/data/dataframes/test/integration/test_dataframes_imports.py new file mode 100644 index 00000000..d3168050 --- /dev/null +++ b/plugins/data/dataframes/test/integration/test_dataframes_imports.py @@ -0,0 +1,22 @@ +import pandas as pd +from conftest import MyTestData, MyTestDataObject +from hopeit.dataframes import DataFrames, dataframe, dataframeobject +from hopeit.dataframes.dataframe import DataFrameMixin +from hopeit.dataframes.dataframeobject import DataFrameObjectMixin + + +def test_dataframes_imports(): + assert DataFrames is not None + assert dataframe is not None + assert dataframeobject is not None + + +def test_dataframe_object_construction(one_element_pandas_df: pd.DataFrame): + test_data = DataFrames.from_df(MyTestData, one_element_pandas_df) + assert isinstance(test_data, DataFrameMixin) + + test_dataobject = MyTestDataObject( + name="test", + data=test_data, + ) + assert isinstance(test_dataobject, DataFrameObjectMixin) diff --git a/plugins/ops/apps-visualizer/api/openapi.json b/plugins/ops/apps-visualizer/api/openapi.json index 75bc6f8f..9c777e75 100644 --- a/plugins/ops/apps-visualizer/api/openapi.json +++ b/plugins/ops/apps-visualizer/api/openapi.json @@ -1,12 +1,12 @@ { "openapi": "3.0.3", "info": { - "version": "0.23", + "version": "0.24", "title": "Simple Example", "description": "Simple Example" }, "paths": { - "/api/config-manager/0x23/runtime-apps-config": { + "/api/config-manager/0x24/runtime-apps-config": { "get": { "summary": "Config Manager: Runtime Apps Config", "description": "Returns the runtime config for the Apps running on this server", @@ -62,11 +62,11 @@ } }, "tags": [ - "config_manager.0x23" + "config_manager.0x24" ] } }, - "/api/config-manager/0x23/cluster-apps-config": { + "/api/config-manager/0x24/cluster-apps-config": { "get": { "summary": "Config Manager: Cluster Apps Config", "description": "Handle remote access to runtime configuration for a group of hosts", @@ -122,7 +122,7 @@ } }, "tags": [ - "config_manager.0x23" + "config_manager.0x24" ] } }, @@ -209,11 +209,11 @@ } }, "tags": [ - "apps_visualizer.0x23" + "apps_visualizer.0x24" ] } }, - "/api/apps-visualizer/0x23/apps/events-graph": { + "/api/apps-visualizer/0x24/apps/events-graph": { "get": { "summary": "App Visualizer: Events Graph Data", "description": "App Visualizer: Events Graph Data", @@ -287,11 +287,11 @@ } }, "tags": [ - "apps_visualizer.0x23" + "apps_visualizer.0x24" ] } }, - "/api/apps-visualizer/0x23/event-stats/live": { + "/api/apps-visualizer/0x24/event-stats/live": { "get": { "summary": "App Visualizer: Live Stats", "description": "App Visualizer: Live Stats", @@ -365,7 +365,7 @@ } }, "tags": [ - "apps_visualizer.0x23" + "apps_visualizer.0x24" ] } } @@ -834,7 +834,7 @@ }, "engine_version": { "type": "string", - "default": "0.23.0" + "default": "0.24.0" } }, "x-module-name": "hopeit.server.config", diff --git a/plugins/storage/fs/test/unit/test_fs.py b/plugins/storage/fs/test/unit/test_fs.py index 794e0f7d..461978ac 100644 --- a/plugins/storage/fs/test/unit/test_fs.py +++ b/plugins/storage/fs/test/unit/test_fs.py @@ -274,7 +274,6 @@ async def test_list_objects(monkeypatch): ] files = await fs.list_files() - print(files) assert files == [ ItemLocator("1.json", None), ItemLocator("2.json", None), diff --git a/plugins/streams/redis/src/hopeit/redis_streams/__init__.py b/plugins/streams/redis/src/hopeit/redis_streams/__init__.py index d85d6e70..72486c23 100644 --- a/plugins/streams/redis/src/hopeit/redis_streams/__init__.py +++ b/plugins/streams/redis/src/hopeit/redis_streams/__init__.py @@ -93,7 +93,7 @@ async def write_stream( :return: number of successful written messages """ try: - event_fields = self._encode_message( + event_fields = await self._encode_message( payload, queue, track_ids, auth_info, compression, serialization ) ok = await self._write_pool.xadd( @@ -196,7 +196,7 @@ async def read_stream( stream_events.append(TypeError(err_msg)) else: stream_events.append( - self._decode_message( + await self._decode_message( stream_name, msg, datatype, @@ -235,7 +235,7 @@ async def ack_read_stream( except (OSError, RedisError, RedisConnectionError) as e: # pragma: no cover raise StreamOSError(e) from e - def _encode_message( + async def _encode_message( self, payload: EventPayload, queue: str, @@ -264,7 +264,7 @@ def _encode_message( "auth_info": base64.b64encode(json.dumps(auth_info).encode()), "ser": serialization.value, "comp": compression.value, - "payload": serialize(payload, serialization, compression), + "payload": await serialize(payload, serialization, compression), "queue": queue.encode(), } event_ts = payload.event_ts() # type: ignore @@ -274,7 +274,7 @@ def _encode_message( event_fields["event_ts"] = event_ts return event_fields - def _decode_message( + async def _decode_message( self, stream_name: str, msg: List[Union[bytes, Dict[bytes, bytes]]], @@ -291,7 +291,7 @@ def _decode_message( serialization = Serialization(msg[1][b"ser"].decode()) return StreamEvent( msg_internal_id=msg[0], - payload=deserialize( + payload=await deserialize( msg[1][b"payload"], serialization, compression, datatype ), # type: ignore queue=msg[1]