Skip to content

Commit

Permalink
Add timeout parameters for coordinations (#2915)
Browse files Browse the repository at this point in the history
Add timeout parameters for coordinations such that they can be tuned via arguments

Fixes #2734
  • Loading branch information
TaoLbr1993 committed Jul 6, 2023
1 parent 209ff2e commit 6ade1d5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 14 deletions.
13 changes: 11 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ def __init__(

# dangling check
self._dangling_timeout_seconds = dangling_timeout_seconds
self._comm_timeout_seconds = 120
self._poll_timeout_seconds = 2
self._dangling_detecting_timer = None
self._cleanup_instance = False

Expand Down Expand Up @@ -237,6 +239,9 @@ def ConnectSession(self, request, context):
self._connected = True
# Cleanup after timeout seconds
self._dangling_timeout_seconds = request.dangling_timeout_seconds
# other timeout seconds
self._comm_timeout_seconds = getattr(request, "comm_timeout_seconds", 120)
self._poll_timeout_seconds = getattr(request, "poll_timeout_seconds", 2)
# If true, also delete graphscope instance (such as pods) in closing process
self._cleanup_instance = request.cleanup_instance

Expand Down Expand Up @@ -382,7 +387,9 @@ def _RunStep(self, request_iterator, context):
def FetchLogs(self, request, context):
while self._streaming_logs:
try:
info_message, error_message = self._pipe_merged.poll(timeout=2)
info_message, error_message = self._pipe_merged.poll(
timeout=self._poll_timeout_seconds
)
except queue.Empty:
info_message, error_message = "", ""
except Exception as e:
Expand Down Expand Up @@ -466,7 +473,9 @@ def _match_frontend_endpoint(pattern, lines):
self._object_manager.put(object_id, gie_manager)
# 60 seconds is enough, see also GH#1024; try 120
# already add errs to outs
outs, _ = proc.communicate(timeout=120) # throws TimeoutError
outs, _ = proc.communicate(
timeout=self._comm_timeout_seconds
) # throws TimeoutError
return_code = proc.poll()
if return_code != 0:
raise RuntimeError(f"Error code: {return_code}, message {outs}")
Expand Down
22 changes: 16 additions & 6 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ def __init__(
preemptive=None,
service_type=None,
timeout_seconds=None,
kube_timeout_seconds=1,
retry_time_seconds=2,
del_retry_time_seconds=1,
vineyard_cpu=None,
vineyard_deployment=None,
vineyard_image=None,
Expand Down Expand Up @@ -162,6 +165,12 @@ def __init__(

assert timeout_seconds is not None
self._timeout_seconds = timeout_seconds
# timeout seconds waiting for kube service ready
self._kube_timeout_seconds = kube_timeout_seconds
# retry time when waiting for kube service ready
self._retry_time_seconds = retry_time_seconds
# retry time when deleting dangling coordinators
self._del_retry_time_seconds = del_retry_time_seconds

self._waiting_for_delete = waiting_for_delete

Expand Down Expand Up @@ -1018,7 +1027,7 @@ def _waiting_for_services_ready(self):
self._core_api.list_namespaced_event,
namespace,
field_selector=field_selector,
timeout_seconds=1,
timeout_seconds=self._kube_timeout_seconds,
)
for event in stream:
msg = f"[{pod_name}]: {event['object'].message}"
Expand All @@ -1032,7 +1041,7 @@ def _waiting_for_services_ready(self):
break
if self._timeout_seconds + start_time < time.time():
raise TimeoutError("GraphScope Engines launching timeout.")
time.sleep(2)
time.sleep(self._retry_time_seconds)

self._pod_name_list = []
self._pod_ip_list = []
Expand Down Expand Up @@ -1244,12 +1253,12 @@ def _delete_dangling_coordinator(self):
)
break
else:
time.sleep(1)
if time.time() - start_time > self._timeout_seconds:
logger.error(
"Deleting dangling coordinator %s timeout",
self._coordinator_name,
)
time.sleep(self._del_retry_time_seconds)

def _get_owner_reference_as_json(self):
owner_reference = [
Expand Down Expand Up @@ -1362,18 +1371,19 @@ def stop(self, is_dangling=False):
)
break
else:
time.sleep(1)
if time.time() - start_time > self._timeout_seconds:
logger.error(
"Deleting namespace %s timeout", self._namespace
)
time.sleep(self._del_retry_time_seconds)

else:
# delete coordinator deployment and service
self._delete_dangling_coordinator()
self._serving = False
logger.info("Kubernetes launcher stopped")

def _allocate_learining_engine(self, object_id):
def _allocate_learning_engine(self, object_id):
# check the learning engine flag
if not self._with_learning:
raise NotImplementedError("Learning engine not enabled")
Expand Down Expand Up @@ -1442,7 +1452,7 @@ def _distribute_learning_process(
)

def create_learning_instance(self, object_id, handle, config):
pod_name_list, _, pod_host_ip_list = self._allocate_learining_engine(object_id)
pod_name_list, _, pod_host_ip_list = self._allocate_learning_engine(object_id)
if not pod_name_list or not pod_host_ip_list:
raise RuntimeError("Failed to allocate learning engine")
return self._distribute_learning_process(
Expand Down
18 changes: 12 additions & 6 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def __init__(
log_level: str,
instance_id: str,
timeout_seconds: int,
close_timeout_seconds: int = 60,
retry_time_seconds: int = 1,
):
super().__init__()
self._num_workers = num_workers
Expand All @@ -75,7 +77,8 @@ def __init__(
self._glog_level = parse_as_glog_level(log_level)
self._instance_id = instance_id
self._timeout_seconds = timeout_seconds

self._close_timeout_seconds = close_timeout_seconds
self._retry_time_seconds = retry_time_seconds
self._vineyard_socket_prefix = os.path.join(get_tempdir(), "vineyard.sock.")

# A graphscope instance may have multiple session by reconnecting to coordinator
Expand Down Expand Up @@ -184,10 +187,11 @@ def create_analytical_instance(self):

start_time = time.time()
while is_free_port(rpc_port):
time.sleep(1)
if self._timeout_seconds + start_time < time.time():
self._analytical_engine_process.kill()
raise RuntimeError("Launch analytical engine failed due to timeout.")
time.sleep(self._retry_time_seconds)

logger.info(
"Analytical engine is listening on %s", self._analytical_engine_endpoint
)
Expand Down Expand Up @@ -341,7 +345,7 @@ def close_interactive_instance(self, object_id):
bufsize=1,
)
# 60 seconds is enough
process.wait(timeout=60)
process.wait(timeout=self._close_timeout_seconds)
return process

def close_learning_instance(self, object_id):
Expand Down Expand Up @@ -424,14 +428,15 @@ def launch_etcd(self):

start_time = time.time()
while is_free_port(self._etcd_client_port):
time.sleep(1)
if self._timeout_seconds + start_time < time.time():
self._etcd_process.kill()
_, errs = self._etcd_process.communicate()
logger.error("Start etcd timeout, %s", errs)
msg = "Launch etcd service failed due to timeout: "
msg += "\n".join([line for line in stdout_watcher.poll_all()])
raise RuntimeError(msg)
time.sleep(self._retry_time_seconds)

stdout_watcher.drop(True)
stdout_watcher.suppress(not logger.isEnabledFor(logging.DEBUG))
logger.info("Etcd is ready, endpoint is %s", self._etcd_endpoint)
Expand Down Expand Up @@ -494,10 +499,9 @@ def launch_vineyard(self):

start_time = time.time()
if len(hosts) > 1:
time.sleep(5) # should be OK
time.sleep(5 * self._retry_time_seconds) # should be OK
else:
while not os.path.exists(self._vineyard_socket):
time.sleep(1)
if self._vineyardd_process.poll() is not None:
msg = "Launch vineyardd failed: "
msg += "\n".join([line for line in stdout_watcher.poll_all()])
Expand All @@ -509,6 +513,8 @@ def launch_vineyard(self):
# outs, _ = self._vineyardd_process.communicate()
# logger.error("Start vineyardd timeout, %s", outs)
raise RuntimeError("Launch vineyardd failed due to timeout.")
time.sleep(self._retry_time_seconds)

stdout_watcher.drop(True)
stdout_watcher.suppress(not logger.isEnabledFor(logging.DEBUG))
logger.info(
Expand Down

0 comments on commit 6ade1d5

Please sign in to comment.