Skip to content

Commit

Permalink
Merge pull request #43 from betodealmeida/schema_generation
Browse files Browse the repository at this point in the history
Schema generation
  • Loading branch information
betodealmeida committed Apr 23, 2023
2 parents e836d08 + e9dbea6 commit 7d5d4c6
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 24 deletions.
4 changes: 4 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ crontab==1.0.1
durations==0.3.3
# via senor-octopus
marshmallow==3.19.0
# via
# marshmallow-jsonschema
# senor-octopus
marshmallow-jsonschema==0.13.0
# via senor-octopus
packaging==23.1
# via marshmallow
Expand Down
4 changes: 4 additions & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ lazy-object-proxy==1.9.0
markupsafe==2.1.2
# via jinja2
marshmallow==3.19.0
# via
# marshmallow-jsonschema
# senor-octopus
marshmallow-jsonschema==0.13.0
# via senor-octopus
mccabe==0.7.0
# via pylint
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ install_requires =
crontab>=0.22.9
durations>=0.3.3
marshmallow>=3.19.0
marshmallow-jsonschema>=0.13.0
typing_extensions>=3.7.4.3

[options.packages.find]
Expand Down
37 changes: 19 additions & 18 deletions src/senor_octopus/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from pkg_resources import iter_entry_points

from senor_octopus.exceptions import InvalidConfigurationException
from senor_octopus.lib import build_marshmallow_schema
from senor_octopus.types import (
Event,
FilterCallable,
Expand Down Expand Up @@ -68,12 +69,21 @@ def build(
f"Invalid plugin name `{plugin_name}`",
) from ex

if not hasattr(plugin, "configuration_schema"):
plugin.configuration_schema = build_marshmallow_schema(plugin)

kwargs = section.copy()
flow = kwargs.pop("flow").strip()

if flow.startswith("->"):
plugin = cast(SourceCallable, plugin)
return Source(node_name, plugin, **kwargs)

if flow.endswith("->"):
plugin = cast(SinkCallable, plugin)
return Sink(node_name, plugin, **kwargs)

plugin = cast(FilterCallable, plugin)
return Filter(node_name, plugin, **kwargs)


Expand All @@ -90,16 +100,13 @@ def __init__(
node_name: str,
plugin: SourceCallable,
schedule: Optional[str] = None,
**extra_kwargs: Any,
**kwargs: Any,
):
super().__init__(node_name)

self.plugin = plugin
self.schedule = CronTab(schedule) if schedule else None
self.kwargs = (
self.plugin.configuration_schema.load(extra_kwargs)
if hasattr(self.plugin, "configuration_schema")
else extra_kwargs
)
self.kwargs = plugin.configuration_schema.load(kwargs)

async def run(self) -> None:
"""
Expand Down Expand Up @@ -128,14 +135,11 @@ class Filter(Node):
to their children, modified or filtered.
"""

def __init__(self, node_name: str, plugin: FilterCallable, **extra_kwargs: Any):
def __init__(self, node_name: str, plugin: FilterCallable, **kwargs: Any):
super().__init__(node_name)

self.plugin = plugin
self.kwargs = (
self.plugin.configuration_schema.load(extra_kwargs)
if hasattr(self.plugin, "configuration_schema")
else extra_kwargs
)
self.kwargs = plugin.configuration_schema.load(kwargs)

async def run(self, stream: Stream) -> None:
"""
Expand Down Expand Up @@ -170,17 +174,14 @@ def __init__(
plugin: SinkCallable,
throttle: Optional[str] = None,
batch: Optional[str] = None,
**extra_kwargs: Any,
**kwargs: Any,
):
super().__init__(node_name)

self.plugin = plugin
self.throttle = Duration(throttle).to_seconds() if throttle else None
self.batch = Duration(batch).to_seconds() if batch else None
self.kwargs = (
self.plugin.configuration_schema.load(extra_kwargs)
if hasattr(self.plugin, "configuration_schema")
else extra_kwargs
)
self.kwargs = plugin.configuration_schema.load(kwargs)

self.last_run: Optional[float] = None
self.queue: asyncio.Queue = asyncio.Queue()
Expand Down
54 changes: 49 additions & 5 deletions src/senor_octopus/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,32 @@
from __future__ import annotations

import asyncio
import inspect
from asyncio.futures import Future
from io import StringIO
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
)

import asciidag.graph
import asciidag.node
from asyncstdlib.builtins import anext as anext_
from marshmallow import Schema
from marshmallow import Schema, fields

from senor_octopus.graph import Node, Source
from senor_octopus.types import Event, Stream

if TYPE_CHECKING: # pragma: no cover
from senor_octopus.graph import Node, Source


def flatten(
obj: Dict[str, Any],
Expand All @@ -39,7 +53,7 @@ def flatten(
return dict(items)


def render_dag(dag: Set[Source], **kwargs: Any) -> str:
def render_dag(dag: Set["Source"], **kwargs: Any) -> str:
"""
Render a DAG as an ASCII graph.
"""
Expand All @@ -57,7 +71,7 @@ def render_dag(dag: Set[Source], **kwargs: Any) -> str:


def build_asciidag(
node: Node,
node: "Node",
asciidag_nodes: Dict[str, asciidag.node.Node],
) -> asciidag.node.Node:
"""
Expand Down Expand Up @@ -123,3 +137,33 @@ def decorator(plugin: Plugin) -> Plugin:
return plugin

return decorator


def build_marshmallow_schema(function: Plugin) -> Schema:
"""
Build a Marshmallow schema from a function signature.
"""
type_map = {
str: fields.String,
int: fields.Integer,
}

signature = inspect.signature(function)
attributes = {}
for name, parameter in signature.parameters.items():
if name == "stream":
continue

if parameter.annotation not in type_map:
raise TypeError(
f"Unsupported type {parameter.annotation} for parameter {name}",
)
kwargs = {
"title": name,
"required": parameter.default is inspect.Parameter.empty,
}
if parameter.default is not inspect.Parameter.empty:
kwargs["default"] = parameter.default
attributes[name] = type_map[parameter.annotation](**kwargs)

return Schema.from_dict(attributes)()
100 changes: 99 additions & 1 deletion tests/lib_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@
import aiotools
import pytest
import yaml
from marshmallow_jsonschema import JSONSchema

from senor_octopus.graph import build_dag
from senor_octopus.lib import flatten, merge_streams, render_dag
from senor_octopus.lib import (
build_marshmallow_schema,
flatten,
merge_streams,
render_dag,
)
from senor_octopus.sources.awair import awair


def test_flatten() -> None:
Expand Down Expand Up @@ -114,3 +121,94 @@ async def gen(name, count, sleep):
("stream1", 0.22321073814882275),
("stream2", 0.7364712141640124),
]


