Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 127 additions & 77 deletions capiorun/capiorun
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
#!/usr/bin/env python3

import argparse
import json
import os
import signal
import subprocess
import sys
import time

try:
from loguru import logger

logger.remove()
logger.add(
sink=lambda msg: print(msg, end=''), # or use sys.stdout
format="<green>{time:DD/MM/YYYY HH:mm:ss}</green> | <cyan>capiorun</cyan> | "
"<level>{level: <8}</level> | <level>{message}</level>",
colorize=True
)
except ImportError:
import logging

logger = logging.getLogger(__name__)

parser = argparse.ArgumentParser(
prog="capiorun",
description="""
Expand All @@ -39,46 +25,80 @@ Developed by Marco Edoardo Santimaria <marcoedoardo.santimaria@unito.it>

For more information, refer to the CAPIO documentation or repository.
""",
formatter_class=argparse.RawTextHelpFormatter
formatter_class=argparse.RawTextHelpFormatter,
)

# Required arguments
parser.add_argument("-d", "--capio-dir", required=True,
help="CAPIO virtual mount point (e.g., /mnt/capio)")
parser.add_argument("-n", "--app-name", required=True,
help="Name of the CAPIO application step to launch. Must match an entry in the CAPIO-CL config.")
parser.add_argument(
"-d",
"--capio-dir",
required=True,
help="CAPIO virtual mount point (e.g., /mnt/capio)",
)
parser.add_argument(
"-n",
"--app-name",
required=True,
help="Name of the CAPIO application step to launch. Must match an entry in the CAPIO-CL config.",
)

# Optional but commonly used
parser.add_argument("-w", "--workflow-name", default="CAPIO",
help="Workflow name. Should match the name in the CAPIO-CL configuration (default: CAPIO)")
parser.add_argument("-c", "--capiocl", default="--no-config",
help="Path to the CAPIO-CL configuration file (default: --no-config)")
parser.add_argument(
"-c",
"--capiocl",
default="--no-config",
help="Path to the CAPIO-CL configuration file (default: --no-config)",
)
parser.add_argument(
"-m", "--metadata-dir", default="", help="Custom directory for metadata"
)


# Debug and logging
parser.add_argument("-L", "--log-level", default="-1",
help="CAPIO log level. Useful when running in debug mode (default: -1)")
parser.add_argument("--log-dir", default="",
help="Custom directory for CAPIO log output")
parser.add_argument("--log-prefix", default="",
help="Prefix for CAPIO log files")
parser.add_argument(
"-L",
"--log-level",
default="-1",
help="CAPIO log level. Useful when running in debug mode (default: -1)",
)
parser.add_argument(
"--log-dir", default="", help="Custom directory for CAPIO log output"
)
parser.add_argument("--log-prefix", default="", help="Prefix for CAPIO log files")

# Tuning and advanced
parser.add_argument("--cache-lines", default="",
help="Number of CAPIO shm-queue cache lines (optional tuning parameter)")
parser.add_argument("--init-file-size", default="",
help="Default file size (in bytes) when pre-allocating memory for new files")
parser.add_argument(
"--cache-lines",
default="",
help="Number of CAPIO shm-queue cache lines (optional tuning parameter)",
)
parser.add_argument(
"--init-file-size",
default="",
help="Default file size (in bytes) when pre-allocating memory for new files",
)

# Binary locations
parser.add_argument("-l", "--libcapio", default="libcapio_posix.so",
help="Path to libcapio_posix.so shared library (default: libcapio_posix.so)")
parser.add_argument("-s", "--server", default="capio_server",
help="Path to capio_server executable (default: capio_server)")
parser.add_argument(
"-l",
"--libcapio",
default="libcapio_posix.so",
help="Path to libcapio_posix.so shared library (default: libcapio_posix.so)",
)
parser.add_argument(
"-s",
"--server",
default="capio_server",
help="Path to capio_server executable (default: capio_server)",
)

# Positional arguments
parser.add_argument('args', nargs=argparse.REMAINDER, help="Command to launch with capio")
parser.add_argument(
"args", nargs=argparse.REMAINDER, help="Command to launch with capio"
)


