generated from ApeWorX/project-template
-
Notifications
You must be signed in to change notification settings - Fork 7
/
_cli.py
132 lines (105 loc) · 3.93 KB
/
_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor
import click
from ape.cli import (
AccountAliasPromptChoice,
ConnectedProviderCommand,
ape_cli_context,
network_option,
verbosity_option,
)
from ape.exceptions import Abort
from taskiq import AsyncBroker
from taskiq.cli.worker.run import shutdown_broker
from taskiq.receiver import Receiver
from silverback._importer import import_from_string
from silverback.runner import PollingRunner
@click.group()
def cli():
"""Work with Silverback applications in local context (using Ape)."""
def _runner_callback(ctx, param, val):
if not val:
return PollingRunner
elif runner := import_from_string(val):
return runner
raise ValueError(f"Failed to import runner '{val}'.")
def _recorder_callback(ctx, param, val):
if not val:
return None
elif recorder := import_from_string(val):
return recorder()
raise ValueError(f"Failed to import recorder '{val}'.")
def _account_callback(ctx, param, val):
if val:
val = val.alias.replace("dev_", "TEST::")
os.environ["SILVERBACK_SIGNER_ALIAS"] = val
return val
def _network_callback(ctx, param, val):
# NOTE: Make sure both of these have the same setting
if env_network_choice := os.environ.get("SILVERBACK_NETWORK_CHOICE"):
if val.network_choice != env_network_choice:
raise Abort(
f"Network choice '{val.network_choice}' does not "
f"match environment variable '{env_network_choice}'."
)
# else it matches, no issue
else:
os.environ["SILVERBACK_NETWORK_CHOICE"] = val.network_choice
return val
async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90):
try:
tasks = []
with ThreadPoolExecutor(max_workers=worker_count) as pool:
for _ in range(worker_count):
receiver = Receiver(
broker=broker,
executor=pool,
validate_params=True,
max_async_tasks=1,
max_prefetch=0,
)
broker.is_worker_process = True
tasks.append(receiver.listen())
await asyncio.gather(*tasks)
finally:
await shutdown_broker(broker, shutdown_timeout)
@cli.command(cls=ConnectedProviderCommand, help="Run Silverback application client")
@ape_cli_context()
@verbosity_option()
@network_option(
default=os.environ.get("SILVERBACK_NETWORK_CHOICE", "auto"),
callback=_network_callback,
)
@click.option("--account", type=AccountAliasPromptChoice(), callback=_account_callback)
@click.option(
"--runner",
help="An import str in format '<module>:<CustomRunner>'",
callback=_runner_callback,
)
@click.option(
"--recorder",
help="An import string in format '<module>:<CustomRecorder>'",
callback=_recorder_callback,
)
@click.option("-x", "--max-exceptions", type=int, default=3)
@click.argument("path")
def run(cli_ctx, account, runner, recorder, max_exceptions, path):
app = import_from_string(path)
runner = runner(app, recorder=recorder, max_exceptions=max_exceptions)
asyncio.run(runner.run())
@cli.command(cls=ConnectedProviderCommand, help="Run Silverback application task workers")
@ape_cli_context()
@verbosity_option()
@network_option(
default=os.environ.get("SILVERBACK_NETWORK_CHOICE", "auto"),
callback=_network_callback,
)
@click.option("--account", type=AccountAliasPromptChoice(), callback=_account_callback)
@click.option("-w", "--workers", type=int, default=2)
@click.option("-x", "--max-exceptions", type=int, default=3)
@click.option("-s", "--shutdown_timeout", type=int, default=90)
@click.argument("path")
def worker(cli_ctx, account, workers, max_exceptions, shutdown_timeout, path):
app = import_from_string(path)
asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout))