Skip to content

Commit

Permalink
use kubectl cp --retries (#2781)
Browse files Browse the repository at this point in the history
  • Loading branch information
siyuan0322 committed May 31, 2023
1 parent d2c7d2e commit b014dad
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
24 changes: 11 additions & 13 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from gscoordinator.utils import ResolveMPICmdPrefix
from gscoordinator.utils import delegate_command_to_pod
from gscoordinator.utils import parse_as_glog_level
from gscoordinator.utils import run_command
from gscoordinator.utils import run_kube_cp_command
from gscoordinator.version import __version__

logger = logging.getLogger("graphscope")
Expand Down Expand Up @@ -197,7 +197,7 @@ def __init__(
self._mars_worker_mem = mars_worker_mem

self._pod_name_list = []
self._pod_ip_list = None
self._pod_ip_list = []
self._pod_host_ip_list = None

self._analytical_engine_endpoint = None
Expand Down Expand Up @@ -335,8 +335,7 @@ def distribute_file(self, path):
except RuntimeError:
cmd = f"mkdir -p {os.path.dirname(path)}"
logger.debug(delegate_command_to_pod(cmd, pod, container))
cmd = f"kubectl cp {path} {pod}:{path} -c {container}"
logger.debug(run_command(cmd))
logger.debug(run_kube_cp_command(path, path, pod, container, True))

def close_analytical_instance(self):
pass
Expand Down Expand Up @@ -834,16 +833,16 @@ def create_analytical_instance(self):
)

# generate and distribute hostfile
kube_hosts_path = os.path.join(get_tempdir(), "kube_hosts")
with open(kube_hosts_path, "w") as f:
hosts = os.path.join(get_tempdir(), "kube_hosts")
with open(hosts, "w") as f:
for i, pod_ip in enumerate(self._pod_ip_list):
f.write(f"{pod_ip} {self._pod_name_list[i]}\n")

container = self._engine_cluster.analytical_container_name
for pod in self._pod_name_list:
container = self._engine_cluster.analytical_container_name
cmd = f"kubectl -n {self._namespace} cp {kube_hosts_path} {pod}:/tmp/hosts_of_nodes -c {container}"
cmd = shlex.split(cmd)
subprocess.check_call(cmd)
logger.debug(
run_kube_cp_command(hosts, "/tmp/hosts_of_nodes", pod, container, True)
)

# launch engine
rmcp = ResolveMPICmdPrefix(rsh_agent=True)
Expand Down Expand Up @@ -1022,10 +1021,9 @@ def create_learning_instance(self, object_id, handle, config):
container = self._engine_cluster.learning_container_name
sub_cmd = f"python3 -m gscoordinator.learning {handle} {config} {pod_index}"
cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
logging.debug("launching learning server: %s", " ".join(cmd))
cmd = shlex.split(cmd)
logger.debug("launching learning server: %s", " ".join(cmd))
proc = subprocess.Popen(
cmd,
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
Expand Down
50 changes: 29 additions & 21 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,29 @@ def delegate_command_to_pod(args: str, pod: str, container: str):
"""Delegate a command to a pod.
Args:
command (str): Command to be delegated.
pod_name (str): Pod name.
namespace (str): Namespace of the pod.
command (str): Command to be delegated.
pod_name (str): Pod name.
namespace (str): Namespace of the pod.
Returns:
str: Output of the command.
Returns:
str: Output of the command.
"""
# logger.info("Delegate command to pod: %s, %s, %s", args, pod, container)
args = f'kubectl exec -c {container} {pod} -- bash -c "{args}"'
return run_command(args)


def run_kube_cp_command(src, dst, pod, container=None, host_to_pod=True):
if host_to_pod:
cmd = f"kubectl cp {src} {pod}:{dst}"
else:
cmd = f"kubectl cp {pod}:{src} {dst}"
if container is not None:
cmd = f"{cmd} -c {container}"
cmd = f"{cmd} --retries=5"
return run_command(cmd)


def compile_library(commands, workdir, output_name, launcher):
if launcher.type() == types_pb2.K8S:
return _compile_on_kubernetes(
Expand All @@ -336,51 +347,48 @@ def _compile_on_kubernetes(commands, workdir, output_name, pod, container):
container,
)
try:
full_path = get_lib_path(workdir, output_name)
lib_path = get_lib_path(workdir, output_name)
try:
# The library may exists in the analytical pod.
test_cmd = f"test -f {full_path}"
test_cmd = f"test -f {lib_path}"
logger.debug(delegate_command_to_pod(test_cmd, pod, container))
logger.info("Library exists, skip compilation")
cp = f"kubectl cp {pod}:{full_path} {full_path} -c {container}"
logger.debug(run_command(cp))
return full_path
logger.debug(run_kube_cp_command(lib_path, lib_path, pod, container, False))
return lib_path
except RuntimeError:
pass
parent_dir = os.path.dirname(workdir)
mkdir = f"mkdir -p {parent_dir}"
logger.debug(delegate_command_to_pod(mkdir, pod, container))
cp = f"kubectl cp {workdir} {pod}:{workdir} -c {container}"
logger.debug(run_command(cp))
logger.debug(run_kube_cp_command(workdir, workdir, pod, container, True))
for command in commands:
command = f"cd {workdir} && {command}"
logger.debug(delegate_command_to_pod(command, pod, container))
cp = f"kubectl cp {pod}:{full_path} {full_path} -c {container}"
logger.debug(run_command(cp))
if not os.path.isfile(full_path):
logger.debug(run_kube_cp_command(lib_path, lib_path, pod, container, False))
if not os.path.isfile(lib_path):
logger.error("Could not find desired library, found files are:")
logger.error(os.listdir(workdir))
raise FileNotFoundError(full_path)
raise FileNotFoundError(lib_path)
except Exception as e:
raise CompilationError(f"Failed to compile {output_name} on kubernetes") from e
return full_path
return lib_path


def _compile_on_local(commands, workdir, output_name):
logger.info("compile on local, %s, %s, %s", commands, workdir, output_name)
try:
for command in commands:
logger.debug(run_command(command, cwd=workdir))
full_path = get_lib_path(workdir, output_name)
if not os.path.isfile(full_path):
lib_path = get_lib_path(workdir, output_name)
if not os.path.isfile(lib_path):
logger.error("Could not find desired library")
logger.info(os.listdir(workdir))
raise FileNotFoundError(full_path)
raise FileNotFoundError(lib_path)
except Exception as e:
raise CompilationError(
f"Failed to compile {output_name} on platform {get_platform_info()}"
) from e
return full_path
return lib_path


def compile_app(
Expand Down

0 comments on commit b014dad

Please sign in to comment.