def build_env(args):
def build_env(args, workflow_name):
env = os.environ.copy()
if args.log_dir:
env["CAPIO_LOG_DIR"] = args.log_dir
Expand All @@ -91,15 +111,18 @@ def build_env(args):

env["CAPIO_DIR"] = args.capio_dir
env["CAPIO_LOG_LEVEL"] = args.log_level
env["CAPIO_WORKFLOW_NAME"] = args.workflow_name
env["CAPIO_WORKFLOW_NAME"] = workflow_name
env["CAPIO_METADATA_DIR"] = args.metadata_dir

return env


def count_files_starting_with(prefix):
return sum(
1 for filename in os.listdir("/dev/shm")
if os.path.isfile(os.path.join("/dev/shm", filename)) and filename.startswith(prefix)
1
for filename in os.listdir("/dev/shm")
if os.path.isfile(os.path.join("/dev/shm", filename))
and filename.startswith(prefix)
)


Expand All @@ -109,67 +132,94 @@ step_process = None
if __name__ == "__main__":
args = parser.parse_args()

if not os.path.exists(f"/dev/shm/{args.workflow_name}"):
logger.info(f"Starting capio server with config file: {args.capiocl}")
logger.info(f"CAPIO_LOG_LEVEL = {args.log_level}")
logger.info(f"CAPIO_WORKFLOW_NAME = {args.workflow_name}")
logger.info(f"CAPIO_APP_NAME = {args.app_name}")
logger.info(f"CAPIO_DIR = {args.capio_dir}")
logger.info(f"CAPIO-CL CONFIG = {args.capiocl}")
workflow_name = "CAPIO"
if args.capiocl != "--no-config":
with open(args.capiocl, "r") as f:
workflow_name = json.load(f)["name"]

if not os.path.exists(f"/dev/shm/{workflow_name}"):
print(f"Starting capio server with config file: {args.capiocl}")
print(f"CAPIO_LOG_LEVEL = {args.log_level}")
print(f"CAPIO_WORKFLOW_NAME = {workflow_name}")
print(f"CAPIO_APP_NAME = {args.app_name}")
print(f"CAPIO_DIR = {args.capio_dir}")
print(f"CAPIO-CL CONFIG = {args.capiocl}")
if not os.path.exists(args.capiocl) and args.capiocl != "--no-config":
logger.critical(f"File {args.capiocl} does not exists. aborting execution...")
print(f"File {args.capiocl} does not exists. aborting execution...")
exit(1)
server_env = build_env(args)
server_env = build_env(args, workflow_name)

server_process = subprocess.Popen(
[args.server, ("--config " + args.capiocl) if args.capiocl != "--no-config" else args.capiocl],
[
args.server,
(
("--config " + args.capiocl)
if args.capiocl != "--no-config"
else args.capiocl
),
],
env=server_env,
stdout=sys.stdout, stderr=sys.stderr)
stdout="server.log",
stderr="server.err",
cwd=args.log_dir,
start_new_session=True,
)

logger.debug(f"capio_server PID: {server_process.pid}")
print(f"capio_server PID: {server_process.pid}")
time.sleep(1)

else:
logger.debug(f"An instance of capio_server with workflow name {args.workflow_name} already exists!")
print(
f"An instance of capio_server with workflow name {workflow_name} already exists!"
)

step_env = build_env(args)
step_env = build_env(args, workflow_name)
step_env["CAPIO_APP_NAME"] = args.app_name
step_env["LD_PRELOAD"] = args.libcapio

