Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/kub_cli/img_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
)
from .logging_utils import LOGGER, formatCommand
from .runtime import (
discoverRunnerExecutable,
getRunnerValue,
tryResolveRunnerExecutable,
)


Expand Down Expand Up @@ -167,13 +167,19 @@ def resolveImageRuntime(config: KubConfig) -> ImageRuntime:
return configured

apptainerRunnerValue = getRunnerValue(config, "apptainer")
apptainerRunner = tryResolveRunnerExecutable(apptainerRunnerValue)
if apptainerRunner is not None:
apptainerRunner = discoverRunnerExecutable(
apptainerRunnerValue,
runtimeName="apptainer",
)
if apptainerRunner.runnerPath is not None:
return "apptainer"

dockerRunnerValue = getRunnerValue(config, "docker")
dockerRunner = tryResolveRunnerExecutable(dockerRunnerValue)
if dockerRunner is not None:
dockerRunner = discoverRunnerExecutable(
dockerRunnerValue,
runtimeName="docker",
)
if dockerRunner.runnerPath is not None:
return "docker"

raise RuntimeSelectionError(
Expand Down
227 changes: 171 additions & 56 deletions src/kub_cli/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

ResolvedRuntime = Literal["apptainer", "docker"]
DASHBOARD_APP_NAME = "kub-dashboard"
RUNNER_PROBE_TIMEOUT_SECONDS = 5.0


@dataclass(frozen=True)
Expand All @@ -39,6 +40,14 @@ class RuntimeResolution:
imageReference: str


@dataclass(frozen=True)
class RunnerDiscovery:
"""Resolved runner executable with optional diagnostic on failure."""

runnerPath: str | None
diagnostic: str | None = None


def deriveApptainerOrasReference(dockerImageReference: str) -> str:
"""Backward-compatible export for ORAS derivation."""

Expand Down Expand Up @@ -85,65 +94,143 @@ def getRunnerValue(config: KubConfig, runtime: ResolvedRuntime) -> str:
return config.dockerRunner.strip()


def resolveRunnerExecutable(runnerValue: str, *, runtimeName: str) -> str:
"""Resolve runner executable path and validate it is runnable."""
def getRunnerInstallHint(runtimeName: str) -> str:
"""Return installation hint text for a runtime executable."""

normalized = runnerValue.strip()
if not normalized:
raise RunnerNotFoundError(
f"{runtimeName.capitalize()} runner is empty. Set --runner or runtime-specific runner config."
if runtimeName == "apptainer":
return (
"Install Apptainer: "
"https://apptainer.org/docs/admin/main/installation.html"
)

runnerPath = Path(normalized).expanduser()
hasPathSeparator = runnerPath.parent != Path(".")
if runtimeName == "docker":
return (
"Install Docker Engine: "
"https://docs.docker.com/engine/install/"
)

if runnerPath.is_absolute() or hasPathSeparator:
if runnerPath.exists() and os.access(runnerPath, os.X_OK):
return str(runnerPath)
return "Install the selected runtime executable."

raise RunnerNotFoundError(
f"{runtimeName.capitalize()} runner not executable: '{runnerPath}'."
)

resolvedRunner = shutil.which(normalized)
if resolvedRunner is None:
if runtimeName == "apptainer":
installHint = (
"Install Apptainer: "
"https://apptainer.org/docs/admin/main/installation.html"
)
elif runtimeName == "docker":
installHint = (
"Install Docker Engine: "
"https://docs.docker.com/engine/install/"
)
else:
installHint = "Install the selected runtime executable."
def buildRunnerNotFoundMessage(runtimeName: str) -> str:
"""Build a consistent missing-runner diagnostic."""

return (
f"Unable to find {runtimeName} runner in PATH. "
f"Set --runner/KUB_APP_RUNNER or install it. {getRunnerInstallHint(runtimeName)}"
)

raise RunnerNotFoundError(
f"Unable to find {runtimeName} runner in PATH. "
f"Set --runner/KUB_APP_RUNNER or install it. {installHint}"

def summarizeRunnerProbeOutput(completed: subprocess.CompletedProcess[str]) -> str:
"""Extract one compact diagnostic line from failed probe output."""

for stream in (completed.stderr, completed.stdout):
for line in stream.splitlines():
normalized = line.strip()
if normalized:
if len(normalized) > 240:
return f"{normalized[:237]}..."
return normalized

return "runner did not report any error details"


def probeRunnerExecutable(runnerPath: str, *, runtimeName: str) -> str | None:
"""Return failure details when a runner exists but cannot start cleanly."""

del runtimeName

try:
completed = subprocess.run(
[runnerPath, "--help"],
check=False,
capture_output=True,
text=True,
timeout=RUNNER_PROBE_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return (
"startup probe timed out after "
f"{RUNNER_PROBE_TIMEOUT_SECONDS:g}s"
)
except OSError as error:
return f"startup probe failed: {error}"

return resolvedRunner
if completed.returncode == 0:
return None

return (
f"startup probe exited with code {completed.returncode}: "
f"{summarizeRunnerProbeOutput(completed)}"
)

def tryResolveRunnerExecutable(runnerValue: str) -> str | None:
"""Try to resolve runner executable, returning None on failure."""

def discoverRunnerExecutable(runnerValue: str, *, runtimeName: str) -> RunnerDiscovery:
"""Resolve a runner executable and verify it starts successfully."""

normalized = runnerValue.strip()
if not normalized:
return None
return RunnerDiscovery(
runnerPath=None,
diagnostic=(
f"{runtimeName.capitalize()} runner is empty. "
"Set --runner or runtime-specific runner config."
),
)

runnerPath = Path(normalized).expanduser()
hasPathSeparator = runnerPath.parent != Path(".")

if runnerPath.is_absolute() or hasPathSeparator:
if runnerPath.exists() and os.access(runnerPath, os.X_OK):
return str(runnerPath)
return None
if not (runnerPath.exists() and os.access(runnerPath, os.X_OK)):
return RunnerDiscovery(
runnerPath=None,
diagnostic=(
f"{runtimeName.capitalize()} runner not executable: '{runnerPath}'."
),
)

resolvedRunner = str(runnerPath)
else:
resolvedRunner = shutil.which(normalized)
if resolvedRunner is None:
return RunnerDiscovery(
runnerPath=None,
diagnostic=buildRunnerNotFoundMessage(runtimeName),
)

probeFailure = probeRunnerExecutable(resolvedRunner, runtimeName=runtimeName)
if probeFailure is not None:
return RunnerDiscovery(
runnerPath=None,
diagnostic=(
f"{runtimeName.capitalize()} runner '{resolvedRunner}' is installed "
f"but not usable: {probeFailure}. "
"Fix the runtime installation or set --runner/KUB_APP_RUNNER "
"to a working executable."
),
)

return RunnerDiscovery(runnerPath=resolvedRunner)


def resolveRunnerExecutable(runnerValue: str, *, runtimeName: str) -> str:
"""Resolve runner executable path and validate it is runnable."""

discovery = discoverRunnerExecutable(runnerValue, runtimeName=runtimeName)
if discovery.runnerPath is not None:
return discovery.runnerPath

raise RunnerNotFoundError(
discovery.diagnostic or buildRunnerNotFoundMessage(runtimeName)
)


def tryResolveRunnerExecutable(runnerValue: str, *, runtimeName: str) -> str | None:
"""Try to resolve runner executable, returning None on failure."""

return shutil.which(normalized)
discovery = discoverRunnerExecutable(runnerValue, runtimeName=runtimeName)
return discovery.runnerPath


def resolveRuntimeForExecution(config: KubConfig) -> RuntimeResolution:
Expand Down Expand Up @@ -201,20 +288,23 @@ def resolveAutoRuntime(config: KubConfig) -> RuntimeResolution:
apptainerImage = getRuntimeCandidateImage(config, "apptainer")
if apptainerImage is not None:
apptainerRunnerValue = getRunnerValue(config, "apptainer")
apptainerRunner = tryResolveRunnerExecutable(apptainerRunnerValue)
if apptainerRunner is not None:
apptainerRunner = discoverRunnerExecutable(
apptainerRunnerValue,
runtimeName="apptainer",
)
if apptainerRunner.runnerPath is not None:
try:
imageReference = resolveApptainerExecutionImage(config)
return RuntimeResolution(
runtime="apptainer",
runnerPath=apptainerRunner,
runnerPath=apptainerRunner.runnerPath,
imageReference=imageReference,
)
except KubCliError as error:
diagnostics.append(f"Apptainer not selected: {error}")
else:
diagnostics.append(
"Apptainer not selected: runner not available in PATH or not executable."
f"Apptainer not selected: {apptainerRunner.diagnostic}"
)
else:
diagnostics.append(
Expand All @@ -224,8 +314,11 @@ def resolveAutoRuntime(config: KubConfig) -> RuntimeResolution:
dockerImage = getRuntimeCandidateImage(config, "docker")
if dockerImage is not None:
dockerRunnerValue = getRunnerValue(config, "docker")
dockerRunner = tryResolveRunnerExecutable(dockerRunnerValue)
if dockerRunner is not None:
dockerRunner = discoverRunnerExecutable(
dockerRunnerValue,
runtimeName="docker",
)
if dockerRunner.runnerPath is not None:
try:
imageReference = resolveDockerExecutionImage(
config,
Expand All @@ -234,14 +327,14 @@ def resolveAutoRuntime(config: KubConfig) -> RuntimeResolution:
)
return RuntimeResolution(
runtime="docker",
runnerPath=dockerRunner,
runnerPath=dockerRunner.runnerPath,
imageReference=imageReference,
)
except KubCliError as error:
diagnostics.append(f"Docker not selected: {error}")
else:
diagnostics.append(
"Docker not selected: runner not available in PATH or not executable."
f"Docker not selected: {dockerRunner.diagnostic}"
)
else:
diagnostics.append(
Expand Down Expand Up @@ -272,9 +365,15 @@ def resolveRunner(self) -> str:
def resolveImage(self) -> str:
return resolveApptainerExecutionImage(self.config)

def build(self, forwardedArgs: Sequence[str]) -> list[str]:
runner = self.resolveRunner()
image = self.resolveImage()
def build(
self,
forwardedArgs: Sequence[str],
*,
runnerPath: str | None = None,
imageReference: str | None = None,
) -> list[str]:
runner = runnerPath or self.resolveRunner()
image = imageReference or self.resolveImage()

command: list[str] = [runner, "run"]

Expand All @@ -292,9 +391,15 @@ def build(self, forwardedArgs: Sequence[str]) -> list[str]:

return command

def buildExec(self, forwardedArgs: Sequence[str]) -> list[str]:
runner = self.resolveRunner()
image = self.resolveImage()
def buildExec(
self,
forwardedArgs: Sequence[str],
*,
runnerPath: str | None = None,
imageReference: str | None = None,
) -> list[str]:
runner = runnerPath or self.resolveRunner()
image = imageReference or self.resolveImage()

command: list[str] = [runner, "exec"]

Expand Down Expand Up @@ -331,9 +436,10 @@ def build(
self,
forwardedArgs: Sequence[str],
*,
runnerPath: str | None = None,
imageReference: str | None = None,
) -> list[str]:
runner = self.resolveRunner()
runner = runnerPath or self.resolveRunner()
resolvedImageReference = imageReference or self.resolveImage()

command: list[str] = [runner, "run", "--rm"]
Expand Down Expand Up @@ -407,15 +513,23 @@ def run(

if runtimeResolution.runtime == "apptainer":
builder = ApptainerCommandBuilder(appName=appName, config=self.config)
command = builder.build(forwardedArgs)
command = builder.build(
forwardedArgs,
runnerPath=runtimeResolution.runnerPath,
imageReference=runtimeResolution.imageReference,
)
if not dryRun:
try:
if shouldUseApptainerExecForLocalImage(
runnerPath=runtimeResolution.runnerPath,
imageReference=runtimeResolution.imageReference,
appName=appName,
):
command = builder.buildExec(forwardedArgs)
command = builder.buildExec(
forwardedArgs,
runnerPath=runtimeResolution.runnerPath,
imageReference=runtimeResolution.imageReference,
)
if self.config.verbose:
LOGGER.debug(
"Apptainer app '%s' not found in local image; using exec fallback.",
Expand All @@ -427,6 +541,7 @@ def run(
builder = DockerCommandBuilder(appName=appName, config=self.config)
command = builder.build(
forwardedArgs,
runnerPath=runtimeResolution.runnerPath,
imageReference=runtimeResolution.imageReference,
)

Expand Down
Loading
Loading