Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable black preview style for string formatting support #8578

Merged
merged 6 commits into from Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Expand Up @@ -18,3 +18,4 @@ repos:
hooks:
- id: black
language_version: python3
args: ['--preview']
8 changes: 6 additions & 2 deletions scripts/generate-release-notes.py
Expand Up @@ -111,7 +111,8 @@ def get_github_token() -> str:

if not shutil.which("gh"):
print(
"You must provide a GitHub access token via GITHUB_TOKEN or have the gh CLI installed."
"You must provide a GitHub access token via GITHUB_TOKEN or have the gh CLI"
" installed."
)
exit(1)

Expand All @@ -129,7 +130,10 @@ def get_github_token() -> str:
match = TOKEN_REGEX.search(output)
if not match:
print(
f"Failed to find token in GitHub CLI output with regex {TOKEN_REGEX.pattern!r}:",
(
"Failed to find token in GitHub CLI output with regex"
f" {TOKEN_REGEX.pattern!r}:"
),
file=sys.stderr,
)
print(output, file=sys.stderr)
Expand Down
3 changes: 2 additions & 1 deletion scripts/wait-for-server.py
Expand Up @@ -37,7 +37,8 @@ async def main(timeout):
break
if healthcheck_exc is not None:
raise RuntimeError(
"Timed out while attempting to connect to compatibility test server."
"Timed out while attempting to connect to compatibility test"
" server."
)


Expand Down
10 changes: 5 additions & 5 deletions src/prefect/__init__.py
Expand Up @@ -113,11 +113,11 @@ class Prefect1ImportInterceptor(importlib.abc.Loader):
def find_spec(self, fullname, path, target=None):
if fullname in PREFECT_1_ATTRIBUTES:
warnings.warn(
f"Attempted import of {fullname!r}, which is part of Prefect 1.x, "
f"while Prefect {__version__} is installed. If you're "
"upgrading you'll need to update your code, see the Prefect "
"2.x migration guide: `https://orion-docs.prefect.io/migration_guide/`. "
"Otherwise ensure that your code is pinned to the expected version."
f"Attempted import of {fullname!r}, which is part of Prefect 1.x, while"
f" Prefect {__version__} is installed. If you're upgrading you'll need"
" to update your code, see the Prefect 2.x migration guide:"
" `https://orion-docs.prefect.io/migration_guide/`. Otherwise ensure"
" that your code is pinned to the expected version."
)


Expand Down
5 changes: 4 additions & 1 deletion src/prefect/_internal/compatibility/deprecated.py
Expand Up @@ -28,7 +28,10 @@
M = TypeVar("M", bound=pydantic.BaseModel)


