From bdc40e2b39f63b999c76a2f707635985cf2ee8e1 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Tue, 12 Oct 2021 13:23:19 -0700 Subject: [PATCH 01/14] clean up hooks, error handling --- src/firebolt/client/__init__.py | 6 +++++- .../{hooks.py => resource_manager_hooks.py} | 18 +++++++----------- src/firebolt/common/exception.py | 6 ------ src/firebolt/service/manager.py | 2 +- 4 files changed, 13 insertions(+), 19 deletions(-) rename src/firebolt/client/{hooks.py => resource_manager_hooks.py} (72%) diff --git a/src/firebolt/client/__init__.py b/src/firebolt/client/__init__.py index 9deaafa7b34..875ec4bf479 100644 --- a/src/firebolt/client/__init__.py +++ b/src/firebolt/client/__init__.py @@ -1,2 +1,6 @@ from firebolt.client.client import DEFAULT_API_URL, Auth, Client -from firebolt.client.hooks import log_request, log_response, raise_on_4xx_5xx +from firebolt.client.resource_manager_hooks import ( + log_request, + log_response, + raise_on_4xx_5xx, +) diff --git a/src/firebolt/client/hooks.py b/src/firebolt/client/resource_manager_hooks.py similarity index 72% rename from src/firebolt/client/hooks.py rename to src/firebolt/client/resource_manager_hooks.py index 5082da16b58..344eb3e3c8c 100644 --- a/src/firebolt/client/hooks.py +++ b/src/firebolt/client/resource_manager_hooks.py @@ -2,8 +2,6 @@ from httpx import HTTPStatusError, Request, RequestError, Response -from firebolt.common.exception import BadRequestError - logger = getLogger(__name__) @@ -33,19 +31,17 @@ def raise_on_4xx_5xx(response: Response) -> None: try: response.raise_for_status() except RequestError as exc: - logger.exception(f"An error occurred while requesting {exc.request.url!r}.") + logger.debug(f"An error occurred while requesting {exc.request.url!r}.") raise exc except HTTPStatusError as exc: response.read() # without this, you can get a ResponseNotRead error - logger.exception( + parsed_response = exc.response.json() + debug_message = ( f"Error response {exc.response.status_code} " f"while requesting {exc.request.url!r}. " - f"Response: {exc.response.json()}" + f"Response: {parsed_response}. " ) - if exc.response.status_code == 400: - raise BadRequestError( - message=exc.response.json()["message"], - request=exc.request, - response=exc.response, - ) from exc + if "message" in parsed_response: + debug_message += f"Message: {parsed_response['message']}" + logger.debug(debug_message) raise exc diff --git a/src/firebolt/common/exception.py b/src/firebolt/common/exception.py index da270921c02..9119efc42ae 100644 --- a/src/firebolt/common/exception.py +++ b/src/firebolt/common/exception.py @@ -1,7 +1,5 @@ from inspect import cleandoc -from httpx import HTTPStatusError - class FireboltError(Exception): pass @@ -77,10 +75,6 @@ class QueryError(CursorError): pass -class BadRequestError(HTTPStatusError): - pass - - class AuthenticationError(FireboltError): """ Firebolt authentication error. Stores error cause and authentication endpoint. diff --git a/src/firebolt/service/manager.py b/src/firebolt/service/manager.py index fe2f775eb16..cdd96d43493 100644 --- a/src/firebolt/service/manager.py +++ b/src/firebolt/service/manager.py @@ -30,7 +30,7 @@ def __init__(self, settings: Optional[Settings] = None): ) self.client.event_hooks = { "request": [log_request], - "response": [log_response, raise_on_4xx_5xx], + "response": [raise_on_4xx_5xx, log_response], } self._init_services(default_region_name=settings.default_region) From a3ea61cc9e86e567437490304ecdd7ebf641439e Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Tue, 12 Oct 2021 15:23:23 -0700 Subject: [PATCH 02/14] engine create refactor --- examples.ipynb | 2 +- src/firebolt/model/engine.py | 33 ++++---- src/firebolt/service/engine.py | 141 +++++++++++---------------------- src/firebolt/service/types.py | 28 +++++++ 4 files changed, 92 insertions(+), 112 deletions(-) create mode 100644 src/firebolt/service/types.py diff --git a/examples.ipynb b/examples.ipynb index a4e5a2f1f65..a14a3992dac 100644 --- a/examples.ipynb +++ b/examples.ipynb @@ -186,7 +186,7 @@ "engine.name = f\"{engine_name}_copy\"\n", "debug(engine)\n", "\n", - "new_engine = rm.engines.create_engine(\n", + "new_engine = rm.engines._send_create_engine(\n", " engine=engine, engine_revision=rm.engine_revisions\n", ")\n", "debug(new_engine)" diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index 4023deca70a..d25d2c94d98 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -9,6 +9,7 @@ from firebolt.model import FireboltBaseModel from firebolt.model.engine_revision import EngineRevisionKey from firebolt.model.region import RegionKey +from firebolt.service.types import EngineType, WarmupMethod logger = logging.getLogger(__name__) @@ -32,25 +33,25 @@ class EngineSettings(FireboltBaseModel): warm_up: str @classmethod - def analytics_default(cls) -> EngineSettings: - """Default settings for the data analytics (querying) use case.""" - return cls( - preset="ENGINE_SETTINGS_PRESET_DATA_ANALYTICS", - auto_stop_delay_duration="1200s", - minimum_logging_level="ENGINE_SETTINGS_LOGGING_LEVEL_INFO", - is_read_only=True, - warm_up="ENGINE_SETTINGS_WARM_UP_INDEXES", - ) + def default( + cls, + engine_type: EngineType = EngineType.GENERAL_PURPOSE, + auto_stop_delay_duration: str = "1200s", + warm_up: WarmupMethod = WarmupMethod.PRELOAD_INDEXES, + ) -> EngineSettings: + if engine_type == EngineType.GENERAL_PURPOSE: + preset = engine_type.GENERAL_PURPOSE.api_settings_preset_name # type: ignore # noqa: E501 + is_read_only = False + else: + preset = engine_type.DATA_ANALYTICS.api_settings_preset_name # type: ignore + is_read_only = True - @classmethod - def general_purpose_default(cls) -> EngineSettings: - """Default settings for the general purpose (data ingestion) use case.""" return cls( - preset="ENGINE_SETTINGS_PRESET_GENERAL_PURPOSE", - auto_stop_delay_duration="1200s", + preset=preset, + auto_stop_delay_duration=auto_stop_delay_duration, minimum_logging_level="ENGINE_SETTINGS_LOGGING_LEVEL_INFO", - is_read_only=False, - warm_up="ENGINE_SETTINGS_WARM_UP_INDEXES", + is_read_only=is_read_only, + warm_up=warm_up.api_name, ) diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index a0bc135fa35..87f54c37c98 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -1,14 +1,19 @@ import json import logging import time -from typing import Optional +from typing import Optional, Union from firebolt.model import FireboltBaseModel from firebolt.model.binding import Binding from firebolt.model.database import Database from firebolt.model.engine import Engine, EngineSettings -from firebolt.model.engine_revision import EngineRevision +from firebolt.model.engine_revision import ( + EngineRevision, + EngineRevisionSpecification, +) +from firebolt.model.region import Region from firebolt.service.base import BaseService +from firebolt.service.types import EngineType, WarmupMethod logger = logging.getLogger(__name__) @@ -31,110 +36,56 @@ def get_engine_by_name(self, engine_name: str) -> Engine: engine_id = response.json()["engine_id"]["engine_id"] return self.get_engine_by_id(engine_id=engine_id) - def create_analytics_engine( + def create( self, name: str, - description: Optional[str] = None, - region_name: Optional[str] = None, - compute_instance_type_name: Optional[str] = None, - compute_instance_count: Optional[int] = None, + region: Union[str, Region, None] = None, + engine_type: Union[str, EngineType] = EngineType.GENERAL_PURPOSE, + scale: int = 2, + spec: str = "i3.4xlarge", + auto_stop: int = 20, + warmup: Union[str, WarmupMethod] = WarmupMethod.PRELOAD_INDEXES, + description: str = "", ) -> Engine: - """ - Create a new engine on Firebolt, based on default Analytics settings. - - (The engine should be used for running queries on Firebolt.) + if isinstance(engine_type, str): + engine_type = EngineType[engine_type] + if isinstance(warmup, str): + warmup = WarmupMethod[warmup] - Args: - name: Name of the engine. - description: Long description of the engine. - region_name: Name of the region in which to create the engine. - If omitted, use the default region. - compute_instance_type_name: Name of the instance type to use for the Engine. - compute_instance_count: Number of instances to use for the Engine. + if region is None: + region = self.resource_manager.regions.default_region + else: + if isinstance(region, str): + region = self.resource_manager.regions.get_by_name(region_name=region) - Returns: - The newly created engine. - """ - engine = self._default( + engine = Engine( name=name, - settings=EngineSettings.analytics_default(), description=description, - region_name=region_name, - ) - return self.create_engine( - engine=engine, - engine_revision=self.resource_manager.engine_revisions.create_analytics_engine_revision( # noqa: E501 - compute_instance_type_name=compute_instance_type_name, - compute_instance_count=compute_instance_count, + compute_region_key=region.key, + settings=EngineSettings.default( + engine_type=engine_type, + auto_stop_delay_duration=f"{auto_stop * 60}s", + warm_up=warmup, ), ) - def create_general_purpose_engine( - self, - name: str, - description: Optional[str] = None, - region_name: Optional[str] = None, - compute_instance_type_name: Optional[str] = None, - compute_instance_count: Optional[int] = None, - ) -> Engine: - """ - Create a new engine on Firebolt, based on default General Purpose settings. - - (The engine should be used for ingesting data into Firebolt.) - - Args: - name: Name of the engine. - description: Long description of the engine. - region_name: Name of the region in which to create the engine. - If omitted, use the default region. - compute_instance_type_name: Name of the instance type to use for the Engine. - compute_instance_count: Number of instances to use for the Engine. - Returns: - The newly created engine. - """ - engine = self._default( - name=name, - settings=EngineSettings.general_purpose_default(), - description=description, - region_name=region_name, + instance_type_key = self.resource_manager.instance_types.get_by_name( + instance_type_name=spec + ).key + + engine_revision = EngineRevision( + specification=EngineRevisionSpecification( + db_compute_instances_type_key=instance_type_key, + db_compute_instances_count=scale, + db_compute_instances_use_spot=False, + db_version="", + proxy_instances_type_key=instance_type_key, + proxy_instances_count=1, + proxy_version="", + ) ) - return self.create_engine( - engine=engine, - engine_revision=self.resource_manager.engine_revisions.create_general_purpose_engine_revision( # noqa: E501 - compute_instance_type_name=compute_instance_type_name, - compute_instance_count=compute_instance_count, - ), - ) - - def _default( - self, - name: str, - settings: EngineSettings, - description: Optional[str] = None, - region_name: Optional[str] = None, - ) -> Engine: - """ - Create a new local Engine object with default settings. - Args: - name: Name of the engine. - settings: Engine revision settings to apply to the engine. - description: Description of the engine. - region_name: Region in which to create the engine. - - Returns: - The new local Engine object. - """ - if region_name is not None: - region = self.resource_manager.regions.get_by_name(region_name=region_name) - else: - region = self.resource_manager.regions.default_region - return Engine( - name=name, - description=description, - compute_region_key=region.key, - settings=settings, - ) + return self._send_create_engine(engine=engine, engine_revision=engine_revision) def get_engines_by_ids(self, engine_ids: list[str]) -> list[Engine]: """Get multiple Engines from Firebolt by their ids.""" @@ -167,7 +118,7 @@ def bind_engine_to_database( engine=engine, database=database, is_default_engine=is_default_engine ) - def create_engine( + def _send_create_engine( self, engine: Engine, engine_revision: Optional[EngineRevision] = None ) -> Engine: """ diff --git a/src/firebolt/service/types.py b/src/firebolt/service/types.py new file mode 100644 index 00000000000..7cb3208acdb --- /dev/null +++ b/src/firebolt/service/types.py @@ -0,0 +1,28 @@ +from enum import Enum +from types import DynamicClassAttribute + + +class EngineType(Enum): + GENERAL_PURPOSE = "GENERAL_PURPOSE" + DATA_ANALYTICS = "DATA_ANALYTICS" + + @DynamicClassAttribute + def api_settings_preset_name(self) -> str: + return { + EngineType.GENERAL_PURPOSE: "ENGINE_SETTINGS_PRESET_GENERAL_PURPOSE", + EngineType.DATA_ANALYTICS: "ENGINE_SETTINGS_PRESET_DATA_ANALYTICS", + }[self] + + +class WarmupMethod(Enum): + MINIMAL = "MINIMAL" + PRELOAD_INDEXES = "PRELOAD_INDEXES" + PRELOAD_ALL_DATA = "PRELOAD_ALL_DATA" + + @DynamicClassAttribute + def api_name(self) -> str: + return { + WarmupMethod.MINIMAL: "ENGINE_SETTINGS_WARM_UP_MINIMAL", + WarmupMethod.PRELOAD_INDEXES: "ENGINE_SETTINGS_WARM_UP_INDEXES", + WarmupMethod.PRELOAD_ALL_DATA: "ENGINE_SETTINGS_WARM_UP_ALL", + }[self] From 21585501dbd855119236557145bdbe658412b1d9 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Wed, 13 Oct 2021 14:24:09 -0700 Subject: [PATCH 03/14] service model refactor wip --- examples.ipynb | 42 +-- src/firebolt/client/resource_manager_hooks.py | 7 +- src/firebolt/model/engine.py | 109 +++++++- src/firebolt/service/binding.py | 2 +- src/firebolt/service/engine.py | 242 ++++++++++-------- src/firebolt/service/types.py | 59 +++++ temp_example.ipynb | 166 ------------ 7 files changed, 330 insertions(+), 297 deletions(-) delete mode 100644 temp_example.ipynb diff --git a/examples.ipynb b/examples.ipynb index a14a3992dac..93c83430729 100644 --- a/examples.ipynb +++ b/examples.ipynb @@ -21,7 +21,7 @@ ")\n", "\n", "# show every web request\n", - "logging.getLogger(\"firebolt.http_client\").setLevel(\"DEBUG\")" + "logging.getLogger(\"firebolt.client\").setLevel(\"DEBUG\")" ] }, { @@ -74,9 +74,9 @@ "from firebolt.common import Settings\n", "\n", "settings = Settings(\n", + " server=\"\", # api.app.firebolt.io\n", " user=\"\",\n", " password=\"\",\n", - " server=\"\", # api.app.firebolt.io\n", " default_region=\"\", # us-east-1\n", ")" ] @@ -107,8 +107,7 @@ "id": "6aeee8cc", "metadata": {}, "source": [ - "### Create default general purpose (ingest) engine\n", - "Note: there is also `Engine.create_analytics(...)` which works the same way as `create_general_purpose(...)`." + "### Create engine" ] }, { @@ -131,10 +130,7 @@ }, "outputs": [], "source": [ - "engine = rm.engines.create_general_purpose_engine(\n", - " name=engine_name,\n", - " compute_instance_type_name=\"m5d.4xlarge\",\n", - ")\n", + "engine = rm.engines.create(name=engine_name)\n", "debug(engine)" ] }, @@ -143,7 +139,26 @@ "id": "96138b51", "metadata": {}, "source": [ - "### Get existing Engine by name, start, stop" + "### Get Engine by name" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "650eced4", + "metadata": {}, + "outputs": [], + "source": [ + "engine = rm.engines.get_by_name(engine_name=engine_name)\n", + "debug(engine)" + ] + }, + { + "cell_type": "markdown", + "id": "cf6ac3c5", + "metadata": {}, + "source": [ + "### Start, Stop Engine" ] }, { @@ -155,13 +170,10 @@ }, "outputs": [], "source": [ - "engine = rm.engines.get_engine_by_name(engine_name=engine_name)\n", - "debug(engine)\n", - "\n", "started_engine = rm.engines.start_engine(engine=engine)\n", "debug(started_engine)\n", "\n", - "stopped_engine = rm.engines.stop_engine(engine=engine)\n", + "stopped_engine = rm.engines.stop(engine=engine)\n", "debug(stopped_engine)" ] }, @@ -182,7 +194,7 @@ }, "outputs": [], "source": [ - "engine = rm.engines.get_engine_by_name(engine_name=engine_name)\n", + "engine = rm.engines.get_by_name(engine_name=engine_name)\n", "engine.name = f\"{engine_name}_copy\"\n", "debug(engine)\n", "\n", @@ -234,7 +246,7 @@ "engine = rm.engines.create_general_purpose_engine(\n", " name=engine_name, compute_instance_type_name=\"m5d.4xlarge\"\n", ")\n", - "binding = rm.engines.bind_engine_to_database(\n", + "binding = rm.engines.attach_to_database(\n", " engine=engine, database=database, is_default_engine=True\n", ")\n", "\n", diff --git a/src/firebolt/client/resource_manager_hooks.py b/src/firebolt/client/resource_manager_hooks.py index 344eb3e3c8c..2e718342422 100644 --- a/src/firebolt/client/resource_manager_hooks.py +++ b/src/firebolt/client/resource_manager_hooks.py @@ -25,8 +25,7 @@ def raise_on_4xx_5xx(response: Response) -> None: """ Hook to raise an error on http responses with codes indicating an error. - If a 400 code is found, raise a follow-on BadRequestError, attempting to - indicate more specifically how the request is bad. + If an error is message is found raise as an ApiError """ try: response.raise_for_status() @@ -42,6 +41,8 @@ def raise_on_4xx_5xx(response: Response) -> None: f"Response: {parsed_response}. " ) if "message" in parsed_response: - debug_message += f"Message: {parsed_response['message']}" + error_message = parsed_response["message"] + logger.debug(f"{debug_message} Message: {error_message}") + raise RuntimeError(error_message) from exc logger.debug(debug_message) raise exc diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index d25d2c94d98..6aa41abb086 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -1,15 +1,25 @@ from __future__ import annotations import logging +import time from datetime import datetime -from typing import Annotated, Optional +from typing import TYPE_CHECKING, Annotated, Optional -from pydantic import Field +from pydantic import Field, PrivateAttr from firebolt.model import FireboltBaseModel +from firebolt.model.binding import Binding +from firebolt.model.database import Database from firebolt.model.engine_revision import EngineRevisionKey from firebolt.model.region import RegionKey -from firebolt.service.types import EngineType, WarmupMethod +from firebolt.service.types import ( + EngineStatusSummary, + EngineType, + WarmupMethod, +) + +if TYPE_CHECKING: + from firebolt.service.engine import EngineService logger = logging.getLogger(__name__) @@ -62,6 +72,8 @@ class Engine(FireboltBaseModel): Engines are configured in Settings and in EngineRevisions. """ + _engine_service: Optional[EngineService] = PrivateAttr() + name: Annotated[str, Field(min_length=1, max_length=255, regex=r"^[0-9a-zA-Z_]+$")] compute_region_key: RegionKey = Field(alias="compute_region_id") settings: EngineSettings @@ -93,3 +105,94 @@ def engine_id(self) -> str: if self.key is None: raise ValueError("engine key is None") return self.key.engine_id + + def attach_to_database( + self, engine: Engine, database: Database, is_default_engine: bool + ) -> Binding: + """ + Attach this engine to a database. + + Args: + engine: Engine to attach to the database. + database: Database to which the engine will be attached. + is_default_engine: + Whether this engine should be used as default for this database. + Only one engine can be set as default for a single database. + This will overwrite any existing default. + """ + return self._engine_service.resource_manager.bindings.create_binding( + engine=engine, database=database, is_default_engine=is_default_engine + ) + + def start( + self, + engine: Engine, + wait_for_startup: bool = True, + wait_timeout_seconds: int = 3600, + print_dots: bool = True, + ) -> Engine: + """ + Start an engine. If it's already started, do nothing. + + Args: + engine: + The engine to start. + wait_for_startup: + If True, wait for startup to complete. + If false, return immediately after requesting startup. + wait_timeout_seconds: + Number of seconds to wait for startup to complete + before raising a TimeoutError. + print_dots: + If True, print dots periodically while waiting for engine startup. + If false, do not print any dots. + + Returns: + The updated Engine from Firebolt. + """ + response = self._engine_service.client.post( + url=f"/core/v1/account/engines/{engine.engine_id}:start", + ) + engine = self._engine_service.parse_engine_dict(response.json()["engine"]) + status = engine.current_status_summary + logger.info( + f"Starting Engine engine_id={engine.engine_id} " + f"name={engine.name} status_summary={status}" + ) + start_time = time.time() + end_time = start_time + wait_timeout_seconds + + # summary statuses: https://tinyurl.com/as7a9ru9 + while ( + wait_for_startup + and status != EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING.name + ): + if time.time() >= end_time: + raise TimeoutError( + f"Could not start engine within {wait_timeout_seconds} seconds." + ) + engine = self._engine_service.get_by_id(engine_id=engine.engine_id) + new_status = engine.current_status_summary + if new_status != status: + logger.info(f"Engine status_summary={new_status}") + elif print_dots: + print(".", end="") + time.sleep(5) + status = new_status + return engine + + def stop(self, engine: Engine) -> Engine: + """Stop an Engine running on Firebolt.""" + response = self._engine_service.client.post( + url=f"/core/v1/account/engines/{engine.engine_id}:stop", + ) + return self._engine_service.parse_engine_dict(response.json()["engine"]) + + def delete(self, engine: Engine) -> Engine: + """Delete an Engine from Firebolt.""" + response = self._engine_service.client.delete( + url=f"/core/v1" + f"/accounts/{self._engine_service.account_id}" + f"/engines/{engine.engine_id}", + ) + return self._engine_service.parse_engine_dict(response.json()["engine"]) diff --git a/src/firebolt/service/binding.py b/src/firebolt/service/binding.py index f42b66879a0..e20731909c8 100644 --- a/src/firebolt/service/binding.py +++ b/src/firebolt/service/binding.py @@ -68,7 +68,7 @@ def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: def get_engines_bound_to_database(self, database: Database) -> list[Engine]: """Get a list of engines that are bound to a database.""" bindings = self.list_bindings(database_id=database.database_id) - return self.resource_manager.engines.get_engines_by_ids( + return self.resource_manager.engines.get_by_ids( engine_ids=[b.engine_id for b in bindings] ) diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index 87f54c37c98..948b1d31388 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -1,11 +1,8 @@ import json import logging -import time from typing import Optional, Union from firebolt.model import FireboltBaseModel -from firebolt.model.binding import Binding -from firebolt.model.database import Database from firebolt.model.engine import Engine, EngineSettings from firebolt.model.engine_revision import ( EngineRevision, @@ -19,22 +16,40 @@ class EngineService(BaseService): - def get_engine_by_id(self, engine_id: str) -> Engine: + def parse_engine_dict(self, engine_dict: dict) -> Engine: + engine = Engine.parse_obj(engine_dict) + engine._engine_service = self + return engine + + def get_by_id(self, engine_id: str) -> Engine: """Get an Engine from Firebolt by its id.""" response = self.client.get( url=f"/core/v1/accounts/{self.account_id}/engines/{engine_id}", ) - engine_spec: dict = response.json()["engine"] - return Engine.parse_obj(engine_spec) + engine_entry: dict = response.json()["engine"] + return self.parse_engine_dict(engine_entry) + + def get_by_ids(self, engine_ids: list[str]) -> list[Engine]: + """Get multiple Engines from Firebolt by their ids.""" + response = self.client.post( + url=f"/core/v1/engines:getByIds", + json={ + "engine_ids": [ + {"account_id": self.account_id, "engine_id": engine_id} + for engine_id in engine_ids + ] + }, + ) + return [self.parse_engine_dict(e) for e in response.json()["engines"]] - def get_engine_by_name(self, engine_name: str) -> Engine: + def get_by_name(self, engine_name: str) -> Engine: """Get an Engine from Firebolt by its name.""" response = self.client.get( url="/core/v1/account/engines:getIdByName", params={"engine_name": engine_name}, ) engine_id = response.json()["engine_id"]["engine_id"] - return self.get_engine_by_id(engine_id=engine_id) + return self.get_by_id(engine_id=engine_id) def create( self, @@ -47,6 +62,28 @@ def create( warmup: Union[str, WarmupMethod] = WarmupMethod.PRELOAD_INDEXES, description: str = "", ) -> Engine: + """ + Create a new Engine. + + Args: + name: An identifier that specifies the name of the engine. + region: The AWS region in which the engine runs. + engine_type: The engine type. GENERAL_PURPOSE or DATA_ANALYTICS + scale: The number of compute instances on the engine. + The scale can be any int from 1 to 128. + spec: The AWS EC2 instance type. + auto_stop: The amount of time (in minutes) after which + the engine automatically stops. + warmup: The warmup method that should be used. + MINIMAL: On-demand loading (both indexes and tables' data). + PRELOAD_INDEXES: Load indexes only. + PRELOAD_ALL_DATA: Full data auto-load + (both indexes and table data - full warmup). + description: A short description of the engine's purpose. + + Returns: + Engine with the specified settings. + """ if isinstance(engine_type, str): engine_type = EngineType[engine_type] if isinstance(warmup, str): @@ -87,36 +124,23 @@ def create( return self._send_create_engine(engine=engine, engine_revision=engine_revision) - def get_engines_by_ids(self, engine_ids: list[str]) -> list[Engine]: - """Get multiple Engines from Firebolt by their ids.""" - response = self.client.post( - url=f"/core/v1/engines:getByIds", - json={ - "engine_ids": [ - {"account_id": self.account_id, "engine_id": engine_id} - for engine_id in engine_ids - ] - }, - ) - return [Engine.parse_obj(e) for e in response.json()["engines"]] - - def bind_engine_to_database( - self, engine: Engine, database: Database, is_default_engine: bool - ) -> Binding: - """ - Attach this engine to a database. - - Args: - engine: Engine to attach to the database. - database: Database to which the engine will be attached. - is_default_engine: - Whether this engine should be used as default for this database. - Only one engine can be set as default for a single database. - This will overwrite any existing default. - """ - return self.resource_manager.bindings.create_binding( - engine=engine, database=database, is_default_engine=is_default_engine - ) + # def attach_to_database( + # self, engine: Engine, database: Database, is_default_engine: bool + # ) -> Binding: + # """ + # Attach this engine to a database. + # + # Args: + # engine: Engine to attach to the database. + # database: Database to which the engine will be attached. + # is_default_engine: + # Whether this engine should be used as default for this database. + # Only one engine can be set as default for a single database. + # This will overwrite any existing default. + # """ + # return self.resource_manager.bindings.create_binding( + # engine=engine, database=database, is_default_engine=is_default_engine + # ) def _send_create_engine( self, engine: Engine, engine_revision: Optional[EngineRevision] = None @@ -150,74 +174,74 @@ class _EngineCreateRequest(FireboltBaseModel): ).json(by_alias=True) ), ) - return Engine.parse_obj(response.json()["engine"]) - - def start_engine( - self, - engine: Engine, - wait_for_startup: bool = True, - wait_timeout_seconds: int = 3600, - print_dots: bool = True, - ) -> Engine: - """ - Start an engine. If it's already started, do nothing. - - Args: - engine: - The engine to start. - wait_for_startup: - If True, wait for startup to complete. - If false, return immediately after requesting startup. - wait_timeout_seconds: - Number of seconds to wait for startup to complete - before raising a TimeoutError. - print_dots: - If True, print dots periodically while waiting for engine startup. - If false, do not print any dots. - - Returns: - The updated Engine from Firebolt. - """ - response = self.client.post( - url=f"/core/v1/account/engines/{engine.engine_id}:start", - ) - engine = Engine.parse_obj(response.json()["engine"]) - status = engine.current_status_summary - logger.info( - f"Starting Engine engine_id={engine.engine_id} " - f"name={engine.name} status_summary={status}" - ) - start_time = time.time() - end_time = start_time + wait_timeout_seconds - - # summary statuses: https://tinyurl.com/as7a9ru9 - while wait_for_startup and status != "ENGINE_STATUS_SUMMARY_RUNNING": - if time.time() >= end_time: - raise TimeoutError( - f"Could not start engine within {wait_timeout_seconds} seconds." - ) - engine = self.get_engine_by_id(engine_id=engine.engine_id) - new_status = engine.current_status_summary - if new_status != status: - logger.info(f"Engine status_summary={new_status}") - elif print_dots: - print(".", end="") - time.sleep(5) - status = new_status - return engine - - def stop_engine(self, engine: Engine) -> Engine: - """Stop an Engine running on Firebolt.""" - response = self.client.post( - url=f"/core/v1/account/engines/{engine.engine_id}:stop", - ) - return Engine.parse_obj(response.json()["engine"]) - - def delete(self, engine: Engine) -> Engine: - """Delete an Engine from Firebolt.""" - response = self.client.delete( - url=f"/core/v1" - f"/accounts/{self.account_id}" - f"/engines/{engine.engine_id}", - ) - return Engine.parse_obj(response.json()["engine"]) + return self.parse_engine_dict(response.json()["engine"]) + + # def start( + # self, + # engine: Engine, + # wait_for_startup: bool = True, + # wait_timeout_seconds: int = 3600, + # print_dots: bool = True, + # ) -> Engine: + # """ + # Start an engine. If it's already started, do nothing. + # + # Args: + # engine: + # The engine to start. + # wait_for_startup: + # If True, wait for startup to complete. + # If false, return immediately after requesting startup. + # wait_timeout_seconds: + # Number of seconds to wait for startup to complete + # before raising a TimeoutError. + # print_dots: + # If True, print dots periodically while waiting for engine startup. + # If false, do not print any dots. + # + # Returns: + # The updated Engine from Firebolt. + # """ + # response = self.client.post( + # url=f"/core/v1/account/engines/{engine.engine_id}:start", + # ) + # engine = Engine.parse_obj(response.json()["engine"]) + # status = engine.current_status_summary + # logger.info( + # f"Starting Engine engine_id={engine.engine_id} " + # f"name={engine.name} status_summary={status}" + # ) + # start_time = time.time() + # end_time = start_time + wait_timeout_seconds + # + # # summary statuses: https://tinyurl.com/as7a9ru9 + # while wait_for_startup and status != "ENGINE_STATUS_SUMMARY_RUNNING": + # if time.time() >= end_time: + # raise TimeoutError( + # f"Could not start engine within {wait_timeout_seconds} seconds." + # ) + # engine = self.get_by_id(engine_id=engine.engine_id) + # new_status = engine.current_status_summary + # if new_status != status: + # logger.info(f"Engine status_summary={new_status}") + # elif print_dots: + # print(".", end="") + # time.sleep(5) + # status = new_status + # return engine + + # def stop(self, engine: Engine) -> Engine: + # """Stop an Engine running on Firebolt.""" + # response = self.client.post( + # url=f"/core/v1/account/engines/{engine.engine_id}:stop", + # ) + # return self.parse_engine_dict(response.json()["engine"]) + # + # def delete(self, engine: Engine) -> Engine: + # """Delete an Engine from Firebolt.""" + # response = self.client.delete( + # url=f"/core/v1" + # f"/accounts/{self.account_id}" + # f"/engines/{engine.engine_id}", + # ) + # return self.parse_engine_dict(response.json()["engine"]) diff --git a/src/firebolt/service/types.py b/src/firebolt/service/types.py index 7cb3208acdb..c3b2ebdc02f 100644 --- a/src/firebolt/service/types.py +++ b/src/firebolt/service/types.py @@ -26,3 +26,62 @@ def api_name(self) -> str: WarmupMethod.PRELOAD_INDEXES: "ENGINE_SETTINGS_WARM_UP_INDEXES", WarmupMethod.PRELOAD_ALL_DATA: "ENGINE_SETTINGS_WARM_UP_ALL", }[self] + + +class EngineStatusSummary(Enum): + ENGINE_STATUS_SUMMARY_UNSPECIFIED = "ENGINE_STATUS_SUMMARY_UNSPECIFIED" + + # Fully stopped. + ENGINE_STATUS_SUMMARY_STOPPED = "ENGINE_STATUS_SUMMARY_STOPPED" + + # Provisioning process is in progress. + # We are creating cloud infra for this engine. + ENGINE_STATUS_SUMMARY_STARTING = "ENGINE_STATUS_SUMMARY_STARTING" + + # Provisioning process is complete. + # We are now waiting for PackDB cluster to initialize and start. + ENGINE_STATUS_SUMMARY_STARTING_INITIALIZING = ( + "ENGINE_STATUS_SUMMARY_STARTING_INITIALIZING" + ) + + # Fully started. + # Engine is ready to serve requests. + ENGINE_STATUS_SUMMARY_RUNNING = "ENGINE_STATUS_SUMMARY_RUNNING" + + # Version of the PackDB is changing. + # This is zero downtime operation that does not affect engine work.s + # This status is reserved for future use (not used fow now). + ENGINE_STATUS_SUMMARY_UPGRADING = "ENGINE_STATUS_SUMMARY_UPGRADING" + + # Hard restart (full stop/start cycle) is in progress. + # Underlying infrastructure is being recreated. + ENGINE_STATUS_SUMMARY_RESTARTING = "ENGINE_STATUS_SUMMARY_RESTARTING" + + # Hard restart (full stop/start cycle) is in progress. + # Underlying infrastructure is ready, waiting for + # PackDB cluster to initialize and start. + # This status is logically the same as ENGINE_STATUS_SUMMARY_STARTING_INITIALIZING, + # but used during restart cycle. + ENGINE_STATUS_SUMMARY_RESTARTING_INITIALIZING = ( + "ENGINE_STATUS_SUMMARY_RESTARTING_INITIALIZING" + ) + + # Underlying infrastructure has issues and is being repaired. + # Engine is still running, but it's not fully healthy and some queries may fail. + ENGINE_STATUS_SUMMARY_REPAIRING = "ENGINE_STATUS_SUMMARY_REPAIRING" + + # Stop is in progress. + ENGINE_STATUS_SUMMARY_STOPPING = "ENGINE_STATUS_SUMMARY_STOPPING" + + # Termination is in progress. + # All infrastructure that belongs to this engine will be completely destroyed. + ENGINE_STATUS_SUMMARY_DELETING = "ENGINE_STATUS_SUMMARY_DELETING" + + # Infrastructure is terminated, engine data is deleted. + ENGINE_STATUS_SUMMARY_DELETED = "ENGINE_STATUS_SUMMARY_DELETED" + + # Failed to start or stop. + # This status only indicates that there were issues during provisioning operations. + # If engine enters this status, + # all infrastructure should be stopped/terminated already. + ENGINE_STATUS_SUMMARY_FAILED = "ENGINE_STATUS_SUMMARY_FAILED" diff --git a/temp_example.ipynb b/temp_example.ipynb deleted file mode 100644 index c47fda3e671..00000000000 --- a/temp_example.ipynb +++ /dev/null @@ -1,166 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 1, - "id": "5994e2ff", - "metadata": {}, - "outputs": [], - "source": [ - "# required imports\n", - "from firebolt.client.registry import init_firebolt_client\n", - "\n", - "# use debug function for pretty printing\n", - "from devtools import debug\n", - "\n", - "# configure logging\n", - "import logging\n", - "\n", - "logging.basicConfig(\n", - " format=\"{asctime} - {name} - {levelname} - {message}\", style=\"{\", level=\"INFO\"\n", - ")\n", - "\n", - "# show every web request\n", - "logging.getLogger(\"firebolt.http_client\").setLevel(\"DEBUG\")" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "id": "46043f53", - "metadata": {}, - "outputs": [], - "source": [ - "from firebolt.service.database_service import DatabaseServiceV1" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "id": "e4d4e28c", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "/var/folders/46/smjdxy0541s253bfdvwg8r_h0000gn/T/ipykernel_79451/2190712626.py:2 \n", - " fc.base_url: URL('https://api.dev.firebolt.io') (URL)\n" - ] - }, - { - "ename": "TypeError", - "evalue": "__init__() missing 1 required positional argument: 'firebolt_client'", - "output_type": "error", - "traceback": [ - "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", - "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", - "\u001b[0;32m/var/folders/46/smjdxy0541s253bfdvwg8r_h0000gn/T/ipykernel_79451/2190712626.py\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0mdebug\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mbase_url\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 3\u001b[0m database_service = (\n\u001b[0;32m----> 4\u001b[0;31m \u001b[0mDatabaseServiceV1\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 5\u001b[0m ) # implicitly uses fc because there is only 1 available\n\u001b[1;32m 6\u001b[0m \u001b[0mdb\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mdatabase_service\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mget_by_name\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"eg_temp\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n", - "\u001b[0;31mTypeError\u001b[0m: __init__() missing 1 required positional argument: 'firebolt_client'" - ] - } - ], - "source": [ - "with init_firebolt_client() as fc:\n", - " debug(fc.base_url)\n", - " database_service = (\n", - " DatabaseServiceV1()\n", - " ) # implicitly uses fc because there is only 1 available\n", - " db = database_service.get_by_name(\"eg_temp\")\n", - " debug(db)" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "id": "f53d989e", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "/var/folders/46/smjdxy0541s253bfdvwg8r_h0000gn/T/ipykernel_79451/3904033391.py:2 \n", - " fc.base_url: URL('https://api.dev.firebolt.io') (URL)\n", - "/var/folders/46/smjdxy0541s253bfdvwg8r_h0000gn/T/ipykernel_79451/3904033391.py:5 \n", - " db: Database(\n", - " name='eg_temp',\n", - " compute_region_key=RegionKey(\n", - " provider_id='402a51bb-1c8e-4dc4-9e05-ced3c1e2186e',\n", - " region_id='f1841f9f-4031-4a9a-b3d7-1dc27e7e61ed',\n", - " ),\n", - " database_key=DatabaseKey(\n", - " account_id='15ff9dd6-32a6-417b-b004-6f69d7cc4c8b',\n", - " database_id='358b6aad-a96a-4d95-9d19-b24740157a1e',\n", - " ),\n", - " description='',\n", - " emoji='1F3A5',\n", - " current_status='DATABASE_STATUS_RUNNING',\n", - " health_status='DATABASE_HEALTH_STATUS_UNSPECIFIED',\n", - " data_size_full=0,\n", - " data_size_compressed=0,\n", - " is_system_database=False,\n", - " storage_bucket_name='358b6aad-a96a-4d95-9d19-b24740157a1e',\n", - " create_time=datetime.datetime(2021, 8, 24, 9, 55, 52, 78923, tzinfo=datetime.timezone.utc),\n", - " create_actor='/users/6fffb386-895f-4444-809f-45214234f2f3',\n", - " last_update_time=datetime.datetime(2021, 8, 26, 18, 1, 54, 772643, tzinfo=datetime.timezone.utc),\n", - " last_update_actor='/users/6fffb386-895f-4444-809f-45214234f2f3',\n", - " desired_status='DATABASE_STATUS_UNSPECIFIED',\n", - " ) (Database)\n" - ] - } - ], - "source": [ - "with init_firebolt_client() as fc, init_firebolt_client() as fc2:\n", - " debug(fc.base_url)\n", - " database_service = DatabaseServiceV1(fc) # explicitly passed\n", - " db = database_service.get_by_name(\"eg_temp\")\n", - " debug(db)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "086a95b9", - "metadata": {}, - "outputs": [], - "source": [ - "# should raise an error because we are using 2 clients but not specifying which one\n", - "with init_firebolt_client() as fc, init_firebolt_client() as fc2:\n", - " debug(fc.base_url)\n", - " database_service = DatabaseService()\n", - " db = database_service.get_by_name(\"eg_temp\")\n", - " debug(db)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "dd2d7fea", - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.9.6" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} From d239e069d5bbbbb7ffaaa4cf732767b3df2ca1db Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Wed, 13 Oct 2021 14:59:58 -0700 Subject: [PATCH 04/14] engine wip --- examples.ipynb | 30 +++++++--- src/firebolt/model/engine.py | 29 ++++++--- src/firebolt/service/engine.py | 104 +++------------------------------ 3 files changed, 51 insertions(+), 112 deletions(-) diff --git a/examples.ipynb b/examples.ipynb index 93c83430729..3804c46240d 100644 --- a/examples.ipynb +++ b/examples.ipynb @@ -139,13 +139,13 @@ "id": "96138b51", "metadata": {}, "source": [ - "### Get Engine by name" + "### Get by name" ] }, { "cell_type": "code", "execution_count": null, - "id": "650eced4", + "id": "2a26eac0", "metadata": {}, "outputs": [], "source": [ @@ -155,10 +155,29 @@ }, { "cell_type": "markdown", - "id": "cf6ac3c5", + "id": "2a2bda8c", "metadata": {}, "source": [ - "### Start, Stop Engine" + "### Start" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e9b7d784", + "metadata": {}, + "outputs": [], + "source": [ + "engine = engine.start()\n", + "debug(engine)" + ] + }, + { + "cell_type": "markdown", + "id": "d5d3d44d", + "metadata": {}, + "source": [ + "### Stop" ] }, { @@ -170,9 +189,6 @@ }, "outputs": [], "source": [ - "started_engine = rm.engines.start_engine(engine=engine)\n", - "debug(started_engine)\n", - "\n", "stopped_engine = rm.engines.stop(engine=engine)\n", "debug(stopped_engine)" ] diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index 6aa41abb086..5ae1142f1ec 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -3,7 +3,7 @@ import logging import time from datetime import datetime -from typing import TYPE_CHECKING, Annotated, Optional +from typing import TYPE_CHECKING, Annotated, Any, Optional from pydantic import Field, PrivateAttr @@ -72,8 +72,10 @@ class Engine(FireboltBaseModel): Engines are configured in Settings and in EngineRevisions. """ - _engine_service: Optional[EngineService] = PrivateAttr() + # internal + _engine_service: EngineService = PrivateAttr() + # required name: Annotated[str, Field(min_length=1, max_length=255, regex=r"^[0-9a-zA-Z_]+$")] compute_region_key: RegionKey = Field(alias="compute_region_id") settings: EngineSettings @@ -100,6 +102,12 @@ class Engine(FireboltBaseModel): alias="endpoint_desired_revision_id" ) + @classmethod + def parse_obj_with_service(cls, obj: Any, engine_service: EngineService) -> Engine: + engine = cls.parse_obj(obj) + engine._engine_service = engine_service + return engine + @property def engine_id(self) -> str: if self.key is None: @@ -126,7 +134,6 @@ def attach_to_database( def start( self, - engine: Engine, wait_for_startup: bool = True, wait_timeout_seconds: int = 3600, print_dots: bool = True, @@ -135,8 +142,6 @@ def start( Start an engine. If it's already started, do nothing. Args: - engine: - The engine to start. wait_for_startup: If True, wait for startup to complete. If false, return immediately after requesting startup. @@ -151,9 +156,11 @@ def start( The updated Engine from Firebolt. """ response = self._engine_service.client.post( - url=f"/core/v1/account/engines/{engine.engine_id}:start", + url=f"/core/v1/account/engines/{self.engine_id}:start", + ) + engine = Engine.parse_obj_with_service( + obj=response.json()["engine"], engine_service=self._engine_service ) - engine = self._engine_service.parse_engine_dict(response.json()["engine"]) status = engine.current_status_summary logger.info( f"Starting Engine engine_id={engine.engine_id} " @@ -186,7 +193,9 @@ def stop(self, engine: Engine) -> Engine: response = self._engine_service.client.post( url=f"/core/v1/account/engines/{engine.engine_id}:stop", ) - return self._engine_service.parse_engine_dict(response.json()["engine"]) + return Engine.parse_obj_with_service( + obj=response.json()["engine"], engine_service=self._engine_service + ) def delete(self, engine: Engine) -> Engine: """Delete an Engine from Firebolt.""" @@ -195,4 +204,6 @@ def delete(self, engine: Engine) -> Engine: f"/accounts/{self._engine_service.account_id}" f"/engines/{engine.engine_id}", ) - return self._engine_service.parse_engine_dict(response.json()["engine"]) + return Engine.parse_obj_with_service( + obj=response.json()["engine"], engine_service=self._engine_service + ) diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index 948b1d31388..cfd7639ec22 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -16,18 +16,13 @@ class EngineService(BaseService): - def parse_engine_dict(self, engine_dict: dict) -> Engine: - engine = Engine.parse_obj(engine_dict) - engine._engine_service = self - return engine - def get_by_id(self, engine_id: str) -> Engine: """Get an Engine from Firebolt by its id.""" response = self.client.get( url=f"/core/v1/accounts/{self.account_id}/engines/{engine_id}", ) engine_entry: dict = response.json()["engine"] - return self.parse_engine_dict(engine_entry) + return Engine.parse_obj_with_service(obj=engine_entry, engine_service=self) def get_by_ids(self, engine_ids: list[str]) -> list[Engine]: """Get multiple Engines from Firebolt by their ids.""" @@ -40,7 +35,10 @@ def get_by_ids(self, engine_ids: list[str]) -> list[Engine]: ] }, ) - return [self.parse_engine_dict(e) for e in response.json()["engines"]] + return [ + Engine.parse_obj_with_service(obj=e, engine_service=self) + for e in response.json()["engines"] + ] def get_by_name(self, engine_name: str) -> Engine: """Get an Engine from Firebolt by its name.""" @@ -124,24 +122,6 @@ def create( return self._send_create_engine(engine=engine, engine_revision=engine_revision) - # def attach_to_database( - # self, engine: Engine, database: Database, is_default_engine: bool - # ) -> Binding: - # """ - # Attach this engine to a database. - # - # Args: - # engine: Engine to attach to the database. - # database: Database to which the engine will be attached. - # is_default_engine: - # Whether this engine should be used as default for this database. - # Only one engine can be set as default for a single database. - # This will overwrite any existing default. - # """ - # return self.resource_manager.bindings.create_binding( - # engine=engine, database=database, is_default_engine=is_default_engine - # ) - def _send_create_engine( self, engine: Engine, engine_revision: Optional[EngineRevision] = None ) -> Engine: @@ -174,74 +154,6 @@ class _EngineCreateRequest(FireboltBaseModel): ).json(by_alias=True) ), ) - return self.parse_engine_dict(response.json()["engine"]) - - # def start( - # self, - # engine: Engine, - # wait_for_startup: bool = True, - # wait_timeout_seconds: int = 3600, - # print_dots: bool = True, - # ) -> Engine: - # """ - # Start an engine. If it's already started, do nothing. - # - # Args: - # engine: - # The engine to start. - # wait_for_startup: - # If True, wait for startup to complete. - # If false, return immediately after requesting startup. - # wait_timeout_seconds: - # Number of seconds to wait for startup to complete - # before raising a TimeoutError. - # print_dots: - # If True, print dots periodically while waiting for engine startup. - # If false, do not print any dots. - # - # Returns: - # The updated Engine from Firebolt. - # """ - # response = self.client.post( - # url=f"/core/v1/account/engines/{engine.engine_id}:start", - # ) - # engine = Engine.parse_obj(response.json()["engine"]) - # status = engine.current_status_summary - # logger.info( - # f"Starting Engine engine_id={engine.engine_id} " - # f"name={engine.name} status_summary={status}" - # ) - # start_time = time.time() - # end_time = start_time + wait_timeout_seconds - # - # # summary statuses: https://tinyurl.com/as7a9ru9 - # while wait_for_startup and status != "ENGINE_STATUS_SUMMARY_RUNNING": - # if time.time() >= end_time: - # raise TimeoutError( - # f"Could not start engine within {wait_timeout_seconds} seconds." - # ) - # engine = self.get_by_id(engine_id=engine.engine_id) - # new_status = engine.current_status_summary - # if new_status != status: - # logger.info(f"Engine status_summary={new_status}") - # elif print_dots: - # print(".", end="") - # time.sleep(5) - # status = new_status - # return engine - - # def stop(self, engine: Engine) -> Engine: - # """Stop an Engine running on Firebolt.""" - # response = self.client.post( - # url=f"/core/v1/account/engines/{engine.engine_id}:stop", - # ) - # return self.parse_engine_dict(response.json()["engine"]) - # - # def delete(self, engine: Engine) -> Engine: - # """Delete an Engine from Firebolt.""" - # response = self.client.delete( - # url=f"/core/v1" - # f"/accounts/{self.account_id}" - # f"/engines/{engine.engine_id}", - # ) - # return self.parse_engine_dict(response.json()["engine"]) + return Engine.parse_obj_with_service( + obj=response.json()["engine"], engine_service=self + ) From 2cc7867f587798dbf0bae89e8586a88d17787e5e Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Wed, 13 Oct 2021 15:53:55 -0700 Subject: [PATCH 05/14] engine wip --- examples.ipynb | 80 ++++++++++++++++++++++++++++---- src/firebolt/common/__init__.py | 8 ---- src/firebolt/common/exception.py | 37 ++++----------- src/firebolt/model/engine.py | 31 +++++++++++-- src/firebolt/service/binding.py | 3 +- 5 files changed, 109 insertions(+), 50 deletions(-) diff --git a/examples.ipynb b/examples.ipynb index 3804c46240d..e94e178d980 100644 --- a/examples.ipynb +++ b/examples.ipynb @@ -102,23 +102,65 @@ "debug(rm.client.account_id)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "5b40dfdd", + "metadata": {}, + "outputs": [], + "source": [ + "database_name = \"\"\n", + "engine_name = \"\"" + ] + }, { "cell_type": "markdown", - "id": "6aeee8cc", + "id": "fea50dfa", "metadata": {}, "source": [ - "### Create engine" + "# Database" + ] + }, + { + "cell_type": "markdown", + "id": "b111f874", + "metadata": {}, + "source": [ + "### Create database" ] }, { "cell_type": "code", "execution_count": null, - "id": "5b40dfdd", + "id": "cdbeedd3", "metadata": {}, "outputs": [], "source": [ - "database_name = \"\"\n", - "engine_name = \"\"" + "rm.databases.create(database_name=database_name, region_name=\"us-east-1\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09bfb0c2", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "id": "bf78ffc5", + "metadata": {}, + "source": [ + "# Engine" + ] + }, + { + "cell_type": "markdown", + "id": "6aeee8cc", + "metadata": {}, + "source": [ + "### Create engine" ] }, { @@ -145,7 +187,7 @@ { "cell_type": "code", "execution_count": null, - "id": "2a26eac0", + "id": "fd76fb57", "metadata": {}, "outputs": [], "source": [ @@ -155,7 +197,27 @@ }, { "cell_type": "markdown", - "id": "2a2bda8c", + "id": "14aedbac", + "metadata": {}, + "source": [ + "### Attach to database" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9fc01024", + "metadata": {}, + "outputs": [], + "source": [ + "engine.attach_to_database(\n", + " database=rm.databases.get_by_name(database_name=database_name)\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "a1e60b06", "metadata": {}, "source": [ "### Start" @@ -164,7 +226,7 @@ { "cell_type": "code", "execution_count": null, - "id": "e9b7d784", + "id": "cf032a2c", "metadata": {}, "outputs": [], "source": [ @@ -174,7 +236,7 @@ }, { "cell_type": "markdown", - "id": "d5d3d44d", + "id": "b241f1a4", "metadata": {}, "source": [ "### Stop" diff --git a/src/firebolt/common/__init__.py b/src/firebolt/common/__init__.py index 9ba045faa6b..e9718def791 100644 --- a/src/firebolt/common/__init__.py +++ b/src/firebolt/common/__init__.py @@ -1,11 +1,3 @@ -from firebolt.common.exception import ( - AlreadyBoundError, - DatabaseRequiredError, - EndpointRequiredError, - FireboltClientRequiredError, - FireboltEngineError, - FireboltError, -) from firebolt.common.settings import Settings diff --git a/src/firebolt/common/exception.py b/src/firebolt/common/exception.py index 9119efc42ae..eb63d94342d 100644 --- a/src/firebolt/common/exception.py +++ b/src/firebolt/common/exception.py @@ -5,39 +5,22 @@ class FireboltError(Exception): pass -class FireboltClientLookupError(FireboltError): - pass - - -class FireboltClientRequiredError(FireboltError): - def __init__( - self, - message: str = cleandoc( - """ - Firebolt Client not found. Start one in a context manager: - ``` - with init_firebolt_client() as fc: - ... - ``` - """ - ), - ): - super().__init__(message) - - class FireboltEngineError(FireboltError): """Base error for engine errors.""" -class AlreadyBoundError(FireboltEngineError): - pass - +class NoAttachedDatabaseError(FireboltEngineError): + def __init__(self, method_name: str): + self.method_name = method_name -class EndpointRequiredError(FireboltEngineError): - pass + def __str__(self) -> str: + return ( + f"unable to call {self.method_name}: " + f"engine must to be attached to a database first." + ) -class DatabaseRequiredError(FireboltEngineError): +class AlreadyBoundError(FireboltEngineError): pass @@ -56,7 +39,6 @@ class CursorError(FireboltError): class CursorClosedError(CursorError): def __init__(self, method_name: str): self.method_name = method_name - super.__repr__ def __str__(self) -> str: return f"unable to call {self.method_name}: cursor closed" @@ -65,7 +47,6 @@ def __str__(self) -> str: class QueryNotRunError(CursorError): def __init__(self, method_name: str): self.method_name = method_name - super.__repr__ def __str__(self) -> str: return f"unable to call {self.method_name}: need to run a query first" diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index 5ae1142f1ec..2bb98d00c04 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -1,12 +1,14 @@ from __future__ import annotations +import functools import logging import time from datetime import datetime -from typing import TYPE_CHECKING, Annotated, Any, Optional +from typing import TYPE_CHECKING, Annotated, Any, Callable, Optional from pydantic import Field, PrivateAttr +from firebolt.common.exception import NoAttachedDatabaseError from firebolt.model import FireboltBaseModel from firebolt.model.binding import Binding from firebolt.model.database import Database @@ -65,6 +67,18 @@ def default( ) +def check_attached_to_database(func: Callable) -> Callable: + """(Decorator) Ensure the engine is attached to a database.""" + + @functools.wraps(func) + def inner(self: Engine, *args: Any, **kwargs: Any) -> Any: + if self.database is None: + raise NoAttachedDatabaseError(method_name=func.__name__) + return func(self, *args, **kwargs) + + return inner + + class Engine(FireboltBaseModel): """ A Firebolt engine. Responsible for performing work (queries, data ingestion). @@ -114,14 +128,21 @@ def engine_id(self) -> str: raise ValueError("engine key is None") return self.key.engine_id + @property + def database(self) -> Optional[Database]: + return ( + self._engine_service.resource_manager.bindings.get_database_bound_to_engine( + engine=self + ) + ) + def attach_to_database( - self, engine: Engine, database: Database, is_default_engine: bool + self, database: Database, is_default_engine: bool = False ) -> Binding: """ Attach this engine to a database. Args: - engine: Engine to attach to the database. database: Database to which the engine will be attached. is_default_engine: Whether this engine should be used as default for this database. @@ -129,9 +150,10 @@ def attach_to_database( This will overwrite any existing default. """ return self._engine_service.resource_manager.bindings.create_binding( - engine=engine, database=database, is_default_engine=is_default_engine + engine=self, database=database, is_default_engine=is_default_engine ) + @check_attached_to_database def start( self, wait_for_startup: bool = True, @@ -188,6 +210,7 @@ def start( status = new_status return engine + @check_attached_to_database def stop(self, engine: Engine) -> Engine: """Stop an Engine running on Firebolt.""" response = self._engine_service.client.post( diff --git a/src/firebolt/service/binding.py b/src/firebolt/service/binding.py index e20731909c8..59506081a80 100644 --- a/src/firebolt/service/binding.py +++ b/src/firebolt/service/binding.py @@ -1,6 +1,7 @@ from typing import Optional -from firebolt.common import AlreadyBoundError, prune_dict +from firebolt.common import prune_dict +from firebolt.common.exception import AlreadyBoundError from firebolt.model.binding import Binding, BindingKey from firebolt.model.database import Database from firebolt.model.engine import Engine From 6477fcd7146f6e81f61605ed4c7592561f23d41b Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Thu, 14 Oct 2021 12:30:00 -0700 Subject: [PATCH 06/14] database wip --- src/firebolt/model/database.py | 37 +++++++++++++++++--- src/firebolt/model/engine.py | 6 +++- src/firebolt/service/binding.py | 6 ++-- src/firebolt/service/database.py | 58 +++++++++++++++++++++++++------- src/firebolt/service/engine.py | 50 +++++++++++++++++++++++++-- src/firebolt/service/types.py | 58 ++++++++++++++++++++++++++++++++ 6 files changed, 191 insertions(+), 24 deletions(-) diff --git a/src/firebolt/model/database.py b/src/firebolt/model/database.py index f2149b2dfa0..0778323255b 100644 --- a/src/firebolt/model/database.py +++ b/src/firebolt/model/database.py @@ -1,13 +1,16 @@ from __future__ import annotations from datetime import datetime -from typing import Annotated, Optional +from typing import TYPE_CHECKING, Annotated, Any, Optional -from pydantic import Field +from pydantic import Field, PrivateAttr from firebolt.model import FireboltBaseModel from firebolt.model.region import RegionKey +if TYPE_CHECKING: + from firebolt.service.database import DatabaseService + class DatabaseKey(FireboltBaseModel): account_id: str @@ -15,6 +18,17 @@ class DatabaseKey(FireboltBaseModel): class Database(FireboltBaseModel): + """ + A Firebolt database. + + Databases belong to a region and have a description, + but otherwise are not configurable. + """ + + # internal + _database_service: DatabaseService = PrivateAttr() + + # required name: Annotated[str, Field(min_length=1, max_length=255, regex=r"^[0-9a-zA-Z_]+$")] compute_region_key: RegionKey = Field(alias="compute_region_id") @@ -34,11 +48,26 @@ class Database(FireboltBaseModel): last_update_actor: Optional[str] desired_status: Optional[str] - class Config: - allow_population_by_field_name = True + @classmethod + def parse_obj_with_service( + cls, obj: Any, database_service: DatabaseService + ) -> Database: + database = cls.parse_obj(obj) + database._database_service = database_service + return database @property def database_id(self) -> Optional[str]: if self.database_key is None: return None return self.database_key.database_id + + def delete(self, database_id: str) -> Database: + """Delete a database from Firebolt.""" + response = self._database_service.client.delete( + url=f"/core/v1/account/databases/{database_id}", + headers={"Content-type": "application/json"}, + ) + return Database.parse_obj_with_service( + response.json()["database"], self._database_service + ) diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index 2bb98d00c04..1b85950377e 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -136,6 +136,10 @@ def database(self) -> Optional[Database]: ) ) + @property + def url(self) -> Optional[str]: + return self.endpoint + def attach_to_database( self, database: Database, is_default_engine: bool = False ) -> Binding: @@ -200,7 +204,7 @@ def start( raise TimeoutError( f"Could not start engine within {wait_timeout_seconds} seconds." ) - engine = self._engine_service.get_by_id(engine_id=engine.engine_id) + engine = self._engine_service.get(engine_id=engine.engine_id) new_status = engine.current_status_summary if new_status != status: logger.info(f"Engine status_summary={new_status}") diff --git a/src/firebolt/service/binding.py b/src/firebolt/service/binding.py index 59506081a80..576ddea19d5 100644 --- a/src/firebolt/service/binding.py +++ b/src/firebolt/service/binding.py @@ -47,7 +47,7 @@ def list_bindings( url=f"/core/v1/accounts/{self.account_id}/bindings", params=prune_dict( { - "page.first": 5000, # FUTURE: consider changing this to a generator + "page.first": 5000, # FUTURE: pagination support w/ generator "filter.id_database_id_eq": database_id, "filter.id_engine_id_eq": engine_id, "filter.is_system_database_eq": is_system_database, @@ -60,9 +60,7 @@ def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: """Get the Database to which an engine is bound, if any.""" try: binding = self.list_bindings(engine_id=engine.engine_id)[0] - return self.resource_manager.databases.get_by_id( - database_id=binding.database_id - ) + return self.resource_manager.databases.get(database_id=binding.database_id) except IndexError: return None diff --git a/src/firebolt/service/database.py b/src/firebolt/service/database.py index a9645542374..305966be538 100644 --- a/src/firebolt/service/database.py +++ b/src/firebolt/service/database.py @@ -1,21 +1,25 @@ +from typing import Union + from firebolt.model import FireboltBaseModel from firebolt.model.database import Database from firebolt.service.base import BaseService +from firebolt.service.types import DatabaseOrder class DatabaseService(BaseService): - def get_by_id(self, database_id: str) -> Database: + def get(self, database_id: str) -> Database: """Get a Database from Firebolt by its id.""" response = self.client.get( url=f"/core/v1/accounts/{self.account_id}/databases/{database_id}", ) - database_spec: dict = response.json()["database"] - return Database.parse_obj(database_spec) + return Database.parse_obj_with_service( + obj=response.json()["database"], database_service=self + ) def get_by_name(self, database_name: str) -> Database: """Get a Database from Firebolt by its name.""" database_id = self.get_id_by_name(database_name=database_name) - return self.get_by_id(database_id=database_id) + return self.get(database_id=database_id) def get_id_by_name(self, database_name: str) -> str: """Get a Database id from Firebolt by its name.""" @@ -26,6 +30,42 @@ def get_id_by_name(self, database_name: str) -> str: database_id = response.json()["database_id"]["database_id"] return database_id + def list( + self, + name_contains: str, + attached_engine_name_eq: str, + attached_engine_name_contains: str, + order_by: Union[str, DatabaseOrder], + ) -> list[Database]: + """ + Get a list of databases on Firebolt. + + Args: + name_contains: Filter for databases with a name containing this substring. + attached_engine_name_eq: Filter for databases by an exact engine name. + attached_engine_name_contains: Filter for databases by engines with a + name containing this substring. + order_by: Method by which to order the results. See [DatabaseOrder]. + + Returns: + A list of databases matching the filters. + """ + if isinstance(order_by, str): + order_by = DatabaseOrder[order_by] + response = self.client.get( + url=f"/core/v1/account/databases", + params={ + "filter.name_contains": name_contains, + "filter.attached_engine_name_eq": attached_engine_name_eq, + "filter.attached_engine_name_contains": attached_engine_name_contains, + "order_by": order_by.name, + }, + ) + return [ + Database.parse_obj_with_service(obj=d, database_service=self) + for d in response.json()["databases"] + ] + def create(self, database_name: str, region_name: str) -> Database: """ Create a new Database on Firebolt. @@ -57,12 +97,6 @@ class _DatabaseCreateRequest(FireboltBaseModel): database=database, ).dict(by_alias=True), ) - return Database.parse_obj(response.json()["database"]) - - def delete(self, database_id: str) -> Database: - """Delete a database from Firebolt.""" - response = self.client.delete( - url=f"/core/v1/account/databases/{database_id}", - headers={"Content-type": "application/json"}, + return Database.parse_obj_with_service( + obj=response.json()["database"], database_service=self ) - return Database.parse_obj(response.json()["database"]) diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index cfd7639ec22..f199203f97f 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -2,6 +2,7 @@ import logging from typing import Optional, Union +from firebolt.common import prune_dict from firebolt.model import FireboltBaseModel from firebolt.model.engine import Engine, EngineSettings from firebolt.model.engine_revision import ( @@ -10,13 +11,13 @@ ) from firebolt.model.region import Region from firebolt.service.base import BaseService -from firebolt.service.types import EngineType, WarmupMethod +from firebolt.service.types import EngineOrder, EngineType, WarmupMethod logger = logging.getLogger(__name__) class EngineService(BaseService): - def get_by_id(self, engine_id: str) -> Engine: + def get(self, engine_id: str) -> Engine: """Get an Engine from Firebolt by its id.""" response = self.client.get( url=f"/core/v1/accounts/{self.account_id}/engines/{engine_id}", @@ -47,7 +48,50 @@ def get_by_name(self, engine_name: str) -> Engine: params={"engine_name": engine_name}, ) engine_id = response.json()["engine_id"]["engine_id"] - return self.get_by_id(engine_id=engine_id) + return self.get(engine_id=engine_id) + + def list( + self, + name_contains: str, + current_status_eq: str, + current_status_not_eq: str, + region_eq: str, + order_by: Union[str, EngineOrder], + ) -> list[Engine]: + """ + Get a list of engines on Firebolt. + + Args: + name_contains: Filter for engines with a name containing this substring. + current_status_eq: Filter for engines with this status. + current_status_not_eq: Filter for engines that do not have this status. + region_eq: Filter for engines by region. + order_by: Method by which to order the results. See [EngineOrder]. + + Returns: + A list of engines matching the filters. + """ + if isinstance(order_by, str): + order_by = EngineOrder[order_by] + response = self.client.get( + url=f"/core/v1/account/engines", + params=prune_dict( + { + "page.first": 5000, # FUTURE: pagination support w/ generator + "filter.name_contains": name_contains, + "filter.current_status_eq": current_status_eq, + "filter.current_status_not_eq": current_status_not_eq, + "filter.compute_region_id_region_id_eq": self.resource_manager.regions.get_by_name( # noqa: E501 + region_name=region_eq + ), + "order_by": order_by.name, + } + ), + ) + return [ + Engine.parse_obj_with_service(obj=e, engine_service=self) + for e in response.json()["engines"] + ] def create( self, diff --git a/src/firebolt/service/types.py b/src/firebolt/service/types.py index c3b2ebdc02f..4d26e8fb5d5 100644 --- a/src/firebolt/service/types.py +++ b/src/firebolt/service/types.py @@ -85,3 +85,61 @@ class EngineStatusSummary(Enum): # If engine enters this status, # all infrastructure should be stopped/terminated already. ENGINE_STATUS_SUMMARY_FAILED = "ENGINE_STATUS_SUMMARY_FAILED" + + +class EngineOrder(Enum): + ENGINE_ORDER_UNSPECIFIED = "ENGINE_ORDER_UNSPECIFIED" + ENGINE_ORDER_NAME_ASC = "ENGINE_ORDER_NAME_ASC" + ENGINE_ORDER_NAME_DESC = "ENGINE_ORDER_NAME_DESC" + ENGINE_ORDER_COMPUTE_REGION_ID_ASC = "ENGINE_ORDER_COMPUTE_REGION_ID_ASC" + ENGINE_ORDER_COMPUTE_REGION_ID_DESC = "ENGINE_ORDER_COMPUTE_REGION_ID_DESC" + ENGINE_ORDER_CURRENT_STATUS_ASC = "ENGINE_ORDER_CURRENT_STATUS_ASC" + ENGINE_ORDER_CURRENT_STATUS_DESC = "ENGINE_ORDER_CURRENT_STATUS_DESC" + ENGINE_ORDER_CREATE_TIME_ASC = "ENGINE_ORDER_CREATE_TIME_ASC" + ENGINE_ORDER_CREATE_TIME_DESC = "ENGINE_ORDER_CREATE_TIME_DESC" + ENGINE_ORDER_CREATE_ACTOR_ASC = "ENGINE_ORDER_CREATE_ACTOR_ASC" + ENGINE_ORDER_CREATE_ACTOR_DESC = "ENGINE_ORDER_CREATE_ACTOR_DESC" + ENGINE_ORDER_LAST_UPDATE_TIME_ASC = "ENGINE_ORDER_LAST_UPDATE_TIME_ASC" + ENGINE_ORDER_LAST_UPDATE_TIME_DESC = "ENGINE_ORDER_LAST_UPDATE_TIME_DESC" + ENGINE_ORDER_LAST_UPDATE_ACTOR_ASC = "ENGINE_ORDER_LAST_UPDATE_ACTOR_ASC" + ENGINE_ORDER_LAST_UPDATE_ACTOR_DESC = "ENGINE_ORDER_LAST_UPDATE_ACTOR_DESC" + ENGINE_ORDER_LATEST_REVISION_CURRENT_STATUS_ASC = ( + "ENGINE_ORDER_LATEST_REVISION_CURRENT_STATUS_ASC" + ) + ENGINE_ORDER_LATEST_REVISION_CURRENT_STATUS_DESC = ( + "ENGINE_ORDER_LATEST_REVISION_CURRENT_STATUS_DESC" + ) + ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_COUNT_ASC = ( + "ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_COUNT_ASC" + ) + ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_COUNT_DESC = ( + "ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_COUNT_DESC" + ) + ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_TYPE_ID_ASC = ( + "ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_TYPE_ID_ASC" + ) + ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_TYPE_ID_DESC = ( + "ENGINE_ORDER_LATEST_REVISION_SPECIFICATION_DB_COMPUTE_INSTANCES_TYPE_ID_DESC" + ) + + +class DatabaseOrder(Enum): + DATABASE_ORDER_UNSPECIFIED = "DATABASE_ORDER_UNSPECIFIED" + DATABASE_ORDER_NAME_ASC = "DATABASE_ORDER_NAME_ASC" + DATABASE_ORDER_NAME_DESC = "DATABASE_ORDER_NAME_DESC" + DATABASE_ORDER_COMPUTE_REGION_ID_ASC = "DATABASE_ORDER_COMPUTE_REGION_ID_ASC" + DATABASE_ORDER_COMPUTE_REGION_ID_DESC = "DATABASE_ORDER_COMPUTE_REGION_ID_DESC" + DATABASE_ORDER_DATA_SIZE_FULL_ASC = "DATABASE_ORDER_DATA_SIZE_FULL_ASC" + DATABASE_ORDER_DATA_SIZE_FULL_DESC = "DATABASE_ORDER_DATA_SIZE_FULL_DESC" + DATABASE_ORDER_DATA_SIZE_COMPRESSED_ASC = "DATABASE_ORDER_DATA_SIZE_COMPRESSED_ASC" + DATABASE_ORDER_DATA_SIZE_COMPRESSED_DESC = ( + "DATABASE_ORDER_DATA_SIZE_COMPRESSED_DESC" + ) + DATABASE_ORDER_CREATE_TIME_ASC = "DATABASE_ORDER_CREATE_TIME_ASC" + DATABASE_ORDER_CREATE_TIME_DESC = "DATABASE_ORDER_CREATE_TIME_DESC" + DATABASE_ORDER_CREATE_ACTOR_ASC = "DATABASE_ORDER_CREATE_ACTOR_ASC" + DATABASE_ORDER_CREATE_ACTOR_DESC = "DATABASE_ORDER_CREATE_ACTOR_DESC" + DATABASE_ORDER_LAST_UPDATE_TIME_ASC = "DATABASE_ORDER_LAST_UPDATE_TIME_ASC" + DATABASE_ORDER_LAST_UPDATE_TIME_DESC = "DATABASE_ORDER_LAST_UPDATE_TIME_DESC" + DATABASE_ORDER_LAST_UPDATE_ACTOR_ASC = "DATABASE_ORDER_LAST_UPDATE_ACTOR_ASC" + DATABASE_ORDER_LAST_UPDATE_ACTOR_DESC = "DATABASE_ORDER_LAST_UPDATE_ACTOR_DESC" From 5f74669607516c84e1241ddff8db61c3040a4cc4 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Thu, 14 Oct 2021 17:14:07 -0700 Subject: [PATCH 07/14] database wip --- src/firebolt/common/exception.py | 15 ++++++++++ src/firebolt/model/database.py | 47 +++++++++++++++++++++++++++++++- src/firebolt/model/engine.py | 1 - src/firebolt/service/types.py | 2 ++ 4 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/firebolt/common/exception.py b/src/firebolt/common/exception.py index eb63d94342d..bb7f022ef77 100644 --- a/src/firebolt/common/exception.py +++ b/src/firebolt/common/exception.py @@ -24,6 +24,21 @@ class AlreadyBoundError(FireboltEngineError): pass +class FireboltDatabaseError(FireboltError): + pass + + +class AttachedEngineInUseError(FireboltDatabaseError): + def __init__(self, method_name: str): + self.method_name = method_name + + def __str__(self) -> str: + return ( + f"unable to call {self.method_name}: " + f"engine must not be in starting or stopping state." + ) + + class ConnectionError(FireboltError): pass diff --git a/src/firebolt/model/database.py b/src/firebolt/model/database.py index 0778323255b..25a40da8829 100644 --- a/src/firebolt/model/database.py +++ b/src/firebolt/model/database.py @@ -1,14 +1,19 @@ from __future__ import annotations +import functools from datetime import datetime -from typing import TYPE_CHECKING, Annotated, Any, Optional +from typing import TYPE_CHECKING, Annotated, Any, Callable, Optional from pydantic import Field, PrivateAttr +from firebolt.common.exception import AttachedEngineInUseError from firebolt.model import FireboltBaseModel from firebolt.model.region import RegionKey +from firebolt.service.types import EngineStatusSummary if TYPE_CHECKING: + from firebolt.model.binding import Binding + from firebolt.model.engine import Engine from firebolt.service.database import DatabaseService @@ -17,6 +22,22 @@ class DatabaseKey(FireboltBaseModel): database_id: str +def check_no_attached_starting_stopping_engines(func: Callable) -> Callable: + """(Decorator) Ensure no attached engines are starting/stopping""" + + @functools.wraps(func) + def inner(self: Database, *args: Any, **kwargs: Any) -> Any: + for engine in self.get_attached_engines(): + if engine.current_status_summary in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, + }: + raise AttachedEngineInUseError(method_name=func.__name__) + return func(self, *args, **kwargs) + + return inner + + class Database(FireboltBaseModel): """ A Firebolt database. @@ -62,6 +83,30 @@ def database_id(self) -> Optional[str]: return None return self.database_key.database_id + def get_attached_engines(self) -> list[Engine]: + """Get a list of engines that are attached to this database.""" + return self._database_service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501 + database=self + ) + + def attach_to_engine( + self, engine: Engine, is_default_engine: bool = False + ) -> Binding: + """ + Attach an engine to this database. + + Args: + engine: The engine to attach. + is_default_engine: + Whether this engine should be used as default for this database. + Only one engine can be set as default for a single database. + This will overwrite any existing default. + """ + return self._database_service.resource_manager.bindings.create_binding( + engine=engine, database=self, is_default_engine=is_default_engine + ) + + @check_no_attached_starting_stopping_engines def delete(self, database_id: str) -> Database: """Delete a database from Firebolt.""" response = self._database_service.client.delete( diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index 1b85950377e..6c53bb82a78 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -195,7 +195,6 @@ def start( start_time = time.time() end_time = start_time + wait_timeout_seconds - # summary statuses: https://tinyurl.com/as7a9ru9 while ( wait_for_startup and status != EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING.name diff --git a/src/firebolt/service/types.py b/src/firebolt/service/types.py index 4d26e8fb5d5..601f216de0a 100644 --- a/src/firebolt/service/types.py +++ b/src/firebolt/service/types.py @@ -29,6 +29,8 @@ def api_name(self) -> str: class EngineStatusSummary(Enum): + """Engine summary status. See: https://tinyurl.com/as7a9ru9""" + ENGINE_STATUS_SUMMARY_UNSPECIFIED = "ENGINE_STATUS_SUMMARY_UNSPECIFIED" # Fully stopped. From 0c6321b3733bd7006f49e0900f8d8ade0361097b Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Fri, 15 Oct 2021 11:14:43 -0700 Subject: [PATCH 08/14] engine startup fix --- examples.ipynb | 18 ++++--- src/firebolt/model/engine.py | 94 +++++++++++++++++++++++++++-------- src/firebolt/service/types.py | 86 +++++++++++++++++++++++++++++++- 3 files changed, 167 insertions(+), 31 deletions(-) diff --git a/examples.ipynb b/examples.ipynb index e94e178d980..d231e4c1c91 100644 --- a/examples.ipynb +++ b/examples.ipynb @@ -139,14 +139,6 @@ "rm.databases.create(database_name=database_name, region_name=\"us-east-1\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "09bfb0c2", - "metadata": {}, - "outputs": [], - "source": [] - }, { "cell_type": "markdown", "id": "bf78ffc5", @@ -234,6 +226,16 @@ "debug(engine)" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "870823a5", + "metadata": {}, + "outputs": [], + "source": [ + "engine.dict()" + ] + }, { "cell_type": "markdown", "id": "b241f1a4", diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index 6c53bb82a78..a2b74e891db 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -15,6 +15,7 @@ from firebolt.model.engine_revision import EngineRevisionKey from firebolt.model.region import RegionKey from firebolt.service.types import ( + EngineStatus, EngineStatusSummary, EngineType, WarmupMethod, @@ -98,8 +99,8 @@ class Engine(FireboltBaseModel): key: Optional[EngineKey] = Field(alias="id") description: Optional[str] emoji: Optional[str] - current_status: Optional[str] - current_status_summary: Optional[str] + current_status: Optional[EngineStatus] + current_status_summary: Optional[EngineStatusSummary] latest_revision_key: Optional[EngineRevisionKey] = Field(alias="latest_revision_id") endpoint: Optional[str] endpoint_serving_revision_key: Optional[EngineRevisionKey] = Field( @@ -140,6 +141,10 @@ def database(self) -> Optional[Database]: def url(self) -> Optional[str]: return self.endpoint + def refresh(self) -> Engine: + """Get an up-to-date instance of the Engine from Firebolt.""" + return self._engine_service.get(engine_id=self.engine_id) + def attach_to_database( self, database: Database, is_default_engine: bool = False ) -> Binding: @@ -181,38 +186,83 @@ def start( Returns: The updated Engine from Firebolt. """ - response = self._engine_service.client.post( - url=f"/core/v1/account/engines/{self.engine_id}:start", - ) - engine = Engine.parse_obj_with_service( - obj=response.json()["engine"], engine_service=self._engine_service - ) - status = engine.current_status_summary + start_time = time.time() + timeout_time = start_time + wait_timeout_seconds + + engine = self.refresh() + if ( + engine.current_status_summary + == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING + ): + logger.info( + f"Engine (engine_id={self.engine_id}, name={self.name}) " + f"is already running." + ) + return engine + + # wait for engine to stop first, if it's already stopping + elif ( + engine.current_status_summary + == EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING + ): + logger.info( + f"Engine (engine_id={engine.engine_id}, name={engine.name}) " + f"is in currently stopping, waiting for it to stop first." + ) + while ( + engine.current_status_summary + != EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED + ): + if time.time() >= timeout_time: + raise TimeoutError( + f"Engine (engine_id={engine.engine_id}, name={engine.name}) " + f"did not stop within {wait_timeout_seconds} seconds." + ) + time.sleep(5) + engine = engine.refresh() + if print_dots: + print(".", end="") + + logger.info( + f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." + ) + + engine = self._send_start() logger.info( - f"Starting Engine engine_id={engine.engine_id} " - f"name={engine.name} status_summary={status}" + f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})" ) - start_time = time.time() - end_time = start_time + wait_timeout_seconds + # wait for engine to start while ( wait_for_startup - and status != EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING.name + and engine.current_status_summary + != EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING ): - if time.time() >= end_time: + if time.time() >= timeout_time: raise TimeoutError( f"Could not start engine within {wait_timeout_seconds} seconds." ) - engine = self._engine_service.get(engine_id=engine.engine_id) - new_status = engine.current_status_summary - if new_status != status: - logger.info(f"Engine status_summary={new_status}") - elif print_dots: - print(".", end="") + previous_status_summary = engine.current_status_summary time.sleep(5) - status = new_status + engine = engine.refresh() + if engine.current_status_summary != previous_status_summary: + logger.info( + f"Engine status_summary=" + f"{getattr(engine.current_status_summary, 'name')}" + ) + if print_dots: + print(".", end="") + return engine + def _send_start(self) -> Engine: + response = self._engine_service.client.post( + url=f"/core/v1/account/engines/{self.engine_id}:start", + ) + return Engine.parse_obj_with_service( + obj=response.json()["engine"], engine_service=self._engine_service + ) + @check_attached_to_database def stop(self, engine: Engine) -> Engine: """Stop an Engine running on Firebolt.""" diff --git a/src/firebolt/service/types.py b/src/firebolt/service/types.py index 601f216de0a..de2df6a312e 100644 --- a/src/firebolt/service/types.py +++ b/src/firebolt/service/types.py @@ -28,8 +28,92 @@ def api_name(self) -> str: }[self] +class EngineStatus(Enum): + """ + Detailed engine status. + + See: https://api.dev.firebolt.io/devDocs#operation/coreV1GetEngine + """ + + ENGINE_STATUS_UNSPECIFIED = "ENGINE_STATUS_UNSPECIFIED" + + # Logical record is created, however underlying infrastructure is not initialized. + # In other words this means that engine is stopped. + ENGINE_STATUS_CREATED = "ENGINE_STATUS_CREATED" + + # Engine initialization request was sent. + ENGINE_STATUS_PROVISIONING_PENDING = "ENGINE_STATUS_PROVISIONING_PENDING" + + # Engine initialization request was received and initialization process started. + ENGINE_STATUS_PROVISIONING_STARTED = "ENGINE_STATUS_PROVISIONING_STARTED" + + # Engine initialization was finished successfully. + ENGINE_STATUS_PROVISIONING_FINISHED = "ENGINE_STATUS_PROVISIONING_FINISHED" + + # Engine initialization failed due to error. + ENGINE_STATUS_PROVISIONING_FAILED = "ENGINE_STATUS_PROVISIONING_FAILED" + + # Engine is initialized, but there are no running or starting engine revisions. + ENGINE_STATUS_RUNNING_IDLE = "ENGINE_STATUS_RUNNING_IDLE" + + # Engine is initialized, there are no running engine revision but it's starting. + ENGINE_STATUS_RUNNING_REVISION_STARTING = "ENGINE_STATUS_RUNNING_REVISION_STARTING" + + # Engine is initialized, initial revision is failed to provision or start. + ENGINE_STATUS_RUNNING_REVISION_STARTUP_FAILED = ( + "ENGINE_STATUS_RUNNING_REVISION_STARTUP_FAILED" + ) + + # Engine is ready (serves an engine revision). + ENGINE_STATUS_RUNNING_REVISION_SERVING = "ENGINE_STATUS_RUNNING_REVISION_SERVING" + + # Engine is ready (serves an engine revision), + # zero-downtime replacement revision is starting. + ENGINE_STATUS_RUNNING_REVISION_CHANGING = "ENGINE_STATUS_RUNNING_REVISION_CHANGING" + + # Engine is ready (serves an engine revision), + # replacement revision failed to provision or start. + ENGINE_STATUS_RUNNING_REVISION_CHANGE_FAILED = ( + "ENGINE_STATUS_RUNNING_REVISION_CHANGE_FAILED" + ) + + # Engine is initialized, replacement of the revision with a downtime is in progress. + ENGINE_STATUS_RUNNING_REVISION_RESTARTING = ( + "ENGINE_STATUS_RUNNING_REVISION_RESTARTING" + ) + + # Engine is initialized, replacement revision failed to provision or start. + ENGINE_STATUS_RUNNING_REVISION_RESTART_FAILED = ( + "ENGINE_STATUS_RUNNING_REVISION_RESTART_FAILED" + ) + + # Engine is initialized, all child revisions are being terminated. + ENGINE_STATUS_RUNNING_REVISIONS_TERMINATING = ( + "ENGINE_STATUS_RUNNING_REVISIONS_TERMINATING" + ) + + # Engine termination request was sent. + ENGINE_STATUS_TERMINATION_PENDING = "ENGINE_STATUS_TERMINATION_PENDING" + + # Engine termination started. + ENGINE_STATUS_TERMINATION_ST = "ENGINE_STATUS_TERMINATION_STARTED" + + # Engine termination finished. + ENGINE_STATUS_TERMINATION_FIN = "ENGINE_STATUS_TERMINATION_FINISHED" + + # Engine termination failed. + ENGINE_STATUS_TERMINATION_F = "ENGINE_STATUS_TERMINATION_FAILED" + + # Engine is soft-deleted. + ENGINE_STATUS_DELETED = "ENGINE_STATUS_DELETED" + + class EngineStatusSummary(Enum): - """Engine summary status. See: https://tinyurl.com/as7a9ru9""" + """ + Engine summary status. + + See: https://api.dev.firebolt.io/devDocs#operation/coreV1GetEngine + """ ENGINE_STATUS_SUMMARY_UNSPECIFIED = "ENGINE_STATUS_SUMMARY_UNSPECIFIED" From 5b4d72c7b6a9f634dcc3db3c809f9cf5ffc3a91a Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Fri, 15 Oct 2021 13:01:50 -0700 Subject: [PATCH 09/14] misc refactors --- README.md | 18 +-- src/firebolt/common/__init__.py | 5 - src/firebolt/common/util.py | 3 + src/firebolt/model/__init__.py | 16 +++ src/firebolt/service/binding.py | 4 +- src/firebolt/service/database.py | 23 ++-- src/firebolt/service/engine.py | 17 ++- src/firebolt/service/engine_revision.py | 103 +----------------- .../resource_manager/test_engine.py | 47 ++++++++ .../{model => service}/test_instance_type.py | 0 tests/unit/{model => service}/test_region.py | 0 11 files changed, 98 insertions(+), 138 deletions(-) create mode 100644 src/firebolt/common/util.py create mode 100644 tests/integration/resource_manager/test_engine.py rename tests/unit/{model => service}/test_instance_type.py (100%) rename tests/unit/{model => service}/test_region.py (100%) diff --git a/README.md b/README.md index a3847cc9c1d..4bb84df698b 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,8 @@ you can initialize a ResourceManager with: ```python from firebolt.service.manager import ResourceManager -with ResourceManager() as rm: - print(rm.regions.default_region) # see your default region +rm = ResourceManager() +print(rm.regions.default_region) # see your default region ``` Or you can configure settings manually: @@ -37,13 +37,13 @@ from firebolt.service.manager import ResourceManager from firebolt.common.settings import Settings from pydantic import SecretStr -with ResourceManager(settings=Settings( - server="api.app.firebolt.io", - user="email@domain.com", - password=SecretStr("*****"), - default_region="us-east-1", -)) as rm: - print(rm.client.account_id) # see your account id +rm = ResourceManager(settings=Settings( + server="api.app.firebolt.io", + user="email@domain.com", + password=SecretStr("*****"), + default_region="us-east-1", +)) +print(rm.client.account_id) # see your account id ``` Under the hood, configuration works via Pydantic, diff --git a/src/firebolt/common/__init__.py b/src/firebolt/common/__init__.py index e9718def791..f5b83f2411d 100644 --- a/src/firebolt/common/__init__.py +++ b/src/firebolt/common/__init__.py @@ -1,6 +1 @@ from firebolt.common.settings import Settings - - -def prune_dict(d: dict) -> dict: - """Prune items from dictionaries where value is None""" - return {k: v for k, v in d.items() if v is not None} diff --git a/src/firebolt/common/util.py b/src/firebolt/common/util.py new file mode 100644 index 00000000000..ddaac5e18a2 --- /dev/null +++ b/src/firebolt/common/util.py @@ -0,0 +1,3 @@ +def prune_dict(d: dict) -> dict: + """Prune items from dictionaries where value is None""" + return {k: v for k, v in d.items() if v is not None} diff --git a/src/firebolt/model/__init__.py b/src/firebolt/model/__init__.py index f9cdcd1e9d3..45e7d1289d9 100644 --- a/src/firebolt/model/__init__.py +++ b/src/firebolt/model/__init__.py @@ -1,3 +1,5 @@ +import json + from pydantic import BaseModel @@ -5,3 +7,17 @@ class FireboltBaseModel(BaseModel): class Config: allow_population_by_field_name = True extra = "forbid" + + def jsonable_dict(self, *args, **kwargs) -> dict: + """ + Generate a dictionary representation of the service that is contains serialized + primitive types, and is therefore json-ready. + + This could be replaced with something native, once this issue is resolved: + https://github.com/samuelcolvin/pydantic/issues/1409 + + This function is intended to improve the compatibility with httpx, which + expects to take in a dictionary of primitives as input to the json parameter + of its request function. See: https://www.python-httpx.org/api/#helper-functions + """ + return json.loads(self.json(*args, **kwargs)) diff --git a/src/firebolt/service/binding.py b/src/firebolt/service/binding.py index 576ddea19d5..a88d99c17a5 100644 --- a/src/firebolt/service/binding.py +++ b/src/firebolt/service/binding.py @@ -1,7 +1,7 @@ from typing import Optional -from firebolt.common import prune_dict from firebolt.common.exception import AlreadyBoundError +from firebolt.common.util import prune_dict from firebolt.model.binding import Binding, BindingKey from firebolt.model.database import Database from firebolt.model.engine import Engine @@ -108,7 +108,7 @@ def create_binding( url=f"/core/v1/accounts/{self.account_id}" f"/databases/{database.database_id}" f"/bindings/{engine.engine_id}", - json=binding.dict( + json=binding.jsonable_dict( by_alias=True, include={"binding_key": ..., "is_default_engine": ...} ), ) diff --git a/src/firebolt/service/database.py b/src/firebolt/service/database.py index 305966be538..173d2962ce0 100644 --- a/src/firebolt/service/database.py +++ b/src/firebolt/service/database.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import Optional, Union from firebolt.model import FireboltBaseModel from firebolt.model.database import Database @@ -66,28 +66,31 @@ def list( for d in response.json()["databases"] ] - def create(self, database_name: str, region_name: str) -> Database: + def create(self, name: str, region: Optional[str] = None) -> Database: """ Create a new Database on Firebolt. Args: - database_name: Name of the database. - region_name: Region name in which to create the database. + name: Name of the database. + region: Region name in which to create the database. Returns: The newly created Database. """ class _DatabaseCreateRequest(FireboltBaseModel): - """Helper model for sending Database creation requests.""" + """Helper service for sending Database creation requests.""" account_id: str database: Database - region_key = self.resource_manager.regions.get_by_name( - region_name=region_name - ).key - database = Database(name=database_name, compute_region_key=region_key) + if region is None: + region_key = self.resource_manager.regions.default_region.key + else: + region_key = self.resource_manager.regions.get_by_name( + region_name=region + ).key + database = Database(name=name, compute_region_key=region_key) response = self.client.post( url=f"/core/v1/accounts/{self.account_id}/databases", @@ -95,7 +98,7 @@ class _DatabaseCreateRequest(FireboltBaseModel): json=_DatabaseCreateRequest( account_id=self.account_id, database=database, - ).dict(by_alias=True), + ).jsonable_dict(by_alias=True), ) return Database.parse_obj_with_service( obj=response.json()["database"], database_service=self diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index f199203f97f..aca1eafeca3 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -1,8 +1,7 @@ -import json import logging from typing import Optional, Union -from firebolt.common import prune_dict +from firebolt.common.util import prune_dict from firebolt.model import FireboltBaseModel from firebolt.model.engine import Engine, EngineSettings from firebolt.model.engine_revision import ( @@ -181,7 +180,7 @@ def _send_create_engine( """ class _EngineCreateRequest(FireboltBaseModel): - """Helper model for sending Engine create requests.""" + """Helper service for sending Engine create requests.""" account_id: str engine: Engine @@ -190,13 +189,11 @@ class _EngineCreateRequest(FireboltBaseModel): response = self.client.post( url="/core/v1/account/engines", headers={"Content-type": "application/json"}, - json=json.loads( - _EngineCreateRequest( - account_id=self.account_id, - engine=engine, - engine_revision=engine_revision, - ).json(by_alias=True) - ), + json=_EngineCreateRequest( + account_id=self.account_id, + engine=engine, + engine_revision=engine_revision, + ).jsonable_dict(by_alias=True), ) return Engine.parse_obj_with_service( obj=response.json()["engine"], engine_service=self diff --git a/src/firebolt/service/engine_revision.py b/src/firebolt/service/engine_revision.py index 748412d69c5..3fdf87a540f 100644 --- a/src/firebolt/service/engine_revision.py +++ b/src/firebolt/service/engine_revision.py @@ -1,10 +1,4 @@ -from typing import Optional - -from firebolt.model.engine_revision import ( - EngineRevision, - EngineRevisionKey, - EngineRevisionSpecification, -) +from firebolt.model.engine_revision import EngineRevision, EngineRevisionKey from firebolt.service.base import BaseService @@ -40,98 +34,3 @@ def get_engine_revision_by_key( ) engine_spec: dict = response.json()["engine_revision"] return EngineRevision.parse_obj(engine_spec) - - def create_analytics_engine_revision( - self, - compute_instance_type_name: Optional[str] = None, - compute_instance_count: Optional[int] = None, - ) -> EngineRevision: - """Create a local EngineRevision with default settings for analytics.""" - return EngineRevision( - specification=self.create_analytics_engine_revision_specification( - compute_instance_type_name=compute_instance_type_name, - compute_instance_count=compute_instance_count, - ) - ) - - def create_general_purpose_engine_revision( - self, - compute_instance_type_name: Optional[str] = None, - compute_instance_count: Optional[int] = None, - ) -> EngineRevision: - """ - Create a local EngineRevision with default settings for - general purpose usage / data ingestion. - """ - return EngineRevision( - specification=self.create_general_purpose_engine_revision_specification( - compute_instance_type_name=compute_instance_type_name, - compute_instance_count=compute_instance_count, - ) - ) - - def create_analytics_engine_revision_specification( - self, - compute_instance_type_name: Optional[str] = None, - compute_instance_count: Optional[int] = None, - ) -> EngineRevisionSpecification: - """ - Default EngineRevisionSpecification for analytics (querying). - - Args: - compute_instance_type_name: Name of the instance type to use for the Engine. - compute_instance_count: Number of instances to use for the Engine. - - Returns: - A default Specification, updated with any user-defined settings. - """ - if compute_instance_type_name is None: - compute_instance_type_name = "m5d.4xlarge" - if compute_instance_count is None: - compute_instance_count = 1 - - instance_type_key = self.resource_manager.instance_types.get_by_name( - instance_type_name=compute_instance_type_name - ).key - return EngineRevisionSpecification( - db_compute_instances_type_key=instance_type_key, - db_compute_instances_count=compute_instance_count, - db_compute_instances_use_spot=False, - db_version="", - proxy_instances_type_key=instance_type_key, - proxy_instances_count=1, - proxy_version="", - ) - - def create_general_purpose_engine_revision_specification( - self, - compute_instance_type_name: Optional[str] = None, - compute_instance_count: Optional[int] = None, - ) -> EngineRevisionSpecification: - """ - Default EngineRevisionSpecification for general purpose / data ingestion. - - Args: - compute_instance_type_name: Name of the instance type to use for the Engine. - compute_instance_count: Number of instances to use for the Engine. - - Returns: - A default Specification, updated with any user-defined settings. - """ - if compute_instance_type_name is None: - compute_instance_type_name = "i3.4xlarge" - if compute_instance_count is None: - compute_instance_count = 2 - - instance_type_key = self.resource_manager.instance_types.get_by_name( - instance_type_name=compute_instance_type_name - ).key - return EngineRevisionSpecification( - db_compute_instances_type_key=instance_type_key, - db_compute_instances_count=compute_instance_count, - db_compute_instances_use_spot=False, - db_version="", - proxy_instances_type_key=instance_type_key, - proxy_instances_count=1, - proxy_version="", - ) diff --git a/tests/integration/resource_manager/test_engine.py b/tests/integration/resource_manager/test_engine.py new file mode 100644 index 00000000000..3bdc7127443 --- /dev/null +++ b/tests/integration/resource_manager/test_engine.py @@ -0,0 +1,47 @@ +import time + +from firebolt.service.manager import ResourceManager +from firebolt.service.types import EngineStatusSummary + + +def test_create_start_stop_engine(): + rm = ResourceManager() + name = f"integration_test_{int(time.time())}" + + engine = rm.engines.create(name=name) + assert engine.name == name + + database = rm.databases.create(name=name) + assert database.name == name + + engine.attach_to_database(database=database) + assert engine.database == database + + engine = engine.start() + assert ( + engine.current_status_summary + == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING + ) + + engine = engine.stop() + assert engine.current_status_summary in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED, + } + + +def test_copy_engine(): + rm = ResourceManager() + name = f"integration_test_{int(time.time())}" + + engine = rm.engines.create(name=name) + assert engine.name == name + + engine.name = f"{engine.name}_copy" + engine_copy = rm.engines._send_create_engine( + engine=engine, + engine_revision=rm.engine_revisions.get_engine_revision_by_key( + engine.latest_revision_key + ), + ) + assert engine_copy diff --git a/tests/unit/model/test_instance_type.py b/tests/unit/service/test_instance_type.py similarity index 100% rename from tests/unit/model/test_instance_type.py rename to tests/unit/service/test_instance_type.py diff --git a/tests/unit/model/test_region.py b/tests/unit/service/test_region.py similarity index 100% rename from tests/unit/model/test_region.py rename to tests/unit/service/test_region.py From 5be6501562c68cc8dc824e36497ff02fa715f8d0 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Fri, 15 Oct 2021 13:25:16 -0700 Subject: [PATCH 10/14] fix mypy errors --- src/firebolt/model/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/firebolt/model/__init__.py b/src/firebolt/model/__init__.py index 45e7d1289d9..94019755c47 100644 --- a/src/firebolt/model/__init__.py +++ b/src/firebolt/model/__init__.py @@ -1,4 +1,5 @@ import json +from typing import Any from pydantic import BaseModel @@ -8,7 +9,7 @@ class Config: allow_population_by_field_name = True extra = "forbid" - def jsonable_dict(self, *args, **kwargs) -> dict: + def jsonable_dict(self, *args: Any, **kwargs: Any) -> dict: """ Generate a dictionary representation of the service that is contains serialized primitive types, and is therefore json-ready. From 6cb25754d3630380365efb942d2448e8f1bac9e3 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Tue, 19 Oct 2021 09:20:21 -0700 Subject: [PATCH 11/14] address pr feedback --- src/firebolt/model/database.py | 38 +++++++------- src/firebolt/model/engine.py | 49 +++++++++---------- src/firebolt/service/binding.py | 8 +-- src/firebolt/service/database.py | 2 +- src/firebolt/service/engine.py | 2 +- src/firebolt/service/engine_revision.py | 10 ++-- .../resource_manager/test_engine.py | 8 +-- 7 files changed, 53 insertions(+), 64 deletions(-) diff --git a/src/firebolt/model/database.py b/src/firebolt/model/database.py index 25a40da8829..3ead968241d 100644 --- a/src/firebolt/model/database.py +++ b/src/firebolt/model/database.py @@ -1,8 +1,7 @@ from __future__ import annotations -import functools from datetime import datetime -from typing import TYPE_CHECKING, Annotated, Any, Callable, Optional +from typing import TYPE_CHECKING, Annotated, Any, Optional from pydantic import Field, PrivateAttr @@ -22,22 +21,6 @@ class DatabaseKey(FireboltBaseModel): database_id: str -def check_no_attached_starting_stopping_engines(func: Callable) -> Callable: - """(Decorator) Ensure no attached engines are starting/stopping""" - - @functools.wraps(func) - def inner(self: Database, *args: Any, **kwargs: Any) -> Any: - for engine in self.get_attached_engines(): - if engine.current_status_summary in { - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING, - EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, - }: - raise AttachedEngineInUseError(method_name=func.__name__) - return func(self, *args, **kwargs) - - return inner - - class Database(FireboltBaseModel): """ A Firebolt database. @@ -102,13 +85,26 @@ def attach_to_engine( Only one engine can be set as default for a single database. This will overwrite any existing default. """ - return self._database_service.resource_manager.bindings.create_binding( + return self._database_service.resource_manager.bindings.create( engine=engine, database=self, is_default_engine=is_default_engine ) - @check_no_attached_starting_stopping_engines def delete(self, database_id: str) -> Database: - """Delete a database from Firebolt.""" + """ + Delete a database from Firebolt. + + Raises an error if there are any attached engines. + + Args: + database_id: Identifier of the database to delete. + """ + for engine in self.get_attached_engines(): + if engine.current_status_summary in { + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STARTING, + EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING, + }: + raise AttachedEngineInUseError(method_name="delete") + response = self._database_service.client.delete( url=f"/core/v1/account/databases/{database_id}", headers={"Content-type": "application/json"}, diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index a2b74e891db..2f7617b7e5e 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -137,10 +137,6 @@ def database(self) -> Optional[Database]: ) ) - @property - def url(self) -> Optional[str]: - return self.endpoint - def refresh(self) -> Engine: """Get an up-to-date instance of the Engine from Firebolt.""" return self._engine_service.get(engine_id=self.engine_id) @@ -158,7 +154,7 @@ def attach_to_database( Only one engine can be set as default for a single database. This will overwrite any existing default. """ - return self._engine_service.resource_manager.bindings.create_binding( + return self._engine_service.resource_manager.bindings.create( engine=self, database=database, is_default_engine=is_default_engine ) @@ -186,18 +182,17 @@ def start( Returns: The updated Engine from Firebolt. """ - start_time = time.time() - timeout_time = start_time + wait_timeout_seconds + timeout_time = time.time() + wait_timeout_seconds engine = self.refresh() if ( engine.current_status_summary == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING ): - logger.info( - f"Engine (engine_id={self.engine_id}, name={self.name}) " - f"is already running." - ) + # logger.info( + # f"Engine (engine_id={self.engine_id}, name={self.name}) " + # f"is already running." + # ) return engine # wait for engine to stop first, if it's already stopping @@ -205,10 +200,10 @@ def start( engine.current_status_summary == EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING ): - logger.info( - f"Engine (engine_id={engine.engine_id}, name={engine.name}) " - f"is in currently stopping, waiting for it to stop first." - ) + # logger.info( + # f"Engine (engine_id={engine.engine_id}, name={engine.name}) " + # f"is in currently stopping, waiting for it to stop first." + # ) while ( engine.current_status_summary != EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED @@ -223,14 +218,14 @@ def start( if print_dots: print(".", end="") - logger.info( - f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." - ) + # logger.info( + # f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." + # ) engine = self._send_start() - logger.info( - f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})" - ) + # logger.info( + # f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})" + # ) # wait for engine to start while ( @@ -242,14 +237,14 @@ def start( raise TimeoutError( f"Could not start engine within {wait_timeout_seconds} seconds." ) - previous_status_summary = engine.current_status_summary + # previous_status_summary = engine.current_status_summary time.sleep(5) engine = engine.refresh() - if engine.current_status_summary != previous_status_summary: - logger.info( - f"Engine status_summary=" - f"{getattr(engine.current_status_summary, 'name')}" - ) + # if engine.current_status_summary != previous_status_summary: + # logger.info( + # f"Engine status_summary=" + # f"{getattr(engine.current_status_summary, 'name')}" + # ) if print_dots: print(".", end="") diff --git a/src/firebolt/service/binding.py b/src/firebolt/service/binding.py index a88d99c17a5..a47a037dd6e 100644 --- a/src/firebolt/service/binding.py +++ b/src/firebolt/service/binding.py @@ -19,7 +19,7 @@ def get_by_key(self, binding_key: BindingKey) -> Binding: binding: dict = response.json()["binding"] return Binding.parse_obj(binding) - def list_bindings( + def get_many( self, database_id: Optional[str] = None, engine_id: Optional[str] = None, @@ -59,19 +59,19 @@ def list_bindings( def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: """Get the Database to which an engine is bound, if any.""" try: - binding = self.list_bindings(engine_id=engine.engine_id)[0] + binding = self.get_many(engine_id=engine.engine_id)[0] return self.resource_manager.databases.get(database_id=binding.database_id) except IndexError: return None def get_engines_bound_to_database(self, database: Database) -> list[Engine]: """Get a list of engines that are bound to a database.""" - bindings = self.list_bindings(database_id=database.database_id) + bindings = self.get_many(database_id=database.database_id) return self.resource_manager.engines.get_by_ids( engine_ids=[b.engine_id for b in bindings] ) - def create_binding( + def create( self, engine: Engine, database: Database, is_default_engine: bool ) -> Binding: """ diff --git a/src/firebolt/service/database.py b/src/firebolt/service/database.py index 173d2962ce0..c70d0f6fe20 100644 --- a/src/firebolt/service/database.py +++ b/src/firebolt/service/database.py @@ -30,7 +30,7 @@ def get_id_by_name(self, database_name: str) -> str: database_id = response.json()["database_id"]["database_id"] return database_id - def list( + def get_many( self, name_contains: str, attached_engine_name_eq: str, diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index aca1eafeca3..fe75bc72088 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -49,7 +49,7 @@ def get_by_name(self, engine_name: str) -> Engine: engine_id = response.json()["engine_id"]["engine_id"] return self.get(engine_id=engine_id) - def list( + def get_many( self, name_contains: str, current_status_eq: str, diff --git a/src/firebolt/service/engine_revision.py b/src/firebolt/service/engine_revision.py index 3fdf87a540f..aefff170f6d 100644 --- a/src/firebolt/service/engine_revision.py +++ b/src/firebolt/service/engine_revision.py @@ -3,11 +3,9 @@ class EngineRevisionService(BaseService): - def get_engine_revision_by_id( - self, engine_id: str, engine_revision_id: str - ) -> EngineRevision: + def get_by_id(self, engine_id: str, engine_revision_id: str) -> EngineRevision: """Get an EngineRevision from Firebolt by engine_id and engine_revision_id.""" - return self.get_engine_revision_by_key( + return self.get_by_key( EngineRevisionKey( account_id=self.account_id, engine_id=engine_id, @@ -15,9 +13,7 @@ def get_engine_revision_by_id( ) ) - def get_engine_revision_by_key( - self, engine_revision_key: EngineRevisionKey - ) -> EngineRevision: + def get_by_key(self, engine_revision_key: EngineRevisionKey) -> EngineRevision: """ Fetch an EngineRevision from Firebolt by it's key. diff --git a/tests/integration/resource_manager/test_engine.py b/tests/integration/resource_manager/test_engine.py index 3bdc7127443..dc5e7863497 100644 --- a/tests/integration/resource_manager/test_engine.py +++ b/tests/integration/resource_manager/test_engine.py @@ -1,9 +1,12 @@ import time +import pytest + from firebolt.service.manager import ResourceManager from firebolt.service.types import EngineStatusSummary +@pytest.mark.skip(reason="manual test") def test_create_start_stop_engine(): rm = ResourceManager() name = f"integration_test_{int(time.time())}" @@ -30,6 +33,7 @@ def test_create_start_stop_engine(): } +@pytest.mark.skip(reason="manual test") def test_copy_engine(): rm = ResourceManager() name = f"integration_test_{int(time.time())}" @@ -40,8 +44,6 @@ def test_copy_engine(): engine.name = f"{engine.name}_copy" engine_copy = rm.engines._send_create_engine( engine=engine, - engine_revision=rm.engine_revisions.get_engine_revision_by_key( - engine.latest_revision_key - ), + engine_revision=rm.engine_revisions.get_by_key(engine.latest_revision_key), ) assert engine_copy From 55d77bd5541c51200895cd18a0891b41f1934f07 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Tue, 19 Oct 2021 13:00:51 -0700 Subject: [PATCH 12/14] address pr feedback --- src/firebolt/model/database.py | 12 +++--- src/firebolt/model/engine.py | 73 ++++++++++++++++---------------- src/firebolt/service/database.py | 2 +- src/firebolt/service/engine.py | 2 +- 4 files changed, 44 insertions(+), 45 deletions(-) diff --git a/src/firebolt/model/database.py b/src/firebolt/model/database.py index 3ead968241d..1ee8b3f3e6d 100644 --- a/src/firebolt/model/database.py +++ b/src/firebolt/model/database.py @@ -30,7 +30,7 @@ class Database(FireboltBaseModel): """ # internal - _database_service: DatabaseService = PrivateAttr() + _service: DatabaseService = PrivateAttr() # required name: Annotated[str, Field(min_length=1, max_length=255, regex=r"^[0-9a-zA-Z_]+$")] @@ -57,7 +57,7 @@ def parse_obj_with_service( cls, obj: Any, database_service: DatabaseService ) -> Database: database = cls.parse_obj(obj) - database._database_service = database_service + database._service = database_service return database @property @@ -68,7 +68,7 @@ def database_id(self) -> Optional[str]: def get_attached_engines(self) -> list[Engine]: """Get a list of engines that are attached to this database.""" - return self._database_service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501 + return self._service.resource_manager.bindings.get_engines_bound_to_database( # noqa: E501 database=self ) @@ -85,7 +85,7 @@ def attach_to_engine( Only one engine can be set as default for a single database. This will overwrite any existing default. """ - return self._database_service.resource_manager.bindings.create( + return self._service.resource_manager.bindings.create( engine=engine, database=self, is_default_engine=is_default_engine ) @@ -105,10 +105,10 @@ def delete(self, database_id: str) -> Database: }: raise AttachedEngineInUseError(method_name="delete") - response = self._database_service.client.delete( + response = self._service.client.delete( url=f"/core/v1/account/databases/{database_id}", headers={"Content-type": "application/json"}, ) return Database.parse_obj_with_service( - response.json()["database"], self._database_service + response.json()["database"], self._service ) diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index 2f7617b7e5e..ecde46967cb 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -40,7 +40,7 @@ class EngineSettings(FireboltBaseModel): """ preset: str - auto_stop_delay_duration: str + auto_stop_delay_duration: Annotated[str, Field(regex=r"^[0-9]+[sm]$|^0$")] minimum_logging_level: str is_read_only: bool warm_up: str @@ -88,7 +88,7 @@ class Engine(FireboltBaseModel): """ # internal - _engine_service: EngineService = PrivateAttr() + _service: EngineService = PrivateAttr() # required name: Annotated[str, Field(min_length=1, max_length=255, regex=r"^[0-9a-zA-Z_]+$")] @@ -120,7 +120,7 @@ class Engine(FireboltBaseModel): @classmethod def parse_obj_with_service(cls, obj: Any, engine_service: EngineService) -> Engine: engine = cls.parse_obj(obj) - engine._engine_service = engine_service + engine._service = engine_service return engine @property @@ -131,15 +131,13 @@ def engine_id(self) -> str: @property def database(self) -> Optional[Database]: - return ( - self._engine_service.resource_manager.bindings.get_database_bound_to_engine( - engine=self - ) + return self._service.resource_manager.bindings.get_database_bound_to_engine( + engine=self ) - def refresh(self) -> Engine: + def get_latest(self) -> Engine: """Get an up-to-date instance of the Engine from Firebolt.""" - return self._engine_service.get(engine_id=self.engine_id) + return self._service.get(engine_id=self.engine_id) def attach_to_database( self, database: Database, is_default_engine: bool = False @@ -154,7 +152,7 @@ def attach_to_database( Only one engine can be set as default for a single database. This will overwrite any existing default. """ - return self._engine_service.resource_manager.bindings.create( + return self._service.resource_manager.bindings.create( engine=self, database=database, is_default_engine=is_default_engine ) @@ -163,7 +161,7 @@ def start( self, wait_for_startup: bool = True, wait_timeout_seconds: int = 3600, - print_dots: bool = True, + verbose: bool = False, ) -> Engine: """ Start an engine. If it's already started, do nothing. @@ -175,7 +173,7 @@ def start( wait_timeout_seconds: Number of seconds to wait for startup to complete before raising a TimeoutError. - print_dots: + verbose: If True, print dots periodically while waiting for engine startup. If false, do not print any dots. @@ -184,7 +182,14 @@ def start( """ timeout_time = time.time() + wait_timeout_seconds - engine = self.refresh() + def wait(seconds: int, error_message: str) -> None: + time.sleep(seconds) + if time.time() > timeout_time: + raise TimeoutError(error_message) + if verbose: + print(".", end="") + + engine = self.get_latest() if ( engine.current_status_summary == EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING @@ -208,15 +213,12 @@ def start( engine.current_status_summary != EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED ): - if time.time() >= timeout_time: - raise TimeoutError( - f"Engine (engine_id={engine.engine_id}, name={engine.name}) " - f"did not stop within {wait_timeout_seconds} seconds." - ) - time.sleep(5) - engine = engine.refresh() - if print_dots: - print(".", end="") + wait( + seconds=5, + error_message=f"Engine (engine_id={engine.engine_id}, name={engine.name}) " # noqa: E501 + f"did not stop within {wait_timeout_seconds} seconds.", + ) + engine = engine.get_latest() # logger.info( # f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped." @@ -233,48 +235,45 @@ def start( and engine.current_status_summary != EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING ): - if time.time() >= timeout_time: - raise TimeoutError( - f"Could not start engine within {wait_timeout_seconds} seconds." - ) + wait( + seconds=5, + error_message=f"Could not start engine within {wait_timeout_seconds} seconds.", # noqa: E501 + ) # previous_status_summary = engine.current_status_summary - time.sleep(5) - engine = engine.refresh() + engine = engine.get_latest() # if engine.current_status_summary != previous_status_summary: # logger.info( # f"Engine status_summary=" # f"{getattr(engine.current_status_summary, 'name')}" # ) - if print_dots: - print(".", end="") return engine def _send_start(self) -> Engine: - response = self._engine_service.client.post( + response = self._service.client.post( url=f"/core/v1/account/engines/{self.engine_id}:start", ) return Engine.parse_obj_with_service( - obj=response.json()["engine"], engine_service=self._engine_service + obj=response.json()["engine"], engine_service=self._service ) @check_attached_to_database def stop(self, engine: Engine) -> Engine: """Stop an Engine running on Firebolt.""" - response = self._engine_service.client.post( + response = self._service.client.post( url=f"/core/v1/account/engines/{engine.engine_id}:stop", ) return Engine.parse_obj_with_service( - obj=response.json()["engine"], engine_service=self._engine_service + obj=response.json()["engine"], engine_service=self._service ) def delete(self, engine: Engine) -> Engine: """Delete an Engine from Firebolt.""" - response = self._engine_service.client.delete( + response = self._service.client.delete( url=f"/core/v1" - f"/accounts/{self._engine_service.account_id}" + f"/accounts/{self._service.account_id}" f"/engines/{engine.engine_id}", ) return Engine.parse_obj_with_service( - obj=response.json()["engine"], engine_service=self._engine_service + obj=response.json()["engine"], engine_service=self._service ) diff --git a/src/firebolt/service/database.py b/src/firebolt/service/database.py index c70d0f6fe20..8d67541dbef 100644 --- a/src/firebolt/service/database.py +++ b/src/firebolt/service/database.py @@ -79,7 +79,7 @@ def create(self, name: str, region: Optional[str] = None) -> Database: """ class _DatabaseCreateRequest(FireboltBaseModel): - """Helper service for sending Database creation requests.""" + """Helper model for sending Database creation requests.""" account_id: str database: Database diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index fe75bc72088..9493428b2b9 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -180,7 +180,7 @@ def _send_create_engine( """ class _EngineCreateRequest(FireboltBaseModel): - """Helper service for sending Engine create requests.""" + """Helper model for sending Engine create requests.""" account_id: str engine: Engine From 2f598399aa50b69818822f3d329b5726c471d157 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Tue, 19 Oct 2021 14:08:50 -0700 Subject: [PATCH 13/14] fix bugs, standardize parameter naming --- examples.ipynb | 166 +++++++++++------------- src/firebolt/model/database.py | 7 +- src/firebolt/model/engine.py | 11 +- src/firebolt/service/binding.py | 4 +- src/firebolt/service/database.py | 18 ++- src/firebolt/service/engine.py | 18 +-- src/firebolt/service/engine_revision.py | 10 +- src/firebolt/service/instance_type.py | 2 +- src/firebolt/service/region.py | 16 +-- 9 files changed, 118 insertions(+), 134 deletions(-) diff --git a/examples.ipynb b/examples.ipynb index d231e4c1c91..709b5e46662 100644 --- a/examples.ipynb +++ b/examples.ipynb @@ -1,5 +1,13 @@ { "cells": [ + { + "cell_type": "markdown", + "id": "2c6619ae", + "metadata": {}, + "source": [ + "# Imports, Logging" + ] + }, { "cell_type": "code", "execution_count": null, @@ -29,7 +37,7 @@ "id": "bc5376bc", "metadata": {}, "source": [ - "### Configure ResourceManager" + "# Configure ResourceManager" ] }, { @@ -37,13 +45,14 @@ "id": "4efaddf9", "metadata": {}, "source": [ - "Option 1: create a `.env` file with the following contents (fill in values):\n", + "**Option 1**: create a `.env` file with the following contents (fill in values):\n", "```\n", "FIREBOLT_USER=''\n", "FIREBOLT_PASSWORD=''\n", "FIREBOLT_SERVER=''\n", "FIREBOLT_DEFAULT_REGION=''\n", - "```" + "```\n", + "(or ensure these env vars are already set)" ] }, { @@ -61,7 +70,7 @@ "id": "f1626c0e", "metadata": {}, "source": [ - "Option 2: Specify settings manually using the Settings object:" + "**Option 2**: Specify settings manually using the Settings object:" ] }, { @@ -86,7 +95,7 @@ "id": "97388a98", "metadata": {}, "source": [ - "### Initialize ResourceManager" + "# Initialize ResourceManager" ] }, { @@ -102,6 +111,14 @@ "debug(rm.client.account_id)" ] }, + { + "cell_type": "markdown", + "id": "cd874695", + "metadata": {}, + "source": [ + "Specify a database_name and engine_name below if you want, or run as-is to use the defaults." + ] + }, { "cell_type": "code", "execution_count": null, @@ -109,8 +126,11 @@ "metadata": {}, "outputs": [], "source": [ - "database_name = \"\"\n", - "engine_name = \"\"" + "import time\n", + "\n", + "default_name = f\"temp_{int(time.time())}\"\n", + "database_name = \"\" or default_name\n", + "engine_name = \"\" or default_name" ] }, { @@ -136,7 +156,47 @@ "metadata": {}, "outputs": [], "source": [ - "rm.databases.create(database_name=database_name, region_name=\"us-east-1\")" + "database = rm.databases.create(name=database_name, region=\"us-east-1\")\n", + "debug(database)" + ] + }, + { + "cell_type": "markdown", + "id": "4518f956", + "metadata": {}, + "source": [ + "### Get by name" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a9b0f0b0", + "metadata": {}, + "outputs": [], + "source": [ + "database = rm.databases.get_by_name(name=database_name)\n", + "debug(database)" + ] + }, + { + "cell_type": "markdown", + "id": "ea32cf4b", + "metadata": {}, + "source": [ + "### Get engines attached to a database" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "116e3a30", + "metadata": {}, + "outputs": [], + "source": [ + "engines = database.get_attached_engines()\n", + "\n", + "debug(engines)" ] }, { @@ -183,7 +243,7 @@ "metadata": {}, "outputs": [], "source": [ - "engine = rm.engines.get_by_name(engine_name=engine_name)\n", + "engine = rm.engines.get_by_name(name=engine_name)\n", "debug(engine)" ] }, @@ -202,9 +262,7 @@ "metadata": {}, "outputs": [], "source": [ - "engine.attach_to_database(\n", - " database=rm.databases.get_by_name(database_name=database_name)\n", - ")" + "engine.attach_to_database(database=rm.databases.get_by_name(name=database_name))" ] }, { @@ -226,16 +284,6 @@ "debug(engine)" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "870823a5", - "metadata": {}, - "outputs": [], - "source": [ - "engine.dict()" - ] - }, { "cell_type": "markdown", "id": "b241f1a4", @@ -253,84 +301,34 @@ }, "outputs": [], "source": [ - "stopped_engine = rm.engines.stop(engine=engine)\n", + "stopped_engine = engine.stop()\n", "debug(stopped_engine)" ] }, { "cell_type": "markdown", - "id": "db04119b", - "metadata": {}, - "source": [ - "### Copy an Engine" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "df4071a2", - "metadata": { - "scrolled": false - }, - "outputs": [], - "source": [ - "engine = rm.engines.get_by_name(engine_name=engine_name)\n", - "engine.name = f\"{engine_name}_copy\"\n", - "debug(engine)\n", - "\n", - "new_engine = rm.engines._send_create_engine(\n", - " engine=engine, engine_revision=rm.engine_revisions\n", - ")\n", - "debug(new_engine)" - ] - }, - { - "cell_type": "markdown", - "id": "ea32cf4b", + "id": "bde3d38b", "metadata": {}, "source": [ - "### Get engines for a database" + "### Get an engine's database" ] }, { "cell_type": "code", "execution_count": null, - "id": "116e3a30", + "id": "1728ae68", "metadata": {}, "outputs": [], "source": [ - "database = rm.databases.get_by_name(database_name=database_name)\n", - "engines = rm.bindings.get_engines_bound_to_database(database=database)\n", - "\n", - "debug(engines)" + "debug(engine.database)" ] }, { "cell_type": "markdown", - "id": "9e949d12", + "id": "7639a535", "metadata": {}, "source": [ - "### Create engine, database, and bind them" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "e4b8e07d", - "metadata": { - "scrolled": false - }, - "outputs": [], - "source": [ - "database = rm.databases.get_by_name(database_name=database_name)\n", - "engine = rm.engines.create_general_purpose_engine(\n", - " name=engine_name, compute_instance_type_name=\"m5d.4xlarge\"\n", - ")\n", - "binding = rm.engines.attach_to_database(\n", - " engine=engine, database=database, is_default_engine=True\n", - ")\n", - "\n", - "debug(binding)" + "# Region, Instance Types" ] }, { @@ -338,7 +336,7 @@ "id": "ddf4098d", "metadata": {}, "source": [ - "### Get default provider, region, and an instance_type\n", + "### Get default region, and an instance_type\n", "An example of these might be:\n", " * provider: AWS\n", " * region: us-east-1\n", @@ -357,14 +355,6 @@ "debug(rm.regions.default_region)\n", "debug(rm.instance_types.get_by_name(instance_type_name=\"i3.4xlarge\"))" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "c59c30a8", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/src/firebolt/model/database.py b/src/firebolt/model/database.py index 1ee8b3f3e6d..ab402003953 100644 --- a/src/firebolt/model/database.py +++ b/src/firebolt/model/database.py @@ -89,14 +89,11 @@ def attach_to_engine( engine=engine, database=self, is_default_engine=is_default_engine ) - def delete(self, database_id: str) -> Database: + def delete(self) -> Database: """ Delete a database from Firebolt. Raises an error if there are any attached engines. - - Args: - database_id: Identifier of the database to delete. """ for engine in self.get_attached_engines(): if engine.current_status_summary in { @@ -106,7 +103,7 @@ def delete(self, database_id: str) -> Database: raise AttachedEngineInUseError(method_name="delete") response = self._service.client.delete( - url=f"/core/v1/account/databases/{database_id}", + url=f"/core/v1/account/databases/{self.database_id}", headers={"Content-type": "application/json"}, ) return Database.parse_obj_with_service( diff --git a/src/firebolt/model/engine.py b/src/firebolt/model/engine.py index ecde46967cb..62002d50b22 100644 --- a/src/firebolt/model/engine.py +++ b/src/firebolt/model/engine.py @@ -137,7 +137,7 @@ def database(self) -> Optional[Database]: def get_latest(self) -> Engine: """Get an up-to-date instance of the Engine from Firebolt.""" - return self._service.get(engine_id=self.engine_id) + return self._service.get(id_=self.engine_id) def attach_to_database( self, database: Database, is_default_engine: bool = False @@ -201,6 +201,7 @@ def wait(seconds: int, error_message: str) -> None: return engine # wait for engine to stop first, if it's already stopping + # FUTURE: revisit logging and consider consolidating this if & the while below. elif ( engine.current_status_summary == EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPING @@ -258,21 +259,21 @@ def _send_start(self) -> Engine: ) @check_attached_to_database - def stop(self, engine: Engine) -> Engine: + def stop(self) -> Engine: """Stop an Engine running on Firebolt.""" response = self._service.client.post( - url=f"/core/v1/account/engines/{engine.engine_id}:stop", + url=f"/core/v1/account/engines/{self.engine_id}:stop", ) return Engine.parse_obj_with_service( obj=response.json()["engine"], engine_service=self._service ) - def delete(self, engine: Engine) -> Engine: + def delete(self) -> Engine: """Delete an Engine from Firebolt.""" response = self._service.client.delete( url=f"/core/v1" f"/accounts/{self._service.account_id}" - f"/engines/{engine.engine_id}", + f"/engines/{self.engine_id}", ) return Engine.parse_obj_with_service( obj=response.json()["engine"], engine_service=self._service diff --git a/src/firebolt/service/binding.py b/src/firebolt/service/binding.py index a47a037dd6e..cc8baf215eb 100644 --- a/src/firebolt/service/binding.py +++ b/src/firebolt/service/binding.py @@ -60,7 +60,7 @@ def get_database_bound_to_engine(self, engine: Engine) -> Optional[Database]: """Get the Database to which an engine is bound, if any.""" try: binding = self.get_many(engine_id=engine.engine_id)[0] - return self.resource_manager.databases.get(database_id=binding.database_id) + return self.resource_manager.databases.get(id_=binding.database_id) except IndexError: return None @@ -68,7 +68,7 @@ def get_engines_bound_to_database(self, database: Database) -> list[Engine]: """Get a list of engines that are bound to a database.""" bindings = self.get_many(database_id=database.database_id) return self.resource_manager.engines.get_by_ids( - engine_ids=[b.engine_id for b in bindings] + ids=[b.engine_id for b in bindings] ) def create( diff --git a/src/firebolt/service/database.py b/src/firebolt/service/database.py index 8d67541dbef..80086ff5e43 100644 --- a/src/firebolt/service/database.py +++ b/src/firebolt/service/database.py @@ -7,25 +7,25 @@ class DatabaseService(BaseService): - def get(self, database_id: str) -> Database: + def get(self, id_: str) -> Database: """Get a Database from Firebolt by its id.""" response = self.client.get( - url=f"/core/v1/accounts/{self.account_id}/databases/{database_id}", + url=f"/core/v1/accounts/{self.account_id}/databases/{id_}", ) return Database.parse_obj_with_service( obj=response.json()["database"], database_service=self ) - def get_by_name(self, database_name: str) -> Database: + def get_by_name(self, name: str) -> Database: """Get a Database from Firebolt by its name.""" - database_id = self.get_id_by_name(database_name=database_name) - return self.get(database_id=database_id) + database_id = self.get_id_by_name(name=name) + return self.get(id_=database_id) - def get_id_by_name(self, database_name: str) -> str: + def get_id_by_name(self, name: str) -> str: """Get a Database id from Firebolt by its name.""" response = self.client.get( url=f"/core/v1/account/databases:getIdByName", - params={"database_name": database_name}, + params={"database_name": name}, ) database_id = response.json()["database_id"]["database_id"] return database_id @@ -87,9 +87,7 @@ class _DatabaseCreateRequest(FireboltBaseModel): if region is None: region_key = self.resource_manager.regions.default_region.key else: - region_key = self.resource_manager.regions.get_by_name( - region_name=region - ).key + region_key = self.resource_manager.regions.get_by_name(name=region).key database = Database(name=name, compute_region_key=region_key) response = self.client.post( diff --git a/src/firebolt/service/engine.py b/src/firebolt/service/engine.py index 9493428b2b9..b69595a1ff8 100644 --- a/src/firebolt/service/engine.py +++ b/src/firebolt/service/engine.py @@ -16,22 +16,22 @@ class EngineService(BaseService): - def get(self, engine_id: str) -> Engine: + def get(self, id_: str) -> Engine: """Get an Engine from Firebolt by its id.""" response = self.client.get( - url=f"/core/v1/accounts/{self.account_id}/engines/{engine_id}", + url=f"/core/v1/accounts/{self.account_id}/engines/{id_}", ) engine_entry: dict = response.json()["engine"] return Engine.parse_obj_with_service(obj=engine_entry, engine_service=self) - def get_by_ids(self, engine_ids: list[str]) -> list[Engine]: + def get_by_ids(self, ids: list[str]) -> list[Engine]: """Get multiple Engines from Firebolt by their ids.""" response = self.client.post( url=f"/core/v1/engines:getByIds", json={ "engine_ids": [ {"account_id": self.account_id, "engine_id": engine_id} - for engine_id in engine_ids + for engine_id in ids ] }, ) @@ -40,14 +40,14 @@ def get_by_ids(self, engine_ids: list[str]) -> list[Engine]: for e in response.json()["engines"] ] - def get_by_name(self, engine_name: str) -> Engine: + def get_by_name(self, name: str) -> Engine: """Get an Engine from Firebolt by its name.""" response = self.client.get( url="/core/v1/account/engines:getIdByName", - params={"engine_name": engine_name}, + params={"engine_name": name}, ) engine_id = response.json()["engine_id"]["engine_id"] - return self.get(engine_id=engine_id) + return self.get(id_=engine_id) def get_many( self, @@ -81,7 +81,7 @@ def get_many( "filter.current_status_eq": current_status_eq, "filter.current_status_not_eq": current_status_not_eq, "filter.compute_region_id_region_id_eq": self.resource_manager.regions.get_by_name( # noqa: E501 - region_name=region_eq + name=region_eq ), "order_by": order_by.name, } @@ -134,7 +134,7 @@ def create( region = self.resource_manager.regions.default_region else: if isinstance(region, str): - region = self.resource_manager.regions.get_by_name(region_name=region) + region = self.resource_manager.regions.get_by_name(name=region) engine = Engine( name=name, diff --git a/src/firebolt/service/engine_revision.py b/src/firebolt/service/engine_revision.py index aefff170f6d..430336650eb 100644 --- a/src/firebolt/service/engine_revision.py +++ b/src/firebolt/service/engine_revision.py @@ -13,20 +13,20 @@ def get_by_id(self, engine_id: str, engine_revision_id: str) -> EngineRevision: ) ) - def get_by_key(self, engine_revision_key: EngineRevisionKey) -> EngineRevision: + def get_by_key(self, key: EngineRevisionKey) -> EngineRevision: """ Fetch an EngineRevision from Firebolt by it's key. Args: - engine_revision_key: Key of the desired EngineRevision. + key: Key of the desired EngineRevision. Returns: The requested EngineRevision """ response = self.client.get( - url=f"/core/v1/accounts/{engine_revision_key.account_id}" - f"/engines/{engine_revision_key.engine_id}" - f"/engineRevisions/{engine_revision_key.engine_revision_id}", + url=f"/core/v1/accounts/{key.account_id}" + f"/engines/{key.engine_id}" + f"/engineRevisions/{key.engine_revision_id}", ) engine_spec: dict = response.json()["engine_revision"] return EngineRevision.parse_obj(engine_spec) diff --git a/src/firebolt/service/instance_type.py b/src/firebolt/service/instance_type.py index 8a1a337d866..6872fc2ce4a 100644 --- a/src/firebolt/service/instance_type.py +++ b/src/firebolt/service/instance_type.py @@ -32,7 +32,7 @@ def instance_types_by_name(self) -> dict[InstanceTypeLookup, InstanceType]: return { InstanceTypeLookup( region_name=self.resource_manager.regions.get_by_id( - region_id=i.key.region_id + id_=i.key.region_id ).name, instance_type_name=i.name, ): i diff --git a/src/firebolt/service/region.py b/src/firebolt/service/region.py index b5640556d4d..105ec2530d3 100644 --- a/src/firebolt/service/region.py +++ b/src/firebolt/service/region.py @@ -45,20 +45,18 @@ def default_region(self) -> Region: raise ValueError( "The environment variable FIREBOLT_DEFAULT_REGION must be set." ) - return self.get_by_name(region_name=self.default_region_name) + return self.get_by_name(name=self.default_region_name) - def get_by_name(self, region_name: str) -> Region: + def get_by_name(self, name: str) -> Region: """Get an AWS region by its name (eg. us-east-1).""" - return self.regions_by_name[region_name] + return self.regions_by_name[name] - def get_by_key(self, region_key: RegionKey) -> Region: + def get_by_key(self, key: RegionKey) -> Region: """Get an AWS Region by its key.""" - return self.regions_by_key[region_key] + return self.regions_by_key[key] - def get_by_id(self, region_id: str) -> Region: + def get_by_id(self, id_: str) -> Region: """Get an AWS Region by region_id.""" return self.get_by_key( - RegionKey( - provider_id=self.resource_manager.provider_id, region_id=region_id - ) + RegionKey(provider_id=self.resource_manager.provider_id, region_id=id_) ) From 15dee25fe2deb025247d050b84e33a65e6a636f8 Mon Sep 17 00:00:00 2001 From: Eric Gustavson Date: Tue, 19 Oct 2021 14:19:40 -0700 Subject: [PATCH 14/14] update readme --- CONTRIBUTING.MD | 2 +- README.md | 12 +++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/CONTRIBUTING.MD b/CONTRIBUTING.MD index 22aea7c3f11..1bf59536ed9 100644 --- a/CONTRIBUTING.MD +++ b/CONTRIBUTING.MD @@ -27,7 +27,7 @@ Working Directory: $ProjectFileDir$ 1. The pre-commit hook should catch linting errors 2. run `mypy src` to check for type errors -3. run `pytest` to run unit tests +3. run `pytest tests/unit` to run unit tests Note: while there is a `mypy` hook for pre-commit, I found it too buggy to be worthwhile, so I just run mypy manually. diff --git a/README.md b/README.md index 4bb84df698b..6dd3a55ef73 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,17 @@ # firebolt-sdk -### Installation & Usage +### Installation * Requires Python `>=3.9` -* Clone this repo -* From the cloned directory: `pip install .` -* See [examples.ipynb](examples.ipynb) for usage +* `pip install firebolt-sdk` + +### Usage + +See: [examples.ipynb](examples.ipynb). ### Configuration -To use the client, you generally will want to set the following environment variables: +To use the SDK, you generally will want to set the following environment variables: ``` FIREBOLT_USER='email@domain.com' FIREBOLT_PASSWORD='*****'