Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/xpk/commands/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def submit_job(args: Namespace) -> None:
if args.time is not None:
cmd += f' --time {args.time}'

return_code, return_value = run_command_for_value(cmd, 'submit job', args)
return_code, return_value = run_command_for_value(cmd, 'submit job')

if return_code != 0:
xpk_print(f'Running batch job returned ERROR {return_code}')
Expand Down
45 changes: 21 additions & 24 deletions src/xpk/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def cluster_adapt(args) -> None:
'Argument --num-nodes was not provided, trying to determine number of'
' nodes based on the available nodes in the cluster...'
)
args.num_nodes = count_nodes_on_cluster(args, system)
args.num_nodes = count_nodes_on_cluster(system)
if args.num_nodes == 0:
xpk_print(
'Found unexpected number of nodes. Is the --device-type correct?'
Expand Down Expand Up @@ -445,7 +445,7 @@ def cluster_describe(args) -> None:

get_cluster_credentials(args)

return_code, data_table = nodepools_build_table(args)
return_code, data_table = nodepools_build_table()
if return_code != 0:
xpk_exit(return_code)

Expand All @@ -461,7 +461,6 @@ def cluster_describe(args) -> None:
r'kubectl get node --no-headers=true'
r" --selector='cloud.google.com/gke-tpu-accelerator' | wc -l",
'Count TPU Nodes',
args,
)
if return_code_node_output != 0:
xpk_exit(return_code_node_output)
Expand All @@ -472,7 +471,6 @@ def cluster_describe(args) -> None:
"kubectl get pod -o=custom-columns='Status:.status.phase' | grep -i"
' Running | wc -l',
'Count TPU Pods',
args,
)
if return_code_pod_output != 0:
xpk_exit(return_code_pod_output)
Expand All @@ -487,7 +485,7 @@ def cluster_describe(args) -> None:
xpk_exit(0)


def nodepools_build_table(args) -> tuple[int, list[list]]:
def nodepools_build_table() -> tuple[int, list[list]]:
table = [[
'NODEPOOL_NAME',
'SLICE',
Expand All @@ -499,14 +497,14 @@ def nodepools_build_table(args) -> tuple[int, list[list]]:

nodepools_data = {}

nodepools, return_code = get_node_pools_name(args)
nodepools, return_code = get_node_pools_name()
if return_code != 0:
xpk_print(f'Get node pools name returned ERROR {return_code}')

for name in nodepools:
nodepools_data[name] = [name]

slices, return_code = get_slice_node_pool_size(args)
slices, return_code = get_slice_node_pool_size()
if return_code != 0:
xpk_print(f'Get slice node pool size returned ERROR {return_code}')

Expand All @@ -515,7 +513,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]:
count, nodepool_name = s[0], s[1]
nodepools_data[nodepool_name].append(count)

type_nodepool, return_code = get_node_pool_instance_type(args)
type_nodepool, return_code = get_node_pool_instance_type()
if return_code != 0:
xpk_print(f'Get node pool instance type returned ERROR {return_code}')

Expand All @@ -524,7 +522,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]:
nodepool_name, instance_type = tn[0], tn[1]
nodepools_data[nodepool_name].append(instance_type)

expected_healthy_nodes, return_code = get_expected_healthy_nodes(args)
expected_healthy_nodes, return_code = get_expected_healthy_nodes()
if return_code != 0:
xpk_print(f'Get expected healthy nodes returned ERROR {return_code}')

Expand All @@ -533,7 +531,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]:
count, nodepool_name = ehn[0], ehn[1]
nodepools_data[nodepool_name].append(count)

actual_healthy_nodes, return_code = get_actual_healthy_nodes(args)
actual_healthy_nodes, return_code = get_actual_healthy_nodes()
if return_code != 0:
xpk_print(f'Get actual healthy nodes returned ERROR {return_code}')

Expand All @@ -542,7 +540,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]:
count, nodepool_name = ahn[0], ahn[1]
nodepools_data[nodepool_name].append(count)