DEPRECATED_WARNING = "{name} has been deprecated{when}. It will not be available after {end_date}. {help}"
DEPRECATED_WARNING = (
"{name} has been deprecated{when}. It will not be available after {end_date}."
" {help}"
)
DEPRECATED_MOVED_WARNING = (
"{name} has moved to {new_location}. It will not be available at the old import "
"path after {end_date}. {help}"
Expand Down
7 changes: 4 additions & 3 deletions src/prefect/_internal/compatibility/experimental.py
Expand Up @@ -24,9 +24,10 @@


EXPERIMENTAL_WARNING = (
"{feature} is experimental. {help}"
"The interface or behavior may change without warning, we recommend pinning versions to prevent unexpected changes. "
"To disable warnings for this group of experiments, disable PREFECT_EXPERIMENTAL_WARN_{group}."
"{feature} is experimental. {help}The interface or behavior may change without"
" warning, we recommend pinning versions to prevent unexpected changes. To disable"
" warnings for this group of experiments, disable"
" PREFECT_EXPERIMENTAL_WARN_{group}."
)

EXPERIMENTAL_ERROR = (
Expand Down
5 changes: 4 additions & 1 deletion src/prefect/_internal/concurrency/supervisors.py
Expand Up @@ -181,7 +181,10 @@ def result(self) -> Union[Awaitable[T], T]:
raise NotImplementedError()

def __repr__(self) -> str:
return f"<{self.__class__.__name__}(id={id(self)}, owner={self.owner_thread_ident})>"
return (
f"<{self.__class__.__name__}(id={id(self)},"
f" owner={self.owner_thread_ident})>"
)


class SyncSupervisor(Supervisor[T]):
Expand Down
44 changes: 30 additions & 14 deletions src/prefect/agent.py
Expand Up @@ -54,7 +54,8 @@ def __init__(
) -> None:
if default_infrastructure and default_infrastructure_document_id:
raise ValueError(
"Provide only one of 'default_infrastructure' and 'default_infrastructure_document_id'."
"Provide only one of 'default_infrastructure' and"
" 'default_infrastructure_document_id'."
)

self.work_queues: Set[str] = set(work_queues) if work_queues else set()
Expand Down Expand Up @@ -154,7 +155,8 @@ async def get_work_queues(self) -> AsyncIterator[WorkQueue]:
)
if self.work_pool_name:
self.logger.info(
f"Created work queue {name!r} in work pool {self.work_pool_name!r}."
f"Created work queue {name!r} in work pool"
f" {self.work_pool_name!r}."
)
else:
self.logger.info(f"Created work queue '{name}'.")
Expand Down Expand Up @@ -214,7 +216,8 @@ async def get_and_submit_flow_runs(self) -> List[FlowRun]:
submittable_runs.extend(queue_runs)
except ObjectNotFound:
self.logger.error(
f"Work queue {work_queue.name!r} ({work_queue.id}) not found."
f"Work queue {work_queue.name!r} ({work_queue.id}) not"
" found."
)
except Exception as exc:
self.logger.exception(exc)
Expand All @@ -231,7 +234,8 @@ async def get_and_submit_flow_runs(self) -> List[FlowRun]:
self.limiter.acquire_on_behalf_of_nowait(flow_run.id)
except anyio.WouldBlock:
self.logger.info(
f"Flow run limit reached; {self.limiter.borrowed_tokens} flow runs in progress."
f"Flow run limit reached; {self.limiter.borrowed_tokens} flow runs"
" in progress."
)
break
else:
Expand Down Expand Up @@ -300,12 +304,16 @@ async def cancel_run(self, flow_run: FlowRun) -> None:
"""
if not flow_run.infrastructure_pid:
self.logger.error(
f"Flow run '{flow_run.id}' does not have an infrastructure pid attached. Cancellation cannot be guaranteed."
f"Flow run '{flow_run.id}' does not have an infrastructure pid"
" attached. Cancellation cannot be guaranteed."
)
await self._mark_flow_run_as_cancelled(
flow_run,
state_updates={
"message": "This flow run is missing infrastructure tracking information and cancellation cannot be guaranteed."
"message": (
"This flow run is missing infrastructure tracking information"
" and cancellation cannot be guaranteed."
)
},
)
return
Expand Down Expand Up @@ -342,7 +350,7 @@ async def cancel_run(self, flow_run: FlowRun) -> None:
self.logger.warning(f"{exc} Flow run cannot be cancelled by this agent.")
except Exception:
self.logger.exception(
f"Encountered exception while killing infrastructure for flow run "
"Encountered exception while killing infrastructure for flow run "
f"'{flow_run.id}'. Flow run may not be cancelled."
)
# We will try again on generic exceptions
Expand Down Expand Up @@ -447,8 +455,9 @@ async def submit_run(self, flow_run: FlowRun) -> None:
)
except Exception as exc:
self.logger.exception(
"An error occured while setting the `infrastructure_pid` on "
f"flow run {flow_run.id!r}. The flow run will not be cancellable."
"An error occured while setting the `infrastructure_pid`"
f" on flow run {flow_run.id!r}. The flow run will not be"
" cancellable."
)

self.logger.info(f"Completed submission of flow run '{flow_run.id}'")
Expand Down Expand Up @@ -507,7 +516,10 @@ async def _submit_run_and_capture_errors(
if result.status_code != 0:
await self._propose_crashed_state(
flow_run,
f"Flow run infrastructure exited with non-zero status code {result.status_code}.",
(
"Flow run infrastructure exited with non-zero status code"
f" {result.status_code}."
),
)

return result
Expand All @@ -518,8 +530,10 @@ async def _propose_pending_state(self, flow_run: FlowRun) -> bool:
state = await propose_state(self.client, Pending(), flow_run_id=flow_run.id)
except Abort as exc:
self.logger.info(
f"Aborted submission of flow run '{flow_run.id}'. "
f"Server sent an abort signal: {exc}",
(
f"Aborted submission of flow run '{flow_run.id}'. "
f"Server sent an abort signal: {exc}"
),
)
return False
except Exception as exc:
Expand All @@ -531,8 +545,10 @@ async def _propose_pending_state(self, flow_run: FlowRun) -> bool:

if not state.is_pending():
self.logger.info(
f"Aborted submission of flow run '{flow_run.id}': "
f"Server returned a non-pending state {state.type.value!r}",
(
f"Aborted submission of flow run '{flow_run.id}': "
f"Server returned a non-pending state {state.type.value!r}"
),
)
return False

Expand Down
35 changes: 19 additions & 16 deletions src/prefect/blocks/core.py
Expand Up @@ -202,9 +202,9 @@ def schema_extra(schema: Dict[str, Any], model: Type["Block"]):
type_._to_block_schema_reference_dict(),
]
else:
refs[
field.name
] = type_._to_block_schema_reference_dict()
refs[field.name] = (
type_._to_block_schema_reference_dict()
)

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -716,7 +716,8 @@ class Custom(Block):
)
except prefect.exceptions.ObjectNotFound as e:
raise ValueError(
f"Unable to find block document named {block_document_name} for block type {block_type_slug}"
f"Unable to find block document named {block_document_name} for block"
f" type {block_type_slug}"
) from e

try:
Expand All @@ -726,18 +727,20 @@ class Custom(Block):
missing_fields = tuple(err["loc"][0] for err in e.errors())
missing_block_data = {field: None for field in missing_fields}
warnings.warn(
f"Could not fully load {block_document_name!r} of block type {cls._block_type_slug!r} - "
"this is likely because one or more required fields were added to the schema "
f"for {cls.__name__!r} that did not exist on the class when this block was last saved. "
f"Please specify values for new field(s): {listrepr(missing_fields)}, then "
f'run `{cls.__name__}.save("{block_document_name}", overwrite=True)`, and '
"load this block again before attempting to use it."
f"Could not fully load {block_document_name!r} of block type"
f" {cls._block_type_slug!r} - this is likely because one or more"
" required fields were added to the schema for"
f" {cls.__name__!r} that did not exist on the class when this block"
" was last saved. Please specify values for new field(s):"
f" {listrepr(missing_fields)}, then run"
f' `{cls.__name__}.save("{block_document_name}", overwrite=True)`,'
" and load this block again before attempting to use it."
)
return cls.construct(**block_document.data, **missing_block_data)
raise RuntimeError(
f"Unable to load {block_document_name!r} of block type {cls._block_type_slug!r} "
"due to failed validation. To load without validation, try loading again "
"with `validate=False`."
f"Unable to load {block_document_name!r} of block type"
f" {cls._block_type_slug!r} due to failed validation. To load without"
" validation, try loading again with `validate=False`."
) from e

@staticmethod
Expand Down Expand Up @@ -858,9 +861,9 @@ async def _save(
)
else:
raise ValueError(
"You are attempting to save values with a name that is already in "
"use for this block type. If you would like to overwrite the values that are saved, "
"then save with `overwrite=True`."
"You are attempting to save values with a name that is already in"
" use for this block type. If you would like to overwrite the"
" values that are saved, then save with `overwrite=True`."
) from err

# Update metadata on block instance for later use.
Expand Down
32 changes: 20 additions & 12 deletions src/prefect/blocks/notifications.py
Expand Up @@ -33,14 +33,14 @@ class AbstractAppriseNotificationBlock(NotificationBlock, ABC):
An abstract class for sending notifications using Apprise.
"""

notify_type: Literal[
"prefect_default", "info", "success", "warning", "failure"
] = Field(
default=PrefectNotifyType.DEFAULT,
description=(
"The type of notification being performed; the prefect_default "
"is a plain notification that does not attach an image."
),
notify_type: Literal["prefect_default", "info", "success", "warning", "failure"] = (
Field(
default=PrefectNotifyType.DEFAULT,
description=(
"The type of notification being performed; the prefect_default "
"is a plain notification that does not attach an image."
),
)
)

def _start_apprise_client(self, url: SecretStr):
Expand Down Expand Up @@ -129,7 +129,9 @@ class MicrosoftTeamsWebhook(AppriseNotificationBlock):
...,
title="Webhook URL",
description="The Teams incoming webhook URL used to send notifications.",
example="https://your-org.webhook.office.com/webhookb2/XXX/IncomingWebhook/YYY/ZZZ",
example=(
"https://your-org.webhook.office.com/webhookb2/XXX/IncomingWebhook/YYY/ZZZ"
),
)


Expand Down Expand Up @@ -324,7 +326,7 @@ class OpsgenieWebhook(AbstractAppriseNotificationBlock):
apikey: SecretStr = Field(
default=...,
title="API Key",
description=("The API Key associated with your Opsgenie account."),
description="The API Key associated with your Opsgenie account.",
)

target_user: Optional[List] = Field(
Expand Down Expand Up @@ -354,13 +356,19 @@ class OpsgenieWebhook(AbstractAppriseNotificationBlock):

tags: Optional[List] = Field(
default=None,
description="A comma-separated list of tags you can associate with your Opsgenie message.",
description=(
"A comma-separated list of tags you can associate with your Opsgenie"
" message."
),
example='["tag1", "tag2"]',
)

priority: Optional[str] = Field(
default=3,
description="The priority to associate with the message. It is on a scale between 1 (LOW) and 5 (EMERGENCY).",
description=(
"The priority to associate with the message. It is on a scale between 1"
" (LOW) and 5 (EMERGENCY)."
),
)

alias: Optional[str] = Field(
Expand Down
27 changes: 18 additions & 9 deletions src/prefect/cli/agent.py
Expand Up @@ -56,8 +56,9 @@ async def start(
"-m",
"--match",
help=(
"Dynamically matches work queue names with the specified prefix for the agent to pull from,"
"for example `dev-` will match all work queues with a name that starts with `dev-`"
"Dynamically matches work queue names with the specified prefix for the"
" agent to pull from,for example `dev-` will match all work queues with a"
" name that starts with `dev-`"
),
),
work_pool_name: str = typer.Option(
Expand All @@ -77,7 +78,10 @@ async def start(
None,
"-t",
"--tag",
help="DEPRECATED: One or more optional tags that will be used to create a work queue. This option will be removed on 2023-02-23.",
help=(
"DEPRECATED: One or more optional tags that will be used to create a work"
" queue. This option will be removed on 2023-02-23."
),
),
limit: int = typer.Option(
None,
Expand All @@ -102,8 +106,11 @@ async def start(
pass
work_queues.append(work_queue)
app.console.print(
"Agents now support multiple work queues. Instead of passing a single argument, provide work queue names "
f"with the `-q` or `--work-queue` flag: `prefect agent start -q {work_queue}`\n",
(
"Agents now support multiple work queues. Instead of passing a single"
" argument, provide work queue names with the `-q` or `--work-queue`"
f" flag: `prefect agent start -q {work_queue}`\n"
),
style="blue",
)

Expand All @@ -122,10 +129,12 @@ async def start(
if tags:
work_queue_name = f"Agent queue {'-'.join(sorted(tags))}"
app.console.print(
"`tags` are deprecated. For backwards-compatibility with old "
f"versions of Prefect, this agent will create a work queue named `{work_queue_name}` "
"that uses legacy tag-based matching. "
"This option will be removed on 2023-02-23.",
(
"`tags` are deprecated. For backwards-compatibility with old versions"
" of Prefect, this agent will create a work queue named"
f" `{work_queue_name}` that uses legacy tag-based matching. This option"
" will be removed on 2023-02-23."
),
style="red",
)

Expand Down