-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
_prompts.py
825 lines (712 loc) · 27.6 KB
/
_prompts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
"""
Utilities for prompting the user for input
"""
import os
import shutil
from datetime import timedelta
from getpass import GetPassWarning
from typing import Any, Dict, List, Optional
import readchar
from rich.console import Console, Group
from rich.live import Live
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.prompt import Confirm, InvalidResponse, Prompt, PromptBase
from rich.table import Table
from rich.text import Text
from prefect.cli._utilities import exit_with_error
from prefect.client.collections import get_collections_metadata_client
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.actions import BlockDocumentCreate, WorkPoolCreate
from prefect.client.schemas.objects import MinimalDeploymentSchedule
from prefect.client.schemas.schedules import (
CronSchedule,
IntervalSchedule,
RRuleSchedule,
)
from prefect.client.utilities import inject_client
from prefect.deployments.base import (
_get_git_remote_origin_url,
_search_for_flow_functions,
)
from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound
from prefect.flows import load_flow_from_entrypoint
from prefect.infrastructure.container import DockerRegistry
from prefect.settings import PREFECT_UI_URL
from prefect.utilities.processutils import get_sys_executable, run_process
from prefect.utilities.slugify import slugify
STORAGE_PROVIDER_TO_CREDS_BLOCK = {
"s3": "aws-credentials",
"gcs": "gcp-credentials",
"azure_blob_storage": "azure-blob-storage-credentials",
}
REQUIRED_FIELDS_FOR_CREDS_BLOCK = {
"aws-credentials": ["aws_access_key_id", "aws_secret_access_key"],
"gcp-credentials": ["project", "service_account_file"],
"azure-blob-storage-credentials": ["account_url", "connection_string"],
}
def prompt(message, **kwargs):
"""Utility to prompt the user for input with consistent styling"""
return Prompt.ask(f"[bold][green]?[/] {message}[/]", **kwargs)
def confirm(message, **kwargs):
"""Utility to prompt the user for confirmation with consistent styling"""
return Confirm.ask(f"[bold][green]?[/] {message}[/]", **kwargs)
def prompt_select_from_table(
console,
prompt: str,
columns: List[Dict],
data: List[Dict],
table_kwargs: Optional[Dict] = None,
opt_out_message: Optional[str] = None,
opt_out_response: Any = None,
) -> Dict:
"""
Given a list of columns and some data, display options to user in a table
and prompt them to select one.
Args:
prompt: A prompt to display to the user before the table.
columns: A list of dicts with keys `header` and `key` to display in
the table. The `header` value will be displayed in the table header
and the `key` value will be used to lookup the value for each row
in the provided data.
data: A list of dicts with keys corresponding to the `key` values in
the `columns` argument.
table_kwargs: Additional kwargs to pass to the `rich.Table` constructor.
Returns:
dict: Data representation of the selected row
"""
current_idx = 0
selected_row = None
table_kwargs = table_kwargs or {}
def build_table() -> Table:
"""
Generate a table of options. The `current_idx` will be highlighted.
"""
table = Table(**table_kwargs)
table.add_column()
for column in columns:
table.add_column(column.get("header", ""))
rows = []
max_length = 250
for item in data:
rows.append(
tuple(
(
value[:max_length] + "...\n"
if isinstance(value := item.get(column.get("key")), str)
and len(value) > max_length
else value
)
for column in columns
)
)
for i, row in enumerate(rows):
if i == current_idx:
# Use blue for selected options
table.add_row("[bold][blue]>", f"[bold][blue]{row[0]}[/]", *row[1:])
else:
table.add_row(" ", *row)
if opt_out_message:
prefix = " > " if current_idx == len(data) else " " * 4
bottom_text = Text(prefix + opt_out_message)
if current_idx == len(data):
bottom_text.stylize("bold blue")
return Group(table, bottom_text)
return table
with Live(build_table(), auto_refresh=False, console=console) as live:
instructions_message = (
f"[bold][green]?[/] {prompt} [bright_blue][Use arrows to move; enter to"
" select"
)
if opt_out_message:
instructions_message += "; n to select none"
instructions_message += "]"
live.console.print(instructions_message)
while selected_row is None:
key = readchar.readkey()
if key == readchar.key.UP:
current_idx = current_idx - 1
# wrap to bottom if at the top
if opt_out_message and current_idx < 0:
current_idx = len(data)
elif not opt_out_message and current_idx < 0:
current_idx = len(data) - 1
elif key == readchar.key.DOWN:
current_idx = current_idx + 1
# wrap to top if at the bottom
if opt_out_message and current_idx >= len(data) + 1:
current_idx = 0
elif not opt_out_message and current_idx >= len(data):
current_idx = 0
elif key == readchar.key.CTRL_C:
# gracefully exit with no message
exit_with_error("")
elif key == readchar.key.ENTER or key == readchar.key.CR:
if current_idx >= len(data):
return opt_out_response
else:
selected_row = data[current_idx]
elif key == "n" and opt_out_message:
return opt_out_response
live.update(build_table(), refresh=True)
return selected_row
# Interval schedule prompting utilities
class IntervalValuePrompt(PromptBase[timedelta]):
response_type = timedelta
validate_error_message = (
"[prompt.invalid]Please enter a valid interval denoted in seconds"
)
def process_response(self, value: str) -> timedelta:
try:
int_value = int(value)
if int_value <= 0:
raise InvalidResponse("[prompt.invalid]Interval must be greater than 0")
return timedelta(seconds=int_value)
except ValueError:
raise InvalidResponse(self.validate_error_message)
def prompt_interval_schedule(console):
"""
Prompt the user for an interval in seconds.
"""
default_seconds = 3600
# The interval value must be a timedelta object in order to pass validation as a `PositiveDuration` type in `IntervalSchedule`.
default_duration = timedelta(seconds=default_seconds)
# We show the default in the prompt message rather than enabling `show_default=True` here because `rich` displays timedeltas in hours
# rather than seconds, which would confuse users since we ask them to enter the interval in seconds.
interval = IntervalValuePrompt.ask(
f"[bold][green]?[/] Seconds between scheduled runs ({default_seconds})",
console=console,
default=default_duration,
show_default=False,
)
return IntervalSchedule(interval=interval)
# Cron schedule prompting utilities
class CronStringPrompt(PromptBase[str]):
response_type = str
validate_error_message = "[prompt.invalid]Please enter a valid cron string"
def process_response(self, value: str) -> str:
try:
CronSchedule.valid_cron_string(value)
return value
except ValueError:
raise InvalidResponse(self.validate_error_message)
class CronTimezonePrompt(PromptBase[str]):
response_type = str
validate_error_message = "[prompt.invalid]Please enter a valid timezone."
def process_response(self, value: str) -> str:
try:
CronSchedule.valid_timezone(value)
return value
except ValueError:
raise InvalidResponse(self.validate_error_message)
def prompt_cron_schedule(console):
"""
Prompt the user for a cron string and timezone.
"""
cron = CronStringPrompt.ask(
"[bold][green]?[/] Cron string",
console=console,
default="0 0 * * *",
)
timezone = CronTimezonePrompt.ask(
"[bold][green]?[/] Timezone", console=console, default="UTC"
)
return CronSchedule(cron=cron, timezone=timezone)
# RRule schedule prompting utilities
class RRuleStringPrompt(PromptBase[str]):
response_type = str
validate_error_message = "[prompt.invalid]Please enter a valid RRule string"
def process_response(self, value: str) -> str:
try:
RRuleSchedule.validate_rrule_str(value)
return value
except ValueError:
raise InvalidResponse(self.validate_error_message)
class RRuleTimezonePrompt(PromptBase[str]):
response_type = str
validate_error_message = "[prompt.invalid]Please enter a valid timezone."
def process_response(self, value: str) -> str:
try:
RRuleSchedule.valid_timezone(value)
return value
except ValueError:
raise InvalidResponse(self.validate_error_message)
def prompt_rrule_schedule(console):
"""
Prompts the user to enter an RRule string and timezone.
"""
rrule = RRuleStringPrompt.ask(
"[bold][green]?[/] RRule string",
console=console,
default="RRULE:FREQ=DAILY;INTERVAL=1",
)
timezone = CronTimezonePrompt.ask(
"[bold][green]?[/] Timezone", console=console, default="UTC"
)
return RRuleSchedule(rrule=rrule, timezone=timezone)
# Schedule type prompting utilities
def prompt_schedule_type(console):
"""
Prompts the user to select a schedule type from a list of options.
"""
selection = prompt_select_from_table(
console,
"What type of schedule would you like to use?",
[
{"header": "Schedule Type", "key": "type"},
{"header": "Description", "key": "description"},
],
[
{
"type": "Interval",
"description": (
"Allows you to set flow runs to be executed at fixed time"
" intervals."
),
},
{
"type": "Cron",
"description": (
"Allows you to define recurring flow runs based on a specified"
" pattern using cron syntax."
),
},
{
"type": "RRule",
"description": (
"Allows you to define recurring flow runs using RFC 2445 recurrence"
" rules."
),
},
],
)
return selection["type"]
def prompt_schedules(console) -> List[MinimalDeploymentSchedule]:
"""
Prompt the user to configure schedules for a deployment.
"""
schedules = []
if confirm(
"Would you like to configure schedules for this deployment?", default=True
):
add_schedule = True
while add_schedule:
schedule_type = prompt_schedule_type(console)
if schedule_type == "Cron":
schedule = prompt_cron_schedule(console)
elif schedule_type == "Interval":
schedule = prompt_interval_schedule(console)
elif schedule_type == "RRule":
schedule = prompt_rrule_schedule(console)
else:
raise Exception("Invalid schedule type")
is_schedule_active = confirm(
"Would you like to activate this schedule?", default=True
)
schedules.append(
MinimalDeploymentSchedule(schedule=schedule, active=is_schedule_active)
)
add_schedule = confirm(
"Would you like to add another schedule?", default=False
)
return schedules
@inject_client
async def prompt_select_work_pool(
console: Console,
prompt: str = "Which work pool would you like to deploy this flow to?",
client: PrefectClient = None,
) -> str:
work_pools = await client.read_work_pools()
work_pool_options = [
work_pool.dict()
for work_pool in work_pools
if work_pool.type != "prefect-agent"
]
if not work_pool_options:
work_pool = await prompt_create_work_pool(console, client=client)
return work_pool.name
else:
selected_work_pool_row = prompt_select_from_table(
console,
prompt,
[
{"header": "Work Pool Name", "key": "name"},
{"header": "Infrastructure Type", "key": "type"},
{"header": "Description", "key": "description"},
],
work_pool_options,
)
return selected_work_pool_row["name"]
async def prompt_build_custom_docker_image(
console: Console,
deployment_config: dict,
):
if not confirm(
"Would you like to build a custom Docker image for this deployment?",
console=console,
default=False,
):
return
build_step = {
"requires": "prefect-docker>=0.3.1",
"id": "build-image",
}
if os.path.exists("Dockerfile"):
if confirm(
"Would you like to use the Dockerfile in the current directory?",
console=console,
default=True,
):
build_step["dockerfile"] = "Dockerfile"
else:
if confirm(
"A Dockerfile exists. You chose not to use it. A temporary Dockerfile"
" will be automatically built during the deployment build step. If"
" another file named 'Dockerfile' already exists at that time, the"
" build step will fail. Would you like to rename your existing"
" Dockerfile?"
):
new_dockerfile_name = prompt(
"New Dockerfile name", default="Dockerfile.backup"
)
shutil.move("Dockerfile", new_dockerfile_name)
build_step["dockerfile"] = "auto"
else:
# this will otherwise raise when build steps are run as the auto-build feature
# executed in the build_docker_image step will create a temporary Dockerfile
raise ValueError(
"A Dockerfile already exists. Please remove or rename the existing"
" one."
)
else:
build_step["dockerfile"] = "auto"
repo_name = prompt("Repository name (e.g. your Docker Hub username)").rstrip("/")
image_name = prompt("Image name", default=deployment_config["name"])
build_step["image_name"] = f"{repo_name}/{image_name}"
build_step["tag"] = prompt("Image tag", default="latest")
console.print(
"Image"
f" [bold][yellow]{build_step['image_name']}:{build_step['tag']}[/yellow][/bold]"
" will be built."
)
return {"prefect_docker.deployments.steps.build_docker_image": build_step}
async def prompt_push_custom_docker_image(
console: Console,
deployment_config: dict,
build_docker_image_step: dict,
):
if not confirm(
"Would you like to push this image to a remote registry?",
console=console,
default=False,
):
return None, build_docker_image_step
push_step = {
"requires": "prefect-docker>=0.3.1",
"image_name": "{{ build-image.image_name }}",
"tag": "{{ build-image.tag }}",
}
registry_url = prompt("Registry URL", default="docker.io").rstrip("/")
repo_and_image_name = build_docker_image_step[
"prefect_docker.deployments.steps.build_docker_image"
]["image_name"]
full_image_name = f"{registry_url}/{repo_and_image_name}"
build_docker_image_step["prefect_docker.deployments.steps.build_docker_image"][
"image_name"
] = full_image_name
if confirm("Is this a private registry?", console=console):
docker_credentials = {}
docker_credentials["registry_url"] = registry_url
if confirm(
"Would you like use prefect-docker to manage Docker registry credentials?",
console=console,
default=False,
):
try:
import prefect_docker
except ImportError:
console.print("Installing prefect-docker...")
await run_process(
[get_sys_executable(), "-m", "pip", "install", "prefect-docker"],
stream_output=True,
)
import prefect_docker
credentials_block = prefect_docker.DockerRegistryCredentials
push_step[
"credentials"
] = "{{ prefect_docker.docker-registry-credentials.docker_registry_creds_name }}"
else:
credentials_block = DockerRegistry
push_step[
"credentials"
] = "{{ prefect.docker-registry.docker_registry_creds_name }}"
docker_registry_creds_name = f"deployment-{slugify(deployment_config['name'])}-{slugify(deployment_config['work_pool']['name'])}-registry-creds"
create_new_block = False
try:
await credentials_block.load(docker_registry_creds_name)
if not confirm(
(
"Would you like to use the existing Docker registry credentials"
f" block {docker_registry_creds_name}?"
),
console=console,
default=True,
):
create_new_block = True
except ValueError:
create_new_block = True
if create_new_block:
docker_credentials["username"] = prompt(
"Docker registry username", console=console
)
try:
docker_credentials["password"] = prompt(
"Docker registry password",
console=console,
password=True,
)
except GetPassWarning:
docker_credentials["password"] = prompt(
"Docker registry password",
console=console,
)
new_creds_block = credentials_block(
username=docker_credentials["username"],
password=docker_credentials["password"],
registry_url=docker_credentials["registry_url"],
)
await new_creds_block.save(name=docker_registry_creds_name, overwrite=True)
return {
"prefect_docker.deployments.steps.push_docker_image": push_step
}, build_docker_image_step
@inject_client
async def prompt_create_work_pool(
console: Console,
client: PrefectClient = None,
):
if not confirm(
(
"Looks like you don't have any work pools this flow can be deployed to."
" Would you like to create one?"
),
default=True,
console=console,
):
raise ValueError(
"A work pool is required to deploy this flow. Please specify a work pool"
" name via the '--pool' flag or in your prefect.yaml file."
)
async with get_collections_metadata_client() as collections_client:
worker_metadata = await collections_client.read_worker_metadata()
selected_worker_row = prompt_select_from_table(
console,
prompt="What infrastructure type would you like to use for your new work pool?",
columns=[
{"header": "Type", "key": "type"},
{"header": "Description", "key": "description"},
],
data=[
worker
for collection in worker_metadata.values()
for worker in collection.values()
if worker["type"] != "prefect-agent"
],
table_kwargs={"show_lines": True},
)
work_pool_name = prompt("Work pool name")
work_pool = await client.create_work_pool(
WorkPoolCreate(name=work_pool_name, type=selected_worker_row["type"])
)
console.print(f"Your work pool {work_pool.name!r} has been created!", style="green")
return work_pool
class EntrypointPrompt(PromptBase[str]):
response_type = str
validate_error_message = "[prompt.invalid]Please enter a valid flow entrypoint."
def process_response(self, value: str) -> str:
try:
value.rsplit(":", 1)
except ValueError:
raise InvalidResponse(self.validate_error_message)
try:
load_flow_from_entrypoint(value)
except Exception:
raise InvalidResponse(
f"[prompt.invalid]Failed to load flow from entrypoint {value!r}."
f" {self.validate_error_message}"
)
return value
async def prompt_entrypoint(console: Console) -> str:
"""
Prompt the user for a flow entrypoint. Will search for flow functions in the
current working directory and nested subdirectories to prompt the user to select
from a list of discovered flows. If no flows are found, the user will be prompted
to enter a flow entrypoint manually.
"""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=True,
) as progress:
task_id = progress.add_task(
description="Scanning for flows...",
total=1,
)
discovered_flows = await _search_for_flow_functions()
progress.update(task_id, completed=1)
if not discovered_flows:
return EntrypointPrompt.ask(
(
"[bold][green]?[/] Flow entrypoint (expected format"
" path/to/file.py:function_name)"
),
console=console,
)
selected_flow = prompt_select_from_table(
console,
prompt="Select a flow to deploy",
columns=[
{"header": "Flow Name", "key": "flow_name"},
{"header": "Location", "key": "filepath"},
],
data=discovered_flows,
opt_out_message="Enter a flow entrypoint manually",
)
if selected_flow is None:
return EntrypointPrompt.ask(
(
"[bold][green]?[/] Flow entrypoint (expected format"
" path/to/file.py:function_name)"
),
console=console,
)
return f"{selected_flow['filepath']}:{selected_flow['function_name']}"
@inject_client
async def prompt_select_remote_flow_storage(
console: Console, client: PrefectClient = None
) -> Optional[str]:
valid_slugs_for_context = set()
for (
storage_provider,
creds_block_type_slug,
) in STORAGE_PROVIDER_TO_CREDS_BLOCK.items():
try:
# only return storage options for which the user has a credentials
# block type
await client.read_block_type_by_slug(creds_block_type_slug)
valid_slugs_for_context.add(storage_provider)
except ObjectNotFound:
pass
if _get_git_remote_origin_url():
valid_slugs_for_context.add("git")
flow_storage_options = [
{
"type": "Git Repo",
"slug": "git",
"description": "Use a Git repository [bold](recommended).",
},
{
"type": "S3",
"slug": "s3",
"description": "Use an AWS S3 bucket.",
},
{
"type": "GCS",
"slug": "gcs",
"description": "Use a Google Cloud Storage bucket.",
},
{
"type": "Azure Blob Storage",
"slug": "azure_blob_storage",
"description": "Use an Azure Blob Storage bucket.",
},
]
valid_storage_options_for_context = [
row
for row in flow_storage_options
if row is not None and row["slug"] in valid_slugs_for_context
]
selected_flow_storage_row = prompt_select_from_table(
console,
prompt="Please select a remote code storage option.",
columns=[
{"header": "Storage Type", "key": "type"},
{"header": "Description", "key": "description"},
],
data=valid_storage_options_for_context,
)
return selected_flow_storage_row["slug"]
@inject_client
async def prompt_select_blob_storage_credentials(
console: Console, storage_provider: str, client: PrefectClient = None
) -> str:
"""
Prompt the user for blob storage credentials.
Returns a jinja template string that references a credentials block.
"""
storage_provider_slug = storage_provider.replace("_", "-")
pretty_storage_provider = storage_provider.replace("_", " ").upper()
creds_block_type_slug = STORAGE_PROVIDER_TO_CREDS_BLOCK[storage_provider]
pretty_creds_block_type = creds_block_type_slug.replace("-", " ").title()
existing_credentials_blocks = await client.read_block_documents_by_type(
block_type_slug=creds_block_type_slug
)
if existing_credentials_blocks:
selected_credentials_block = prompt_select_from_table(
console,
prompt=(
f"Select from your existing {pretty_creds_block_type} credential blocks"
),
columns=[
{
"header": f"{pretty_storage_provider} Credentials Blocks",
"key": "name",
}
],
data=[{"name": block.name} for block in existing_credentials_blocks],
opt_out_message="Create a new credentials block",
)
if selected_credentials_block and (
selected_block := selected_credentials_block.get("name")
):
return f"{{{{ prefect.blocks.{creds_block_type_slug}.{selected_block} }}}}"
credentials_block_type = await client.read_block_type_by_slug(creds_block_type_slug)
credentials_block_schema = await client.get_most_recent_block_schema_for_block_type(
block_type_id=credentials_block_type.id
)
console.print(
f"\nProvide details on your new {pretty_storage_provider} credentials:"
)
hydrated_fields = {
field_name: prompt(f"{field_name} [yellow]({props.get('type')})[/]")
for field_name, props in credentials_block_schema.fields.get(
"properties"
).items()
if field_name in REQUIRED_FIELDS_FOR_CREDS_BLOCK[creds_block_type_slug]
}
console.print(f"[blue]\n{pretty_storage_provider} credentials specified![/]\n")
while True:
credentials_block_name = prompt(
"Give a name to your new credentials block",
default=f"{storage_provider_slug}-storage-credentials",
)
try:
new_block_document = await client.create_block_document(
block_document=BlockDocumentCreate(
name=credentials_block_name,
data=hydrated_fields,
block_schema_id=credentials_block_schema.id,
block_type_id=credentials_block_type.id,
)
)
break
except ObjectAlreadyExists:
console.print(
f"A {pretty_creds_block_type!r} block named"
f" {credentials_block_name!r} already exists. Please choose another"
" name"
)
if PREFECT_UI_URL:
console.print(
"\nView/Edit your new credentials block in the UI:"
f"\n[blue]{PREFECT_UI_URL.value()}/blocks/block/{new_block_document.id}[/]\n"
)
return f"{{{{ prefect.blocks.{creds_block_type_slug}.{new_block_document.name} }}}}"