Skip to content

Commit

Permalink
🐙 octavia-cli: implement init command (#9665)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Feb 3, 2022
1 parent 9aade15 commit a73ed08
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 33 deletions.
1 change: 1 addition & 0 deletions octavia-cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ We welcome community contributions!

| Date | Milestone |
|------------|-------------------------------------|
| 2022-01-25 | Implement `octavia init` + some context checks|
| 2022-01-19 | Implement `octavia list workspace sources`, `octavia list workspace destinations`, `octavia list workspace connections`|
| 2022-01-17 | Implement `octavia list connectors source` and `octavia list connectors destinations`|
| 2022-01-17 | Generate an API Python client from our Open API spec |
Expand Down
78 changes: 78 additions & 0 deletions octavia-cli/octavia_cli/check_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os

import airbyte_api_client
import click
from airbyte_api_client.api import health_api, workspace_api
from airbyte_api_client.model.workspace_id_request_body import WorkspaceIdRequestBody
from urllib3.exceptions import MaxRetryError

from .init.commands import DIRECTORIES_TO_CREATE as REQUIRED_PROJECT_DIRECTORIES


class UnhealthyApiError(click.ClickException):
pass


class UnreachableAirbyteInstanceError(click.ClickException):
pass


class WorkspaceIdError(click.ClickException):
pass


def check_api_health(api_client: airbyte_api_client.ApiClient) -> None:
"""Check if the Airbyte API is network reachable and healthy.
Args:
api_client (airbyte_api_client.ApiClient): Airbyte API client.
Raises:
click.ClickException: Raised if the Airbyte api server is unavailable according to the API response.
click.ClickException: Raised if the Airbyte URL is not reachable.
"""
api_instance = health_api.HealthApi(api_client)
try:
api_response = api_instance.get_health_check()
if not api_response.available:
raise UnhealthyApiError(
"Your Airbyte instance is not ready to receive requests: the health endpoint returned 'available: False.'"
)
except (airbyte_api_client.ApiException, MaxRetryError) as e:
raise UnreachableAirbyteInstanceError(
f"Could not reach your Airbyte instance, make sure the instance is up and running an network reachable: {e}"
)


def check_workspace_exists(api_client: airbyte_api_client.ApiClient, workspace_id: str) -> None:
"""Check if the provided workspace id corresponds to an existing workspace on the Airbyte instance.
Args:
api_client (airbyte_api_client.ApiClient): Airbyte API client.
workspace_id (str): Id of the workspace whose existence we are trying to verify.
Raises:
click.ClickException: Raised if the workspace does not exist on the Airbyte instance.
"""
api_instance = workspace_api.WorkspaceApi(api_client)
try:
api_instance.get_workspace(WorkspaceIdRequestBody(workspace_id=workspace_id), _check_return_type=False)
except airbyte_api_client.ApiException:
raise WorkspaceIdError("The workspace you are trying to use does not exist in your Airbyte instance")


def check_is_initialized(project_directory: str = ".") -> bool:
"""Check if required project directories exist to consider the project as initialized.
Args:
project_directory (str, optional): Where the project should be initialized. Defaults to ".".
Returns:
bool: [description]
"""
sub_directories = [f.name for f in os.scandir(project_directory) if f.is_dir()]
return set(REQUIRED_PROJECT_DIRECTORIES).issubset(sub_directories)
53 changes: 35 additions & 18 deletions octavia-cli/octavia_cli/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,58 @@
import click
from airbyte_api_client.api import workspace_api

from .check_context import check_api_health, check_is_initialized, check_workspace_exists
from .init import commands as init_commands
from .list import commands as list_commands

AVAILABLE_COMMANDS: List[click.Command] = [list_commands._list]
AVAILABLE_COMMANDS: List[click.Command] = [list_commands._list, init_commands.init]


@click.group()
@click.option("--airbyte-url", envvar="AIRBYTE_URL", default="http://localhost:8000", help="The URL of your Airbyte instance.")
@click.option(
"--workspace-id",
envvar="AIRBYTE_WORKSPACE_ID",
default=None,
help="The id of the workspace on which you want octavia-cli to work. Defaults to the first one found on your Airbyte instance.",
)
@click.pass_context
def octavia(ctx: click.Context, airbyte_url: str) -> None:
def octavia(ctx: click.Context, airbyte_url: str, workspace_id: str) -> None:
ctx.ensure_object(dict)
ctx.obj["API_CLIENT"] = get_api_client(airbyte_url)
ctx.obj["WORKSPACE_ID"] = get_workspace_id(ctx.obj["API_CLIENT"], workspace_id)
ctx.obj["PROJECT_IS_INITIALIZED"] = check_is_initialized()
click.echo(
click.style(
f"🐙 - Octavia is targetting your Airbyte instance running at {airbyte_url} on workspace {ctx.obj['WORKSPACE_ID']}.", fg="green"
)
)
if not ctx.obj["PROJECT_IS_INITIALIZED"]:
click.echo(click.style("🐙 - Project is not yet initialized.", fg="red", bold=True))


def get_api_client(airbyte_url):
client_configuration = airbyte_api_client.Configuration(host=f"{airbyte_url}/api")
api_client = airbyte_api_client.ApiClient(client_configuration)
# TODO alafanechere workspace check might deserve its own function
api_instance = workspace_api.WorkspaceApi(api_client)
# open-api-generator consider non-required field as not nullable
# This will break validation of WorkspaceRead object for firstCompletedSync and feedbackDone fields
# This is why we bypass _check_return_type
api_response = api_instance.list_workspaces(_check_return_type=False)
# TODO alafanechere prompt user to chose a workspace if multiple workspaces exist
workspace_id = api_response.workspaces[0]["workspaceId"]
click.echo(f"🐙 - Octavia is targetting your Airbyte instance running at {airbyte_url} on workspace {workspace_id}")
ctx.obj["API_CLIENT"] = api_client
ctx.obj["WORKSPACE_ID"] = workspace_id
check_api_health(api_client)
return api_client


def get_workspace_id(api_client, user_defined_workspace_id):
if user_defined_workspace_id:
check_workspace_exists(api_client, user_defined_workspace_id)
return user_defined_workspace_id
else:
api_instance = workspace_api.WorkspaceApi(api_client)
api_response = api_instance.list_workspaces(_check_return_type=False)
return api_response.workspaces[0]["workspaceId"]


def add_commands_to_octavia():
for command in AVAILABLE_COMMANDS:
octavia.add_command(command)


@octavia.command(help="Scaffolds a local project directories.")
def init():
raise click.ClickException("The init command is not yet implemented.")


@octavia.command(name="import", help="Import an existing resources from the Airbyte instance.")
def _import() -> None:
raise click.ClickException("The import command is not yet implemented.")
Expand Down
3 changes: 3 additions & 0 deletions octavia-cli/octavia_cli/init/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
34 changes: 34 additions & 0 deletions octavia-cli/octavia_cli/init/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os
from typing import Iterable, Tuple

import click

DIRECTORIES_TO_CREATE = {"connections", "destinations", "sources"}


def create_directories(directories_to_create: Iterable[str]) -> Tuple[Iterable[str], Iterable[str]]:
created_directories = []
not_created_directories = []
for directory in directories_to_create:
try:
os.mkdir(directory)
created_directories.append(directory)
except FileExistsError:
not_created_directories.append(directory)
return created_directories, not_created_directories


@click.command(help="Initialize required directories for the project.")
def init():
click.echo("🔨 - Initializing the project.")
created_directories, not_created_directories = create_directories(DIRECTORIES_TO_CREATE)
if created_directories:
message = f"✅ - Created the following directories: {', '.join(created_directories)}."
click.echo(click.style(message, fg="green"))
if not_created_directories:
message = f"❓ - Already existing directories: {', '.join(not_created_directories) }."
click.echo(click.style(message, fg="yellow", bold=True))
85 changes: 85 additions & 0 deletions octavia-cli/unit_tests/test_check_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import os
import shutil
import tempfile
from pathlib import Path

import airbyte_api_client
import pytest
from airbyte_api_client.model.workspace_id_request_body import WorkspaceIdRequestBody
from octavia_cli import check_context
from urllib3.exceptions import MaxRetryError


@pytest.fixture
def mock_api_client(mocker):
return mocker.Mock()


def test_api_check_health_available(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
mock_api_response = mocker.Mock(available=True)
check_context.health_api.HealthApi.return_value.get_health_check.return_value = mock_api_response

assert check_context.check_api_health(mock_api_client) is None
check_context.health_api.HealthApi.assert_called_with(mock_api_client)
api_instance = check_context.health_api.HealthApi.return_value
api_instance.get_health_check.assert_called()


def test_api_check_health_unavailable(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
mock_api_response = mocker.Mock(available=False)
check_context.health_api.HealthApi.return_value.get_health_check.return_value = mock_api_response
with pytest.raises(check_context.UnhealthyApiError):
check_context.check_api_health(mock_api_client)


def test_api_check_health_unreachable_api_exception(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
check_context.health_api.HealthApi.return_value.get_health_check.side_effect = airbyte_api_client.ApiException()
with pytest.raises(check_context.UnreachableAirbyteInstanceError):
check_context.check_api_health(mock_api_client)


def test_api_check_health_unreachable_max_retry_error(mock_api_client, mocker):
mocker.patch.object(check_context, "health_api")
check_context.health_api.HealthApi.return_value.get_health_check.side_effect = MaxRetryError("foo", "bar")
with pytest.raises(check_context.UnreachableAirbyteInstanceError):
check_context.check_api_health(mock_api_client)


def test_check_workspace_exists(mock_api_client, mocker):
mocker.patch.object(check_context, "workspace_api")
mock_api_instance = mocker.Mock()
check_context.workspace_api.WorkspaceApi.return_value = mock_api_instance
assert check_context.check_workspace_exists(mock_api_client, "foo") is None
check_context.workspace_api.WorkspaceApi.assert_called_with(mock_api_client)
mock_api_instance.get_workspace.assert_called_with(WorkspaceIdRequestBody("foo"), _check_return_type=False)


def test_check_workspace_exists_error(mock_api_client, mocker):
mocker.patch.object(check_context, "workspace_api")
check_context.workspace_api.WorkspaceApi.return_value.get_workspace.side_effect = airbyte_api_client.ApiException()
with pytest.raises(check_context.WorkspaceIdError):
check_context.check_workspace_exists(mock_api_client, "foo")


@pytest.fixture
def project_directories():
dirpath = tempfile.mkdtemp()
yield str(Path(dirpath).parent.absolute()), [os.path.basename(dirpath)]
shutil.rmtree(dirpath)


def test_check_is_initialized(mocker, project_directories):
project_directory, sub_directories = project_directories
mocker.patch.object(check_context, "REQUIRED_PROJECT_DIRECTORIES", sub_directories)
assert check_context.check_is_initialized(project_directory)


def test_check_not_initialized():
assert not check_context.check_is_initialized(".")
73 changes: 58 additions & 15 deletions octavia-cli/unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from unittest import mock

import click
import pytest
from click.testing import CliRunner
Expand All @@ -17,25 +15,70 @@ def dumb(ctx):


def test_octavia(mocker):
mocker.patch.object(entrypoint, "workspace_api")
mocker.patch.object(entrypoint, "airbyte_api_client")

mocker.patch.object(entrypoint, "click")
mocker.patch.object(entrypoint, "get_api_client")
mocker.patch.object(entrypoint, "get_workspace_id", mocker.Mock(return_value="api-defined-workspace-id"))
mocker.patch.object(entrypoint, "check_is_initialized", mocker.Mock(return_value=True))
context_object = {}
mock_api_instance = entrypoint.workspace_api.WorkspaceApi.return_value
mock_api_instance.list_workspaces.return_value = mock.MagicMock(workspaces=[{"workspaceId": "expected_workspace_id"}])
entrypoint.octavia.add_command(dumb)
runner = CliRunner()
result = runner.invoke(entrypoint.octavia, ["--airbyte-url", "test-airbyte-url", "dumb"], obj=context_object)
entrypoint.get_api_client.assert_called()
entrypoint.get_workspace_id.assert_called_with(entrypoint.get_api_client.return_value, None)
expected_message = "🐙 - Octavia is targetting your Airbyte instance running at test-airbyte-url on workspace api-defined-workspace-id."
entrypoint.click.style.assert_called_with(expected_message, fg="green")
entrypoint.click.echo.assert_called_with(entrypoint.click.style.return_value)
assert context_object == {
"API_CLIENT": entrypoint.get_api_client.return_value,
"WORKSPACE_ID": entrypoint.get_workspace_id.return_value,
"PROJECT_IS_INITIALIZED": entrypoint.check_is_initialized.return_value,
}
assert result.exit_code == 0


def test_octavia_not_initialized(mocker):
mocker.patch.object(entrypoint, "click")
mocker.patch.object(entrypoint, "get_api_client")
mocker.patch.object(entrypoint, "get_workspace_id", mocker.Mock(return_value="api-defined-workspace-id"))
mocker.patch.object(entrypoint, "check_is_initialized", mocker.Mock(return_value=False))
context_object = {}
entrypoint.octavia.add_command(dumb)
runner = CliRunner()
result = runner.invoke(entrypoint.octavia, ["--airbyte-url", "test-airbyte-url", "dumb"], obj=context_object)
entrypoint.airbyte_api_client.Configuration.assert_called_with(host="test-airbyte-url/api")
entrypoint.airbyte_api_client.ApiClient.assert_called_with(entrypoint.airbyte_api_client.Configuration.return_value)
entrypoint.workspace_api.WorkspaceApi.assert_called_with(entrypoint.airbyte_api_client.ApiClient.return_value)
mock_api_instance.list_workspaces.assert_called_once()
assert context_object["API_CLIENT"] == entrypoint.airbyte_api_client.ApiClient.return_value
assert context_object["WORKSPACE_ID"] == "expected_workspace_id"
entrypoint.click.style.assert_called_with("🐙 - Project is not yet initialized.", fg="red", bold=True)
entrypoint.click.echo.assert_called_with(entrypoint.click.style.return_value)
assert result.exit_code == 0


def test_get_api_client(mocker):
mocker.patch.object(entrypoint, "airbyte_api_client")
mocker.patch.object(entrypoint, "check_api_health")
api_client = entrypoint.get_api_client("test-url")
entrypoint.airbyte_api_client.Configuration.assert_called_with(host="test-url/api")
entrypoint.airbyte_api_client.ApiClient.assert_called_with(entrypoint.airbyte_api_client.Configuration.return_value)
entrypoint.check_api_health.assert_called_with(entrypoint.airbyte_api_client.ApiClient.return_value)
assert api_client == entrypoint.airbyte_api_client.ApiClient.return_value


def test_get_workspace_id_user_defined(mocker):
mock_api_client = mocker.Mock()
mocker.patch.object(entrypoint, "check_workspace_exists")
mocker.patch.object(entrypoint, "workspace_api")
assert entrypoint.get_workspace_id(mock_api_client, "user-defined-workspace-id") == "user-defined-workspace-id"
entrypoint.check_workspace_exists.assert_called_with(mock_api_client, "user-defined-workspace-id")


def test_get_workspace_id_api_defined(mocker):
mock_api_client = mocker.Mock()
mocker.patch.object(entrypoint, "check_workspace_exists")
mocker.patch.object(entrypoint, "workspace_api")
mock_api_instance = entrypoint.workspace_api.WorkspaceApi.return_value
mock_api_instance.list_workspaces.return_value = mocker.Mock(workspaces=[{"workspaceId": "api-defined-workspace-id"}])
assert entrypoint.get_workspace_id(mock_api_client, None) == "api-defined-workspace-id"
entrypoint.workspace_api.WorkspaceApi.assert_called_with(mock_api_client)
mock_api_instance.list_workspaces.assert_called_with(_check_return_type=False)


def test_commands_in_octavia_group():
octavia_commands = entrypoint.octavia.commands.values()
for command in entrypoint.AVAILABLE_COMMANDS:
Expand All @@ -44,7 +87,7 @@ def test_commands_in_octavia_group():

@pytest.mark.parametrize(
"command",
[entrypoint.init, entrypoint.apply, entrypoint.create, entrypoint.delete, entrypoint._import],
[entrypoint.apply, entrypoint.create, entrypoint.delete, entrypoint._import],
)
def test_not_implemented_commands(command):
runner = CliRunner()
Expand All @@ -54,4 +97,4 @@ def test_not_implemented_commands(command):


def test_available_commands():
assert entrypoint.AVAILABLE_COMMANDS == [entrypoint.list_commands._list]
assert entrypoint.AVAILABLE_COMMANDS == [entrypoint.list_commands._list, entrypoint.init_commands.init]
3 changes: 3 additions & 0 deletions octavia-cli/unit_tests/test_init/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

0 comments on commit a73ed08

Please sign in to comment.