Skip to content
137 changes: 0 additions & 137 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import secrets
import string
import subprocess
import tempfile

import juju.unit
import yaml
Expand Down Expand Up @@ -830,142 +829,6 @@ async def get_cluster_status(unit: Unit, cluster_set: bool | None = False) -> di
return result.get("status", {})


async def delete_file_or_directory_in_unit(ops_test: OpsTest, unit_name: str, path: str) -> bool:
"""Delete a file in the provided unit.

Args:
ops_test: The ops test framework
unit_name: The name unit on which to delete the file from
path: The path of file or directory to delete

Returns:
boolean indicating success
"""
if path.strip() in ["/", "."]:
return

try:
return_code, _, _ = await ops_test.juju(
"ssh", unit_name, "sudo", "find", path, "-maxdepth", "1", "-delete"
)

return return_code == 0
except Exception:
return False


async def write_content_to_file_in_unit(
ops_test: OpsTest, unit: Unit, path: str, content: str
) -> None:
"""Write content to the file in the provided unit.

Args:
ops_test: The ops test framework
unit: THe unit in which to write to file in
path: The path at which to write the content to
content: The content to write to the file.
"""
with tempfile.NamedTemporaryFile(mode="w") as temp_file:
temp_file.write(content)
temp_file.flush()

await unit.scp_to(temp_file.name, "/tmp/file")

return_code, _, _ = await ops_test.juju("ssh", unit.name, "sudo", "mv", "/tmp/file", path)
assert return_code == 0

return_code, _, _ = await ops_test.juju(
"ssh", unit.name, "sudo", "chown", "snap_daemon:root", path
)
assert return_code == 0


async def read_contents_from_file_in_unit(ops_test: OpsTest, unit: Unit, path: str) -> str:
"""Read contents from file in the provided unit.

Args:
ops_test: The ops test framework
unit: The unit in which to read file from
path: The path from which to read content from

Returns:
the contents of the file
"""
return_code, _, _ = await ops_test.juju("ssh", unit.name, "sudo", "cp", path, "/tmp/file")
assert return_code == 0

return_code, _, _ = await ops_test.juju(
"ssh", unit.name, "sudo", "chown", "ubuntu:ubuntu", "/tmp/file"
)
assert return_code == 0

with tempfile.NamedTemporaryFile(mode="r+") as temp_file:
await unit.scp_from("/tmp/file", temp_file.name)

temp_file.seek(0)

contents = ""
for line in temp_file:
contents += line
contents += "\n"

return_code, _, _ = await ops_test.juju("ssh", unit.name, "sudo", "rm", "/tmp/file")
assert return_code == 0

return contents


async def ls_la_in_unit(ops_test: OpsTest, unit_name: str, directory: str) -> list[str]:
"""Returns the output of ls -la in unit.

Args:
ops_test: The ops test framework
unit_name: The name of unit in which to run ls -la
path: The path from which to run ls -la

Returns:
a list of files returned by ls -la
"""
return_code, output, _ = await ops_test.juju("ssh", unit_name, "sudo", "ls", "-la", directory)
assert return_code == 0

ls_output = output.split("\n")[1:]

return [
line.strip("\r")
for line in ls_output
if len(line.strip()) > 0 and line.split()[-1] not in [".", ".."]
]


async def stop_running_flush_mysql_cronjobs(ops_test: OpsTest, unit_name: str) -> None:
"""Stop running any logrotate jobs that may have been triggered by cron.

Args:
ops_test: The ops test object passed into every test case
unit_name: The name of the unit to be tested
"""
# send TERM signal to mysql daemon, which trigger shutdown process
await ops_test.juju(
"ssh",
unit_name,
"sudo",
"pkill",
"-15",
"-f",
"logrotate -f /etc/logrotate.d/flush_mysql_logs",
)

# hold execution until process is stopped
try:
for attempt in Retrying(stop=stop_after_attempt(45), wait=wait_fixed(2)):
with attempt:
if await get_process_pid(ops_test, unit_name, "logrotate"):
raise Exception
except RetryError as e:
raise Exception("Failed to stop the flush_mysql_logs logrotate process.") from e


def get_unit_by_index(app_name: str, units: list, index: int):
"""Get unit by index.

Expand Down
Loading