logger.info(f"Starting workflow steps with following environment variables:")
logger.info(f"CAPIO_LOG_LEVEL = {args.log_level}")
logger.info(f"CAPIO_WORKFLOW_NAME = {args.workflow_name}")
logger.info(f"CAPIO_APP_NAME = {args.app_name}")
logger.info(f"CAPIO_DIR = {args.capio_dir}")
logger.info(f"LD_PRELOAD = {args.libcapio}")
logger.info(f"command = {" ".join(args.args)}")
print(f"Starting workflow steps with following environment variables:")
print(f"CAPIO_LOG_LEVEL = {args.log_level}")
print(f"CAPIO_WORKFLOW_NAME = {workflow_name}")
print(f"CAPIO_APP_NAME = {args.app_name}")
print(f"CAPIO_DIR = {args.capio_dir}")
print(f"LD_PRELOAD = {args.libcapio}")
print(f"command = {' '.join(args.args)}")
try:
step_process = subprocess.Popen(
args.args,
env=step_env,
stdout=sys.stdout, stderr=sys.stderr
stdout=sys.stdout,
stderr=sys.stderr,
)

step_process.wait()
logger.success(f"Step {args.app_name} terminated successfully")
print(f"Step {args.app_name} terminated successfully")
except Exception as e:
logger.critical(f"An error occurred in startup/execution of workflow app <{args.app_name}>: {e}")
print(
f"An error occurred in startup/execution of workflow app <{args.app_name}>: {e}"
)

if server_process is not None:
if count_files_starting_with(args.workflow_name) > 6:
logger.debug("Server instance is used by other applications... skipping server termination")
if count_files_starting_with(workflow_name) >= 4:
print(
"Server instance is used by other applications... skipping server termination"
)
else:
logger.info(f"Terminating instance of capio_server")
print(f"Terminating instance of capio_server")
server_process.send_signal(signal.SIGTERM)
time.sleep(2)
logger.success("Terminated CAPIO server instance")
print("Terminated CAPIO server instance")
else:
if count_files_starting_with(args.workflow_name) <= 6:
logger.info("Terminating instance of capio_server started by other capiorun command")
result = subprocess.run(["killall", "capio_server"], stdout=sys.stdout, stderr=sys.stderr)
if count_files_starting_with(workflow_name) < 4:
print(
"Terminating instance of capio_server started by other capiorun command"
)
result = subprocess.run(
["killall", "capio_server"], stdout=sys.stdout, stderr=sys.stderr
)
if result.returncode == 0:
logger.success("Terminated CAPIO server instance")
print("Terminated CAPIO server instance")
else:
logger.critical("Error terminating capio_server instance!")
print("Error terminating capio_server instance!")
else:
logger.debug("Skipping termination of capio_server")
print("Skipping termination of capio_server")
9 changes: 2 additions & 7 deletions src/common/capio/filesystem.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ inline bool is_capio_path(const std::filesystem::path &path_to_check) {
syscall_no_intercept_flag = true;
#endif

auto input_abs_path = std::filesystem::absolute(input_path);

std::filesystem::path resolved;
std::filesystem::path resolved, input_abs_path = std::filesystem::absolute(input_path);

for (const auto &part : input_abs_path) {
resolved /= part;
Expand All @@ -156,16 +154,13 @@ inline bool is_capio_path(const std::filesystem::path &path_to_check) {
} else {
resolved = target;
}

resolved = std::filesystem::absolute(resolved);
}
}
auto return_value = std::filesystem::absolute(input_path);
#ifdef __CAPIO_POSIX
syscall_no_intercept_flag = false;
#endif

return return_value;
return resolved;
}

#endif // CAPIO_COMMON_FILESYSTEM_HPP
4 changes: 2 additions & 2 deletions src/posix/handlers/open.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ int open_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg
std::string resolved_path = resolve_possible_symlink(path);
if ((flags & O_CREAT) == O_CREAT) {
LOG("O_CREAT");
create_request(-1, path.data(), tid);
create_request(-1, resolved_path.data(), tid);
} else {
LOG("not O_CREAT");
open_request(-1, resolved_path.data(), tid);
Expand Down Expand Up @@ -121,7 +121,7 @@ int openat_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a

if ((flags & O_CREAT) == O_CREAT) {
LOG("O_CREAT");
create_request(-1, path.data(), tid);
create_request(-1, resolved_path.data(), tid);
} else {
LOG("not O_CREAT");
open_request(-1, resolved_path.data(), tid);
Expand Down