Skip to content

Commit

Permalink
Adds monitoring to helm_client and start_helm_cluster.py
Browse files Browse the repository at this point in the history
  • Loading branch information
pritchardn committed Mar 14, 2022
1 parent 07a25f0 commit ca86fc8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
36 changes: 36 additions & 0 deletions daliuge-engine/dlg/deploy/helm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import shutil
import subprocess
import sys
import threading
import time

import dlg
Expand Down Expand Up @@ -247,6 +248,7 @@ def launch_helm(self, co_host=False):
else:
logger.error("K8s pods did not start in timeframe allocated")
self.teardown()
raise RuntimeWarning("K8s pods did not start in timeframe allocated")
else:
logger.info(f"Created helm chart {self._chart_name} in {self._deploy_dir}")

Expand All @@ -257,6 +259,23 @@ def teardown(self):
subprocess.check_output([f'helm uninstall daliuge-daemon-{i}'], shell=True)
subprocess.check_output([f'helm uninstall daliuge-daemon-master'], shell=True)

def _monitor(self, session_id=None):

def _task():
while True:
try:
dlg.deploy.common.monitor_sessions(
session_id=session_id, host=self._submission_endpoint,
port=NODE_DEFAULT_REST_PORT
)
break
except:
logger.exception("Monitoring failed, attempting to restart")

threads = threading.Thread(target=_task)
threads.start()
return threads

def submit_pgt(self):
"""
There is a semi-dynamic element to fetching the IPs of Node(s) to deploy to.
Expand All @@ -275,6 +294,14 @@ def submit_pgt(self):
submit(physical_graph, self._submission_endpoint, port=NODE_DEFAULT_REST_PORT,
skip_deploy=False)

def submit_and_monitor_pgt(self):
"""
Combines submission and monitoring steps of a pgt.
"""
session_id = self.submit_pgt()
monitoring_thread = self._monitor(session_id)
monitoring_thread.join()

def submit_pg(self):
"""
There is a semi-dynamic element to fetching the IPs of Node(s) to deploy to.
Expand All @@ -286,3 +313,12 @@ def submit_pg(self):
pg_data = json.loads(self._physical_graph_file)
# TODO: Add dumping to log-dir
submit(pg_data, self._submission_endpoint, port=NODE_DEFAULT_REST_PORT, skip_deploy=False)

def submit_and_monitor_pg(self):
"""
Combines submission and monitoring steps of a pg.
"""
session_id = self.submit_pg()
monitoring_thread = self._monitor(session_id)
monitoring_thread.join()

2 changes: 1 addition & 1 deletion daliuge-engine/dlg/deploy/start_helm_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def main():
helm_client.create_helm_chart(json.dumps(physical_graph))
try:
helm_client.launch_helm()
helm_client.submit_pgt()
helm_client.submit_and_monitor_pgt()
except dlg.restutils.RestClientException as exp:
raise exp
except dlg.exceptions.InvalidGraphException as exp2:
Expand Down

0 comments on commit ca86fc8

Please sign in to comment.