Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/firebolt/common/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

ACCOUNT_ENGINE_URL = "/core/v1/accounts/{account_id}/engines/{engine_id}"
ACCOUNT_ENGINE_START_URL = ACCOUNT_ENGINE_URL + ":start"
ACCOUNT_ENGINE_RESTART_URL = ACCOUNT_ENGINE_URL + ":restart"
ACCOUNT_ENGINE_STOP_URL = ACCOUNT_ENGINE_URL + ":stop"
ACCOUNT_ENGINES_URL = "/core/v1/accounts/{account_id}/engines"
ACCOUNT_ENGINE_BY_NAME_URL = ACCOUNT_ENGINES_URL + ":getIdByName"
Expand Down
73 changes: 53 additions & 20 deletions src/firebolt/model/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from firebolt.common.exception import NoAttachedDatabaseError
from firebolt.common.urls import (
ACCOUNT_ENGINE_RESTART_URL,
ACCOUNT_ENGINE_START_URL,
ACCOUNT_ENGINE_STOP_URL,
ACCOUNT_ENGINE_URL,
Expand Down Expand Up @@ -251,7 +252,7 @@ def start(
f"Engine (engine_id={engine.engine_id}, name={engine.name}) stopped."
)

engine = self._send_start()
engine = self._send_engine_request(ACCOUNT_ENGINE_START_URL)
logger.info(
f"Starting Engine (engine_id={engine.engine_id}, name={engine.name})"
)
Expand All @@ -277,34 +278,16 @@ def start(

return engine

def _send_start(self) -> Engine:
response = self._service.client.post(
url=ACCOUNT_ENGINE_START_URL.format(
account_id=self._service.account_id, engine_id=self.engine_id
)
)
return Engine.parse_obj_with_service(
obj=response.json()["engine"], engine_service=self._service
)

@check_attached_to_database
def stop(
self, wait_for_stop: bool = False, wait_timeout_seconds: int = 3600
) -> Engine:
"""Stop an Engine running on Firebolt."""
timeout_time = time.time() + wait_timeout_seconds

response = self._service.client.post(
url=ACCOUNT_ENGINE_STOP_URL.format(
account_id=self._service.account_id, engine_id=self.engine_id
)
)
engine = self._send_engine_request(ACCOUNT_ENGINE_STOP_URL)
logger.info(f"Stopping Engine (engine_id={self.engine_id}, name={self.name})")

engine = Engine.parse_obj_with_service(
obj=response.json()["engine"], engine_service=self._service
)

while wait_for_stop and engine.current_status_summary not in {
EngineStatusSummary.ENGINE_STATUS_SUMMARY_STOPPED,
EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED,
Expand All @@ -320,6 +303,46 @@ def stop(

return engine

@check_attached_to_database
def restart(
self,
wait_for_startup: bool = True,
wait_timeout_seconds: int = 3600,
) -> Engine:
"""
Restart an engine.

Args:
wait_for_startup:
If True, wait for startup to complete.
If false, return immediately after requesting startup.
wait_timeout_seconds:
Number of seconds to wait for startup to complete
before raising a TimeoutError.

Returns:
The updated Engine from Firebolt.
"""
timeout_time = time.time() + wait_timeout_seconds

engine = self._send_engine_request(ACCOUNT_ENGINE_RESTART_URL)
logger.info(f"Stopping Engine (engine_id={self.engine_id}, name={self.name})")

while wait_for_startup and engine.current_status_summary not in {
EngineStatusSummary.ENGINE_STATUS_SUMMARY_RUNNING,
EngineStatusSummary.ENGINE_STATUS_SUMMARY_FAILED,
}:
wait(
seconds=5,
timeout_time=timeout_time,
error_message=f"Could not restart engine within {wait_timeout_seconds} seconds.", # noqa: E501
verbose=False,
)

engine = engine.get_latest()

return engine

def delete(self) -> Engine:
"""Delete an Engine from Firebolt."""
response = self._service.client.delete(
Expand All @@ -331,3 +354,13 @@ def delete(self) -> Engine:
return Engine.parse_obj_with_service(
obj=response.json()["engine"], engine_service=self._service
)

def _send_engine_request(self, url: str) -> Engine:
response = self._service.client.post(
url=url.format(
account_id=self._service.account_id, engine_id=self.engine_id
)
)
return Engine.parse_obj_with_service(
obj=response.json()["engine"], engine_service=self._service
)
3 changes: 2 additions & 1 deletion tests/unit/service/conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from typing import Callable, List
from urllib.parse import urlparse

import httpx
import pytest
Expand Down Expand Up @@ -214,7 +215,7 @@ def do_mock(
request: httpx.Request = None,
**kwargs,
) -> Response:
assert request.url == engine_url
assert urlparse(engine_url).path in request.url.path
return Response(
status_code=httpx.codes.OK,
json={"engine": mock_engine.dict()},
Expand Down
37 changes: 37 additions & 0 deletions tests/unit/service/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,40 @@ def test_attach_to_database(
engine.attach_to_database(database=database)

assert engine.database == database


def test_engine_restart(
httpx_mock: HTTPXMock,
auth_callback: Callable,
auth_url: str,
provider_callback: Callable,
provider_url: str,
settings: Settings,
mock_engine: Engine,
account_id_callback: Callable,
account_id_url: str,
engine_callback: Callable,
account_engine_url: str,
bindings_callback: Callable,
bindings_url: str,
database_callback: Callable,
database_url: str,
):
httpx_mock.add_callback(auth_callback, url=auth_url)
httpx_mock.add_callback(provider_callback, url=provider_url)

httpx_mock.add_callback(account_id_callback, url=account_id_url)
httpx_mock.add_callback(auth_callback, url=auth_url)

httpx_mock.add_callback(
engine_callback, url=f"{account_engine_url}:restart", method="POST"
)
httpx_mock.add_callback(bindings_callback, url=bindings_url)
httpx_mock.add_callback(database_callback, url=database_url)

manager = ResourceManager(settings=settings)

mock_engine._service = manager.engines
engine = mock_engine.restart(wait_for_startup=False)

assert engine.name == mock_engine.name