Skip to content

Commit

Permalink
wip: adding integration tests for the name strategy and the various t…
Browse files Browse the repository at this point in the history
…ypes of formats
  • Loading branch information
eliax1996 committed Nov 22, 2023
1 parent ffc5550 commit 20b7256
Show file tree
Hide file tree
Showing 5 changed files with 553 additions and 45 deletions.
240 changes: 221 additions & 19 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
from _pytest.fixtures import SubRequest
from aiohttp.pytest_plugin import AiohttpClient
from aiohttp.test_utils import TestClient
from contextlib import closing, ExitStack
from contextlib import asynccontextmanager, closing, ExitStack
from dataclasses import asdict
from filelock import FileLock
from kafka import KafkaProducer
from karapace.client import Client
from karapace.config import Config, set_config_defaults, write_config
from karapace.config import Config, ConfigDefaults, set_config_defaults, write_config
from karapace.dataclasses import default_dataclass
from karapace.kafka_admin import KafkaAdminClient, NewTopic
from karapace.kafka_rest_apis import KafkaRest
from pathlib import Path
Expand All @@ -30,7 +31,7 @@
from tests.integration.utils.synchronization import lock_path_for
from tests.integration.utils.zookeeper import configure_and_start_zk
from tests.utils import repeat_until_successful_request
from typing import AsyncIterator, Iterator, List, Optional
from typing import AsyncContextManager, AsyncIterator, Callable, Iterator, List, Optional
from urllib.parse import urlparse

import asyncio
Expand Down Expand Up @@ -215,13 +216,14 @@ def fixture_admin(kafka_servers: KafkaServers) -> Iterator[KafkaAdminClient]:
yield KafkaAdminClient(bootstrap_servers=kafka_servers.bootstrap_servers)


