Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into liu-247
Browse files Browse the repository at this point in the history
  • Loading branch information
awicenec committed May 5, 2022
2 parents d99db30 + 3068a2f commit fdee485
Show file tree
Hide file tree
Showing 17 changed files with 617 additions and 176 deletions.
98 changes: 98 additions & 0 deletions daliuge-engine/dlg/deploy/deployment_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
# MA 02111-1307 USA
#
import json
import logging
import re
import subprocess
import time

logger = logging.getLogger(__name__)


class ListTokens(object):
Expand Down Expand Up @@ -104,6 +110,19 @@ def list_as_string(s):
return _parse_list_tokens(iter(_list_tokenizer(s)))


def check_k8s_env():
"""
Makes sure kubectl can be called and is accessible.
"""
try:
output = subprocess.run(['kubectl version'], capture_output=True,
shell=True).stdout
pattern = re.compile(r'^Client Version:.*\nServer Version:.*')
return re.match(pattern, output.decode(encoding='utf-8'))
except subprocess.SubprocessError:
return False


def find_numislands(physical_graph_template_file):
"""
Given the physical graph data extract the graph name and the total number of
Expand Down Expand Up @@ -150,3 +169,82 @@ def num_daliuge_nodes(num_nodes: int, run_proxy: bool):
"Not enough nodes {0} to run DALiuGE.".format(num_nodes)
)
return ret


def find_node_ips():
query = subprocess.check_output([
r'kubectl get nodes --selector=kubernetes.io/role!=master -o jsonpath={.items[*].status.addresses[?\(@.type==\"InternalIP\"\)].address}'],
shell=True)
node_ips = query.decode(encoding='utf-8').split(' ')
return node_ips


def find_service_ips(num_expected, retries=3, timeout=10):
pattern = r"^daliuge-daemon-service-.*\s*ClusterIP\s*\d+\.\d+\.\d+\.\d+"
ip_pattern = r"\d+\.\d+\.\d+\.\d+"
ips = []
attempts = 0
while len(ips) < num_expected and attempts < retries:
ips = []
query = subprocess.check_output([
r'kubectl get svc -o wide'],
shell=True).decode(encoding='utf-8')
outcome = re.findall(pattern, query, re.M)
for service in outcome:
ip = re.search(ip_pattern, service)
if ip:
ips.append(ip.group(0))
logger.info(f"K8s service ips: {ips}")
time.sleep(timeout)
return ips


def find_pod_ips(num_expected, retries=3, timeout=10):
ips = []
attempts = 0
while len(ips) < num_expected and attempts < retries:
ips = []
query = str(subprocess.check_output([
r'kubectl get pods -o wide'],
shell=True).decode(encoding='utf-8'))
pattern = r"^daliuge-daemon.*"
ip_pattern = r"\d+\.\d+\.\d+\.\d+"
outcome = re.findall(pattern, query, re.M)
for pod in outcome:
ip = re.search(ip_pattern, pod)
if ip:
ips.append(ip.group(0))
logger.info(f"K8s pod ips: {ips}")
time.sleep(timeout)
return ips


def _status_all_running(statuses):
if statuses == []:
return False
for status in statuses:
if status != "Running":
return False
return True


def wait_for_pods(num_expected, retries=18, timeout=10):
all_running = False
attempts = 0
while not all_running and attempts < retries:
query = str(subprocess.check_output([
r'kubectl get pods -o wide'],
shell=True).decode(encoding='utf-8'))
logger.info(query)
pattern = r"^daliuge-daemon.*"
outcome = re.findall(pattern, query, re.M)
if len(outcome) < num_expected:
all_running = False
continue
all_running = True
for pod in outcome:
if "Running" not in pod:
all_running = False
attempts += 1
time.sleep(timeout)
return all_running
Loading

0 comments on commit fdee485

Please sign in to comment.