-
Notifications
You must be signed in to change notification settings - Fork 6
/
5_asynchronous_groups_and_services_full.py
155 lines (129 loc) · 4.87 KB
/
5_asynchronous_groups_and_services_full.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# %% [markdown]
"""
# 5. Asynchronous groups and services (full)
The following tutorial shows `pipeline`
asynchronous service and service group usage.
This tutorial is a more advanced version of the
[previous tutorial](
%doclink(tutorial,pipeline.5_asynchronous_groups_and_services_basic)
).
"""
# %pip install dff
# %%
import asyncio
import json
import logging
import urllib.request
from dff.script import Context
from dff.pipeline import ServiceGroup, Pipeline, ServiceRuntimeInfo, ACTOR
from dff.utils.testing.common import (
check_happy_path,
is_interactive_mode,
run_interactive_mode,
)
from dff.utils.testing.toy_script import HAPPY_PATH, TOY_SCRIPT
logger = logging.getLogger(__name__)
# %% [markdown]
"""
Services and service groups can be synchronous and asynchronous.
In synchronous service groups services are executed consequently,
some of them (`ACTOR`) can even return `Context` object,
modifying it.
In asynchronous service groups all services
are executed simultaneously and should not return anything,
neither modify Context.
To become asynchronous service or service group
should _be able_ to be asynchronous
and should not be marked synchronous.
Service can be asynchronous if its handler is an async function.
Service group can be asynchronous if all services
and service groups inside it are asynchronous.
If service or service group can be asynchronous
the `asynchronous` constructor parameter is checked.
If the parameter is not set,
the service becomes asynchronous, and if set, it is used instead.
If service can not be asynchronous,
but is marked asynchronous, an exception is thrown.
ACTOR service is asynchronous.
The timeout field only works for asynchronous services and service groups.
If service execution takes more time than timeout,
it is aborted and marked as failed.
Pipeline `optimization_warnings` argument can be used to
display optimization warnings during pipeline construction.
Generally for optimization purposes asynchronous
services should be combined into asynchronous
groups to run simultaneously.
Synchronous services should be expelled from (mostly) asynchronous groups.
Here service group `balanced_group` can be asynchronous,
however it is requested to be synchronous,
so its services are executed consequently.
Service group `service_group_0` is asynchronous,
it doesn't run out of timeout of 0.02 seconds,
however contains 6 time consuming services,
each of them sleeps for 0.01 of a second.
Service group `service_group_1` is also asynchronous,
it logs HTTPS requests (from 1 to 15),
running simultaneously, in random order.
Service group `pipeline` can't be asynchronous because
`balanced_group` and ACTOR are synchronous.
"""
# %%
async def simple_asynchronous_service(_, __, info: ServiceRuntimeInfo):
logger.info(f"Service '{info.name}' is running")
async def time_consuming_service(_):
await asyncio.sleep(0.01)
def meta_web_querying_service(
photo_number: int,
): # This function returns services, a service factory
async def web_querying_service(ctx: Context, _, info: ServiceRuntimeInfo):
if ctx.misc.get("web_query", None) is None:
ctx.misc["web_query"] = {}
with urllib.request.urlopen(
f"https://jsonplaceholder.typicode.com/photos/{photo_number}"
) as webpage:
web_content = webpage.read().decode(
webpage.headers.get_content_charset()
)
ctx.misc["web_query"].update(
{
f"{ctx.last_request}"
f":photo_number_{photo_number}": json.loads(web_content)[
"title"
]
}
)
logger.info(f"Service '{info.name}' has completed HTTPS request")
return web_querying_service
def context_printing_service(ctx: Context):
logger.info(f"Context misc: {json.dumps(ctx.misc, indent=4, default=str)}")
# %%
pipeline_dict = {
"script": TOY_SCRIPT,
"start_label": ("greeting_flow", "start_node"),
"fallback_label": ("greeting_flow", "fallback_node"),
"optimization_warnings": True,
# There are no warnings - pipeline is well-optimized
"components": [
ServiceGroup(
name="balanced_group",
asynchronous=False,
components=[
simple_asynchronous_service,
ServiceGroup(
timeout=0.02,
components=[time_consuming_service for _ in range(0, 6)],
),
simple_asynchronous_service,
],
),
ACTOR,
[meta_web_querying_service(photo) for photo in range(1, 16)],
context_printing_service,
],
}
# %%
pipeline = Pipeline.from_dict(pipeline_dict)
if __name__ == "__main__":
check_happy_path(pipeline, HAPPY_PATH)
if is_interactive_mode():
run_interactive_mode(pipeline)