def test_build_marshmallow_schema() -> None:
"""
Test the ``build_marshmallow_schema`` function.
"""
json_schema = JSONSchema()
schema = build_marshmallow_schema(awair)
assert json_schema.dump(schema) == {
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"GeneratedSchema": {
"type": "object",
"properties": {
"access_token": {"title": "access_token", "type": "string"},
"device_id": {"title": "device_id", "type": "integer"},
"device_type": {
"title": "device_type",
"type": "string",
"default": "awair-element",
},
"prefix": {
"title": "prefix",
"type": "string",
"default": "hub.awair",
},
},
"required": ["access_token", "device_id"],
"additionalProperties": False,
},
},
"$ref": "#/definitions/GeneratedSchema",
}

json_schema = JSONSchema()
assert json_schema.dump(awair.configuration_schema) == { # type: ignore
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AwairConfig": {
"required": ["access_token", "device_id"],
"properties": {
"access_token": {
"title": "Awair API access token",
"type": "string",
"default": None,
"description": (
"An Awair API access token. Can be obtained from "
"https://developer.getawair.com/console/access-token."
),
},
"device_id": {
"title": "Device ID",
"type": "integer",
"default": None,
"description": (
"The ID of the device to read data from. To find "
"the device ID: `curl "
"'https://developer-apis.awair.is/v1/users/self/devices' "
"-H 'Authorization: Bearer example-token'`"
),
},
"device_type": {
"title": "Device type",
"type": "string",
"default": "awair-element",
"description": "The type of device to read data from.",
},
"prefix": {
"title": "The prefix for events from this source",
"type": "string",
"default": "hub.awair",
"description": (
"The prefix for events from this source. For example, if "
"the prefix is `awair` an event name `awair.score` will "
"be emitted for the air quality score."
),
},
},
"type": "object",
"additionalProperties": False,
},
},
"$ref": "#/definitions/AwairConfig",
}

def some_func(arg: object) -> object:
return arg

with pytest.raises(TypeError) as excinfo:
build_marshmallow_schema(some_func) # type: ignore
assert str(excinfo.value) == "Unsupported type <class 'object'> for parameter arg"

0 comments on commit 7d5d4c6

Please sign in to comment.