diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 629418f7a..9770a010b 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -7,7 +7,6 @@ import secrets import string import subprocess -import tempfile import juju.unit import yaml @@ -830,142 +829,6 @@ async def get_cluster_status(unit: Unit, cluster_set: bool | None = False) -> di return result.get("status", {}) -async def delete_file_or_directory_in_unit(ops_test: OpsTest, unit_name: str, path: str) -> bool: - """Delete a file in the provided unit. - - Args: - ops_test: The ops test framework - unit_name: The name unit on which to delete the file from - path: The path of file or directory to delete - - Returns: - boolean indicating success - """ - if path.strip() in ["/", "."]: - return - - try: - return_code, _, _ = await ops_test.juju( - "ssh", unit_name, "sudo", "find", path, "-maxdepth", "1", "-delete" - ) - - return return_code == 0 - except Exception: - return False - - -async def write_content_to_file_in_unit( - ops_test: OpsTest, unit: Unit, path: str, content: str -) -> None: - """Write content to the file in the provided unit. - - Args: - ops_test: The ops test framework - unit: THe unit in which to write to file in - path: The path at which to write the content to - content: The content to write to the file. - """ - with tempfile.NamedTemporaryFile(mode="w") as temp_file: - temp_file.write(content) - temp_file.flush() - - await unit.scp_to(temp_file.name, "/tmp/file") - - return_code, _, _ = await ops_test.juju("ssh", unit.name, "sudo", "mv", "/tmp/file", path) - assert return_code == 0 - - return_code, _, _ = await ops_test.juju( - "ssh", unit.name, "sudo", "chown", "snap_daemon:root", path - ) - assert return_code == 0 - - -async def read_contents_from_file_in_unit(ops_test: OpsTest, unit: Unit, path: str) -> str: - """Read contents from file in the provided unit. - - Args: - ops_test: The ops test framework - unit: The unit in which to read file from - path: The path from which to read content from - - Returns: - the contents of the file - """ - return_code, _, _ = await ops_test.juju("ssh", unit.name, "sudo", "cp", path, "/tmp/file") - assert return_code == 0 - - return_code, _, _ = await ops_test.juju( - "ssh", unit.name, "sudo", "chown", "ubuntu:ubuntu", "/tmp/file" - ) - assert return_code == 0 - - with tempfile.NamedTemporaryFile(mode="r+") as temp_file: - await unit.scp_from("/tmp/file", temp_file.name) - - temp_file.seek(0) - - contents = "" - for line in temp_file: - contents += line - contents += "\n" - - return_code, _, _ = await ops_test.juju("ssh", unit.name, "sudo", "rm", "/tmp/file") - assert return_code == 0 - - return contents - - -async def ls_la_in_unit(ops_test: OpsTest, unit_name: str, directory: str) -> list[str]: - """Returns the output of ls -la in unit. - - Args: - ops_test: The ops test framework - unit_name: The name of unit in which to run ls -la - path: The path from which to run ls -la - - Returns: - a list of files returned by ls -la - """ - return_code, output, _ = await ops_test.juju("ssh", unit_name, "sudo", "ls", "-la", directory) - assert return_code == 0 - - ls_output = output.split("\n")[1:] - - return [ - line.strip("\r") - for line in ls_output - if len(line.strip()) > 0 and line.split()[-1] not in [".", ".."] - ] - - -async def stop_running_flush_mysql_cronjobs(ops_test: OpsTest, unit_name: str) -> None: - """Stop running any logrotate jobs that may have been triggered by cron. - - Args: - ops_test: The ops test object passed into every test case - unit_name: The name of the unit to be tested - """ - # send TERM signal to mysql daemon, which trigger shutdown process - await ops_test.juju( - "ssh", - unit_name, - "sudo", - "pkill", - "-15", - "-f", - "logrotate -f /etc/logrotate.d/flush_mysql_logs", - ) - - # hold execution until process is stopped - try: - for attempt in Retrying(stop=stop_after_attempt(45), wait=wait_fixed(2)): - with attempt: - if await get_process_pid(ops_test, unit_name, "logrotate"): - raise Exception - except RetryError as e: - raise Exception("Failed to stop the flush_mysql_logs logrotate process.") from e - - def get_unit_by_index(app_name: str, units: list, index: int): """Get unit by index. diff --git a/tests/integration/high_availability/high_availability_helpers_new.py b/tests/integration/high_availability/high_availability_helpers_new.py index 7329ca0bc..a5ff3703d 100644 --- a/tests/integration/high_availability/high_availability_helpers_new.py +++ b/tests/integration/high_availability/high_availability_helpers_new.py @@ -3,24 +3,66 @@ # See LICENSE file for licensing details. import json +import os import subprocess from collections.abc import Callable +from pathlib import Path import jubilant_backports from jubilant_backports import Juju from jubilant_backports.statustypes import Status, UnitStatus -from tenacity import Retrying, stop_after_delay, wait_fixed +from tenacity import ( + Retrying, + stop_after_delay, + wait_fixed, +) from constants import SERVER_CONFIG_USERNAME from ..helpers import execute_queries_on_unit MINUTE_SECS = 60 +TEST_DATABASE_NAME = "testing" JujuModelStatusFn = Callable[[Status], bool] JujuAppsStatusFn = Callable[[Status, str], bool] +def _get_juju_keys() -> tuple[str, str]: + """Get Juju public and private keys.""" + config_dir = Path("~/.local/share/juju") + if juju_data := os.getenv("JUJU_DATA"): + config_dir = Path(juju_data) + + return ( + str(config_dir.expanduser().resolve() / "ssh" / "juju_id_rsa.pub"), + str(config_dir.expanduser().resolve() / "ssh" / "juju_id_rsa"), + ) + + +def check_mysql_instances_online(juju: Juju, app_name: str) -> bool: + """Checks whether all MySQL cluster instances are online. + + Args: + juju: The Juju instance + app_name: The name of the application + """ + mysql_app_leader = get_app_leader(juju, app_name) + mysql_app_units = get_app_units(juju, app_name) + + for attempt in Retrying(stop=stop_after_delay(10 * MINUTE_SECS), wait=wait_fixed(10)): + with attempt: + mysql_cluster_status = get_mysql_cluster_status(juju, mysql_app_leader) + mysql_cluster_topology = mysql_cluster_status["defaultreplicaset"]["topology"] + assert len(mysql_cluster_topology) == len(mysql_app_units) + + for member in mysql_cluster_topology.values(): + if member["status"] != "online": + return False + + return True + + async def check_mysql_units_writes_increment( juju: Juju, app_name: str, app_units: list[str] | None = None ) -> None: @@ -77,6 +119,33 @@ def get_app_units(juju: Juju, app_name: str) -> dict[str, UnitStatus]: return app_status.units +def scale_app_units(juju: Juju, app_name: str, num_units: int) -> None: + """Scale a given application to a number of units.""" + app_units = get_app_units(juju, app_name) + app_units_diff = len(app_units) - num_units + + scale_func = None + if app_units_diff > 0: + scale_func = juju.remove_unit + if app_units_diff < 0: + scale_func = juju.add_unit + if app_units_diff == 0: + return + + for _ in range(abs(app_units_diff)): + scale_func(app_name, num_units=1) + + juju.wait( + ready=lambda status: len(status.apps[app_name].units) == num_units, + timeout=20 * MINUTE_SECS, + ) + juju.wait( + ready=wait_for_apps_status(jubilant_backports.all_active, app_name), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) + + def get_unit_by_number(juju: Juju, app_name: str, unit_number: int) -> str: """Get unit by number.""" model_status = juju.status() @@ -109,6 +178,17 @@ def get_unit_info(juju: Juju, unit_name: str) -> dict: return json.loads(output) +def get_unit_process_id(juju: Juju, unit_name: str, process_name: str) -> int | None: + """Return the pid of a process running in a given unit.""" + try: + task = juju.exec(f"pgrep -x {process_name}", unit=unit_name) + task.raise_on_failure() + + return int(task.stdout.strip()) + except Exception: + return None + + def get_unit_status_log(juju: Juju, unit_name: str, log_lines: int = 0) -> list[dict]: """Get the status log for a unit. @@ -126,6 +206,54 @@ def get_unit_status_log(juju: Juju, unit_name: str, log_lines: int = 0) -> list[ return json.loads(output) +# TODO: +# Rely on Jubilant scp command once they make it easier +# for package users to deal with temporal directories +# https://github.com/canonical/jubilant/issues/201 +def scp_unit_file_from( + juju: Juju, app_name: str, unit_name: str, source_path: str, target_path: str +) -> None: + """Copy a file from a given unit.""" + juju_keys = _get_juju_keys() + unit_address = get_unit_ip(juju, app_name, unit_name) + unit_username = "ubuntu" + + subprocess.check_output([ + "scp", + "-B", + "-o", + "StrictHostKeyChecking=no", + "-i", + juju_keys[1], + f"{unit_username}@{unit_address}:{source_path}", + f"{target_path}", + ]) + + +# TODO: +# Rely on Jubilant scp command once they make it easier +# for package users to deal with temporal directories +# https://github.com/canonical/jubilant/issues/201 +def scp_unit_file_into( + juju: Juju, app_name: str, unit_name: str, source_path: str, target_path: str +) -> None: + """Copy a file into a given unit.""" + juju_keys = _get_juju_keys() + unit_address = get_unit_ip(juju, app_name, unit_name) + unit_username = "ubuntu" + + subprocess.check_output([ + "scp", + "-B", + "-o", + "StrictHostKeyChecking=no", + "-i", + juju_keys[1], + f"{source_path}", + f"{unit_username}@{unit_address}:{target_path}", + ]) + + def get_relation_data(juju: Juju, app_name: str, rel_name: str) -> list[dict]: """Returns a list that contains the relation-data. @@ -244,6 +372,111 @@ async def get_mysql_variable_value( return output[0] +async def insert_mysql_test_data(juju: Juju, app_name: str, table_name: str, value: str) -> None: + """Insert data into the MySQL database. + + Args: + juju: The Juju model. + app_name: The application name. + table_name: The database table name. + value: The value to insert. + """ + mysql_leader = get_app_leader(juju, app_name) + mysql_primary = get_mysql_primary_unit(juju, app_name) + + credentials_task = juju.run( + unit=mysql_leader, + action="get-password", + params={"username": SERVER_CONFIG_USERNAME}, + ) + credentials_task.raise_on_failure() + + insert_queries = [ + f"CREATE DATABASE IF NOT EXISTS `{TEST_DATABASE_NAME}`", + f"CREATE TABLE IF NOT EXISTS `{TEST_DATABASE_NAME}`.`{table_name}` (id VARCHAR(255), PRIMARY KEY (id))", + f"INSERT INTO `{TEST_DATABASE_NAME}`.`{table_name}` (id) VALUES ('{value}')", + ] + + await execute_queries_on_unit( + get_unit_ip(juju, app_name, mysql_primary), + credentials_task.results["username"], + credentials_task.results["password"], + insert_queries, + commit=True, + ) + + +async def remove_mysql_test_data(juju: Juju, app_name: str, table_name: str) -> None: + """Remove data into the MySQL database. + + Args: + juju: The Juju model. + app_name: The application name. + table_name: The database table name. + """ + mysql_leader = get_app_leader(juju, app_name) + mysql_primary = get_mysql_primary_unit(juju, app_name) + + credentials_task = juju.run( + unit=mysql_leader, + action="get-password", + params={"username": SERVER_CONFIG_USERNAME}, + ) + credentials_task.raise_on_failure() + + remove_queries = [ + f"DROP TABLE IF EXISTS `{TEST_DATABASE_NAME}`.`{table_name}`", + f"DROP DATABASE IF EXISTS `{TEST_DATABASE_NAME}`", + ] + + await execute_queries_on_unit( + get_unit_ip(juju, app_name, mysql_primary), + credentials_task.results["username"], + credentials_task.results["password"], + remove_queries, + commit=True, + ) + + +async def verify_mysql_test_data(juju: Juju, app_name: str, table_name: str, value: str) -> None: + """Verifies data into the MySQL database. + + Args: + juju: The Juju model. + app_name: The application name. + table_name: The database table name. + value: The value to check against. + """ + mysql_app_leader = get_app_leader(juju, app_name) + mysql_app_units = get_app_units(juju, app_name) + + credentials_task = juju.run( + unit=mysql_app_leader, + action="get-password", + params={"username": SERVER_CONFIG_USERNAME}, + ) + credentials_task.raise_on_failure() + + select_queries = [ + f"SELECT id FROM `{TEST_DATABASE_NAME}`.`{table_name}` WHERE id = '{value}'", + ] + + for unit_name in mysql_app_units: + for attempt in Retrying( + reraise=True, + stop=stop_after_delay(5 * MINUTE_SECS), + wait=wait_fixed(10), + ): + with attempt: + output = await execute_queries_on_unit( + get_unit_ip(juju, app_name, unit_name), + credentials_task.results["username"], + credentials_task.results["password"], + select_queries, + ) + assert output[0] == value + + def wait_for_apps_status(jubilant_status_func: JujuAppsStatusFn, *apps: str) -> JujuModelStatusFn: """Waits for Juju agents to be idle, and for applications to reach a certain status. diff --git a/tests/integration/high_availability/test_replication_data_consistency.py b/tests/integration/high_availability/test_replication_data_consistency.py index da6161156..dfa3138c3 100644 --- a/tests/integration/high_availability/test_replication_data_consistency.py +++ b/tests/integration/high_availability/test_replication_data_consistency.py @@ -1,41 +1,72 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. - import logging -from pathlib import Path +import jubilant_backports import pytest -import yaml -from pytest_operator.plugin import OpsTest - -from .high_availability_helpers import ( - clean_up_database_and_table, - ensure_all_units_continuous_writes_incrementing, - get_application_name, - insert_data_into_mysql_and_validate_replication, +from jubilant_backports import Juju + +from ..helpers import generate_random_string +from .high_availability_helpers_new import ( + check_mysql_units_writes_increment, + insert_mysql_test_data, + remove_mysql_test_data, + verify_mysql_test_data, + wait_for_apps_status, ) -logger = logging.getLogger(__name__) +MYSQL_APP_NAME = "mysql" +MYSQL_TEST_APP_NAME = "mysql-test-app" + +MINUTE_SECS = 60 -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ANOTHER_APP_NAME = f"second{APP_NAME}" -TIMEOUT = 17 * 60 +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) @pytest.mark.abort_on_fail -async def test_consistent_data_replication_across_cluster( - ops_test: OpsTest, highly_available_cluster -) -> None: - """Confirm that data is replicated from the primary node to all the replicas.""" - mysql_application_name = get_application_name(ops_test, "mysql") +def test_deploy_highly_available_cluster(juju: Juju, charm: str) -> None: + """Simple test to ensure that the MySQL and application charms get deployed.""" + logging.info("Deploying MySQL cluster") + juju.deploy( + charm=charm, + app=MYSQL_APP_NAME, + base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=3, + ) + juju.deploy( + charm=MYSQL_TEST_APP_NAME, + app=MYSQL_TEST_APP_NAME, + base="ubuntu@22.04", + channel="latest/edge", + config={"sleep_interval": 500}, + num_units=1, + ) + + juju.integrate( + f"{MYSQL_APP_NAME}:database", + f"{MYSQL_TEST_APP_NAME}:database", + ) - # assert that there are 3 units in the mysql cluster - assert len(ops_test.model.applications[mysql_application_name].units) == 3 + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant_backports.all_active, MYSQL_APP_NAME, MYSQL_TEST_APP_NAME + ), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) + + +@pytest.mark.abort_on_fail +async def test_consistent_data_replication_across_cluster(juju: Juju) -> None: + """Confirm that data is replicated from the primary node to all the replicas.""" + table_name = "data" + table_value = generate_random_string(255) - database_name, table_name = "test-check-consistency", "data" - await insert_data_into_mysql_and_validate_replication(ops_test, database_name, table_name) - await clean_up_database_and_table(ops_test, database_name, table_name) + await insert_mysql_test_data(juju, MYSQL_APP_NAME, table_name, table_value) + await verify_mysql_test_data(juju, MYSQL_APP_NAME, table_name, table_value) + await remove_mysql_test_data(juju, MYSQL_APP_NAME, table_name) - await ensure_all_units_continuous_writes_incrementing(ops_test) + await check_mysql_units_writes_increment(juju, MYSQL_APP_NAME) diff --git a/tests/integration/high_availability/test_replication_data_isolation.py b/tests/integration/high_availability/test_replication_data_isolation.py index 0b87e7509..1e2877610 100644 --- a/tests/integration/high_availability/test_replication_data_isolation.py +++ b/tests/integration/high_availability/test_replication_data_isolation.py @@ -1,103 +1,94 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. - import logging -from pathlib import Path -import yaml -from pytest_operator.plugin import OpsTest +import jubilant_backports +import pytest +from jubilant_backports import Juju -from ..helpers import ( - execute_queries_on_unit, - get_primary_unit, - get_server_config_credentials, -) -from .high_availability_helpers import ( - get_application_name, +from .high_availability_helpers_new import ( + insert_mysql_test_data, + remove_mysql_test_data, + verify_mysql_test_data, + wait_for_apps_status, ) -logger = logging.getLogger(__name__) +MYSQL_APP_NAME = "mysql" +MYSQL_TEST_APP_NAME = "mysql-test-app" + +MINUTE_SECS = 60 -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ANOTHER_APP_NAME = f"second{APP_NAME}" -TIMEOUT = 17 * 60 +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) -async def test_cluster_isolation(ops_test: OpsTest, charm, highly_available_cluster) -> None: +@pytest.mark.abort_on_fail +def test_deploy_highly_available_cluster(juju: Juju, charm: str) -> None: + """Simple test to ensure that the MySQL and application charms get deployed.""" + logging.info("Deploying MySQL cluster") + juju.deploy( + charm=charm, + app=MYSQL_APP_NAME, + base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=3, + ) + juju.deploy( + charm=MYSQL_TEST_APP_NAME, + app=MYSQL_TEST_APP_NAME, + base="ubuntu@22.04", + channel="latest/edge", + config={"sleep_interval": 500}, + num_units=1, + ) + + juju.integrate( + f"{MYSQL_APP_NAME}:database", + f"{MYSQL_TEST_APP_NAME}:database", + ) + + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant_backports.all_active, MYSQL_APP_NAME, MYSQL_TEST_APP_NAME + ), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) + + +async def test_cluster_data_isolation(juju: Juju, charm: str) -> None: """Test for cluster data isolation. This test creates a new cluster, create a new table on both cluster, write a single record with the application name for each cluster, retrieve and compare these records, asserting they are not the same. """ - app = get_application_name(ops_test, "mysql") - apps = [app, ANOTHER_APP_NAME] + mysql_main_app_name = f"{MYSQL_APP_NAME}" + mysql_other_app_name = f"{MYSQL_APP_NAME}-other" - await ops_test.model.deploy( - charm, - application_name=ANOTHER_APP_NAME, - num_units=1, + juju.deploy( + charm=charm, + app=mysql_other_app_name, base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=1, ) - async with ops_test.fast_forward("60s"): - await ops_test.model.block_until( - lambda: len(ops_test.model.applications[ANOTHER_APP_NAME].units) == 1 - ) - await ops_test.model.wait_for_idle( - apps=[ANOTHER_APP_NAME], - status="active", - raise_on_blocked=True, - timeout=TIMEOUT, - ) - - # retrieve connection data for each cluster - connection_data = {} - for application in apps: - random_unit = ops_test.model.applications[application].units[0] - server_config_credentials = await get_server_config_credentials(random_unit) - primary_unit = await get_primary_unit(ops_test, random_unit, application) - - primary_unit_address = await primary_unit.get_public_address() - - connection_data[application] = { - "host": primary_unit_address, - "username": server_config_credentials["username"], - "password": server_config_credentials["password"], - } - - # write single distinct record to each cluster - for application in apps: - create_records_sql = [ - "CREATE DATABASE IF NOT EXISTS test", - "DROP TABLE IF EXISTS test.cluster_isolation_table", - "CREATE TABLE test.cluster_isolation_table (id varchar(40), primary key(id))", - f"INSERT INTO test.cluster_isolation_table VALUES ('{application}')", - ] - - await execute_queries_on_unit( - connection_data[application]["host"], - connection_data[application]["username"], - connection_data[application]["password"], - create_records_sql, - commit=True, - ) - - result = [] - # read single record from each cluster - for application in apps: - read_records_sql = ["SELECT id FROM test.cluster_isolation_table"] - - output = await execute_queries_on_unit( - connection_data[application]["host"], - connection_data[application]["username"], - connection_data[application]["password"], - read_records_sql, - commit=False, - ) - - assert len(output) == 1, "Just one record must exist on the test table" - result.append(output[0]) - - assert result[0] != result[1], "Writes from one cluster are replicated to another cluster." + + logging.info("Wait for application to become active") + juju.wait( + ready=wait_for_apps_status(jubilant_backports.all_active, mysql_other_app_name), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) + + table_name = "cluster_isolation_table" + + for app_name in (mysql_main_app_name, mysql_other_app_name): + await insert_mysql_test_data(juju, app_name, table_name, f"{app_name}-value") + for app_name in (mysql_main_app_name, mysql_other_app_name): + await verify_mysql_test_data(juju, app_name, table_name, f"{app_name}-value") + for app_name in (mysql_main_app_name, mysql_other_app_name): + await remove_mysql_test_data(juju, app_name, table_name) + + juju.remove_application(mysql_other_app_name, destroy_storage=True, force=True) diff --git a/tests/integration/high_availability/test_replication_logs_rotation.py b/tests/integration/high_availability/test_replication_logs_rotation.py index 0913bee92..3886327b0 100644 --- a/tests/integration/high_availability/test_replication_logs_rotation.py +++ b/tests/integration/high_availability/test_replication_logs_rotation.py @@ -1,115 +1,258 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. - import logging -from pathlib import Path +import tempfile +import jubilant_backports import pytest -import yaml -from pytest_operator.plugin import OpsTest +from jubilant_backports import Juju +from tenacity import ( + Retrying, + stop_after_attempt, + wait_fixed, +) from constants import CHARMED_MYSQL_COMMON_DIRECTORY -from ..helpers import ( - delete_file_or_directory_in_unit, - ls_la_in_unit, - read_contents_from_file_in_unit, - stop_running_flush_mysql_cronjobs, - write_content_to_file_in_unit, -) -from .high_availability_helpers import ( - get_application_name, +from .high_availability_helpers_new import ( + get_app_leader, + get_unit_process_id, + scp_unit_file_from, + scp_unit_file_into, + wait_for_apps_status, ) -logger = logging.getLogger(__name__) +MYSQL_APP_NAME = "mysql" +MYSQL_TEST_APP_NAME = "mysql-test-app" + +MINUTE_SECS = 60 -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ANOTHER_APP_NAME = f"second{APP_NAME}" -TIMEOUT = 17 * 60 +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) @pytest.mark.abort_on_fail -async def test_log_rotation(ops_test: OpsTest, highly_available_cluster) -> None: - """Test the log rotation of text files.""" - app = get_application_name(ops_test, "mysql") - unit = ops_test.model.applications[app].units[0] +def test_deploy_highly_available_cluster(juju: Juju, charm: str) -> None: + """Simple test to ensure that the MySQL and application charms get deployed.""" + logging.info("Deploying MySQL cluster") + juju.deploy( + charm=charm, + app=MYSQL_APP_NAME, + base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=3, + ) + juju.deploy( + charm=MYSQL_TEST_APP_NAME, + app=MYSQL_TEST_APP_NAME, + base="ubuntu@22.04", + channel="latest/edge", + config={"sleep_interval": 500}, + num_units=1, + ) + juju.integrate( + f"{MYSQL_APP_NAME}:database", + f"{MYSQL_TEST_APP_NAME}:database", + ) + + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant_backports.all_active, MYSQL_APP_NAME, MYSQL_TEST_APP_NAME + ), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) + + +@pytest.mark.abort_on_fail +def test_log_rotation(juju: Juju) -> None: + """Test the log rotation of text files.""" log_types = ["error", "audit"] log_files = ["error.log", "audit.log"] - archive_directories = [ - "archive_error", - "archive_audit", - ] + archive_dirs = ["archive_error", "archive_audit"] + + mysql_app_leader = get_app_leader(juju, MYSQL_APP_NAME) + mysql_logs_path = f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql" + + logging.info("Removing the cron file") + delete_unit_file(juju, mysql_app_leader, "/etc/cron.d/flush_mysql_logs") + + logging.info("Stopping any running logrotate jobs") + stop_unit_flush_logs_job(juju, mysql_app_leader) - logger.info("Removing the cron file") - await delete_file_or_directory_in_unit(ops_test, unit.name, "/etc/cron.d/flush_mysql_logs") + for log_type in log_types: + archive_log_dir = f"{mysql_logs_path}/archive_{log_type}" - logger.info("Stopping any running logrotate jobs") - await stop_running_flush_mysql_cronjobs(ops_test, unit.name) + logging.info("Removing existing archive directories") + delete_unit_file(juju, mysql_app_leader, archive_log_dir) - logger.info("Removing existing archive directories") - for archive_directory in archive_directories: - await delete_file_or_directory_in_unit( - ops_test, - unit.name, - f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/{archive_directory}/", + logging.info("Writing some data to the text log files") + write_unit_file( + juju=juju, + app_name=MYSQL_APP_NAME, + unit_name=mysql_app_leader, + file_path=f"{mysql_logs_path}/{log_type}.log", + file_data=f"{log_type} content", ) - logger.info("Writing some data to the text log files") - for log in log_types: - log_path = f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/{log}.log" - await write_content_to_file_in_unit(ops_test, unit, log_path, f"test {log} content\n") + logging.info("Ensuring only log files exist") + log_files_listed = list_unit_files(juju, mysql_app_leader, mysql_logs_path) + log_dirs_listed = [line.split()[-1] for line in log_files_listed] - logger.info("Ensuring only log files exist") - ls_la_output = await ls_la_in_unit( - ops_test, unit.name, f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/" - ) + assert len(log_files_listed) == len(log_files) + assert sorted(log_dirs_listed) == sorted(log_files) - assert len(ls_la_output) == len(log_files), ( - f"❌ files other than log files exist {ls_la_output}" - ) - directories = [line.split()[-1] for line in ls_la_output] - assert sorted(directories) == sorted(log_files), ( - f"❌ file other than logs files exist: {ls_la_output}" - ) + logging.info("Executing logrotate") + start_unit_flush_logs_job(juju, mysql_app_leader) - logger.info("Executing logrotate") - return_code, stdout, _ = await ops_test.juju( - "ssh", unit.name, "sudo", "logrotate", "-f", "/etc/logrotate.d/flush_mysql_logs" - ) - assert return_code == 0, f"❌ logrotate exited with code {return_code} and stdout {stdout}" + logging.info("Ensuring log files and archive directories exist") + log_files_listed = list_unit_files(juju, mysql_app_leader, mysql_logs_path) + log_dirs_listed = [line.split()[-1] for line in log_files_listed] - logger.info("Ensuring log files and archive directories exist") - ls_la_output = await ls_la_in_unit( - ops_test, unit.name, f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/" - ) + assert len(log_files_listed) == len(log_files + archive_dirs) + assert sorted(log_dirs_listed) == sorted(log_files + archive_dirs) - assert len(ls_la_output) == len(log_files + archive_directories), ( - f"❌ unexpected files/directories in log directory: {ls_la_output}" - ) - directories = [line.split()[-1] for line in ls_la_output] - assert sorted(directories) == sorted(log_files + archive_directories), ( - f"❌ unexpected files/directories in log directory: {ls_la_output}" - ) + logging.info("Ensuring log files were rotated") + for log_type in log_types: + active_log_file_data = read_unit_file( + juju=juju, + app_name=MYSQL_APP_NAME, + unit_name=mysql_app_leader, + file_path=f"{mysql_logs_path}/{log_type}.log", + ) + assert f"{log_type} content" not in active_log_file_data + + archive_log_dir = f"{mysql_logs_path}/archive_{log_type}" + archive_log_files_listed = list_unit_files(juju, mysql_app_leader, archive_log_dir) + + assert len(archive_log_files_listed) == 1 - logger.info("Ensuring log files were rotated") - for log in set(log_types): - file_contents = await read_contents_from_file_in_unit( - ops_test, unit, f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/{log}.log" + archive_log_file_name = archive_log_files_listed[0].split()[-1] + archive_log_file_data = read_unit_file( + juju=juju, + app_name=MYSQL_APP_NAME, + unit_name=mysql_app_leader, + file_path=f"{archive_log_dir}/{archive_log_file_name}", ) - assert f"test {log} content" not in file_contents, f"❌ log file {log}.log not rotated" + assert f"{log_type} content" in archive_log_file_data - ls_la_output = await ls_la_in_unit( - ops_test, unit.name, f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/archive_{log}/" + +def delete_unit_file(juju: Juju, unit_name: str, file_path: str) -> None: + """Delete a path in the provided unit. + + Args: + juju: The Juju instance + unit_name: The unit on which to delete the file + file_path: The path or file to delete + """ + if file_path.strip() in ["/", "."]: + return + + task = juju.exec(f"sudo find {file_path} -maxdepth 1 -delete", unit=unit_name) + task.raise_on_failure() + + +def list_unit_files(juju: Juju, unit_name: str, file_path: str) -> list[str]: + """Returns the list of files in the given path. + + Args: + juju: The Juju instance + unit_name: The unit in which to list the files + file_path: The path at which to list the files + """ + task = juju.exec(f"sudo ls -la {file_path}", unit=unit_name) + task.raise_on_failure() + + output = task.stdout.split("\n")[1:] + + return [ + line.strip("\r") + for line in output + if len(line.strip()) > 0 and line.split()[-1] not in [".", ".."] + ] + + +def read_unit_file(juju: Juju, app_name: str, unit_name: str, file_path: str) -> str: + """Read contents from file in the provided unit. + + Args: + juju: The Juju instance + app_name: The name of the app + unit_name: The name of the unit to read the file from + file_path: The path of the unit to read the file + """ + temp_path = "/tmp/file" + + juju.exec(f"sudo cp {file_path} {temp_path}", unit=unit_name) + juju.exec(f"sudo chown ubuntu:ubuntu {temp_path}", unit=unit_name) + + with tempfile.NamedTemporaryFile(mode="r+") as temp_file: + scp_unit_file_from( + juju=juju, + app_name=app_name, + unit_name=unit_name, + source_path=temp_path, + target_path=temp_file.name, ) - assert len(ls_la_output) == 1, f"❌ more than 1 file in archive directory: {ls_la_output}" + contents = temp_file.read() - filename = ls_la_output[0].split()[-1] - file_contents = await read_contents_from_file_in_unit( - ops_test, - unit, - f"{CHARMED_MYSQL_COMMON_DIRECTORY}/var/log/mysql/archive_{log}/{filename}", + juju.exec(f"sudo rm {temp_path}", unit=unit_name) + return contents + + +def write_unit_file(juju: Juju, app_name: str, unit_name: str, file_path: str, file_data: str): + """Write content to the file in the provided unit. + + Args: + juju: The Juju instance + app_name: The name of the app + unit_name: The name of the unit to write the file into + file_path: The path of the unit to write the file + file_data: The data to write to the file. + """ + temp_path = "/tmp/file" + + with tempfile.NamedTemporaryFile(mode="w") as temp_file: + temp_file.write(file_data) + temp_file.flush() + + scp_unit_file_into( + juju=juju, + app_name=app_name, + unit_name=unit_name, + source_path=temp_file.name, + target_path=temp_path, ) - assert f"test {log} content" in file_contents, f"❌ log file {log}.log not rotated" + + juju.exec(f"sudo mv {temp_path} {file_path}", unit=unit_name) + juju.exec(f"sudo chown snap_daemon:root {file_path}", unit=unit_name) + + +def start_unit_flush_logs_job(juju: Juju, unit_name: str) -> None: + """Start running the logrotate job.""" + # TODO: + # Rely on Jubilant exec command once they fix it + # https://github.com/canonical/jubilant/issues/206 + juju.ssh( + command="sudo logrotate -f /etc/logrotate.d/flush_mysql_logs", + target=unit_name, + ) + + +def stop_unit_flush_logs_job(juju: Juju, unit_name: str) -> None: + """Stop running any logrotate jobs that may have been triggered by cron.""" + # TODO: + # Rely on Jubilant exec command once they fix it + # https://github.com/canonical/jubilant/issues/206 + juju.ssh( + command="sudo pkill -f 'logrotate -f /etc/logrotate.d/flush_mysql_logs' --signal SIGTERM", + target=unit_name, + ) + + # Hold execution until process is stopped + for attempt in Retrying(stop=stop_after_attempt(45), wait=wait_fixed(2)): + with attempt: + if get_unit_process_id(juju, unit_name, "logrotate") is not None: + raise Exception("Failed to stop the flush_mysql_logs logrotate process") diff --git a/tests/integration/high_availability/test_replication_reelection.py b/tests/integration/high_availability/test_replication_reelection.py index 3afa2c5a7..9ad9f404e 100644 --- a/tests/integration/high_availability/test_replication_reelection.py +++ b/tests/integration/high_availability/test_replication_reelection.py @@ -1,76 +1,100 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. - import logging -from pathlib import Path +import jubilant_backports import pytest -import yaml -from pytest_operator.plugin import OpsTest - -from ..helpers import ( - get_primary_unit_wrapper, - scale_application, -) -from .high_availability_helpers import ( - clean_up_database_and_table, - ensure_all_units_continuous_writes_incrementing, - ensure_n_online_mysql_members, - get_application_name, - insert_data_into_mysql_and_validate_replication, +from jubilant_backports import Juju + +from ..helpers import generate_random_string +from .high_availability_helpers_new import ( + check_mysql_instances_online, + check_mysql_units_writes_increment, + get_mysql_primary_unit, + insert_mysql_test_data, + remove_mysql_test_data, + scale_app_units, + wait_for_apps_status, ) -logger = logging.getLogger(__name__) - -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ANOTHER_APP_NAME = f"second{APP_NAME}" -TIMEOUT = 17 * 60 - +MYSQL_APP_NAME = "mysql" +MYSQL_TEST_APP_NAME = "mysql-test-app" -@pytest.mark.abort_on_fail -async def test_kill_primary_check_reelection(ops_test: OpsTest, highly_available_cluster) -> None: - """Confirm that a new primary is elected when the current primary is torn down.""" - mysql_application_name = get_application_name(ops_test, "mysql") - application = ops_test.model.applications[mysql_application_name] - - await ensure_all_units_continuous_writes_incrementing(ops_test) +MINUTE_SECS = 60 - primary_unit = await get_primary_unit_wrapper(ops_test, mysql_application_name) - primary_unit_name = primary_unit.name +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) - # Destroy the primary unit and block to ensure that the - # juju status changed from active - logger.info("Destroying leader unit") - await ops_test.model.destroy_units(primary_unit.name) - async with ops_test.fast_forward("60s"): - await ops_test.model.block_until(lambda: len(application.units) == 2) - await ops_test.model.wait_for_idle( - apps=[mysql_application_name], - status="active", - raise_on_blocked=True, - timeout=TIMEOUT, - ) +@pytest.mark.abort_on_fail +def test_deploy_highly_available_cluster(juju: Juju, charm: str) -> None: + """Simple test to ensure that the MySQL and application charms get deployed.""" + logging.info("Deploying MySQL cluster") + juju.deploy( + charm=charm, + app=MYSQL_APP_NAME, + base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=3, + ) + juju.deploy( + charm=MYSQL_TEST_APP_NAME, + app=MYSQL_TEST_APP_NAME, + base="ubuntu@22.04", + channel="latest/edge", + config={"sleep_interval": 500}, + num_units=1, + ) + + juju.integrate( + f"{MYSQL_APP_NAME}:database", + f"{MYSQL_TEST_APP_NAME}:database", + ) + + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant_backports.all_active, MYSQL_APP_NAME, MYSQL_TEST_APP_NAME + ), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) - # Wait for unit to be destroyed and confirm that the new primary unit is different - new_primary_unit = await get_primary_unit_wrapper(ops_test, mysql_application_name) - assert primary_unit_name != new_primary_unit.name, "Primary has not changed" +@pytest.mark.abort_on_fail +async def test_kill_primary_check_reelection(juju: Juju) -> None: + """Confirm that a new primary is elected when the current primary is tear down.""" + await check_mysql_units_writes_increment(juju, MYSQL_APP_NAME) + + mysql_old_primary = get_mysql_primary_unit(juju, MYSQL_APP_NAME) + + # Destroy the primary unit to ensure that the juju status changed from active + logging.info("Killing the primary unit") + juju.remove_unit(mysql_old_primary) + + juju.wait( + ready=lambda status: len(status.apps[MYSQL_APP_NAME].units) == 2, + timeout=20 * MINUTE_SECS, + ) + juju.wait( + ready=wait_for_apps_status(jubilant_backports.all_active, MYSQL_APP_NAME), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) + + # Confirm that the new primary unit is different + mysql_new_primary = get_mysql_primary_unit(juju, MYSQL_APP_NAME) + assert mysql_new_primary != mysql_old_primary, "Primary has not changed" # Add the unit back and wait until it is active - async with ops_test.fast_forward("60s"): - logger.info("Scaling back to 3 units") - await scale_application(ops_test, mysql_application_name, 3) - - # wait (and retry) until the killed pod is back online in the mysql cluster - assert await ensure_n_online_mysql_members(ops_test, 3), ( - "Old primary has not come back online after being killed" - ) + logging.info("Scaling back to 3 units") + scale_app_units(juju, MYSQL_APP_NAME, 3) - await ensure_all_units_continuous_writes_incrementing(ops_test) + # Retry until the killed pod is back online in the mysql cluster + assert check_mysql_instances_online(juju, MYSQL_APP_NAME) - database_name, table_name = "test-kill-primary-check-reelection", "data" - await insert_data_into_mysql_and_validate_replication(ops_test, database_name, table_name) - await clean_up_database_and_table(ops_test, database_name, table_name) + table_name = "data" + table_value = generate_random_string(255) + await check_mysql_units_writes_increment(juju, MYSQL_APP_NAME) + await insert_mysql_test_data(juju, MYSQL_APP_NAME, table_name, table_value) + await remove_mysql_test_data(juju, MYSQL_APP_NAME, table_name) diff --git a/tests/integration/high_availability/test_replication_scaling.py b/tests/integration/high_availability/test_replication_scaling.py index 1b0141b7e..136909b2b 100644 --- a/tests/integration/high_availability/test_replication_scaling.py +++ b/tests/integration/high_availability/test_replication_scaling.py @@ -1,105 +1,92 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. - import logging -from pathlib import Path +import jubilant_backports import pytest -import yaml -from pytest_operator.plugin import OpsTest - -from ..helpers import ( - execute_queries_on_unit, - generate_random_string, - get_primary_unit_wrapper, - get_server_config_credentials, - scale_application, -) -from .high_availability_helpers import ( - get_application_name, +from jubilant_backports import Juju + +from ..helpers import generate_random_string +from .high_availability_helpers_new import ( + get_app_units, + insert_mysql_test_data, + remove_mysql_test_data, + scale_app_units, + verify_mysql_test_data, + wait_for_apps_status, ) -logger = logging.getLogger(__name__) +MYSQL_APP_NAME = "mysql" +MYSQL_TEST_APP_NAME = "mysql-test-app" -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ANOTHER_APP_NAME = f"second{APP_NAME}" -TIMEOUT = 17 * 60 +MINUTE_SECS = 60 +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) -@pytest.mark.abort_on_fail -async def test_scaling_without_data_loss(ops_test: OpsTest, highly_available_cluster) -> None: - """Test that data is preserved during scale up and scale down.""" - # Insert values into test table from the primary unit - app = get_application_name(ops_test, "mysql") - application = ops_test.model.applications[app] - random_unit = application.units[0] - server_config_credentials = await get_server_config_credentials(random_unit) +@pytest.mark.abort_on_fail +def test_deploy_highly_available_cluster(juju: Juju, charm: str) -> None: + """Simple test to ensure that the MySQL and application charms get deployed.""" + logging.info("Deploying MySQL cluster") + juju.deploy( + charm=charm, + app=MYSQL_APP_NAME, + base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=3, + ) + juju.deploy( + charm=MYSQL_TEST_APP_NAME, + app=MYSQL_TEST_APP_NAME, + base="ubuntu@22.04", + channel="latest/edge", + config={"sleep_interval": 500}, + num_units=1, + ) - primary_unit = await get_primary_unit_wrapper( - ops_test, - app, + juju.integrate( + f"{MYSQL_APP_NAME}:database", + f"{MYSQL_TEST_APP_NAME}:database", ) - primary_unit_address = await primary_unit.get_public_address() - - random_chars = generate_random_string(40) - create_records_sql = [ - "CREATE DATABASE IF NOT EXISTS test", - "CREATE TABLE IF NOT EXISTS test.instance_state_replication (id varchar(40), primary key(id))", - f"INSERT INTO test.instance_state_replication VALUES ('{random_chars}')", - ] - - await execute_queries_on_unit( - primary_unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - create_records_sql, - commit=True, + + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant_backports.all_active, MYSQL_APP_NAME, MYSQL_TEST_APP_NAME + ), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, ) - old_unit_names = [unit.name for unit in ops_test.model.applications[app].units] - # Add a unit and wait until it is active - async with ops_test.fast_forward("60s"): - await scale_application(ops_test, app, 4) +@pytest.mark.abort_on_fail +async def test_scaling_without_data_loss(juju: Juju) -> None: + """Test that data is preserved during scale up and scale down.""" + table_name = "instance_state_replication" + table_value = generate_random_string(255) + + await insert_mysql_test_data(juju, MYSQL_APP_NAME, table_name, table_value) - added_unit = next(unit for unit in application.units if unit.name not in old_unit_names) + mysql_app_old_units = set(get_app_units(juju, MYSQL_APP_NAME)) + scale_app_units(juju, MYSQL_APP_NAME, 4) + mysql_app_new_units = set(get_app_units(juju, MYSQL_APP_NAME)) # Ensure that all units have the above inserted data - select_data_sql = [ - f"SELECT * FROM test.instance_state_replication WHERE id = '{random_chars}'", - ] - - for unit in application.units: - unit_address = await unit.get_public_address() - output = await execute_queries_on_unit( - unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_data_sql, - ) - assert random_chars in output - - # Destroy the recently created unit and wait until the application is active - await ops_test.model.destroy_units(added_unit.name) - async with ops_test.fast_forward("60s"): - await ops_test.model.block_until(lambda: len(ops_test.model.applications[app].units) == 3) - await ops_test.model.wait_for_idle( - apps=[app], - status="active", - raise_on_blocked=True, - timeout=TIMEOUT, - ) + await verify_mysql_test_data(juju, MYSQL_APP_NAME, table_name, table_value) + + mysql_app_added_unit = (mysql_app_new_units - mysql_app_old_units).pop() + juju.remove_unit(mysql_app_added_unit) + juju.wait( + ready=lambda status: len(status.apps[MYSQL_APP_NAME].units) == 3, + timeout=20 * MINUTE_SECS, + ) + juju.wait( + ready=wait_for_apps_status(jubilant_backports.all_active, MYSQL_APP_NAME), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) # Ensure that the data still exists in all the units - for unit in application.units: - unit_address = await unit.get_public_address() - output = await execute_queries_on_unit( - unit_address, - server_config_credentials["username"], - server_config_credentials["password"], - select_data_sql, - ) - assert random_chars in output + await verify_mysql_test_data(juju, MYSQL_APP_NAME, table_name, table_value) + await remove_mysql_test_data(juju, MYSQL_APP_NAME, table_name) diff --git a/tests/integration/high_availability/test_replication_unit_endpoints.py b/tests/integration/high_availability/test_replication_unit_endpoints.py index 4e2416a0d..a21a90fdd 100644 --- a/tests/integration/high_availability/test_replication_unit_endpoints.py +++ b/tests/integration/high_availability/test_replication_unit_endpoints.py @@ -1,87 +1,112 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. - import logging import time -from pathlib import Path +import jubilant_backports import pytest import urllib3 -import yaml -from pytest_operator.plugin import OpsTest -from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed +from jubilant_backports import Juju +from tenacity import ( + Retrying, + stop_after_attempt, + wait_fixed, +) -from ..helpers import ( - fetch_credentials, +from constants import ( + CHARMED_MYSQL_SNAP_NAME, + CHARMED_MYSQLD_EXPORTER_SERVICE, + MONITORING_USERNAME, + MYSQL_EXPORTER_PORT, ) -from .high_availability_helpers import ( - get_application_name, + +from .high_availability_helpers_new import ( + get_app_units, + get_unit_ip, + wait_for_apps_status, ) -logger = logging.getLogger(__name__) +MYSQL_APP_NAME = "mysql" +MYSQL_TEST_APP_NAME = "mysql-test-app" + +MINUTE_SECS = 60 -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ANOTHER_APP_NAME = f"second{APP_NAME}" -TIMEOUT = 17 * 60 +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) @pytest.mark.abort_on_fail -async def test_exporter_endpoints(ops_test: OpsTest, highly_available_cluster) -> None: +def test_deploy_highly_available_cluster(juju: Juju, charm: str) -> None: + """Simple test to ensure that the MySQL and application charms get deployed.""" + logging.info("Deploying MySQL cluster") + juju.deploy( + charm=charm, + app=MYSQL_APP_NAME, + base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=3, + ) + juju.deploy( + charm=MYSQL_TEST_APP_NAME, + app=MYSQL_TEST_APP_NAME, + base="ubuntu@22.04", + channel="latest/edge", + config={"sleep_interval": 500}, + num_units=1, + ) + + juju.integrate( + f"{MYSQL_APP_NAME}:database", + f"{MYSQL_TEST_APP_NAME}:database", + ) + + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant_backports.all_active, MYSQL_APP_NAME, MYSQL_TEST_APP_NAME + ), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) + + +@pytest.mark.abort_on_fail +def test_exporter_endpoints(juju: Juju) -> None: """Test that endpoints are running.""" - mysql_application_name = get_application_name(ops_test, "mysql") - application = ops_test.model.applications[mysql_application_name] - http = urllib3.PoolManager() + http_client = urllib3.PoolManager() + service_name = f"{CHARMED_MYSQL_SNAP_NAME}.{CHARMED_MYSQLD_EXPORTER_SERVICE}" - for unit in application.units: - _, output, _ = await ops_test.juju( - "ssh", unit.name, "sudo", "snap", "services", "charmed-mysql.mysqld-exporter" - ) - assert output.split("\n")[1].split()[2] == "inactive" + for unit_name in get_app_units(juju, MYSQL_APP_NAME): + task = juju.exec(f"sudo snap services {service_name}", unit=unit_name) + task.raise_on_failure() - return_code, _, _ = await ops_test.juju( - "ssh", unit.name, "sudo", "snap", "set", "charmed-mysql", "exporter.user=monitoring" - ) - assert return_code == 0 - - monitoring_credentials = await fetch_credentials(unit, "monitoring") - return_code, _, _ = await ops_test.juju( - "ssh", - unit.name, - "sudo", - "snap", - "set", - "charmed-mysql", - f"exporter.password={monitoring_credentials['password']}", - ) - assert return_code == 0 + assert task.stdout.split("\n")[1].split()[2] == "inactive" - return_code, _, _ = await ops_test.juju( - "ssh", unit.name, "sudo", "snap", "start", "charmed-mysql.mysqld-exporter" + credentials_task = juju.run( + unit=unit_name, + action="get-password", + params={"username": MONITORING_USERNAME}, ) - assert return_code == 0 - - try: - for attempt in Retrying(stop=stop_after_attempt(45), wait=wait_fixed(2)): - with attempt: - _, output, _ = await ops_test.juju( - "ssh", - unit.name, - "sudo", - "snap", - "services", - "charmed-mysql.mysqld-exporter", - ) - assert output.split("\n")[1].split()[2] == "active" - except RetryError as e: - raise Exception("Failed to start the mysqld-exporter snap service") from e + credentials_task.raise_on_failure() - time.sleep(30) + username = credentials_task.results["username"] + password = credentials_task.results["password"] + + juju.exec(f"sudo snap set charmed-mysql exporter.user={username}", unit=unit_name) + juju.exec(f"sudo snap set charmed-mysql exporter.password={password}", unit=unit_name) + juju.exec(f"sudo snap start {service_name}", unit=unit_name) - unit_address = await unit.get_public_address() - mysql_exporter_url = f"http://{unit_address}:9104/metrics" + for attempt in Retrying(stop=stop_after_attempt(45), wait=wait_fixed(2)): + with attempt: + task = juju.exec(f"sudo snap services {service_name}", unit=unit_name) + task.raise_on_failure() + + assert task.stdout.split("\n")[1].split()[2] == "active" + + time.sleep(30) - jmx_resp = http.request("GET", mysql_exporter_url) + mysql_unit_address = get_unit_ip(juju, MYSQL_APP_NAME, unit_name) + mysql_unit_exporter_url = f"http://{mysql_unit_address}:{MYSQL_EXPORTER_PORT}/metrics" + mysql_unit_response = http_client.request("GET", mysql_unit_exporter_url) - assert jmx_resp.status == 200 + assert mysql_unit_response.status == 200 diff --git a/tests/integration/high_availability/test_replication_variables.py b/tests/integration/high_availability/test_replication_variables.py index 99ed9f2b7..1512c9b55 100644 --- a/tests/integration/high_availability/test_replication_variables.py +++ b/tests/integration/high_availability/test_replication_variables.py @@ -1,39 +1,67 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. - import logging -from pathlib import Path +import jubilant_backports import pytest -import yaml -from pytest_operator.plugin import OpsTest +from jubilant_backports import Juju -from ..helpers import ( - retrieve_database_variable_value, -) -from .high_availability_helpers import ( - get_application_name, +from .high_availability_helpers_new import ( + get_app_units, + get_mysql_variable_value, + wait_for_apps_status, ) -logger = logging.getLogger(__name__) +MYSQL_APP_NAME = "mysql" +MYSQL_TEST_APP_NAME = "mysql-test-app" + +MINUTE_SECS = 60 + +logging.getLogger("jubilant.wait").setLevel(logging.WARNING) + + +@pytest.mark.abort_on_fail +def test_deploy_highly_available_cluster(juju: Juju, charm: str) -> None: + """Simple test to ensure that the MySQL and application charms get deployed.""" + logging.info("Deploying MySQL cluster") + juju.deploy( + charm=charm, + app=MYSQL_APP_NAME, + base="ubuntu@22.04", + config={"profile": "testing"}, + num_units=3, + ) + juju.deploy( + charm=MYSQL_TEST_APP_NAME, + app=MYSQL_TEST_APP_NAME, + base="ubuntu@22.04", + channel="latest/edge", + config={"sleep_interval": 500}, + num_units=1, + ) + + juju.integrate( + f"{MYSQL_APP_NAME}:database", + f"{MYSQL_TEST_APP_NAME}:database", + ) -METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) -APP_NAME = METADATA["name"] -ANOTHER_APP_NAME = f"second{APP_NAME}" -TIMEOUT = 17 * 60 + logging.info("Wait for applications to become active") + juju.wait( + ready=wait_for_apps_status( + jubilant_backports.all_active, MYSQL_APP_NAME, MYSQL_TEST_APP_NAME + ), + error=jubilant_backports.any_blocked, + timeout=20 * MINUTE_SECS, + ) @pytest.mark.abort_on_fail -async def test_custom_variables(ops_test: OpsTest, highly_available_cluster) -> None: +async def test_custom_variables(juju: Juju) -> None: """Query database for custom variables.""" - mysql_application_name = get_application_name(ops_test, "mysql") - application = ops_test.model.applications[mysql_application_name] + for unit in get_app_units(juju, MYSQL_APP_NAME): + custom_vars = {"max_connections": 100} - for unit in application.units: - custom_vars = {} - custom_vars["max_connections"] = 100 for k, v in custom_vars.items(): - logger.info(f"Checking that {k} is set to {v} on {unit.name}") - value = await retrieve_database_variable_value(ops_test, unit, k) - assert value == v, f"Variable {k} is not set to {v}" + logging.info(f"Checking that {k} is set to {v} on {unit}") + assert await get_mysql_variable_value(juju, MYSQL_APP_NAME, unit, k) == v