total_nodes, return_code = get_total_nodes_per_node_pool(args)
total_nodes, return_code = get_total_nodes_per_node_pool()
if return_code != 0:
xpk_print(f'Get total nodes per node pool returned ERROR {return_code}')

Expand All @@ -557,20 +555,20 @@ def nodepools_build_table(args) -> tuple[int, list[list]]:
return 0, table


def get_node_pools_name(args) -> tuple[list[str], int]:
def get_node_pools_name() -> tuple[list[str], int]:
cmd_nodepools = (
'kubectl get node --no-headers=true -o'
" custom-columns='NODEPOOL:.metadata.labels.cloud\\.google\\.com/gke-nodepool'"
" | grep -v 'none' | sort | uniq"
)
return_code, out = run_command_for_value(cmd_nodepools, 'Nodepool list', args)
return_code, out = run_command_for_value(cmd_nodepools, 'Nodepool list')
if return_code != 0:
return [], return_code

return out.splitlines(), 0


def get_slice_node_pool_size(args) -> tuple[list[str], int]:
def get_slice_node_pool_size() -> tuple[list[str], int]:
cmd_slices = (
'kubectl get node --no-headers=true -o'
" custom-columns=':metadata.labels.cloud\\.google\\.com/gke-nodepool'"
Expand All @@ -579,31 +577,31 @@ def get_slice_node_pool_size(args) -> tuple[list[str], int]:
' | uniq -c'
)
return_code, out = run_command_for_value(
cmd_slices, 'Count nodes per nodepool slice', args
cmd_slices, 'Count nodes per nodepool slice'
)
if return_code != 0:
return [], return_code

return out.splitlines(), 0


def get_node_pool_instance_type(args) -> tuple[list[str], int]:
def get_node_pool_instance_type() -> tuple[list[str], int]:
cmd_type_nodepool = (
'kubectl get node --no-headers=true -o'
" custom-columns='NODEPOOL:.metadata.labels.cloud\\.google\\.com/gke-nodepool,"
" TYPE:.metadata.labels.node\\.kubernetes\\.io/instance-type' | grep -v"
" 'none' | sort | uniq"
)
return_code, out = run_command_for_value(
cmd_type_nodepool, 'Instance type of nodepools', args
cmd_type_nodepool, 'Instance type of nodepools'
)
if return_code != 0:
return [], return_code

return out.splitlines(), 0


