diff --git a/pyproject.toml b/pyproject.toml index 6a114fa..64fa740 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,6 @@ lint = [ ] unit = [ "coverage[toml]==7.9.1; python_version > '3.8'", - "pytest>=8.3.5; python_version < '3.9'", "pytest==8.4.1; python_version >= '3.9'" ] @@ -41,9 +40,6 @@ branch = true [tool.coverage.report] show_missing = true -exclude_lines = [ - "logger\\.debug" -] [tool.pytest.ini_options] minversion = "6.0" diff --git a/tests/pyproject.toml b/tests/pyproject.toml new file mode 100644 index 0000000..c65b81d --- /dev/null +++ b/tests/pyproject.toml @@ -0,0 +1,51 @@ +# Copyright 2025 Canonical Ltd. +# See LICENSE file for licensing details. + +# Linting tools configuration +[tool.ruff] +# preview and explicit preview are enabled for CPY001 +preview = true +target-version = "py312" +src = ["src", "."] +line-length = 99 + +[tool.ruff.lint] +explicit-preview-rules = true +select = ["A", "E", "W", "F", "C", "N", "D", "I001", "B", "CPY001", "RUF", "S", "SIM", "UP", "TCH"] +extend-ignore = [ + "D203", + "D204", + "D213", + "D215", + "D400", + "D404", + "D406", + "D407", + "D408", + "D409", + "D413", +] +# Ignore E501 because using black creates errors with this +# Ignore D107 Missing docstring in __init__ +ignore = ["E501", "D107"] + +[tool.ruff.lint.per-file-ignores] +"*" = [ + "D100", "D101", "D102", "D103", "D104", + # Asserts + "B011", + # Disable security checks for tests + "S", +] + +[tool.ruff.lint.flake8-copyright] +# Check for properly formatted copyright header in each file +author = "Canonical Ltd." +notice-rgx = "Copyright\\s\\d{4}([-,]\\d{4})*\\s+" +min-file-size = 1 + +[tool.ruff.lint.mccabe] +max-complexity = 10 + +[tool.ruff.lint.pydocstyle] +convention = "google" diff --git a/tests/unit/test_postgresql.py b/tests/unit/test_postgresql.py index b671bb4..0c69bd0 100644 --- a/tests/unit/test_postgresql.py +++ b/tests/unit/test_postgresql.py @@ -1,22 +1,22 @@ # Copyright 2025 Canonical Ltd. # See LICENSE file for licensing details. -from unittest.mock import call, patch +from unittest.mock import call, patch, sentinel import psycopg2 import pytest from ops.testing import Harness from psycopg2.sql import SQL, Composed, Identifier, Literal - from single_kernel_postgresql.abstract_charm import AbstractPostgreSQLCharm -from single_kernel_postgresql.config.literals import ( - PEER, - SYSTEM_USERS, -) +from single_kernel_postgresql.config.literals import PEER, SYSTEM_USERS from single_kernel_postgresql.utils.postgresql import ( ACCESS_GROUP_INTERNAL, ACCESS_GROUPS, + PostgreSQL, PostgreSQLCreateDatabaseError, + PostgreSQLCreateUserError, PostgreSQLGetLastArchivedWALError, + PostgreSQLUndefinedHostError, + PostgreSQLUndefinedPasswordError, ) @@ -65,11 +65,14 @@ def test_create_access_groups(harness, users_exist): def test_create_database(harness): - with patch( - "single_kernel_postgresql.utils.postgresql.PostgreSQL.enable_disable_extensions" - ) as _enable_disable_extensions, patch( - "single_kernel_postgresql.utils.postgresql.PostgreSQL._connect_to_database" - ) as _connect_to_database: + with ( + patch( + "single_kernel_postgresql.utils.postgresql.PostgreSQL.enable_disable_extensions" + ) as _enable_disable_extensions, + patch( + "single_kernel_postgresql.utils.postgresql.PostgreSQL._connect_to_database" + ) as _connect_to_database, + ): # Test a successful database creation. database = "test_database" plugins = ["test_plugin_1", "test_plugin_2"] @@ -310,3 +313,215 @@ def test_validate_group_map(harness): assert harness.charm.postgresql.validate_group_map("ldap_group=ldap_test_group") is True assert harness.charm.postgresql.validate_group_map("ldap_group=ldap_test_group,") is False assert harness.charm.postgresql.validate_group_map("ldap_group ldap_test_group") is False + + +def test_connect_to_database(): + # Error on no host + pg = PostgreSQL(None, None, "operator", None, "postgres", None) + with pytest.raises(PostgreSQLUndefinedHostError): + pg._connect_to_database() + + # Error on no password + pg = PostgreSQL("primary", "current", "operator", None, "postgres", None) + with pytest.raises(PostgreSQLUndefinedPasswordError): + pg._connect_to_database() + + # Returns connection + pg = PostgreSQL("primary", "current", "operator", "password", "postgres", None) + with patch( + "single_kernel_postgresql.utils.postgresql.psycopg2.connect", + return_value=sentinel.connection, + ) as _connect: + assert pg._connect_to_database() == sentinel.connection + _connect.assert_called_once_with( + "dbname='postgres' user='operator' host='primary'password='password' connect_timeout=1" + ) + + +def test_is_user_in_hba(): + with patch( + "single_kernel_postgresql.utils.postgresql.PostgreSQL._connect_to_database", + ) as _connect_to_database: + pg = PostgreSQL("primary", "current", "operator", "password", "postgres", None) + _cursor = _connect_to_database().__enter__().cursor().__enter__() + + # No result + _cursor.fetchone.return_value = None + assert pg.is_user_in_hba("test-user") is False + _cursor.execute.assert_called_once_with( + Composed([ + SQL("SELECT COUNT(*) FROM pg_hba_file_rules WHERE "), + Literal("test-user"), + SQL(" = ANY(user_name);"), + ]) + ) + + # Exception + _cursor.fetchone.side_effect = psycopg2.Error + assert pg.is_user_in_hba("test-user") is False + + # Result + _cursor.fetchone.side_effect = None + _cursor.fetchone.return_value = (1,) + assert pg.is_user_in_hba("test-user") is True + + +def test_drop_hba_triggers(): + with ( + patch( + "single_kernel_postgresql.utils.postgresql.PostgreSQL._connect_to_database", + ) as _connect_to_database, + patch("single_kernel_postgresql.utils.postgresql.logger") as _logger, + ): + pg = PostgreSQL("primary", "current", "operator", "password", "postgres", None) + _cursor = _connect_to_database().__enter__().cursor().__enter__() + _cursor.fetchall.return_value = (("db1",), ("db2",)) + + pg.drop_hba_triggers() + + assert _cursor.execute.call_count == 5 + _cursor.execute.assert_any_call( + SQL( + "SELECT datname FROM pg_database WHERE datname <> 'template0' AND datname <>'postgres';" + ) + ) + _cursor.execute.assert_any_call( + SQL("DROP EVENT TRIGGER IF EXISTS update_pg_hba_on_create_schema;") + ) + _cursor.execute.assert_any_call( + SQL("DROP EVENT TRIGGER IF EXISTS update_pg_hba_on_drop_schema;") + ) + _cursor.execute.reset_mock() + + # Exception on select + _cursor.execute.side_effect = psycopg2.Error + + pg.drop_hba_triggers() + + _cursor.execute.assert_called_once_with( + SQL( + "SELECT datname FROM pg_database WHERE datname <> 'template0' AND datname <>'postgres';" + ) + ) + _logger.warning.assert_called_once_with( + "Failed to get databases when removing hba trigger: " + ) + _logger.warning.reset_mock() + + # Exception on drop + _cursor.execute.side_effect = [None, psycopg2.Error, None, None] + + pg.drop_hba_triggers() + + _logger.warning.assert_called_once_with("Failed to remove hba trigger for db1: ") + + +def test_create_user(): + with ( + patch( + "single_kernel_postgresql.utils.postgresql.PostgreSQL._connect_to_database", + ) as _connect_to_database, + patch( + "single_kernel_postgresql.utils.postgresql.PostgreSQL._process_extra_user_roles", + ) as _process_extra_user_roles, + ): + pg = PostgreSQL("primary", "current", "operator", "password", "postgres", None) + _cursor = _connect_to_database().__enter__().cursor().__enter__() + _process_extra_user_roles.return_value = (["role1", "role2"], ["priv1", "priv2"]) + + # Create user + _cursor.fetchone.return_value = None + + pg.create_user("username", "password") + + assert _cursor.execute.call_count == 8 + _cursor.execute.assert_any_call( + Composed([ + SQL("SELECT TRUE FROM pg_roles WHERE rolname="), + Literal("username"), + SQL(";"), + ]) + ) + _cursor.execute.assert_any_call(SQL("RESET ROLE;")) + _cursor.execute.assert_any_call(SQL("BEGIN;")) + _cursor.execute.assert_any_call(SQL("SET LOCAL log_statement = 'none';")) + _cursor.execute.assert_any_call( + Composed([ + SQL("CREATE ROLE "), + Identifier("username"), + SQL(" WITH LOGIN ENCRYPTED PASSWORD 'password' priv1 priv2;"), + ]) + ) + _cursor.execute.assert_any_call(SQL("COMMIT;")) + _cursor.execute.assert_any_call( + Composed([ + SQL("GRANT "), + Identifier("role1"), + SQL(" TO "), + Identifier("username"), + SQL(";"), + ]) + ) + _cursor.execute.assert_any_call( + Composed([ + SQL("GRANT "), + Identifier("role2"), + SQL(" TO "), + Identifier("username"), + SQL(";"), + ]) + ) + _cursor.execute.reset_mock() + _process_extra_user_roles.reset_mock() + + # Alter user + _cursor.fetchone.return_value = (1,) + + pg.create_user("username", "password", True, True, ["role3"], "db1", True) + + _process_extra_user_roles.assert_called_once_with("username", ["role3"]) + assert _cursor.execute.call_count == 8 + _cursor.execute.assert_any_call( + Composed([ + SQL("SELECT TRUE FROM pg_roles WHERE rolname="), + Literal("username"), + SQL(";"), + ]) + ) + _cursor.execute.assert_any_call(SQL("RESET ROLE;")) + _cursor.execute.assert_any_call(SQL("BEGIN;")) + _cursor.execute.assert_any_call(SQL("SET LOCAL log_statement = 'none';")) + _cursor.execute.assert_any_call( + Composed([ + SQL("ALTER ROLE "), + Identifier("username"), + SQL( + ' WITH LOGIN SUPERUSER REPLICATION ENCRYPTED PASSWORD \'password\' IN ROLE "charmed_db1_admin", "charmed_db1_dml" CREATEDB priv1 priv2;' + ), + ]) + ) + _cursor.execute.assert_any_call(SQL("COMMIT;")) + _cursor.execute.assert_any_call( + Composed([ + SQL("GRANT "), + Identifier("role1"), + SQL(" TO "), + Identifier("username"), + SQL(";"), + ]) + ) + _cursor.execute.assert_any_call( + Composed([ + SQL("GRANT "), + Identifier("role2"), + SQL(" TO "), + Identifier("username"), + SQL(";"), + ]) + ) + + # Exception + _cursor.execute.side_effect = psycopg2.Error + + with pytest.raises(PostgreSQLCreateUserError): + pg.create_user("username", "password") diff --git a/uv.lock b/uv.lock index 0e7dc56..9168345 100644 --- a/uv.lock +++ b/uv.lock @@ -121,7 +121,6 @@ name = "exceptiongroup" version = "1.3.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "typing-extensions", version = "4.13.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, { name = "typing-extensions", version = "4.15.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9' and python_full_version < '3.13'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/0b/9f/a65090624ecf468cdca03533906e7c69ed7588582240cfe7cc9e770b50eb/exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88", size = 29749, upload-time = "2025-05-10T17:42:51.123Z" } @@ -258,26 +257,10 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" }, ] -[[package]] -name = "pluggy" -version = "1.5.0" -source = { registry = "https://pypi.org/simple" } -resolution-markers = [ - "python_full_version < '3.9'", -] -sdist = { url = "https://files.pythonhosted.org/packages/96/2d/02d4312c973c6050a18b314a5ad0b3210edb65a906f868e31c111dede4a6/pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1", size = 67955, upload-time = "2024-04-20T21:34:42.531Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/88/5f/e351af9a41f866ac3f1fac4ca0613908d9a41741cfcf2228f4ad853b697d/pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669", size = 20556, upload-time = "2024-04-20T21:34:40.434Z" }, -] - [[package]] name = "pluggy" version = "1.6.0" source = { registry = "https://pypi.org/simple" } -resolution-markers = [ - "python_full_version >= '3.10'", - "python_full_version == '3.9.*'", -] sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" } wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, @@ -285,6 +268,7 @@ wheels = [ [[package]] name = "postgresql-charms-single-kernel" +version = "0.0.1" source = { virtual = "." } [package.dev-dependencies] @@ -302,8 +286,7 @@ lint = [ ] unit = [ { name = "coverage", extra = ["toml"], marker = "python_full_version >= '3.9'" }, - { name = "pytest", version = "8.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, - { name = "pytest", version = "8.4.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, + { name = "pytest", marker = "python_full_version >= '3.9'" }, ] [package.metadata] @@ -320,7 +303,6 @@ lint = [ ] unit = [ { name = "coverage", extras = ["toml"], marker = "python_full_version >= '3.9'", specifier = "==7.9.1" }, - { name = "pytest", marker = "python_full_version < '3.9'", specifier = ">=8.3.5" }, { name = "pytest", marker = "python_full_version >= '3.9'", specifier = "==8.4.1" }, ] @@ -364,40 +346,16 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d5/1a/524f832e1ff1962a22a1accc775ca7b143ba2e9f5924bb6749dce566784a/pyright-1.1.405-py3-none-any.whl", hash = "sha256:a2cb13700b5508ce8e5d4546034cb7ea4aedb60215c6c33f56cec7f53996035a", size = 5905038, upload-time = "2025-09-04T03:37:04.913Z" }, ] -[[package]] -name = "pytest" -version = "8.3.5" -source = { registry = "https://pypi.org/simple" } -resolution-markers = [ - "python_full_version < '3.9'", -] -dependencies = [ - { name = "colorama", marker = "python_full_version < '3.9' and sys_platform == 'win32'" }, - { name = "exceptiongroup", marker = "python_full_version < '3.9'" }, - { name = "iniconfig", marker = "python_full_version < '3.9'" }, - { name = "packaging", marker = "python_full_version < '3.9'" }, - { name = "pluggy", version = "1.5.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.9'" }, - { name = "tomli", marker = "python_full_version < '3.9'" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/ae/3c/c9d525a414d506893f0cd8a8d0de7706446213181570cdbd766691164e40/pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845", size = 1450891, upload-time = "2025-03-02T12:54:54.503Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/30/3d/64ad57c803f1fa1e963a7946b6e0fea4a70df53c1a7fed304586539c2bac/pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820", size = 343634, upload-time = "2025-03-02T12:54:52.069Z" }, -] - [[package]] name = "pytest" version = "8.4.1" source = { registry = "https://pypi.org/simple" } -resolution-markers = [ - "python_full_version >= '3.10'", - "python_full_version == '3.9.*'", -] dependencies = [ { name = "colorama", marker = "python_full_version >= '3.9' and sys_platform == 'win32'" }, { name = "exceptiongroup", marker = "python_full_version >= '3.9' and python_full_version < '3.11'" }, { name = "iniconfig", marker = "python_full_version >= '3.9'" }, { name = "packaging", marker = "python_full_version >= '3.9'" }, - { name = "pluggy", version = "1.6.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.9'" }, + { name = "pluggy", marker = "python_full_version >= '3.9'" }, { name = "pygments", marker = "python_full_version >= '3.9'" }, { name = "tomli", marker = "python_full_version >= '3.9' and python_full_version < '3.11'" }, ]