@pytest.fixture(scope="function", name="rest_async")
async def fixture_rest_async(
@asynccontextmanager
async def _kafka_rest_async(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
custom_values: Optional[ConfigDefaults] = None,
) -> AsyncIterator[Optional[KafkaRest]]:
# Do not start a REST api when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
Expand All @@ -234,14 +236,17 @@ async def fixture_rest_async(

config_path = tmp_path / "karapace_config.json"

config = set_config_defaults(
{
"admin_metadata_max_age": 2,
"bootstrap_uri": kafka_servers.bootstrap_servers,
# Use non-default max request size for REST producer.
"producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES,
}
)
override_values = {
"admin_metadata_max_age": 2,
"bootstrap_uri": kafka_servers.bootstrap_servers,
# Use non-default max request size for REST producer.
"producer_max_request_size": REST_PRODUCER_MAX_REQUEST_BYTES,
}

if custom_values is not None:
override_values.update(custom_values)

config = set_config_defaults(override_values)
write_config(config_path, config)
rest = KafkaRest(config=config)

Expand All @@ -253,8 +258,49 @@ async def fixture_rest_async(
await rest.close()


@pytest.fixture(scope="function", name="rest_async_client")
async def fixture_rest_async_client(
@pytest.fixture(scope="function", name="rest_async")
async def fixture_rest_async(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client: Client,
) -> AsyncIterator[Optional[KafkaRest]]:
async with _kafka_rest_async(
request,
loop,
tmp_path,
kafka_servers,
registry_async_client,
) as kafka_rest_async:
yield kafka_rest_async


@pytest.fixture(scope="function", name="rest_async_from_config")
def fixture_rest_async_from_config(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
tmp_path: Path,
kafka_servers: KafkaServers,
registry_async_client_from_config: Callable[[ConfigDefaults], AsyncIterator[RegistryDescription]],
) -> Callable[[ConfigDefaults], AsyncContextManager[Optional[KafkaRest]]]:
@asynccontextmanager
async def async_kafka_from_custom_config(config: ConfigDefaults) -> KafkaRest:
async with registry_async_client_from_config(config) as registry_async_client:
async with _kafka_rest_async(
request,
loop,
tmp_path,
kafka_servers,
registry_async_client,
) as kafka_rest_async:
yield kafka_rest_async

return async_kafka_from_custom_config


@asynccontextmanager
async def _rest_async_client(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
rest_async: KafkaRest,
Expand Down Expand Up @@ -288,6 +334,43 @@ async def get_client(**kwargs) -> TestClient: # pylint: disable=unused-argument
await client.close()


@pytest.fixture(scope="function", name="rest_async_client")
async def fixture_rest_async_client(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
rest_async: KafkaRest,
aiohttp_client: AiohttpClient,
) -> AsyncIterator[Client]:
async with _rest_async_client(
request,
loop,
rest_async,
aiohttp_client,
) as rest_async_client:
yield rest_async_client


@pytest.fixture(scope="function", name="rest_async_client_from_config")
async def fixture_rest_async_client_from_config(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
rest_async_from_config: Callable[[ConfigDefaults], AsyncContextManager[Optional[KafkaRest]]],
aiohttp_client: AiohttpClient,
) -> Callable[[ConfigDefaults], AsyncContextManager[Client]]:
@asynccontextmanager
async def async_client_from_custom_config(config: ConfigDefaults) -> Client:
async with rest_async_from_config(config) as rest_async:
async with _rest_async_client(
request,
loop,
rest_async,
aiohttp_client,
) as rest_async_client:
yield rest_async_client

return async_client_from_custom_config


@pytest.fixture(scope="function", name="rest_async_novalidation")
async def fixture_rest_async_novalidation(
request: SubRequest,
Expand Down Expand Up @@ -453,13 +536,14 @@ async def fixture_registry_async_pair(
yield [server.endpoint.to_url() for server in endpoints]


@pytest.fixture(scope="function", name="registry_cluster")
async def fixture_registry_cluster(
@asynccontextmanager
async def _registry_cluster(
request: SubRequest,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
custom_values: Optional[ConfigDefaults] = None,
) -> AsyncIterator[RegistryDescription]:
# Do not start a registry when the user provided an external service. Doing
# so would cause this node to join the existing group and participate in
Expand All @@ -477,12 +561,56 @@ async def fixture_registry_cluster(
config_templates=[config],
data_dir=session_logdir / _clear_test_name(request.node.name),
port_range=port_range,
custom_values=custom_values,
) as servers:
yield servers[0]


@pytest.fixture(scope="function", name="registry_async_client")
async def fixture_registry_async_client(
@pytest.fixture(scope="function", name="registry_cluster")
async def fixture_registry_cluster(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
custom_values: Optional[ConfigDefaults] = None,
) -> AsyncIterator[RegistryDescription]:
async with _registry_cluster(
request,
loop,
session_logdir,
kafka_servers,
port_range,
custom_values,
) as registry_description:
yield registry_description


@pytest.fixture(scope="function", name="registry_cluster_from_custom_config")
def fixture_registry_cluster_with_custom_config(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
session_logdir: Path,
kafka_servers: KafkaServers,
port_range: PortRangeInclusive,
) -> Callable[[ConfigDefaults], AsyncContextManager[RegistryDescription]]:
@asynccontextmanager
async def registry_from_custom_config(config: ConfigDefaults) -> RegistryDescription:
async with _registry_cluster(
request,
loop,
session_logdir,
kafka_servers,
port_range,
config,
) as registry_description:
yield registry_description

return registry_from_custom_config


@asynccontextmanager
async def _registry_async_client(
request: SubRequest,
registry_cluster: RegistryDescription,
loop: asyncio.AbstractEventLoop, # pylint: disable=unused-argument
Expand All @@ -508,6 +636,80 @@ async def fixture_registry_async_client(
await client.close()


@pytest.fixture(scope="function", name="registry_async_client")
async def fixture_registry_async_client(
request: SubRequest,
registry_cluster: RegistryDescription,
loop: asyncio.AbstractEventLoop,
) -> Client:
async with _registry_async_client(
request,
registry_cluster,
loop,
) as client:
yield client


@pytest.fixture(scope="function", name="registry_async_client_from_config")
def fixture_registry_async_client_custom_config(
request: SubRequest,
registry_cluster_from_custom_config: Callable[[ConfigDefaults], AsyncIterator[RegistryDescription]],
loop: asyncio.AbstractEventLoop,
) -> Callable[[ConfigDefaults], AsyncContextManager[Client]]:
@asynccontextmanager
async def client_from_custom_config(config: ConfigDefaults) -> Client:
async with registry_cluster_from_custom_config(config) as registry_description:
async with _registry_async_client(request, registry_description, loop) as client:
yield client

return client_from_custom_config


@default_dataclass
class RestClientAndRegistryClient:
registry_client: Client
rest_client: Client


@pytest.fixture(scope="function", name="rest_async_client_and_rest_async_client_from_config")
def fixture_rest_async_client_and_rest_async_client_from_config(
request: SubRequest,
loop: asyncio.AbstractEventLoop,
aiohttp_client: AiohttpClient,
session_logdir: Path,
kafka_servers: KafkaServers,
tmp_path: Path,
port_range: PortRangeInclusive,
) -> Callable[[ConfigDefaults], AsyncContextManager[RestClientAndRegistryClient]]:
@asynccontextmanager
async def client_from_custom_config(config: ConfigDefaults) -> RestClientAndRegistryClient:
# ugly but without python 3.9 we cannot join those :(
async with _registry_cluster(
request,
loop,
session_logdir,
kafka_servers,
port_range,
config,
) as registry_description:
async with _registry_async_client(request, registry_description, loop) as registry_async_client:
async with _kafka_rest_async(
request, loop, tmp_path, kafka_servers, registry_async_client, config
) as kafka_rest_async:
async with _rest_async_client(
request,
loop,
kafka_rest_async,
aiohttp_client,
) as rest_async_client:
yield RestClientAndRegistryClient(
registry_client=registry_async_client,
rest_client=rest_async_client,
)

return client_from_custom_config


@pytest.fixture(scope="function", name="credentials_folder")
def fixture_credentials_folder() -> str:
integration_test_folder = os.path.dirname(__file__)
Expand Down
Loading

0 comments on commit 20b7256

Please sign in to comment.