def get_expected_healthy_nodes(args) -> tuple[list[str], int]:
def get_expected_healthy_nodes() -> tuple[list[str], int]:
cmd_expected_healthy_nodes = (
'kubectl get node --no-headers=true -o'
" custom-columns=':metadata.labels.cloud\\.google\\.com/gke-nodepool'"
Expand All @@ -614,15 +612,14 @@ def get_expected_healthy_nodes(args) -> tuple[list[str], int]:
return_code, out = run_command_for_value(
cmd_expected_healthy_nodes,
'Count expected healthy nodes per nodepool',
args,
)
if return_code != 0:
return [], return_code

return out.splitlines(), 0


def get_actual_healthy_nodes(args) -> tuple[list[str], int]:
def get_actual_healthy_nodes() -> tuple[list[str], int]:
cmd_actual_healthy_nodes = (
'kubectl get node --no-headers=true -o'
" custom-columns='NODE_NAME:metadata.name,"
Expand All @@ -635,15 +632,15 @@ def get_actual_healthy_nodes(args) -> tuple[list[str], int]:
' | uniq -c'
)
return_code, out = run_command_for_value(
cmd_actual_healthy_nodes, 'Count actual healthy nodes per nodepool', args
cmd_actual_healthy_nodes, 'Count actual healthy nodes per nodepool'
)
if return_code != 0:
return [], return_code

return out.splitlines(), 0


def get_total_nodes_per_node_pool(args) -> tuple[list[str], int]:
def get_total_nodes_per_node_pool() -> tuple[list[str], int]:
cmd_total_nodes = (
'kubectl get node --no-headers=true -o'
" custom-columns='NODE_NAME:metadata.name,"
Expand All @@ -655,7 +652,7 @@ def get_total_nodes_per_node_pool(args) -> tuple[list[str], int]:
' | uniq -c'
)
return_code, out = run_command_for_value(
cmd_total_nodes, 'Count total nodes per nodepool', args
cmd_total_nodes, 'Count total nodes per nodepool'
)
if return_code != 0:
return [], return_code
Expand Down Expand Up @@ -1204,7 +1201,7 @@ def install_storage_csis(args):

def install_kjob(args):
xpk_print('Verifying kjob installation')
err_code = verify_kjob_installed(args)
err_code = verify_kjob_installed()
if err_code > 0:
xpk_exit(err_code)

Expand Down
1 change: 0 additions & 1 deletion src/xpk/commands/cluster_gcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def validate_state_gcs_bucket(args):
err_code, _ = run_command_for_value(
bucket_validate_cmd,
'Validate remote state bucket existence.',
global_args=args,
)
if err_code != 0:
xpk_exit(err_code)
Expand Down
13 changes: 5 additions & 8 deletions src/xpk/commands/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def info(args: Namespace) -> None:
add_zone_and_project(args)
get_cluster_credentials(args)

verify_kueuectl(args)
verify_kueuectl()
lq, cq = bool(args.localqueue), bool(args.clusterqueue)
if not lq and not cq:
lq, cq = True, True
Expand All @@ -48,7 +48,7 @@ def info(args: Namespace) -> None:
if lq:
lqs = run_kueuectl_list_localqueue(args)

cqs = run_kueuectl_list_clusterqueue(args)
cqs = run_kueuectl_list_clusterqueue()
quotas = get_nominal_quotas(cqs)

if lq and lqs is not None:
Expand Down Expand Up @@ -214,26 +214,23 @@ def run_kueuectl_list_localqueue(args: Namespace) -> str:
command = 'kubectl kueue list localqueue -o json'
if args.namespace != '':
command += f' --namespace {args.namespace}'
return_code, val = run_command_for_value(command, 'list localqueue', args)
return_code, val = run_command_for_value(command, 'list localqueue')

if return_code != 0:
xpk_print(f'Cluster info request returned ERROR {return_code}')
xpk_exit(return_code)
return val


def run_kueuectl_list_clusterqueue(args: Namespace) -> str:
def run_kueuectl_list_clusterqueue() -> str:
"""Run the kueuectl list clusterqueue command.

Args:
args: user provided arguments for running the command.

Returns:
kueuectl list clusterqueue formatted as json string
"""
command = 'kubectl kueue list clusterqueue -o json'

return_code, val = run_command_for_value(command, 'list clusterqueue', args)
return_code, val = run_command_for_value(command, 'list clusterqueue')

if return_code != 0:
xpk_print(f'Cluster info request returned ERROR {return_code}')
Expand Down
2 changes: 1 addition & 1 deletion src/xpk/commands/inspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def inspector_run_command_helper(
prefix = f'Command: {command}\nCommand Description: {command_description}\n'
postfix = '========================================================'
return_code, command_output = run_command_for_value(
command, f'{command_description}', args
command, f'{command_description}'
)

if return_code != 0:
Expand Down
6 changes: 1 addition & 5 deletions src/xpk/commands/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ def job_info(args):
job_name = args.name

desc_command = f'kubectl-kjob describe slurm {job_name}'
desc_code, desc_text = run_command_for_value(
desc_command, 'Getting job data', args
)
desc_code, desc_text = run_command_for_value(desc_command, 'Getting job data')
if desc_code != 0:
xpk_print(f'Data info request returned ERROR {desc_code}')
xpk_exit(desc_code)
Expand All @@ -76,7 +74,6 @@ def job_info(args):
job_code, job_text = run_command_for_value(
job_command,
'Getting job info',
args,
dry_run_return_val=JOBS_DRY_RUN_YAML,
)
if job_code != 0:
Expand All @@ -87,7 +84,6 @@ def job_info(args):
pods_code, pods_text = run_command_for_value(
pods_command,
'Getting pods list',
args,
dry_run_return_val=PODS_DRY_RUN_RESULT,
)
if pods_code != 0:
Expand Down
13 changes: 5 additions & 8 deletions src/xpk/commands/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def cluster_create(args) -> None:
xpk_exit(install_kueue_on_cluster_code)

xpk_print('Verifying kjob installation')
err_code = verify_kjob_installed(args)
err_code = verify_kjob_installed()
if err_code > 0:
xpk_exit(err_code)

Expand Down Expand Up @@ -154,7 +154,7 @@ def create_cluster_if_necessary(args) -> int:
Returns:
0 if successful and 1 otherwise.
"""
all_clusters, return_code = get_all_local_clusters_programmatic(args)
all_clusters, return_code = get_all_local_clusters_programmatic()
if return_code > 0:
xpk_print('Listing all clusters failed!')
return 1
Expand Down Expand Up @@ -229,18 +229,15 @@ def run_kind_cluster_create_command(args) -> int:
return 0


def get_all_local_clusters_programmatic(args) -> tuple[list[str], int]:
def get_all_local_clusters_programmatic() -> tuple[list[str], int]:
"""Gets all the local clusters.

Args:
args: user provided arguments for running the command.

Returns:
List of cluster names and 0 if successful and 1 otherwise.
"""
command = 'kind get clusters'
return_code, raw_cluster_output = run_command_for_value(
command, 'Find if Cluster Exists', args
command, 'Find if Cluster Exists'
)
if return_code != 0:
xpk_print(f'Find if Cluster Exists returned ERROR {return_code}')
Expand All @@ -261,7 +258,7 @@ def set_local_cluster_command(args) -> int:
if not args.cluster:
command = 'kubectl config current-context'
return_code, current_context = run_command_for_value(
command, 'get current-context', args
command, 'get current-context'
)
xpk_print(
'No local cluster name specified. Using current-context'
Expand Down
6 changes: 3 additions & 3 deletions src/xpk/commands/kjob_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ def add_gpu_networking_annotations_to_command(args, cmd: str) -> str:

annotations: tuple
if gpu_type == H100_MEGA_DEVICE_TYPE:
annotations = get_a3mega_pod_template_annotations(args)
annotations = get_a3mega_pod_template_annotations()
elif gpu_type == H200_DEVICE_TYPE:
annotations = get_a3ultra_pod_template_annotations(args)
annotations = get_a3ultra_pod_template_annotations()
elif gpu_type == B200_DEVICE_TYPE:
annotations = get_a4_pod_template_annotations(args)
annotations = get_a4_pod_template_annotations()
else:
annotations = tuple()

Expand Down
1 change: 0 additions & 1 deletion src/xpk/commands/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def get_existing_shell_pod_name(args: Namespace) -> str | None:
' -o custom-columns=":metadata.name"'
),
task='Get existing interactive shell pod name.',
global_args=args,
)
if return_code != 0:
xpk_print(
Expand Down
4 changes: 2 additions & 2 deletions src/xpk/commands/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def workload_create(args) -> None:
xpk_print('Starting workload create', flush=True)

metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}'
cluster_config_map = get_cluster_configmap(args, metadata_configmap_name)
cluster_config_map = get_cluster_configmap(metadata_configmap_name)
cluster_xpk_version = None
if cluster_config_map is None:
xpk_print(
Expand Down Expand Up @@ -507,7 +507,7 @@ def workload_create(args) -> None:
annotations=annotations,
)

sub_networks = get_cluster_subnetworks(args)
sub_networks = get_cluster_subnetworks()
if args.device_type == a3high_device_type:
yml_string = tcpx_decorator.decorate_jobset(yml_string)
elif args.device_type == a3mega_device_type:
Expand Down
Loading
Loading