-
Notifications
You must be signed in to change notification settings - Fork 30
/
api.py
321 lines (254 loc) · 13.1 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
import dataclasses
import sys
import traceback
from pathlib import Path
from typing import List, Tuple, Union, Any, Optional, Dict
from lib_comfyui import global_state, ipc
ALL_TABS = ...
Tabs = Union[str, Tuple[str, ...]]
AUTO_WORKFLOW = '"auto"' # json encoded string
@dataclasses.dataclass
class WorkflowType:
"""
Describes a unique type of ComfyUI workflow
"""
base_id: str
display_name: str
tabs: Tabs = ('txt2img', 'img2img')
default_workflow: Union[str, Path] = "null"
types: Union[str, Tuple[str, ...], Dict[str, str]] = dataclasses.field(default_factory=tuple)
input_types: Union[str, Tuple[str, ...], Dict[str, str], None] = None
def __post_init__(self):
if isinstance(self.tabs, str):
self.tabs = (self.tabs,)
if self.input_types is None:
self.input_types = self.types
if not isinstance(self.types, (str, tuple, dict)):
raise TypeError(f'types should be str, tuple or dict but it is {type(self.types)}')
if not isinstance(self.input_types, (str, tuple, dict)):
raise TypeError(f'input_types should be str, tuple or dict but it is {type(self.input_types)}')
assert self.tabs, "tabs must not be empty"
if isinstance(self.default_workflow, Path):
with open(str(self.default_workflow), 'r') as f:
self.default_workflow = f.read()
elif self.default_workflow == AUTO_WORKFLOW:
if not self.is_same_io():
raise ValueError('AUTO_WORKFLOW default workflow is only supported for same input and output types')
def get_ids(self, tabs: Tabs = ALL_TABS) -> List[str]:
if isinstance(tabs, str):
tabs = (tabs,)
return [
f'{self.base_id}_{tab}'
for tab in self.tabs
if tabs == ALL_TABS or tab in tabs
]
def pretty_str(self):
return f'"{self.display_name}" ({self.base_id})'
def is_same_io(self):
def normalize_to_tuple(types):
if isinstance(types, dict):
return tuple(types.values())
elif isinstance(types, str):
return types,
return types
return normalize_to_tuple(self.input_types) == normalize_to_tuple(self.types)
def get_workflow_types(tabs: Tabs = ALL_TABS) -> List[WorkflowType]:
"""
Get the list of currently registered workflow types
To update the workflow types list, see `add_workflow_type` or `set_workflow_types`
Args:
tabs (Tabs): Whitelist of tabs
Returns:
List of workflow types that are defined on at least one of the given tabs
"""
if isinstance(tabs, str):
tabs = (tabs,)
return [
workflow_type
for workflow_type in getattr(global_state, 'workflow_types', [])
if tabs == ALL_TABS or any(tab in tabs for tab in workflow_type.tabs)
]
def add_workflow_type(new_workflow_type: WorkflowType) -> None:
"""
Register a new workflow type
You cannot call this function after the extension ui has been created
Args:
new_workflow_type (WorkflowType): new workflow type to add
Raises:
ValueError: If the id or display_name of new_workflow_type has already been registered
NotImplementedError: if the workflow types list is modified after the ui has been instantiated
"""
workflow_types = get_workflow_types()
for existing_workflow_type in workflow_types:
if existing_workflow_type.base_id == new_workflow_type.base_id:
raise ValueError(f'The id {new_workflow_type.base_id} already exists')
if existing_workflow_type.display_name == new_workflow_type.display_name:
raise ValueError(f'The display name {new_workflow_type.display_name} is already in use by workflow type {existing_workflow_type.base_id}')
if getattr(global_state, 'is_ui_instantiated', False):
raise NotImplementedError('Cannot modify workflow types after the ui has been instantiated')
workflow_types.append(new_workflow_type)
set_workflow_types(workflow_types)
def set_workflow_types(workflow_types: List[WorkflowType]) -> None:
"""
Set the list of currently registered workflow types
You cannot call this function after the extension ui has been created
Args:
workflow_types (List[WorkflowType]): the new workflow types
Raises:
NotImplementedError: if the workflow types list is modified after the ui has been instantiated
Notes:
A deep copy of workflow_types is used when calling this function from the comfyui process
No copy is made when calling this function from the webui process
"""
if getattr(global_state, 'is_ui_instantiated', False):
raise NotImplementedError('Cannot modify workflow types after the ui has been instantiated')
global_state.workflow_types = workflow_types
def clear_workflow_types() -> None:
"""
Clear the list of currently registered workflow types
You cannot call this function after the extension ui has been created
Raises:
NotImplementedError: if the workflow types list is modified after the ui has been instantiated
"""
set_workflow_types([])
def get_workflow_type_ids(tabs: Tabs = ALL_TABS) -> List[str]:
"""
Get all workflow type ids of all currently registered workflow types
Multiple ids can be assigned to each workflow type depending on how many tabs it is to be displayed on
Args:
tabs (Tabs): Whitelist of tabs for which to return the ids
Returns:
List of ids for the given tabs
"""
res = []
for workflow_type in get_workflow_types(tabs):
res.extend(workflow_type.get_ids(tabs))
return res
def get_workflow_type_display_names(tabs: Tabs = ALL_TABS) -> List[str]:
"""
Get the list of display names needed for the given tabs
Args:
tabs (Tabs): Whitelist of tabs for which to return the display names
Returns:
List of display names for the given tabs
"""
return [workflow_type.display_name for workflow_type in get_workflow_types(tabs)]
def get_default_workflow_json(workflow_type_id: str) -> str:
"""
Get the default workflow for the given workflow type id
Args:
workflow_type_id (str): The workflow type id for which to get the default workflow
Returns:
The default workflow, or None if there is no default workflow for the given workflow type
Raises:
ValueError: If workflow_type_id does not exist
"""
for workflow_type in get_workflow_types():
if workflow_type_id in workflow_type.get_ids():
return workflow_type.default_workflow
raise ValueError(workflow_type_id)
def is_workflow_type_enabled(workflow_type_id: str) -> bool:
return (
getattr(global_state, 'enable', True) and
getattr(global_state, 'enabled_workflow_type_ids', {}).get(workflow_type_id, False)
)
@ipc.restrict_to_process('webui')
def run_workflow(
workflow_type: WorkflowType,
tab: str,
batch_input: Any,
queue_front: Optional[bool] = None,
identity_on_error: Optional[bool] = False,
) -> List[Any]:
"""
Run a comfyui workflow synchronously
Args:
workflow_type (WorkflowType): Target workflow type to run
tab (str): The tab on which to run the workflow type. The workflow type must be present on the tab
batch_input (Any): Batch object to pass as input to the workflow. The number of elements in this batch object will be the size of the comfyui batch.
The particular type of batch_input depends on **workflow_type.input_types**:
- if **input_types** is a dict, **batch_input** should be a dict. For each **k, v** pair of **batch_input**, **v** should match the type expected by **input_types[k]**.
- if **input_types** is a tuple, **batch_input** should be a tuple. Each element **v** at index **i** of **batch_input** should match the type expected by **input_types[i]**.
- if **input_types** is a str, **batch_input** should be a single value that should match the type expected by **input_types**.
queue_front (Optional[bool]): Whether to queue the workflow before or after the currently queued workflows
identity_on_error (Optional[bool]): Whether to return batch_input (converted to the type expected by workflow_type.types) instead of raising a RuntimeError when the workflow fails to run
Returns:
The outputs of the workflow, as a list.
Each element of the list corresponds to one output node in the workflow.
You can expect multiple values when a user has multiple output nodes in their workflow.
The particular type of each returned element depends on workflow_type.types:
- **types** is a dict: each element is a dict. For each **k, v** pair, **v** matches the type expected by **types[k]**
- **types** is a tuple: each element is a tuple. For each element **v** at index **i**, **v** matches the type expected by **types[i]**
- **types** is a str: each element is a single value that matches the type expected by **types**
Each element of the list will have the same batch size as **batch_input**
Raises:
ValueError: If workflow_type is not present on the given tab
TypeError: If the type of batch_input does not match the type expected by workflow_type.input_types
RuntimeError: If identity_on_error is False and workflow execution fails for any reason
AssertionError: If multiple candidate ids exist for workflow_type
"""
from lib_comfyui.comfyui.iframe_requests import ComfyuiIFrameRequests
candidate_ids = workflow_type.get_ids(tab)
assert len(candidate_ids) <= 1, (
f'Found multiple candidate workflow type ids for tab {tab} and workflow type {workflow_type.pretty_str()}: {candidate_ids}\n'
'The workflow type is likely to be incorrectly configured'
)
if queue_front is None:
queue_front = getattr(global_state, 'queue_front', True)
batch_input_args = _normalize_batch_input_to_tuple(batch_input, workflow_type)
if not candidate_ids:
raise ValueError(f'The workflow type {workflow_type.pretty_str()} does not exist on tab {tab}. Valid tabs for the given workflow type are: {workflow_type.tabs}')
workflow_type_id = candidate_ids[0]
try:
if not is_workflow_type_enabled(workflow_type_id):
raise WorkflowTypeDisabled(f'Workflow type {workflow_type.pretty_str()} is not enabled on tab {tab}')
batch_output_params = ComfyuiIFrameRequests.start_workflow_sync(
batch_input_args=batch_input_args,
workflow_type_id=workflow_type_id,
queue_front=queue_front,
)
except RuntimeError as e:
if not identity_on_error:
raise e
# don't print just because the workflow type is disabled
if not isinstance(e, WorkflowTypeDisabled):
print('\n'.join(traceback.format_exception_only(e)))
if not workflow_type.is_same_io():
print('[sd-webui-comfyui]', f'Returning input of type {workflow_type.input_types}, which likely does not match the expected output type {workflow_type.types}', file=sys.stderr)
# denormalize tuple -> tuple|str|dict
if isinstance(workflow_type.types, tuple):
return [batch_input_args]
elif isinstance(workflow_type.types, str):
return [batch_input_args[0]]
else:
return [dict(zip(workflow_type.types.keys(), batch_input_args))]
# denormalize dict -> tuple|str|dict
if isinstance(workflow_type.types, tuple):
return [tuple(params.values()) for params in batch_output_params]
elif isinstance(workflow_type.types, str):
return [next(iter(params.values())) for params in batch_output_params]
else:
return batch_output_params
class WorkflowTypeDisabled(RuntimeError):
pass
def _normalize_batch_input_to_tuple(batch_input, workflow_type):
if isinstance(workflow_type.input_types, dict):
if not isinstance(batch_input, dict):
raise TypeError(f'batch_input should be dict but is instead {type(batch_input)}')
expected_keys = set(workflow_type.input_types.keys())
actual_keys = set(batch_input.keys())
if expected_keys - actual_keys:
raise TypeError(f'batch_input is missing keys: {expected_keys - actual_keys}')
# convert to tuple in the same order as the items in input_types
return tuple(batch_input[k] for k in workflow_type.input_types.keys())
elif isinstance(workflow_type.input_types, str):
return batch_input,
elif isinstance(workflow_type.input_types, tuple):
if not isinstance(batch_input, tuple):
raise TypeError(f'batch_input should be tuple but is instead {type(batch_input)}')
if len(batch_input) != len(workflow_type.input_types):
raise TypeError(
f'batch_input received {len(batch_input)} values instead of {len(workflow_type.input_types)} (signature is {workflow_type.input_types})')
return batch_input
else:
raise TypeError(f'batch_input should be str, tuple or dict but is instead {type(batch_input)}')