From 4459c76e002dba212ef6bf4e938e2df943566cf7 Mon Sep 17 00:00:00 2001 From: Luciano Scarpulla Date: Fri, 16 Feb 2024 11:19:31 +0100 Subject: [PATCH 1/5] feat: Add compatible methods for Pydantic V1.8 and V2 This allows to use both V1.8 and V2 without triggering any deprecation warnings --- src/firebolt/model/V1/__init__.py | 72 +++- src/firebolt/model/V1/database.py | 364 ++++++++--------- src/firebolt/model/V1/engine.py | 544 ++++++++++++------------- src/firebolt/service/V1/binding.py | 296 +++++++------- src/firebolt/service/V1/database.py | 4 +- src/firebolt/service/V1/provider.py | 20 +- src/firebolt/service/V1/region.py | 150 +++---- tests/unit/service/V1/conftest.py | 15 +- tests/unit/service/V1/test_bindings.py | 6 +- tests/unit/util.py | 2 +- 10 files changed, 766 insertions(+), 707 deletions(-) diff --git a/src/firebolt/model/V1/__init__.py b/src/firebolt/model/V1/__init__.py index 1a1114c232..d34ae497af 100644 --- a/src/firebolt/model/V1/__init__.py +++ b/src/firebolt/model/V1/__init__.py @@ -1,16 +1,70 @@ import json -from typing import Any +from typing import Any, Callable, Final, Type, TypeVar, Union -from pydantic import BaseModel +import pydantic +from typing_extensions import TypeAlias +Model = TypeVar("Model", bound="pydantic.BaseModel") -class FireboltBaseModel(BaseModel): +GenericCallable: TypeAlias = Callable[..., Any] - # Using Pydantic 1.* config class for backwards compatibility - class Config: - extra = "forbid" - allow_population_by_field_name = True # Pydantic 1.8 - populate_by_name = True # Pydantic 2.0 +# Using `.VERSION` instead of `.__version__` for backward compatibility: +PYDANTIC_VERSION: Final[int] = int(pydantic.VERSION[0]) + + +def use_if_version_ge( + version_ge: int, + obj: Union[pydantic.BaseModel, Type[Model]], + previous_method: str, + latest_method: str, +) -> GenericCallable: + """ + Utility function to get desired method from base model. + + Args: + version_ge: The version number that will be used to determine + the desired method. + obj: The object on which the method will be taken from + previous_method: The method previously available in a version + smaller than `version_ge`. + latest_method: The method available from `version_ge` onwards. + + """ + if PYDANTIC_VERSION >= version_ge: + return getattr(obj, latest_method) + else: + return getattr(obj, previous_method) + + +if PYDANTIC_VERSION >= 2: + # This import can only happen outside the BaseModel, + # or it will raise PydanticUserError + from pydantic import ConfigDict + + +class FireboltBaseModel(pydantic.BaseModel): + if PYDANTIC_VERSION >= 2: + # Pydantic V2 config + model_config = ConfigDict(populate_by_name=True, from_attributes=True) + + else: + # Using Pydantic 1.* config class for backwards compatibility + class Config: + extra = "forbid" + allow_population_by_field_name = True # Pydantic 1.8 + + def model_dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: + """Pydantic V2 and V1 compatible method for `dict` -> `model_dump`.""" + return use_if_version_ge(2, self, "dict", "model_dump")(*args, **kwargs) + + @classmethod + def parse_model(cls: Type[Model], *args: Any, **kwargs: Any) -> Model: + """Pydantic V2 and V1 compatible method for `parse_obj` -> `model_validate`.""" + return use_if_version_ge(2, cls, "parse_obj", "model_validate")(*args, **kwargs) + + def model_json(self, *args: Any, **kwargs: Any) -> str: + """Pydantic V2 and V1 compatible method for `json` -> `model_dump_json`.""" + return use_if_version_ge(2, self, "json", "model_dump_json")(*args, **kwargs) def jsonable_dict(self, *args: Any, **kwargs: Any) -> dict: """ @@ -24,4 +78,4 @@ def jsonable_dict(self, *args: Any, **kwargs: Any) -> dict: expects to take in a dictionary of primitives as input to the JSON parameter of its request function. See: https://www.python-httpx.org/api/#helper-functions """ - return json.loads(self.json(*args, **kwargs)) + return json.loads(self.model_json(*args, **kwargs)) diff --git a/src/firebolt/model/V1/database.py b/src/firebolt/model/V1/database.py index 6ad19c1ef6..533266e828 100644 --- a/src/firebolt/model/V1/database.py +++ b/src/firebolt/model/V1/database.py @@ -1,182 +1,182 @@ -from __future__ import annotations - -import logging -from datetime import datetime -from typing import TYPE_CHECKING, Any, List, Optional, Sequence - -from pydantic import Field, PrivateAttr - -from firebolt.model.V1 import FireboltBaseModel -from firebolt.model.V1.region import RegionKey -from firebolt.service.V1.engine import EngineService -from firebolt.service.V1.types import EngineStatusSummary -from firebolt.utils.exception import AttachedEngineInUseError -from firebolt.utils.urls import ACCOUNT_DATABASE_URL - -if TYPE_CHECKING: - from firebolt.model.V1.binding import Binding - from firebolt.model.V1.engine import Engine - from firebolt.service.V1.database import DatabaseService - -logger = logging.getLogger(__name__) - - -class DatabaseKey(FireboltBaseModel): - account_id: str - database_id: str - - -class FieldMask(FireboltBaseModel): - paths: Sequence[str] = Field(alias="paths") - - -class Database(FireboltBaseModel): - """ - A Firebolt database. - - Databases belong to a region and have a description, - but otherwise are not configurable. - """ - - # internal - _service: DatabaseService = PrivateAttr() - - # required - name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") - compute_region_key: RegionKey = Field(alias="compute_region_id") - - # optional - database_key: Optional[DatabaseKey] = Field(default=None, alias="id") - description: Optional[str] = Field(default=None, max_length=255) - emoji: Optional[str] = Field(default=None, max_length=255) - current_status: Optional[str] = None - health_status: Optional[str] = None - data_size_full: Optional[int] = None - data_size_compressed: Optional[int] = None - is_system_database: Optional[bool] = None - storage_bucket_name: Optional[str] = None - create_time: Optional[datetime] = None - create_actor: Optional[str] = None - last_update_time: Optional[datetime] = None - last_update_actor: Optional[str] = None - desired_status: Optional[str] = None - - @classmethod - def parse_obj_with_service( - cls, obj: Any, database_service: DatabaseService - ) -> Database: - database = cls.parse_obj(obj) - database._service = database_service - return database - - @property - def database_id(self) -> Optional[str]: - if self.database_key is None: - return None - return self.database_key.database_id - - def get_attached_engines(self) -> List[Engine]: - """Get a list of engines that are attached to this database.""" - - return self._service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501 - database=self - ) - - def attach_to_engine( - self, engine: Engine, is_default_engine: bool = False - ) -> Binding: - """ - Attach an engine to this database. - - Args: - engine: The engine to attach. - is_default_engine: - Whether this engine should be used as default for this database. - Only one engine can be set as default for a single database. - This will overwrite any existing default. - """ - - return self._service.resource_manager.bindings.create( - engine=engine, database=self, is_default_engine=is_default_engine - ) - - def delete(self) -> Database: - """ - Delete a database from Firebolt. - - Raises an error if there are any attached engines. - """ - - for engine in self.get_attached_engines(): - if engine.current_status_summary in { - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING, - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, - }: - raise AttachedEngineInUseError(method_name="delete") - - logger.info( - f"Deleting Database (database_id={self.database_id}, name={self.name})" - ) - response = self._service.client.delete( - url=ACCOUNT_DATABASE_URL.format( - account_id=self._service.account_id, database_id=self.database_id - ), - headers={"Content-type": "application/json"}, - ) - return Database.parse_obj_with_service( - response.json()["database"], self._service - ) - - def update(self, description: str) -> Database: - """ - Updates a database description. - """ - - class _DatabaseUpdateRequest(FireboltBaseModel): - """Helper model for sending Database creation requests.""" - - account_id: str - database: Database - database_id: str - update_mask: FieldMask - - self.description = description - - assert self.database_id is not None, "Database must have database_id" - logger.info( - f"Updating Database (database_id={self.database_id}, " - f"name={self.name}, description={self.description})" - ) - - payload = _DatabaseUpdateRequest( - account_id=self._service.account_id, - database=self, - database_id=self.database_id, - update_mask=FieldMask(paths=["description"]), - ).jsonable_dict(by_alias=True) - - response = self._service.client.patch( - url=ACCOUNT_DATABASE_URL.format( - account_id=self._service.account_id, database_id=self.database_id - ), - headers={"Content-type": "application/json"}, - json=payload, - ) - - return Database.parse_obj_with_service( - response.json()["database"], self._service - ) - - def get_default_engine(self) -> Optional[Engine]: - """ - Returns: default engine of the database, or None if default engine is missing - """ - rm = self._service.resource_manager - assert isinstance(rm.engines, EngineService), "Expected EngineService V1" - default_engines: List[Engine] = [ - rm.engines.get(binding.engine_id) - for binding in rm.bindings.get_many(database_id=self.database_id) - if binding.is_default_engine - ] - - return None if len(default_engines) == 0 else default_engines[0] +from __future__ import annotations + +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Any, List, Optional, Sequence + +from pydantic import Field, PrivateAttr + +from firebolt.model.V1 import FireboltBaseModel +from firebolt.model.V1.region import RegionKey +from firebolt.service.V1.engine import EngineService +from firebolt.service.V1.types import EngineStatusSummary +from firebolt.utils.exception import AttachedEngineInUseError +from firebolt.utils.urls import ACCOUNT_DATABASE_URL + +if TYPE_CHECKING: + from firebolt.model.V1.binding import Binding + from firebolt.model.V1.engine import Engine + from firebolt.service.V1.database import DatabaseService + +logger = logging.getLogger(__name__) + + +class DatabaseKey(FireboltBaseModel): + account_id: str + database_id: str + + +class FieldMask(FireboltBaseModel): + paths: Sequence[str] = Field(alias="paths") + + +class Database(FireboltBaseModel): + """ + A Firebolt database. + + Databases belong to a region and have a description, + but otherwise are not configurable. + """ + + # internal + _service: DatabaseService = PrivateAttr() + + # required + name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") + compute_region_key: RegionKey = Field(alias="compute_region_id") + + # optional + database_key: Optional[DatabaseKey] = Field(default=None, alias="id") + description: Optional[str] = Field(default=None, max_length=255) + emoji: Optional[str] = Field(default=None, max_length=255) + current_status: Optional[str] = None + health_status: Optional[str] = None + data_size_full: Optional[int] = None + data_size_compressed: Optional[int] = None + is_system_database: Optional[bool] = None + storage_bucket_name: Optional[str] = None + create_time: Optional[datetime] = None + create_actor: Optional[str] = None + last_update_time: Optional[datetime] = None + last_update_actor: Optional[str] = None + desired_status: Optional[str] = None + + @classmethod + def parse_obj_with_service( + cls, obj: Any, database_service: DatabaseService + ) -> Database: + database = cls.parse_model(obj) + database._service = database_service + return database + + @property + def database_id(self) -> Optional[str]: + if self.database_key is None: + return None + return self.database_key.database_id + + def get_attached_engines(self) -> List[Engine]: + """Get a list of engines that are attached to this database.""" + + return self._service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501 + database=self + ) + + def attach_to_engine( + self, engine: Engine, is_default_engine: bool = False + ) -> Binding: + """ + Attach an engine to this database. + + Args: + engine: The engine to attach. + is_default_engine: + Whether this engine should be used as default for this database. + Only one engine can be set as default for a single database. + This will overwrite any existing default. + """ + + return self._service.resource_manager.bindings.create( + engine=engine, database=self, is_default_engine=is_default_engine + ) + + def delete(self) -> Database: + """ + Delete a database from Firebolt. + + Raises an error if there are any attached engines. + """ + + for engine in self.get_attached_engines(): + if engine.current_status_summary in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, + }: + raise AttachedEngineInUseError(method_name="delete") + + logger.info( + f"Deleting Database (database_id={self.database_id}, name={self.name})" + ) + response = self._service.client.delete( + url=ACCOUNT_DATABASE_URL.format( + account_id=self._service.account_id, database_id=self.database_id + ), + headers={"Content-type": "application/json"}, + ) + return Database.parse_obj_with_service( + response.json()["database"], self._service + ) + + def update(self, description: str) -> Database: + """ + Updates a database description. + """ + + class _DatabaseUpdateRequest(FireboltBaseModel): + """Helper model for sending Database creation requests.""" + + account_id: str + database: Database + database_id: str + update_mask: FieldMask + + self.description = description + + assert self.database_id is not None, "Database must have database_id" + logger.info( + f"Updating Database (database_id={self.database_id}, " + f"name={self.name}, description={self.description})" + ) + + payload = _DatabaseUpdateRequest( + account_id=self._service.account_id, + database=self, + database_id=self.database_id, + update_mask=FieldMask(paths=["description"]), + ).jsonable_dict(by_alias=True) + + response = self._service.client.patch( + url=ACCOUNT_DATABASE_URL.format( + account_id=self._service.account_id, database_id=self.database_id + ), + headers={"Content-type": "application/json"}, + json=payload, + ) + + return Database.parse_obj_with_service( + response.json()["database"], self._service + ) + + def get_default_engine(self) -> Optional[Engine]: + """ + Returns: default engine of the database, or None if default engine is missing + """ + rm = self._service.resource_manager + assert isinstance(rm.engines, EngineService), "Expected EngineService V1" + default_engines: List[Engine] = [ + rm.engines.get(binding.engine_id) + for binding in rm.bindings.get_many(database_id=self.database_id) + if binding.is_default_engine + ] + + return None if len(default_engines) == 0 else default_engines[0] diff --git a/src/firebolt/model/V1/engine.py b/src/firebolt/model/V1/engine.py index 4a97eb4c4c..12eedee4aa 100644 --- a/src/firebolt/model/V1/engine.py +++ b/src/firebolt/model/V1/engine.py @@ -1,272 +1,272 @@ -from __future__ import annotations - -import logging -import time -from datetime import datetime -from typing import TYPE_CHECKING, Any, Optional, Sequence - -from pydantic import Field, PrivateAttr - -from firebolt.model.V1 import FireboltBaseModel -from firebolt.model.V1.engine_revision import EngineRevisionKey -from firebolt.model.V1.region import RegionKey -from firebolt.service.V1.types import ( - EngineStatus, - EngineStatusSummary, - EngineType, - WarmupMethod, -) -from firebolt.utils.urls import ( - ACCOUNT_ENGINE_START_URL, - ACCOUNT_ENGINE_STOP_URL, -) - -if TYPE_CHECKING: - from firebolt.service.V1.engine import EngineService - -logger = logging.getLogger(__name__) - - -class EngineKey(FireboltBaseModel): - account_id: str - engine_id: str - - -def wait(seconds: int, timeout_time: float, error_message: str, verbose: bool) -> None: - time.sleep(seconds) - if time.time() > timeout_time: - raise TimeoutError(error_message) - if verbose: - print(".", end="") - - -class EngineSettings(FireboltBaseModel): - """ - Engine settings. - - See also: :py:class:`EngineRevisionSpecification - ` - which also contains engine configuration. - """ - - preset: str - auto_stop_delay_duration: str = Field(pattern=r"^[0-9]+[sm]$|^0$") - minimum_logging_level: str - is_read_only: bool - warm_up: str - - @classmethod - def default( - cls, - engine_type: EngineType = EngineType.GENERAL_PURPOSE, - auto_stop_delay_duration: str = "1200s", - warm_up: WarmupMethod = WarmupMethod.PRELOAD_INDEXES, - minimum_logging_level: str = "ENGINE_SETTINGS_LOGGING_LEVEL_INFO", - ) -> EngineSettings: - if engine_type == EngineType.GENERAL_PURPOSE: - preset = engine_type.GENERAL_PURPOSE.api_settings_preset_name # type: ignore # noqa: E501 - is_read_only = False - else: - preset = engine_type.DATA_ANALYTICS.api_settings_preset_name # type: ignore - is_read_only = True - - return cls( - preset=preset, - auto_stop_delay_duration=auto_stop_delay_duration, - minimum_logging_level=minimum_logging_level, - is_read_only=is_read_only, - warm_up=warm_up.api_name, - ) - - -class FieldMask(FireboltBaseModel): - paths: Sequence[str] = Field(alias="paths") - - -class Engine(FireboltBaseModel): - """ - A Firebolt engine. Responsible for performing work (queries, ingestion). - - Engines are configured in :py:class:`Settings - ` - and in :py:class:`EngineRevisionSpecification - `. - """ - - # internal - _service: EngineService = PrivateAttr() - - # required - name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") - compute_region_key: RegionKey = Field(alias="compute_region_id") - settings: EngineSettings - - # optional - key: Optional[EngineKey] = Field(None, alias="id") - description: Optional[str] = None - emoji: Optional[str] = None - current_status: Optional[EngineStatus] = None - current_status_summary: Optional[EngineStatusSummary] = None - latest_revision_key: Optional[EngineRevisionKey] = Field( - None, alias="latest_revision_id" - ) - endpoint: Optional[str] = None - endpoint_serving_revision_key: Optional[EngineRevisionKey] = Field( - None, alias="endpoint_serving_revision_id" - ) - create_time: Optional[datetime] = None - create_actor: Optional[str] = None - last_update_time: Optional[datetime] = None - last_update_actor: Optional[str] = None - last_use_time: Optional[datetime] = None - desired_status: Optional[str] = None - health_status: Optional[str] = None - endpoint_desired_revision_key: Optional[EngineRevisionKey] = Field( - None, alias="endpoint_desired_revision_id" - ) - - @classmethod - def parse_obj_with_service(cls, obj: Any, engine_service: EngineService) -> Engine: - engine = cls.parse_obj(obj) - engine._service = engine_service - return engine - - @property - def engine_id(self) -> str: - if self.key is None: - raise ValueError("engine key is None") - return self.key.engine_id - - def get_latest(self) -> Engine: - """Get an up-to-date instance of the engine from Firebolt.""" - return self._service.get(id_=self.engine_id) - - def start( - self, - wait_for_startup: bool = True, - wait_timeout_seconds: int = 3600, - verbose: bool = False, - ) -> Engine: - """ - Start an engine. If it's already started, do nothing. - - Args: - wait_for_startup: - If True, wait for startup to complete. - If False, return immediately after requesting startup. - wait_timeout_seconds: - Number of seconds to wait for startup to complete - before raising a TimeoutError - verbose: - If True, print dots periodically while waiting for engine start. - If False, do not print any dots. - - Returns: - The updated engine from Firebolt. - """ - timeout_time = time.time() + wait_timeout_seconds - - engine = self.get_latest() - if ( - engine.current_status_summary - == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING - ): - logger.info( - f"Engine (engine_id={self.engine_id}, name={self.name}) " - "is already running." - ) - return engine - - # wait for engine to stop first, if it's already stopping - # FUTURE: revisit logging and consider consolidating this if & the while below. - elif ( - engine.current_status_summary - == EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING - ): - logger.info( - f"Engine (engine_id={engine.engine_id}, name={engine.name}) " - "is in currently stopping, waiting for it to stop first." - ) - while ( - engine.current_status_summary - != EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED - ): - wait( - seconds=5, - timeout_time=timeout_time, - error_message=( - "Engine " - f"(engine_id={engine.engine_id}, name={engine.name}) " - f"did not stop within {wait_timeout_seconds} seconds." - ), - verbose=True, - ) - engine = engine.get_latest() - - logger.info( - f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." - ) - - engine = self._send_engine_request(ACCOUNT_ENGINE_START_URL) - logger.info( - f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})" - ) - - # wait for engine to start - while wait_for_startup and engine.current_status_summary not in { - EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING, - EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, - }: - wait( - seconds=5, - timeout_time=timeout_time, - error_message=( # noqa: E501 - f"Could not start engine within {wait_timeout_seconds} seconds." - ), - verbose=verbose, - ) - previous_status_summary = engine.current_status_summary - engine = engine.get_latest() - if engine.current_status_summary != previous_status_summary: - logger.info( - "Engine status_summary=" - f"{getattr(engine.current_status_summary, 'name')}" - ) - - return engine - - def stop( - self, wait_for_stop: bool = False, wait_timeout_seconds: int = 3600 - ) -> Engine: - """Stop an Engine running on Firebolt.""" - timeout_time = time.time() + wait_timeout_seconds - - engine = self._send_engine_request(ACCOUNT_ENGINE_STOP_URL) - logger.info(f"Stopping Engine (engine_id={self.engine_id}, name={self.name})") - - while wait_for_stop and engine.current_status_summary not in { - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED, - EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, - }: - wait( - seconds=5, - timeout_time=timeout_time, - error_message=( # noqa: E501 - f"Could not stop engine within {wait_timeout_seconds} seconds." - ), - verbose=False, - ) - - engine = engine.get_latest() - - return engine - - def _send_engine_request(self, url: str) -> Engine: - response = self._service.client.post( - url=url.format( - account_id=self._service.account_id, engine_id=self.engine_id - ) - ) - return Engine.parse_obj_with_service( - obj=response.json()["engine"], engine_service=self._service - ) +from __future__ import annotations + +import logging +import time +from datetime import datetime +from typing import TYPE_CHECKING, Any, Optional, Sequence + +from pydantic import Field, PrivateAttr + +from firebolt.model.V1 import FireboltBaseModel +from firebolt.model.V1.engine_revision import EngineRevisionKey +from firebolt.model.V1.region import RegionKey +from firebolt.service.V1.types import ( + EngineStatus, + EngineStatusSummary, + EngineType, + WarmupMethod, +) +from firebolt.utils.urls import ( + ACCOUNT_ENGINE_START_URL, + ACCOUNT_ENGINE_STOP_URL, +) + +if TYPE_CHECKING: + from firebolt.service.V1.engine import EngineService + +logger = logging.getLogger(__name__) + + +class EngineKey(FireboltBaseModel): + account_id: str + engine_id: str + + +def wait(seconds: int, timeout_time: float, error_message: str, verbose: bool) -> None: + time.sleep(seconds) + if time.time() > timeout_time: + raise TimeoutError(error_message) + if verbose: + print(".", end="") + + +class EngineSettings(FireboltBaseModel): + """ + Engine settings. + + See also: :py:class:`EngineRevisionSpecification + ` + which also contains engine configuration. + """ + + preset: str + auto_stop_delay_duration: str = Field(pattern=r"^[0-9]+[sm]$|^0$") + minimum_logging_level: str + is_read_only: bool + warm_up: str + + @classmethod + def default( + cls, + engine_type: EngineType = EngineType.GENERAL_PURPOSE, + auto_stop_delay_duration: str = "1200s", + warm_up: WarmupMethod = WarmupMethod.PRELOAD_INDEXES, + minimum_logging_level: str = "ENGINE_SETTINGS_LOGGING_LEVEL_INFO", + ) -> EngineSettings: + if engine_type == EngineType.GENERAL_PURPOSE: + preset = engine_type.GENERAL_PURPOSE.api_settings_preset_name # type: ignore # noqa: E501 + is_read_only = False + else: + preset = engine_type.DATA_ANALYTICS.api_settings_preset_name # type: ignore + is_read_only = True + + return cls( + preset=preset, + auto_stop_delay_duration=auto_stop_delay_duration, + minimum_logging_level=minimum_logging_level, + is_read_only=is_read_only, + warm_up=warm_up.api_name, + ) + + +class FieldMask(FireboltBaseModel): + paths: Sequence[str] = Field(alias="paths") + + +class Engine(FireboltBaseModel): + """ + A Firebolt engine. Responsible for performing work (queries, ingestion). + + Engines are configured in :py:class:`Settings + ` + and in :py:class:`EngineRevisionSpecification + `. + """ + + # internal + _service: EngineService = PrivateAttr() + + # required + name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") + compute_region_key: RegionKey = Field(alias="compute_region_id") + settings: EngineSettings + + # optional + key: Optional[EngineKey] = Field(None, alias="id") + description: Optional[str] = None + emoji: Optional[str] = None + current_status: Optional[EngineStatus] = None + current_status_summary: Optional[EngineStatusSummary] = None + latest_revision_key: Optional[EngineRevisionKey] = Field( + None, alias="latest_revision_id" + ) + endpoint: Optional[str] = None + endpoint_serving_revision_key: Optional[EngineRevisionKey] = Field( + None, alias="endpoint_serving_revision_id" + ) + create_time: Optional[datetime] = None + create_actor: Optional[str] = None + last_update_time: Optional[datetime] = None + last_update_actor: Optional[str] = None + last_use_time: Optional[datetime] = None + desired_status: Optional[str] = None + health_status: Optional[str] = None + endpoint_desired_revision_key: Optional[EngineRevisionKey] = Field( + None, alias="endpoint_desired_revision_id" + ) + + @classmethod + def parse_obj_with_service(cls, obj: Any, engine_service: EngineService) -> Engine: + engine = cls.parse_model(obj) + engine._service = engine_service + return engine + + @property + def engine_id(self) -> str: + if self.key is None: + raise ValueError("engine key is None") + return self.key.engine_id + + def get_latest(self) -> Engine: + """Get an up-to-date instance of the engine from Firebolt.""" + return self._service.get(id_=self.engine_id) + + def start( + self, + wait_for_startup: bool = True, + wait_timeout_seconds: int = 3600, + verbose: bool = False, + ) -> Engine: + """ + Start an engine. If it's already started, do nothing. + + Args: + wait_for_startup: + If True, wait for startup to complete. + If False, return immediately after requesting startup. + wait_timeout_seconds: + Number of seconds to wait for startup to complete + before raising a TimeoutError + verbose: + If True, print dots periodically while waiting for engine start. + If False, do not print any dots. + + Returns: + The updated engine from Firebolt. + """ + timeout_time = time.time() + wait_timeout_seconds + + engine = self.get_latest() + if ( + engine.current_status_summary + == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING + ): + logger.info( + f"Engine (engine_id={self.engine_id}, name={self.name}) " + "is already running." + ) + return engine + + # wait for engine to stop first, if it's already stopping + # FUTURE: revisit logging and consider consolidating this if & the while below. + elif ( + engine.current_status_summary + == EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING + ): + logger.info( + f"Engine (engine_id={engine.engine_id}, name={engine.name}) " + "is in currently stopping, waiting for it to stop first." + ) + while ( + engine.current_status_summary + != EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED + ): + wait( + seconds=5, + timeout_time=timeout_time, + error_message=( + "Engine " + f"(engine_id={engine.engine_id}, name={engine.name}) " + f"did not stop within {wait_timeout_seconds} seconds." + ), + verbose=True, + ) + engine = engine.get_latest() + + logger.info( + f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." + ) + + engine = self._send_engine_request(ACCOUNT_ENGINE_START_URL) + logger.info( + f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})" + ) + + # wait for engine to start + while wait_for_startup and engine.current_status_summary not in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, + }: + wait( + seconds=5, + timeout_time=timeout_time, + error_message=( # noqa: E501 + f"Could not start engine within {wait_timeout_seconds} seconds." + ), + verbose=verbose, + ) + previous_status_summary = engine.current_status_summary + engine = engine.get_latest() + if engine.current_status_summary != previous_status_summary: + logger.info( + "Engine status_summary=" + f"{getattr(engine.current_status_summary, 'name')}" + ) + + return engine + + def stop( + self, wait_for_stop: bool = False, wait_timeout_seconds: int = 3600 + ) -> Engine: + """Stop an Engine running on Firebolt.""" + timeout_time = time.time() + wait_timeout_seconds + + engine = self._send_engine_request(ACCOUNT_ENGINE_STOP_URL) + logger.info(f"Stopping Engine (engine_id={self.engine_id}, name={self.name})") + + while wait_for_stop and engine.current_status_summary not in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, + }: + wait( + seconds=5, + timeout_time=timeout_time, + error_message=( # noqa: E501 + f"Could not stop engine within {wait_timeout_seconds} seconds." + ), + verbose=False, + ) + + engine = engine.get_latest() + + return engine + + def _send_engine_request(self, url: str) -> Engine: + response = self._service.client.post( + url=url.format( + account_id=self._service.account_id, engine_id=self.engine_id + ) + ) + return Engine.parse_obj_with_service( + obj=response.json()["engine"], engine_service=self._service + ) diff --git a/src/firebolt/service/V1/binding.py b/src/firebolt/service/V1/binding.py index f21b09cea9..5a75075704 100644 --- a/src/firebolt/service/V1/binding.py +++ b/src/firebolt/service/V1/binding.py @@ -1,148 +1,148 @@ -import logging -from typing import List, Optional - -from firebolt.model.V1.binding import Binding, BindingKey -from firebolt.model.V1.database import Database -from firebolt.model.V1.engine import Engine -from firebolt.service.V1.base import BaseService -from firebolt.service.V1.database import DatabaseService -from firebolt.service.V1.engine import EngineService -from firebolt.utils.exception import AlreadyBoundError -from firebolt.utils.urls import ( - ACCOUNT_BINDINGS_URL, - ACCOUNT_DATABASE_BINDING_URL, -) -from firebolt.utils.util import prune_dict - -logger = logging.getLogger(__name__) - - -class BindingService(BaseService): - def get_by_key(self, binding_key: BindingKey) -> Binding: - """Get a binding by its BindingKey""" - response = self.client.get( - url=ACCOUNT_DATABASE_BINDING_URL.format( - account_id=binding_key.account_id, - database_id=binding_key.database_id, - engine_id=binding_key.engine_id, - ) - ) - binding: dict = response.json()["binding"] - return Binding.parse_obj(binding) - - def get_many( - self, - database_id: Optional[str] = None, - engine_id: Optional[str] = None, - is_system_database: Optional[bool] = None, - ) -> List[Binding]: - """ - List bindings on Firebolt, optionally filtering by database and engine. - - Args: - database_id: - Return bindings matching the database_id. - If None, match any databases. - engine_id: - Return bindings matching the engine_id. - If None, match any engines. - is_system_database: - If True, return only system databases. - If False, return only non-system databases. - If None, do not filter on this parameter. - - Returns: - List of bindings matching the filter parameters - """ - - response = self.client.get( - url=ACCOUNT_BINDINGS_URL.format(account_id=self.account_id), - params=prune_dict( - { - "page.first": 5000, # FUTURE: pagination support w/ generator - "filter.id_database_id_eq": database_id, - "filter.id_engine_id_eq": engine_id, - "filter.is_system_database_eq": is_system_database, - } - ), - ) - return [Binding.parse_obj(i["node"]) for i in response.json()["edges"]] - - def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: - """Get the database to which an engine is bound, if any.""" - try: - binding = self.get_many(engine_id=engine.engine_id)[0] - except IndexError: - return None - try: - assert isinstance( - self.resource_manager.databases, DatabaseService - ), "Expected DatabaseService V1" - return self.resource_manager.databases.get(id_=binding.database_id) - except (KeyError, IndexError): - return None - - def get_engines_bound_to_database(self, database: Database) -> List[Engine]: - """Get a list of engines that are bound to a database.""" - - bindings = self.get_many(database_id=database.database_id) - if not bindings: - return [] - assert isinstance( - self.resource_manager.engines, EngineService - ), "Expected EngineService V1" - return self.resource_manager.engines.get_by_ids( - ids=[b.engine_id for b in bindings] - ) - - def create( - self, engine: Engine, database: Database, is_default_engine: bool - ) -> Binding: - """ - Create a new binding between an engine and a database. - - Args: - engine: Engine to bind. - database: Database to bind. - is_default_engine: - Whether this engine should be used as default for this database. - Only one engine can be set as default for a single database. - This will overwrite any existing default. - - Returns: - New binding between the engine and database. - """ - - existing_database = self.get_database_bound_to_engine(engine=engine) - if existing_database is not None: - raise AlreadyBoundError( - f"The engine {engine.name} is already bound " - f"to {existing_database.name}!" - ) - - logger.info( - f"Attaching Engine (engine_id={engine.engine_id}, name={engine.name}) " - f"to Database (database_id={database.database_id}, " - f"name={database.name})" - ) - assert database.database_id is not None, "Database must have database_id" - binding = Binding( - binding_key=BindingKey( - account_id=self.account_id, - database_id=database.database_id, - engine_id=engine.engine_id, - ), - is_default_engine=is_default_engine, - ) - - response = self.client.post( - url=ACCOUNT_DATABASE_BINDING_URL.format( - account_id=self.account_id, - database_id=database.database_id, - engine_id=engine.engine_id, - ), - json=binding.jsonable_dict( - by_alias=True, include={"binding_key": ..., "is_default_engine": ...} - ), - ) - return Binding.parse_obj(response.json()["binding"]) +import logging +from typing import List, Optional + +from firebolt.model.V1.binding import Binding, BindingKey +from firebolt.model.V1.database import Database +from firebolt.model.V1.engine import Engine +from firebolt.service.V1.base import BaseService +from firebolt.service.V1.database import DatabaseService +from firebolt.service.V1.engine import EngineService +from firebolt.utils.exception import AlreadyBoundError +from firebolt.utils.urls import ( + ACCOUNT_BINDINGS_URL, + ACCOUNT_DATABASE_BINDING_URL, +) +from firebolt.utils.util import prune_dict + +logger = logging.getLogger(__name__) + + +class BindingService(BaseService): + def get_by_key(self, binding_key: BindingKey) -> Binding: + """Get a binding by its BindingKey""" + response = self.client.get( + url=ACCOUNT_DATABASE_BINDING_URL.format( + account_id=binding_key.account_id, + database_id=binding_key.database_id, + engine_id=binding_key.engine_id, + ) + ) + binding: dict = response.json()["binding"] + return Binding.parse_model(binding) + + def get_many( + self, + database_id: Optional[str] = None, + engine_id: Optional[str] = None, + is_system_database: Optional[bool] = None, + ) -> List[Binding]: + """ + List bindings on Firebolt, optionally filtering by database and engine. + + Args: + database_id: + Return bindings matching the database_id. + If None, match any databases. + engine_id: + Return bindings matching the engine_id. + If None, match any engines. + is_system_database: + If True, return only system databases. + If False, return only non-system databases. + If None, do not filter on this parameter. + + Returns: + List of bindings matching the filter parameters + """ + + response = self.client.get( + url=ACCOUNT_BINDINGS_URL.format(account_id=self.account_id), + params=prune_dict( + { + "page.first": 5000, # FUTURE: pagination support w/ generator + "filter.id_database_id_eq": database_id, + "filter.id_engine_id_eq": engine_id, + "filter.is_system_database_eq": is_system_database, + } + ), + ) + return [Binding.parse_model(i["node"]) for i in response.json()["edges"]] + + def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: + """Get the database to which an engine is bound, if any.""" + try: + binding = self.get_many(engine_id=engine.engine_id)[0] + except IndexError: + return None + try: + assert isinstance( + self.resource_manager.databases, DatabaseService + ), "Expected DatabaseService V1" + return self.resource_manager.databases.get(id_=binding.database_id) + except (KeyError, IndexError): + return None + + def get_engines_bound_to_database(self, database: Database) -> List[Engine]: + """Get a list of engines that are bound to a database.""" + + bindings = self.get_many(database_id=database.database_id) + if not bindings: + return [] + assert isinstance( + self.resource_manager.engines, EngineService + ), "Expected EngineService V1" + return self.resource_manager.engines.get_by_ids( + ids=[b.engine_id for b in bindings] + ) + + def create( + self, engine: Engine, database: Database, is_default_engine: bool + ) -> Binding: + """ + Create a new binding between an engine and a database. + + Args: + engine: Engine to bind. + database: Database to bind. + is_default_engine: + Whether this engine should be used as default for this database. + Only one engine can be set as default for a single database. + This will overwrite any existing default. + + Returns: + New binding between the engine and database. + """ + + existing_database = self.get_database_bound_to_engine(engine=engine) + if existing_database is not None: + raise AlreadyBoundError( + f"The engine {engine.name} is already bound " + f"to {existing_database.name}!" + ) + + logger.info( + f"Attaching Engine (engine_id={engine.engine_id}, name={engine.name}) " + f"to Database (database_id={database.database_id}, " + f"name={database.name})" + ) + assert database.database_id is not None, "Database must have database_id" + binding = Binding( + binding_key=BindingKey( # type: ignore[call-arg] + account_id=self.account_id, + database_id=database.database_id, + engine_id=engine.engine_id, + ), + is_default_engine=is_default_engine, # type: ignore[call-arg] + ) + + response = self.client.post( + url=ACCOUNT_DATABASE_BINDING_URL.format( + account_id=self.account_id, + database_id=database.database_id, + engine_id=engine.engine_id, + ), + json=binding.jsonable_dict( + by_alias=True, include={"binding_key": ..., "is_default_engine": ...} + ), + ) + return Binding.parse_model(response.json()["binding"]) diff --git a/src/firebolt/service/V1/database.py b/src/firebolt/service/V1/database.py index 2673f0a740..4ff7523f19 100644 --- a/src/firebolt/service/V1/database.py +++ b/src/firebolt/service/V1/database.py @@ -110,7 +110,9 @@ class _DatabaseCreateRequest(FireboltBaseModel): else: region_key = self.resource_manager.regions.get_by_name(name=region).key database = Database( - name=name, compute_region_key=region_key, description=description + name=name, + compute_region_key=region_key, # type: ignore[call-arg] + description=description, ) logger.info(f"Creating Database (name={name})") diff --git a/src/firebolt/service/V1/provider.py b/src/firebolt/service/V1/provider.py index 2d9ddec269..9c3742e767 100644 --- a/src/firebolt/service/V1/provider.py +++ b/src/firebolt/service/V1/provider.py @@ -1,10 +1,10 @@ -from firebolt.client import Client -from firebolt.model.V1.provider import Provider -from firebolt.utils.urls import PROVIDERS_URL - - -def get_provider_id(client: Client) -> str: - """Get the AWS provider_id.""" - response = client.get(url=PROVIDERS_URL) - providers = [Provider.parse_obj(i["node"]) for i in response.json()["edges"]] - return providers[0].provider_id +from firebolt.client import Client +from firebolt.model.V1.provider import Provider +from firebolt.utils.urls import PROVIDERS_URL + + +def get_provider_id(client: Client) -> str: + """Get the AWS provider_id.""" + response = client.get(url=PROVIDERS_URL) + providers = [Provider.parse_model(i["node"]) for i in response.json()["edges"]] + return providers[0].provider_id diff --git a/src/firebolt/service/V1/region.py b/src/firebolt/service/V1/region.py index f688e26fa3..289f7eeafe 100644 --- a/src/firebolt/service/V1/region.py +++ b/src/firebolt/service/V1/region.py @@ -1,75 +1,75 @@ -from typing import TYPE_CHECKING, Dict, List - -from firebolt.model.V1.region import Region, RegionKey -from firebolt.service.V1.base import BaseService -from firebolt.utils.urls import REGIONS_URL -from firebolt.utils.util import cached_property - -if TYPE_CHECKING: - from firebolt.service.manager import ResourceManager - - -class RegionService(BaseService): - def __init__(self, resource_manager: "ResourceManager"): - """ - Service to manage AWS regions (us-east-1, etc) - - Args: - resource_manager: Resource manager to use - """ - - super().__init__(resource_manager=resource_manager) - - @cached_property - def regions(self) -> List[Region]: - """List of available AWS regions on Firebolt.""" - - response = self.client.get(url=REGIONS_URL, params={"page.first": 5000}) - return [Region.parse_obj(i["node"]) for i in response.json()["edges"]] - - @cached_property - def regions_by_name(self) -> Dict[str, Region]: - """Dict of {RegionLookup to Region}""" - - return {r.name: r for r in self.regions} - - @cached_property - def regions_by_key(self) -> Dict[RegionKey, Region]: - """Dict of {RegionKey to Region}""" - - return {r.key: r for r in self.regions} - - @cached_property - def default_region(self) -> Region: - """Default AWS region, could be provided from environment.""" - - if not self.default_region_setting: - raise ValueError( - "default_region parameter must be set when initializing " - "the resource manager." - ) - return self.get_by_name(name=self.default_region_setting) - - def get_by_name(self, name: str) -> Region: - """Get an AWS region by its name (eg. us-east-1).""" - - return self.regions_by_name[name] - - def get_by_key(self, key: RegionKey) -> Region: - """Get an AWS region by its key.""" - - return self.regions_by_key[key] - - def get_by_id(self, id_: str) -> Region: - """Get an AWS region by region_id.""" - - # error if provider_id is not set - if not self.resource_manager.provider_id: - raise ValueError( - "provider_id parameter must be set when initializing " - "the resource manager." - ) - - return self.get_by_key( - RegionKey(provider_id=self.resource_manager.provider_id, region_id=id_) - ) +from typing import TYPE_CHECKING, Dict, List + +from firebolt.model.V1.region import Region, RegionKey +from firebolt.service.V1.base import BaseService +from firebolt.utils.urls import REGIONS_URL +from firebolt.utils.util import cached_property + +if TYPE_CHECKING: + from firebolt.service.manager import ResourceManager + + +class RegionService(BaseService): + def __init__(self, resource_manager: "ResourceManager"): + """ + Service to manage AWS regions (us-east-1, etc) + + Args: + resource_manager: Resource manager to use + """ + + super().__init__(resource_manager=resource_manager) + + @cached_property + def regions(self) -> List[Region]: + """List of available AWS regions on Firebolt.""" + + response = self.client.get(url=REGIONS_URL, params={"page.first": 5000}) + return [Region.parse_model(i["node"]) for i in response.json()["edges"]] + + @cached_property + def regions_by_name(self) -> Dict[str, Region]: + """Dict of {RegionLookup to Region}""" + + return {r.name: r for r in self.regions} + + @cached_property + def regions_by_key(self) -> Dict[RegionKey, Region]: + """Dict of {RegionKey to Region}""" + + return {r.key: r for r in self.regions} + + @cached_property + def default_region(self) -> Region: + """Default AWS region, could be provided from environment.""" + + if not self.default_region_setting: + raise ValueError( + "default_region parameter must be set when initializing " + "the resource manager." + ) + return self.get_by_name(name=self.default_region_setting) + + def get_by_name(self, name: str) -> Region: + """Get an AWS region by its name (eg. us-east-1).""" + + return self.regions_by_name[name] + + def get_by_key(self, key: RegionKey) -> Region: + """Get an AWS region by its key.""" + + return self.regions_by_key[key] + + def get_by_id(self, id_: str) -> Region: + """Get an AWS region by region_id.""" + + # error if provider_id is not set + if not self.resource_manager.provider_id: + raise ValueError( + "provider_id parameter must be set when initializing " + "the resource manager." + ) + + return self.get_by_key( + RegionKey(provider_id=self.resource_manager.provider_id, region_id=id_) + ) diff --git a/tests/unit/service/V1/conftest.py b/tests/unit/service/V1/conftest.py index d65f7f9b1c..bc0c409add 100644 --- a/tests/unit/service/V1/conftest.py +++ b/tests/unit/service/V1/conftest.py @@ -234,7 +234,7 @@ def do_mock( assert urlparse(engine_url).path in request.url.path return Response( status_code=httpx.codes.OK, - json={"engine": mock_engine.dict()}, + json={"engine": mock_engine.model_dict()}, ) return do_mock @@ -254,7 +254,7 @@ def do_mock( assert request.url == account_engine_url return Response( status_code=httpx.codes.OK, - json={"engine": mock_engine.dict()}, + json={"engine": mock_engine.model_dict()}, ) return do_mock @@ -297,7 +297,7 @@ def do_mock( assert request.url == databases_url return Response( status_code=httpx.codes.OK, - json={"database": mock_database.dict()}, + json={"database": mock_database.model_dict()}, ) return do_mock @@ -309,7 +309,8 @@ def get_databases_callback_inner( request: httpx.Request = None, **kwargs ) -> Response: return Response( - status_code=httpx.codes.OK, json={"edges": [{"node": mock_database.dict()}]} + status_code=httpx.codes.OK, + json={"edges": [{"node": mock_database.model_dict()}]}, ) return get_databases_callback_inner @@ -329,7 +330,7 @@ def do_mock( assert request.url == database_url return Response( status_code=httpx.codes.OK, - json={"database": mock_database.dict()}, + json={"database": mock_database.model_dict()}, ) return do_mock @@ -407,7 +408,7 @@ def do_mock( assert request.url == database_get_url return Response( status_code=httpx.codes.OK, - json={"database": mock_database.dict()}, + json={"database": mock_database.model_dict()}, ) return do_mock @@ -474,7 +475,7 @@ def do_mock( assert request.url == create_binding_url return Response( status_code=httpx.codes.OK, - json={"binding": binding.dict()}, + json={"binding": binding.model_dict()}, ) return do_mock diff --git a/tests/unit/service/V1/test_bindings.py b/tests/unit/service/V1/test_bindings.py index b4c3bf6054..95328520a8 100644 --- a/tests/unit/service/V1/test_bindings.py +++ b/tests/unit/service/V1/test_bindings.py @@ -56,7 +56,7 @@ def test_create_binding( httpx_mock.add_callback(account_id_callback, url=account_id_url) httpx_mock.add_response(url=bindings_url, method="GET", json={"edges": []}) httpx_mock.add_response( - url=create_binding_url, method="POST", json={"binding": binding.dict()} + url=create_binding_url, method="POST", json={"binding": binding.model_dict()} ) resource_manager = ResourceManager(settings=settings) @@ -116,7 +116,9 @@ def test_get_engines_bound_to_db( httpx_mock.add_callback(account_id_callback, url=account_id_url) httpx_mock.add_callback(bindings_database_callback, url=database_bindings_url) httpx_mock.add_response( - url=engines_by_id_url, method="POST", json={"engines": [mock_engine.dict()]} + url=engines_by_id_url, + method="POST", + json={"engines": [mock_engine.model_dict()]}, ) resource_manager = ResourceManager(settings=settings) diff --git a/tests/unit/util.py b/tests/unit/util.py index 0cf3fbc11a..df5b672ac9 100644 --- a/tests/unit/util.py +++ b/tests/unit/util.py @@ -23,7 +23,7 @@ def list_to_paginated_response(items: List[FireboltBaseModel]) -> Dict: def list_to_paginated_response_v1(items: List[FireboltBaseModelV1]) -> Dict: - return {"edges": [{"node": i.dict()} for i in items]} + return {"edges": [{"node": i.model_dict()} for i in items]} def execute_generator_requests( From ed7e046d730126758bbeb0e4af92e5310b0bd0c3 Mon Sep 17 00:00:00 2001 From: Luciano Scarpulla Date: Fri, 16 Feb 2024 12:42:38 +0100 Subject: [PATCH 2/5] fix: Revert to LF line endings --- src/firebolt/model/V1/database.py | 364 +++++++++---------- src/firebolt/model/V1/engine.py | 544 ++++++++++++++--------------- src/firebolt/service/V1/binding.py | 296 ++++++++-------- src/firebolt/service/V1/region.py | 150 ++++---- 4 files changed, 677 insertions(+), 677 deletions(-) diff --git a/src/firebolt/model/V1/database.py b/src/firebolt/model/V1/database.py index 533266e828..33b0eb2aa5 100644 --- a/src/firebolt/model/V1/database.py +++ b/src/firebolt/model/V1/database.py @@ -1,182 +1,182 @@ -from __future__ import annotations - -import logging -from datetime import datetime -from typing import TYPE_CHECKING, Any, List, Optional, Sequence - -from pydantic import Field, PrivateAttr - -from firebolt.model.V1 import FireboltBaseModel -from firebolt.model.V1.region import RegionKey -from firebolt.service.V1.engine import EngineService -from firebolt.service.V1.types import EngineStatusSummary -from firebolt.utils.exception import AttachedEngineInUseError -from firebolt.utils.urls import ACCOUNT_DATABASE_URL - -if TYPE_CHECKING: - from firebolt.model.V1.binding import Binding - from firebolt.model.V1.engine import Engine - from firebolt.service.V1.database import DatabaseService - -logger = logging.getLogger(__name__) - - -class DatabaseKey(FireboltBaseModel): - account_id: str - database_id: str - - -class FieldMask(FireboltBaseModel): - paths: Sequence[str] = Field(alias="paths") - - -class Database(FireboltBaseModel): - """ - A Firebolt database. - - Databases belong to a region and have a description, - but otherwise are not configurable. - """ - - # internal - _service: DatabaseService = PrivateAttr() - - # required - name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") - compute_region_key: RegionKey = Field(alias="compute_region_id") - - # optional - database_key: Optional[DatabaseKey] = Field(default=None, alias="id") - description: Optional[str] = Field(default=None, max_length=255) - emoji: Optional[str] = Field(default=None, max_length=255) - current_status: Optional[str] = None - health_status: Optional[str] = None - data_size_full: Optional[int] = None - data_size_compressed: Optional[int] = None - is_system_database: Optional[bool] = None - storage_bucket_name: Optional[str] = None - create_time: Optional[datetime] = None - create_actor: Optional[str] = None - last_update_time: Optional[datetime] = None - last_update_actor: Optional[str] = None - desired_status: Optional[str] = None - - @classmethod - def parse_obj_with_service( - cls, obj: Any, database_service: DatabaseService - ) -> Database: - database = cls.parse_model(obj) - database._service = database_service - return database - - @property - def database_id(self) -> Optional[str]: - if self.database_key is None: - return None - return self.database_key.database_id - - def get_attached_engines(self) -> List[Engine]: - """Get a list of engines that are attached to this database.""" - - return self._service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501 - database=self - ) - - def attach_to_engine( - self, engine: Engine, is_default_engine: bool = False - ) -> Binding: - """ - Attach an engine to this database. - - Args: - engine: The engine to attach. - is_default_engine: - Whether this engine should be used as default for this database. - Only one engine can be set as default for a single database. - This will overwrite any existing default. - """ - - return self._service.resource_manager.bindings.create( - engine=engine, database=self, is_default_engine=is_default_engine - ) - - def delete(self) -> Database: - """ - Delete a database from Firebolt. - - Raises an error if there are any attached engines. - """ - - for engine in self.get_attached_engines(): - if engine.current_status_summary in { - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING, - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, - }: - raise AttachedEngineInUseError(method_name="delete") - - logger.info( - f"Deleting Database (database_id={self.database_id}, name={self.name})" - ) - response = self._service.client.delete( - url=ACCOUNT_DATABASE_URL.format( - account_id=self._service.account_id, database_id=self.database_id - ), - headers={"Content-type": "application/json"}, - ) - return Database.parse_obj_with_service( - response.json()["database"], self._service - ) - - def update(self, description: str) -> Database: - """ - Updates a database description. - """ - - class _DatabaseUpdateRequest(FireboltBaseModel): - """Helper model for sending Database creation requests.""" - - account_id: str - database: Database - database_id: str - update_mask: FieldMask - - self.description = description - - assert self.database_id is not None, "Database must have database_id" - logger.info( - f"Updating Database (database_id={self.database_id}, " - f"name={self.name}, description={self.description})" - ) - - payload = _DatabaseUpdateRequest( - account_id=self._service.account_id, - database=self, - database_id=self.database_id, - update_mask=FieldMask(paths=["description"]), - ).jsonable_dict(by_alias=True) - - response = self._service.client.patch( - url=ACCOUNT_DATABASE_URL.format( - account_id=self._service.account_id, database_id=self.database_id - ), - headers={"Content-type": "application/json"}, - json=payload, - ) - - return Database.parse_obj_with_service( - response.json()["database"], self._service - ) - - def get_default_engine(self) -> Optional[Engine]: - """ - Returns: default engine of the database, or None if default engine is missing - """ - rm = self._service.resource_manager - assert isinstance(rm.engines, EngineService), "Expected EngineService V1" - default_engines: List[Engine] = [ - rm.engines.get(binding.engine_id) - for binding in rm.bindings.get_many(database_id=self.database_id) - if binding.is_default_engine - ] - - return None if len(default_engines) == 0 else default_engines[0] +from __future__ import annotations + +import logging +from datetime import datetime +from typing import TYPE_CHECKING, Any, List, Optional, Sequence + +from pydantic import Field, PrivateAttr + +from firebolt.model.V1 import FireboltBaseModel +from firebolt.model.V1.region import RegionKey +from firebolt.service.V1.engine import EngineService +from firebolt.service.V1.types import EngineStatusSummary +from firebolt.utils.exception import AttachedEngineInUseError +from firebolt.utils.urls import ACCOUNT_DATABASE_URL + +if TYPE_CHECKING: + from firebolt.model.V1.binding import Binding + from firebolt.model.V1.engine import Engine + from firebolt.service.V1.database import DatabaseService + +logger = logging.getLogger(__name__) + + +class DatabaseKey(FireboltBaseModel): + account_id: str + database_id: str + + +class FieldMask(FireboltBaseModel): + paths: Sequence[str] = Field(alias="paths") + + +class Database(FireboltBaseModel): + """ + A Firebolt database. + + Databases belong to a region and have a description, + but otherwise are not configurable. + """ + + # internal + _service: DatabaseService = PrivateAttr() + + # required + name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") + compute_region_key: RegionKey = Field(alias="compute_region_id") + + # optional + database_key: Optional[DatabaseKey] = Field(default=None, alias="id") + description: Optional[str] = Field(default=None, max_length=255) + emoji: Optional[str] = Field(default=None, max_length=255) + current_status: Optional[str] = None + health_status: Optional[str] = None + data_size_full: Optional[int] = None + data_size_compressed: Optional[int] = None + is_system_database: Optional[bool] = None + storage_bucket_name: Optional[str] = None + create_time: Optional[datetime] = None + create_actor: Optional[str] = None + last_update_time: Optional[datetime] = None + last_update_actor: Optional[str] = None + desired_status: Optional[str] = None + + @classmethod + def parse_obj_with_service( + cls, obj: Any, database_service: DatabaseService + ) -> Database: + database = cls.parse_model(obj) + database._service = database_service + return database + + @property + def database_id(self) -> Optional[str]: + if self.database_key is None: + return None + return self.database_key.database_id + + def get_attached_engines(self) -> List[Engine]: + """Get a list of engines that are attached to this database.""" + + return self._service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501 + database=self + ) + + def attach_to_engine( + self, engine: Engine, is_default_engine: bool = False + ) -> Binding: + """ + Attach an engine to this database. + + Args: + engine: The engine to attach. + is_default_engine: + Whether this engine should be used as default for this database. + Only one engine can be set as default for a single database. + This will overwrite any existing default. + """ + + return self._service.resource_manager.bindings.create( + engine=engine, database=self, is_default_engine=is_default_engine + ) + + def delete(self) -> Database: + """ + Delete a database from Firebolt. + + Raises an error if there are any attached engines. + """ + + for engine in self.get_attached_engines(): + if engine.current_status_summary in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, + }: + raise AttachedEngineInUseError(method_name="delete") + + logger.info( + f"Deleting Database (database_id={self.database_id}, name={self.name})" + ) + response = self._service.client.delete( + url=ACCOUNT_DATABASE_URL.format( + account_id=self._service.account_id, database_id=self.database_id + ), + headers={"Content-type": "application/json"}, + ) + return Database.parse_obj_with_service( + response.json()["database"], self._service + ) + + def update(self, description: str) -> Database: + """ + Updates a database description. + """ + + class _DatabaseUpdateRequest(FireboltBaseModel): + """Helper model for sending Database creation requests.""" + + account_id: str + database: Database + database_id: str + update_mask: FieldMask + + self.description = description + + assert self.database_id is not None, "Database must have database_id" + logger.info( + f"Updating Database (database_id={self.database_id}, " + f"name={self.name}, description={self.description})" + ) + + payload = _DatabaseUpdateRequest( + account_id=self._service.account_id, + database=self, + database_id=self.database_id, + update_mask=FieldMask(paths=["description"]), + ).jsonable_dict(by_alias=True) + + response = self._service.client.patch( + url=ACCOUNT_DATABASE_URL.format( + account_id=self._service.account_id, database_id=self.database_id + ), + headers={"Content-type": "application/json"}, + json=payload, + ) + + return Database.parse_obj_with_service( + response.json()["database"], self._service + ) + + def get_default_engine(self) -> Optional[Engine]: + """ + Returns: default engine of the database, or None if default engine is missing + """ + rm = self._service.resource_manager + assert isinstance(rm.engines, EngineService), "Expected EngineService V1" + default_engines: List[Engine] = [ + rm.engines.get(binding.engine_id) + for binding in rm.bindings.get_many(database_id=self.database_id) + if binding.is_default_engine + ] + + return None if len(default_engines) == 0 else default_engines[0] diff --git a/src/firebolt/model/V1/engine.py b/src/firebolt/model/V1/engine.py index 12eedee4aa..458f1db089 100644 --- a/src/firebolt/model/V1/engine.py +++ b/src/firebolt/model/V1/engine.py @@ -1,272 +1,272 @@ -from __future__ import annotations - -import logging -import time -from datetime import datetime -from typing import TYPE_CHECKING, Any, Optional, Sequence - -from pydantic import Field, PrivateAttr - -from firebolt.model.V1 import FireboltBaseModel -from firebolt.model.V1.engine_revision import EngineRevisionKey -from firebolt.model.V1.region import RegionKey -from firebolt.service.V1.types import ( - EngineStatus, - EngineStatusSummary, - EngineType, - WarmupMethod, -) -from firebolt.utils.urls import ( - ACCOUNT_ENGINE_START_URL, - ACCOUNT_ENGINE_STOP_URL, -) - -if TYPE_CHECKING: - from firebolt.service.V1.engine import EngineService - -logger = logging.getLogger(__name__) - - -class EngineKey(FireboltBaseModel): - account_id: str - engine_id: str - - -def wait(seconds: int, timeout_time: float, error_message: str, verbose: bool) -> None: - time.sleep(seconds) - if time.time() > timeout_time: - raise TimeoutError(error_message) - if verbose: - print(".", end="") - - -class EngineSettings(FireboltBaseModel): - """ - Engine settings. - - See also: :py:class:`EngineRevisionSpecification - ` - which also contains engine configuration. - """ - - preset: str - auto_stop_delay_duration: str = Field(pattern=r"^[0-9]+[sm]$|^0$") - minimum_logging_level: str - is_read_only: bool - warm_up: str - - @classmethod - def default( - cls, - engine_type: EngineType = EngineType.GENERAL_PURPOSE, - auto_stop_delay_duration: str = "1200s", - warm_up: WarmupMethod = WarmupMethod.PRELOAD_INDEXES, - minimum_logging_level: str = "ENGINE_SETTINGS_LOGGING_LEVEL_INFO", - ) -> EngineSettings: - if engine_type == EngineType.GENERAL_PURPOSE: - preset = engine_type.GENERAL_PURPOSE.api_settings_preset_name # type: ignore # noqa: E501 - is_read_only = False - else: - preset = engine_type.DATA_ANALYTICS.api_settings_preset_name # type: ignore - is_read_only = True - - return cls( - preset=preset, - auto_stop_delay_duration=auto_stop_delay_duration, - minimum_logging_level=minimum_logging_level, - is_read_only=is_read_only, - warm_up=warm_up.api_name, - ) - - -class FieldMask(FireboltBaseModel): - paths: Sequence[str] = Field(alias="paths") - - -class Engine(FireboltBaseModel): - """ - A Firebolt engine. Responsible for performing work (queries, ingestion). - - Engines are configured in :py:class:`Settings - ` - and in :py:class:`EngineRevisionSpecification - `. - """ - - # internal - _service: EngineService = PrivateAttr() - - # required - name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") - compute_region_key: RegionKey = Field(alias="compute_region_id") - settings: EngineSettings - - # optional - key: Optional[EngineKey] = Field(None, alias="id") - description: Optional[str] = None - emoji: Optional[str] = None - current_status: Optional[EngineStatus] = None - current_status_summary: Optional[EngineStatusSummary] = None - latest_revision_key: Optional[EngineRevisionKey] = Field( - None, alias="latest_revision_id" - ) - endpoint: Optional[str] = None - endpoint_serving_revision_key: Optional[EngineRevisionKey] = Field( - None, alias="endpoint_serving_revision_id" - ) - create_time: Optional[datetime] = None - create_actor: Optional[str] = None - last_update_time: Optional[datetime] = None - last_update_actor: Optional[str] = None - last_use_time: Optional[datetime] = None - desired_status: Optional[str] = None - health_status: Optional[str] = None - endpoint_desired_revision_key: Optional[EngineRevisionKey] = Field( - None, alias="endpoint_desired_revision_id" - ) - - @classmethod - def parse_obj_with_service(cls, obj: Any, engine_service: EngineService) -> Engine: - engine = cls.parse_model(obj) - engine._service = engine_service - return engine - - @property - def engine_id(self) -> str: - if self.key is None: - raise ValueError("engine key is None") - return self.key.engine_id - - def get_latest(self) -> Engine: - """Get an up-to-date instance of the engine from Firebolt.""" - return self._service.get(id_=self.engine_id) - - def start( - self, - wait_for_startup: bool = True, - wait_timeout_seconds: int = 3600, - verbose: bool = False, - ) -> Engine: - """ - Start an engine. If it's already started, do nothing. - - Args: - wait_for_startup: - If True, wait for startup to complete. - If False, return immediately after requesting startup. - wait_timeout_seconds: - Number of seconds to wait for startup to complete - before raising a TimeoutError - verbose: - If True, print dots periodically while waiting for engine start. - If False, do not print any dots. - - Returns: - The updated engine from Firebolt. - """ - timeout_time = time.time() + wait_timeout_seconds - - engine = self.get_latest() - if ( - engine.current_status_summary - == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING - ): - logger.info( - f"Engine (engine_id={self.engine_id}, name={self.name}) " - "is already running." - ) - return engine - - # wait for engine to stop first, if it's already stopping - # FUTURE: revisit logging and consider consolidating this if & the while below. - elif ( - engine.current_status_summary - == EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING - ): - logger.info( - f"Engine (engine_id={engine.engine_id}, name={engine.name}) " - "is in currently stopping, waiting for it to stop first." - ) - while ( - engine.current_status_summary - != EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED - ): - wait( - seconds=5, - timeout_time=timeout_time, - error_message=( - "Engine " - f"(engine_id={engine.engine_id}, name={engine.name}) " - f"did not stop within {wait_timeout_seconds} seconds." - ), - verbose=True, - ) - engine = engine.get_latest() - - logger.info( - f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." - ) - - engine = self._send_engine_request(ACCOUNT_ENGINE_START_URL) - logger.info( - f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})" - ) - - # wait for engine to start - while wait_for_startup and engine.current_status_summary not in { - EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING, - EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, - }: - wait( - seconds=5, - timeout_time=timeout_time, - error_message=( # noqa: E501 - f"Could not start engine within {wait_timeout_seconds} seconds." - ), - verbose=verbose, - ) - previous_status_summary = engine.current_status_summary - engine = engine.get_latest() - if engine.current_status_summary != previous_status_summary: - logger.info( - "Engine status_summary=" - f"{getattr(engine.current_status_summary, 'name')}" - ) - - return engine - - def stop( - self, wait_for_stop: bool = False, wait_timeout_seconds: int = 3600 - ) -> Engine: - """Stop an Engine running on Firebolt.""" - timeout_time = time.time() + wait_timeout_seconds - - engine = self._send_engine_request(ACCOUNT_ENGINE_STOP_URL) - logger.info(f"Stopping Engine (engine_id={self.engine_id}, name={self.name})") - - while wait_for_stop and engine.current_status_summary not in { - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED, - EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, - }: - wait( - seconds=5, - timeout_time=timeout_time, - error_message=( # noqa: E501 - f"Could not stop engine within {wait_timeout_seconds} seconds." - ), - verbose=False, - ) - - engine = engine.get_latest() - - return engine - - def _send_engine_request(self, url: str) -> Engine: - response = self._service.client.post( - url=url.format( - account_id=self._service.account_id, engine_id=self.engine_id - ) - ) - return Engine.parse_obj_with_service( - obj=response.json()["engine"], engine_service=self._service - ) +from __future__ import annotations + +import logging +import time +from datetime import datetime +from typing import TYPE_CHECKING, Any, Optional, Sequence + +from pydantic import Field, PrivateAttr + +from firebolt.model.V1 import FireboltBaseModel +from firebolt.model.V1.engine_revision import EngineRevisionKey +from firebolt.model.V1.region import RegionKey +from firebolt.service.V1.types import ( + EngineStatus, + EngineStatusSummary, + EngineType, + WarmupMethod, +) +from firebolt.utils.urls import ( + ACCOUNT_ENGINE_START_URL, + ACCOUNT_ENGINE_STOP_URL, +) + +if TYPE_CHECKING: + from firebolt.service.V1.engine import EngineService + +logger = logging.getLogger(__name__) + + +class EngineKey(FireboltBaseModel): + account_id: str + engine_id: str + + +def wait(seconds: int, timeout_time: float, error_message: str, verbose: bool) -> None: + time.sleep(seconds) + if time.time() > timeout_time: + raise TimeoutError(error_message) + if verbose: + print(".", end="") + + +class EngineSettings(FireboltBaseModel): + """ + Engine settings. + + See also: :py:class:`EngineRevisionSpecification + ` + which also contains engine configuration. + """ + + preset: str + auto_stop_delay_duration: str = Field(pattern=r"^[0-9]+[sm]$|^0$") + minimum_logging_level: str + is_read_only: bool + warm_up: str + + @classmethod + def default( + cls, + engine_type: EngineType = EngineType.GENERAL_PURPOSE, + auto_stop_delay_duration: str = "1200s", + warm_up: WarmupMethod = WarmupMethod.PRELOAD_INDEXES, + minimum_logging_level: str = "ENGINE_SETTINGS_LOGGING_LEVEL_INFO", + ) -> EngineSettings: + if engine_type == EngineType.GENERAL_PURPOSE: + preset = engine_type.GENERAL_PURPOSE.api_settings_preset_name # type: ignore # noqa: E501 + is_read_only = False + else: + preset = engine_type.DATA_ANALYTICS.api_settings_preset_name # type: ignore + is_read_only = True + + return cls( + preset=preset, + auto_stop_delay_duration=auto_stop_delay_duration, + minimum_logging_level=minimum_logging_level, + is_read_only=is_read_only, + warm_up=warm_up.api_name, + ) + + +class FieldMask(FireboltBaseModel): + paths: Sequence[str] = Field(alias="paths") + + +class Engine(FireboltBaseModel): + """ + A Firebolt engine. Responsible for performing work (queries, ingestion). + + Engines are configured in :py:class:`Settings + ` + and in :py:class:`EngineRevisionSpecification + `. + """ + + # internal + _service: EngineService = PrivateAttr() + + # required + name: str = Field(min_length=1, max_length=255, pattern=r"^[0-9a-zA-Z_]+$") + compute_region_key: RegionKey = Field(alias="compute_region_id") + settings: EngineSettings + + # optional + key: Optional[EngineKey] = Field(None, alias="id") + description: Optional[str] = None + emoji: Optional[str] = None + current_status: Optional[EngineStatus] = None + current_status_summary: Optional[EngineStatusSummary] = None + latest_revision_key: Optional[EngineRevisionKey] = Field( + None, alias="latest_revision_id" + ) + endpoint: Optional[str] = None + endpoint_serving_revision_key: Optional[EngineRevisionKey] = Field( + None, alias="endpoint_serving_revision_id" + ) + create_time: Optional[datetime] = None + create_actor: Optional[str] = None + last_update_time: Optional[datetime] = None + last_update_actor: Optional[str] = None + last_use_time: Optional[datetime] = None + desired_status: Optional[str] = None + health_status: Optional[str] = None + endpoint_desired_revision_key: Optional[EngineRevisionKey] = Field( + None, alias="endpoint_desired_revision_id" + ) + + @classmethod + def parse_obj_with_service(cls, obj: Any, engine_service: EngineService) -> Engine: + engine = cls.parse_model(obj) + engine._service = engine_service + return engine + + @property + def engine_id(self) -> str: + if self.key is None: + raise ValueError("engine key is None") + return self.key.engine_id + + def get_latest(self) -> Engine: + """Get an up-to-date instance of the engine from Firebolt.""" + return self._service.get(id_=self.engine_id) + + def start( + self, + wait_for_startup: bool = True, + wait_timeout_seconds: int = 3600, + verbose: bool = False, + ) -> Engine: + """ + Start an engine. If it's already started, do nothing. + + Args: + wait_for_startup: + If True, wait for startup to complete. + If False, return immediately after requesting startup. + wait_timeout_seconds: + Number of seconds to wait for startup to complete + before raising a TimeoutError + verbose: + If True, print dots periodically while waiting for engine start. + If False, do not print any dots. + + Returns: + The updated engine from Firebolt. + """ + timeout_time = time.time() + wait_timeout_seconds + + engine = self.get_latest() + if ( + engine.current_status_summary + == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING + ): + logger.info( + f"Engine (engine_id={self.engine_id}, name={self.name}) " + "is already running." + ) + return engine + + # wait for engine to stop first, if it's already stopping + # FUTURE: revisit logging and consider consolidating this if & the while below. + elif ( + engine.current_status_summary + == EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING + ): + logger.info( + f"Engine (engine_id={engine.engine_id}, name={engine.name}) " + "is in currently stopping, waiting for it to stop first." + ) + while ( + engine.current_status_summary + != EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED + ): + wait( + seconds=5, + timeout_time=timeout_time, + error_message=( + "Engine " + f"(engine_id={engine.engine_id}, name={engine.name}) " + f"did not stop within {wait_timeout_seconds} seconds." + ), + verbose=True, + ) + engine = engine.get_latest() + + logger.info( + f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." + ) + + engine = self._send_engine_request(ACCOUNT_ENGINE_START_URL) + logger.info( + f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})" + ) + + # wait for engine to start + while wait_for_startup and engine.current_status_summary not in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, + }: + wait( + seconds=5, + timeout_time=timeout_time, + error_message=( # noqa: E501 + f"Could not start engine within {wait_timeout_seconds} seconds." + ), + verbose=verbose, + ) + previous_status_summary = engine.current_status_summary + engine = engine.get_latest() + if engine.current_status_summary != previous_status_summary: + logger.info( + "Engine status_summary=" + f"{getattr(engine.current_status_summary, 'name')}" + ) + + return engine + + def stop( + self, wait_for_stop: bool = False, wait_timeout_seconds: int = 3600 + ) -> Engine: + """Stop an Engine running on Firebolt.""" + timeout_time = time.time() + wait_timeout_seconds + + engine = self._send_engine_request(ACCOUNT_ENGINE_STOP_URL) + logger.info(f"Stopping Engine (engine_id={self.engine_id}, name={self.name})") + + while wait_for_stop and engine.current_status_summary not in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED, + }: + wait( + seconds=5, + timeout_time=timeout_time, + error_message=( # noqa: E501 + f"Could not stop engine within {wait_timeout_seconds} seconds." + ), + verbose=False, + ) + + engine = engine.get_latest() + + return engine + + def _send_engine_request(self, url: str) -> Engine: + response = self._service.client.post( + url=url.format( + account_id=self._service.account_id, engine_id=self.engine_id + ) + ) + return Engine.parse_obj_with_service( + obj=response.json()["engine"], engine_service=self._service + ) diff --git a/src/firebolt/service/V1/binding.py b/src/firebolt/service/V1/binding.py index 5a75075704..f141ebee6c 100644 --- a/src/firebolt/service/V1/binding.py +++ b/src/firebolt/service/V1/binding.py @@ -1,148 +1,148 @@ -import logging -from typing import List, Optional - -from firebolt.model.V1.binding import Binding, BindingKey -from firebolt.model.V1.database import Database -from firebolt.model.V1.engine import Engine -from firebolt.service.V1.base import BaseService -from firebolt.service.V1.database import DatabaseService -from firebolt.service.V1.engine import EngineService -from firebolt.utils.exception import AlreadyBoundError -from firebolt.utils.urls import ( - ACCOUNT_BINDINGS_URL, - ACCOUNT_DATABASE_BINDING_URL, -) -from firebolt.utils.util import prune_dict - -logger = logging.getLogger(__name__) - - -class BindingService(BaseService): - def get_by_key(self, binding_key: BindingKey) -> Binding: - """Get a binding by its BindingKey""" - response = self.client.get( - url=ACCOUNT_DATABASE_BINDING_URL.format( - account_id=binding_key.account_id, - database_id=binding_key.database_id, - engine_id=binding_key.engine_id, - ) - ) - binding: dict = response.json()["binding"] - return Binding.parse_model(binding) - - def get_many( - self, - database_id: Optional[str] = None, - engine_id: Optional[str] = None, - is_system_database: Optional[bool] = None, - ) -> List[Binding]: - """ - List bindings on Firebolt, optionally filtering by database and engine. - - Args: - database_id: - Return bindings matching the database_id. - If None, match any databases. - engine_id: - Return bindings matching the engine_id. - If None, match any engines. - is_system_database: - If True, return only system databases. - If False, return only non-system databases. - If None, do not filter on this parameter. - - Returns: - List of bindings matching the filter parameters - """ - - response = self.client.get( - url=ACCOUNT_BINDINGS_URL.format(account_id=self.account_id), - params=prune_dict( - { - "page.first": 5000, # FUTURE: pagination support w/ generator - "filter.id_database_id_eq": database_id, - "filter.id_engine_id_eq": engine_id, - "filter.is_system_database_eq": is_system_database, - } - ), - ) - return [Binding.parse_model(i["node"]) for i in response.json()["edges"]] - - def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: - """Get the database to which an engine is bound, if any.""" - try: - binding = self.get_many(engine_id=engine.engine_id)[0] - except IndexError: - return None - try: - assert isinstance( - self.resource_manager.databases, DatabaseService - ), "Expected DatabaseService V1" - return self.resource_manager.databases.get(id_=binding.database_id) - except (KeyError, IndexError): - return None - - def get_engines_bound_to_database(self, database: Database) -> List[Engine]: - """Get a list of engines that are bound to a database.""" - - bindings = self.get_many(database_id=database.database_id) - if not bindings: - return [] - assert isinstance( - self.resource_manager.engines, EngineService - ), "Expected EngineService V1" - return self.resource_manager.engines.get_by_ids( - ids=[b.engine_id for b in bindings] - ) - - def create( - self, engine: Engine, database: Database, is_default_engine: bool - ) -> Binding: - """ - Create a new binding between an engine and a database. - - Args: - engine: Engine to bind. - database: Database to bind. - is_default_engine: - Whether this engine should be used as default for this database. - Only one engine can be set as default for a single database. - This will overwrite any existing default. - - Returns: - New binding between the engine and database. - """ - - existing_database = self.get_database_bound_to_engine(engine=engine) - if existing_database is not None: - raise AlreadyBoundError( - f"The engine {engine.name} is already bound " - f"to {existing_database.name}!" - ) - - logger.info( - f"Attaching Engine (engine_id={engine.engine_id}, name={engine.name}) " - f"to Database (database_id={database.database_id}, " - f"name={database.name})" - ) - assert database.database_id is not None, "Database must have database_id" - binding = Binding( - binding_key=BindingKey( # type: ignore[call-arg] - account_id=self.account_id, - database_id=database.database_id, - engine_id=engine.engine_id, - ), - is_default_engine=is_default_engine, # type: ignore[call-arg] - ) - - response = self.client.post( - url=ACCOUNT_DATABASE_BINDING_URL.format( - account_id=self.account_id, - database_id=database.database_id, - engine_id=engine.engine_id, - ), - json=binding.jsonable_dict( - by_alias=True, include={"binding_key": ..., "is_default_engine": ...} - ), - ) - return Binding.parse_model(response.json()["binding"]) +import logging +from typing import List, Optional + +from firebolt.model.V1.binding import Binding, BindingKey +from firebolt.model.V1.database import Database +from firebolt.model.V1.engine import Engine +from firebolt.service.V1.base import BaseService +from firebolt.service.V1.database import DatabaseService +from firebolt.service.V1.engine import EngineService +from firebolt.utils.exception import AlreadyBoundError +from firebolt.utils.urls import ( + ACCOUNT_BINDINGS_URL, + ACCOUNT_DATABASE_BINDING_URL, +) +from firebolt.utils.util import prune_dict + +logger = logging.getLogger(__name__) + + +class BindingService(BaseService): + def get_by_key(self, binding_key: BindingKey) -> Binding: + """Get a binding by its BindingKey""" + response = self.client.get( + url=ACCOUNT_DATABASE_BINDING_URL.format( + account_id=binding_key.account_id, + database_id=binding_key.database_id, + engine_id=binding_key.engine_id, + ) + ) + binding: dict = response.json()["binding"] + return Binding.parse_model(binding) + + def get_many( + self, + database_id: Optional[str] = None, + engine_id: Optional[str] = None, + is_system_database: Optional[bool] = None, + ) -> List[Binding]: + """ + List bindings on Firebolt, optionally filtering by database and engine. + + Args: + database_id: + Return bindings matching the database_id. + If None, match any databases. + engine_id: + Return bindings matching the engine_id. + If None, match any engines. + is_system_database: + If True, return only system databases. + If False, return only non-system databases. + If None, do not filter on this parameter. + + Returns: + List of bindings matching the filter parameters + """ + + response = self.client.get( + url=ACCOUNT_BINDINGS_URL.format(account_id=self.account_id), + params=prune_dict( + { + "page.first": 5000, # FUTURE: pagination support w/ generator + "filter.id_database_id_eq": database_id, + "filter.id_engine_id_eq": engine_id, + "filter.is_system_database_eq": is_system_database, + } + ), + ) + return [Binding.parse_model(i["node"]) for i in response.json()["edges"]] + + def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: + """Get the database to which an engine is bound, if any.""" + try: + binding = self.get_many(engine_id=engine.engine_id)[0] + except IndexError: + return None + try: + assert isinstance( + self.resource_manager.databases, DatabaseService + ), "Expected DatabaseService V1" + return self.resource_manager.databases.get(id_=binding.database_id) + except (KeyError, IndexError): + return None + + def get_engines_bound_to_database(self, database: Database) -> List[Engine]: + """Get a list of engines that are bound to a database.""" + + bindings = self.get_many(database_id=database.database_id) + if not bindings: + return [] + assert isinstance( + self.resource_manager.engines, EngineService + ), "Expected EngineService V1" + return self.resource_manager.engines.get_by_ids( + ids=[b.engine_id for b in bindings] + ) + + def create( + self, engine: Engine, database: Database, is_default_engine: bool + ) -> Binding: + """ + Create a new binding between an engine and a database. + + Args: + engine: Engine to bind. + database: Database to bind. + is_default_engine: + Whether this engine should be used as default for this database. + Only one engine can be set as default for a single database. + This will overwrite any existing default. + + Returns: + New binding between the engine and database. + """ + + existing_database = self.get_database_bound_to_engine(engine=engine) + if existing_database is not None: + raise AlreadyBoundError( + f"The engine {engine.name} is already bound " + f"to {existing_database.name}!" + ) + + logger.info( + f"Attaching Engine (engine_id={engine.engine_id}, name={engine.name}) " + f"to Database (database_id={database.database_id}, " + f"name={database.name})" + ) + assert database.database_id is not None, "Database must have database_id" + binding = Binding( + binding_key=BindingKey( # type: ignore[call-arg] + account_id=self.account_id, + database_id=database.database_id, + engine_id=engine.engine_id, + ), + is_default_engine=is_default_engine, # type: ignore[call-arg] + ) + + response = self.client.post( + url=ACCOUNT_DATABASE_BINDING_URL.format( + account_id=self.account_id, + database_id=database.database_id, + engine_id=engine.engine_id, + ), + json=binding.jsonable_dict( + by_alias=True, include={"binding_key": ..., "is_default_engine": ...} + ), + ) + return Binding.parse_model(response.json()["binding"]) diff --git a/src/firebolt/service/V1/region.py b/src/firebolt/service/V1/region.py index 289f7eeafe..8191a7f494 100644 --- a/src/firebolt/service/V1/region.py +++ b/src/firebolt/service/V1/region.py @@ -1,75 +1,75 @@ -from typing import TYPE_CHECKING, Dict, List - -from firebolt.model.V1.region import Region, RegionKey -from firebolt.service.V1.base import BaseService -from firebolt.utils.urls import REGIONS_URL -from firebolt.utils.util import cached_property - -if TYPE_CHECKING: - from firebolt.service.manager import ResourceManager - - -class RegionService(BaseService): - def __init__(self, resource_manager: "ResourceManager"): - """ - Service to manage AWS regions (us-east-1, etc) - - Args: - resource_manager: Resource manager to use - """ - - super().__init__(resource_manager=resource_manager) - - @cached_property - def regions(self) -> List[Region]: - """List of available AWS regions on Firebolt.""" - - response = self.client.get(url=REGIONS_URL, params={"page.first": 5000}) - return [Region.parse_model(i["node"]) for i in response.json()["edges"]] - - @cached_property - def regions_by_name(self) -> Dict[str, Region]: - """Dict of {RegionLookup to Region}""" - - return {r.name: r for r in self.regions} - - @cached_property - def regions_by_key(self) -> Dict[RegionKey, Region]: - """Dict of {RegionKey to Region}""" - - return {r.key: r for r in self.regions} - - @cached_property - def default_region(self) -> Region: - """Default AWS region, could be provided from environment.""" - - if not self.default_region_setting: - raise ValueError( - "default_region parameter must be set when initializing " - "the resource manager." - ) - return self.get_by_name(name=self.default_region_setting) - - def get_by_name(self, name: str) -> Region: - """Get an AWS region by its name (eg. us-east-1).""" - - return self.regions_by_name[name] - - def get_by_key(self, key: RegionKey) -> Region: - """Get an AWS region by its key.""" - - return self.regions_by_key[key] - - def get_by_id(self, id_: str) -> Region: - """Get an AWS region by region_id.""" - - # error if provider_id is not set - if not self.resource_manager.provider_id: - raise ValueError( - "provider_id parameter must be set when initializing " - "the resource manager." - ) - - return self.get_by_key( - RegionKey(provider_id=self.resource_manager.provider_id, region_id=id_) - ) +from typing import TYPE_CHECKING, Dict, List + +from firebolt.model.V1.region import Region, RegionKey +from firebolt.service.V1.base import BaseService +from firebolt.utils.urls import REGIONS_URL +from firebolt.utils.util import cached_property + +if TYPE_CHECKING: + from firebolt.service.manager import ResourceManager + + +class RegionService(BaseService): + def __init__(self, resource_manager: "ResourceManager"): + """ + Service to manage AWS regions (us-east-1, etc) + + Args: + resource_manager: Resource manager to use + """ + + super().__init__(resource_manager=resource_manager) + + @cached_property + def regions(self) -> List[Region]: + """List of available AWS regions on Firebolt.""" + + response = self.client.get(url=REGIONS_URL, params={"page.first": 5000}) + return [Region.parse_model(i["node"]) for i in response.json()["edges"]] + + @cached_property + def regions_by_name(self) -> Dict[str, Region]: + """Dict of {RegionLookup to Region}""" + + return {r.name: r for r in self.regions} + + @cached_property + def regions_by_key(self) -> Dict[RegionKey, Region]: + """Dict of {RegionKey to Region}""" + + return {r.key: r for r in self.regions} + + @cached_property + def default_region(self) -> Region: + """Default AWS region, could be provided from environment.""" + + if not self.default_region_setting: + raise ValueError( + "default_region parameter must be set when initializing " + "the resource manager." + ) + return self.get_by_name(name=self.default_region_setting) + + def get_by_name(self, name: str) -> Region: + """Get an AWS region by its name (eg. us-east-1).""" + + return self.regions_by_name[name] + + def get_by_key(self, key: RegionKey) -> Region: + """Get an AWS region by its key.""" + + return self.regions_by_key[key] + + def get_by_id(self, id_: str) -> Region: + """Get an AWS region by region_id.""" + + # error if provider_id is not set + if not self.resource_manager.provider_id: + raise ValueError( + "provider_id parameter must be set when initializing " + "the resource manager." + ) + + return self.get_by_key( + RegionKey(provider_id=self.resource_manager.provider_id, region_id=id_) + ) From f4797d64c4ed43ffde879f6f91de1e4dfbd547f7 Mon Sep 17 00:00:00 2001 From: Luciano Scarpulla Date: Fri, 16 Feb 2024 12:44:23 +0100 Subject: [PATCH 3/5] fix: Change dict typehint --- src/firebolt/model/V1/__init__.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/firebolt/model/V1/__init__.py b/src/firebolt/model/V1/__init__.py index d34ae497af..cb611ae099 100644 --- a/src/firebolt/model/V1/__init__.py +++ b/src/firebolt/model/V1/__init__.py @@ -18,18 +18,6 @@ def use_if_version_ge( previous_method: str, latest_method: str, ) -> GenericCallable: - """ - Utility function to get desired method from base model. - - Args: - version_ge: The version number that will be used to determine - the desired method. - obj: The object on which the method will be taken from - previous_method: The method previously available in a version - smaller than `version_ge`. - latest_method: The method available from `version_ge` onwards. - - """ if PYDANTIC_VERSION >= version_ge: return getattr(obj, latest_method) else: @@ -53,7 +41,7 @@ class Config: extra = "forbid" allow_population_by_field_name = True # Pydantic 1.8 - def model_dict(self, *args: Any, **kwargs: Any) -> dict[str, Any]: + def model_dict(self, *args: Any, **kwargs: Any) -> dict: """Pydantic V2 and V1 compatible method for `dict` -> `model_dump`.""" return use_if_version_ge(2, self, "dict", "model_dump")(*args, **kwargs) From 36790f0c8a4a1e83eb5cdb9f3797aefc6a5c6a7c Mon Sep 17 00:00:00 2001 From: Luciano Scarpulla Date: Fri, 16 Feb 2024 12:52:47 +0100 Subject: [PATCH 4/5] fix: Bring back docstring --- src/firebolt/model/V1/__init__.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/firebolt/model/V1/__init__.py b/src/firebolt/model/V1/__init__.py index cb611ae099..23123723c3 100644 --- a/src/firebolt/model/V1/__init__.py +++ b/src/firebolt/model/V1/__init__.py @@ -18,6 +18,18 @@ def use_if_version_ge( previous_method: str, latest_method: str, ) -> GenericCallable: + """ + Utility function to get desired method from base model. + + Args: + version_ge: The version number that will be used to determine + the desired method. + obj: The object on which the method will be taken from + previous_method: The method previously available in a version + smaller than `version_ge`. + latest_method: The method available from `version_ge` onwards. + + """ if PYDANTIC_VERSION >= version_ge: return getattr(obj, latest_method) else: From e3579d06666decdcee4aee3c9a87a0b0af7916d1 Mon Sep 17 00:00:00 2001 From: Luciano Scarpulla Date: Wed, 21 Feb 2024 10:55:39 +0100 Subject: [PATCH 5/5] fix: Change line separators back to LF --- src/firebolt/service/V1/provider.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/firebolt/service/V1/provider.py b/src/firebolt/service/V1/provider.py index 9c3742e767..fa53e8129d 100644 --- a/src/firebolt/service/V1/provider.py +++ b/src/firebolt/service/V1/provider.py @@ -1,10 +1,10 @@ -from firebolt.client import Client -from firebolt.model.V1.provider import Provider -from firebolt.utils.urls import PROVIDERS_URL - - -def get_provider_id(client: Client) -> str: - """Get the AWS provider_id.""" - response = client.get(url=PROVIDERS_URL) - providers = [Provider.parse_model(i["node"]) for i in response.json()["edges"]] - return providers[0].provider_id +from firebolt.client import Client +from firebolt.model.V1.provider import Provider +from firebolt.utils.urls import PROVIDERS_URL + + +def get_provider_id(client: Client) -> str: + """Get the AWS provider_id.""" + response = client.get(url=PROVIDERS_URL) + providers = [Provider.parse_model(i["node"]) for i in response.json()["edges"]] + return providers[0].provider_id