From 2f4099da9adcdee161b0cf7cd203f3ba4ca2458b Mon Sep 17 00:00:00 2001 From: Konrad Kaim Date: Tue, 23 Sep 2025 11:02:28 +0000 Subject: [PATCH 1/2] feat: remove args from run_command_for_value --- src/xpk/commands/batch.py | 2 +- src/xpk/commands/cluster.py | 13 +++++-------- src/xpk/commands/cluster_gcluster.py | 1 - src/xpk/commands/info.py | 4 ++-- src/xpk/commands/inspector.py | 2 +- src/xpk/commands/job.py | 6 +----- src/xpk/commands/kind.py | 4 ++-- src/xpk/commands/shell.py | 1 - src/xpk/core/capacity.py | 4 ++-- src/xpk/core/cluster.py | 8 +------- src/xpk/core/cluster_private.py | 2 -- src/xpk/core/commands.py | 5 ++--- src/xpk/core/gcloud_context.py | 1 - src/xpk/core/jobset.py | 2 +- src/xpk/core/kjob.py | 2 +- src/xpk/core/kueue.py | 6 +++--- src/xpk/core/monitoring.py | 2 +- src/xpk/core/network.py | 10 ++++------ src/xpk/core/nodepool.py | 8 ++++---- src/xpk/core/pathways.py | 2 +- src/xpk/core/ray.py | 6 +++--- src/xpk/core/resources.py | 1 - src/xpk/core/workload.py | 9 ++++----- src/xpk/utils/validation.py | 4 +--- 24 files changed, 40 insertions(+), 65 deletions(-) diff --git a/src/xpk/commands/batch.py b/src/xpk/commands/batch.py index e52edb9a8..65c333907 100644 --- a/src/xpk/commands/batch.py +++ b/src/xpk/commands/batch.py @@ -126,7 +126,7 @@ def submit_job(args: Namespace) -> None: if args.time is not None: cmd += f' --time {args.time}' - return_code, return_value = run_command_for_value(cmd, 'submit job', args) + return_code, return_value = run_command_for_value(cmd, 'submit job') if return_code != 0: xpk_print(f'Running batch job returned ERROR {return_code}') diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index d8bf77d45..28810bb71 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -461,7 +461,6 @@ def cluster_describe(args) -> None: r'kubectl get node --no-headers=true' r" --selector='cloud.google.com/gke-tpu-accelerator' | wc -l", 'Count TPU Nodes', - args, ) if return_code_node_output != 0: xpk_exit(return_code_node_output) @@ -472,7 +471,6 @@ def cluster_describe(args) -> None: "kubectl get pod -o=custom-columns='Status:.status.phase' | grep -i" ' Running | wc -l', 'Count TPU Pods', - args, ) if return_code_pod_output != 0: xpk_exit(return_code_pod_output) @@ -563,7 +561,7 @@ def get_node_pools_name(args) -> tuple[list[str], int]: " custom-columns='NODEPOOL:.metadata.labels.cloud\\.google\\.com/gke-nodepool'" " | grep -v 'none' | sort | uniq" ) - return_code, out = run_command_for_value(cmd_nodepools, 'Nodepool list', args) + return_code, out = run_command_for_value(cmd_nodepools, 'Nodepool list') if return_code != 0: return [], return_code @@ -579,7 +577,7 @@ def get_slice_node_pool_size(args) -> tuple[list[str], int]: ' | uniq -c' ) return_code, out = run_command_for_value( - cmd_slices, 'Count nodes per nodepool slice', args + cmd_slices, 'Count nodes per nodepool slice' ) if return_code != 0: return [], return_code @@ -595,7 +593,7 @@ def get_node_pool_instance_type(args) -> tuple[list[str], int]: " 'none' | sort | uniq" ) return_code, out = run_command_for_value( - cmd_type_nodepool, 'Instance type of nodepools', args + cmd_type_nodepool, 'Instance type of nodepools' ) if return_code != 0: return [], return_code @@ -614,7 +612,6 @@ def get_expected_healthy_nodes(args) -> tuple[list[str], int]: return_code, out = run_command_for_value( cmd_expected_healthy_nodes, 'Count expected healthy nodes per nodepool', - args, ) if return_code != 0: return [], return_code @@ -635,7 +632,7 @@ def get_actual_healthy_nodes(args) -> tuple[list[str], int]: ' | uniq -c' ) return_code, out = run_command_for_value( - cmd_actual_healthy_nodes, 'Count actual healthy nodes per nodepool', args + cmd_actual_healthy_nodes, 'Count actual healthy nodes per nodepool' ) if return_code != 0: return [], return_code @@ -655,7 +652,7 @@ def get_total_nodes_per_node_pool(args) -> tuple[list[str], int]: ' | uniq -c' ) return_code, out = run_command_for_value( - cmd_total_nodes, 'Count total nodes per nodepool', args + cmd_total_nodes, 'Count total nodes per nodepool' ) if return_code != 0: return [], return_code diff --git a/src/xpk/commands/cluster_gcluster.py b/src/xpk/commands/cluster_gcluster.py index 600a58bba..cb98d8d90 100644 --- a/src/xpk/commands/cluster_gcluster.py +++ b/src/xpk/commands/cluster_gcluster.py @@ -213,7 +213,6 @@ def validate_state_gcs_bucket(args): err_code, _ = run_command_for_value( bucket_validate_cmd, 'Validate remote state bucket existence.', - global_args=args, ) if err_code != 0: xpk_exit(err_code) diff --git a/src/xpk/commands/info.py b/src/xpk/commands/info.py index 5dcc0ba86..a2525e0f9 100644 --- a/src/xpk/commands/info.py +++ b/src/xpk/commands/info.py @@ -214,7 +214,7 @@ def run_kueuectl_list_localqueue(args: Namespace) -> str: command = 'kubectl kueue list localqueue -o json' if args.namespace != '': command += f' --namespace {args.namespace}' - return_code, val = run_command_for_value(command, 'list localqueue', args) + return_code, val = run_command_for_value(command, 'list localqueue') if return_code != 0: xpk_print(f'Cluster info request returned ERROR {return_code}') @@ -233,7 +233,7 @@ def run_kueuectl_list_clusterqueue(args: Namespace) -> str: """ command = 'kubectl kueue list clusterqueue -o json' - return_code, val = run_command_for_value(command, 'list clusterqueue', args) + return_code, val = run_command_for_value(command, 'list clusterqueue') if return_code != 0: xpk_print(f'Cluster info request returned ERROR {return_code}') diff --git a/src/xpk/commands/inspector.py b/src/xpk/commands/inspector.py index 3e8d783f0..8c3c8673f 100644 --- a/src/xpk/commands/inspector.py +++ b/src/xpk/commands/inspector.py @@ -41,7 +41,7 @@ def inspector_run_command_helper( prefix = f'Command: {command}\nCommand Description: {command_description}\n' postfix = '========================================================' return_code, command_output = run_command_for_value( - command, f'{command_description}', args + command, f'{command_description}' ) if return_code != 0: diff --git a/src/xpk/commands/job.py b/src/xpk/commands/job.py index 4d9d21457..284655a6b 100644 --- a/src/xpk/commands/job.py +++ b/src/xpk/commands/job.py @@ -62,9 +62,7 @@ def job_info(args): job_name = args.name desc_command = f'kubectl-kjob describe slurm {job_name}' - desc_code, desc_text = run_command_for_value( - desc_command, 'Getting job data', args - ) + desc_code, desc_text = run_command_for_value(desc_command, 'Getting job data') if desc_code != 0: xpk_print(f'Data info request returned ERROR {desc_code}') xpk_exit(desc_code) @@ -76,7 +74,6 @@ def job_info(args): job_code, job_text = run_command_for_value( job_command, 'Getting job info', - args, dry_run_return_val=JOBS_DRY_RUN_YAML, ) if job_code != 0: @@ -87,7 +84,6 @@ def job_info(args): pods_code, pods_text = run_command_for_value( pods_command, 'Getting pods list', - args, dry_run_return_val=PODS_DRY_RUN_RESULT, ) if pods_code != 0: diff --git a/src/xpk/commands/kind.py b/src/xpk/commands/kind.py index 274a2bb43..f0d67d181 100644 --- a/src/xpk/commands/kind.py +++ b/src/xpk/commands/kind.py @@ -240,7 +240,7 @@ def get_all_local_clusters_programmatic(args) -> tuple[list[str], int]: """ command = 'kind get clusters' return_code, raw_cluster_output = run_command_for_value( - command, 'Find if Cluster Exists', args + command, 'Find if Cluster Exists' ) if return_code != 0: xpk_print(f'Find if Cluster Exists returned ERROR {return_code}') @@ -261,7 +261,7 @@ def set_local_cluster_command(args) -> int: if not args.cluster: command = 'kubectl config current-context' return_code, current_context = run_command_for_value( - command, 'get current-context', args + command, 'get current-context' ) xpk_print( 'No local cluster name specified. Using current-context' diff --git a/src/xpk/commands/shell.py b/src/xpk/commands/shell.py index 4974b0c3d..e1f235161 100644 --- a/src/xpk/commands/shell.py +++ b/src/xpk/commands/shell.py @@ -60,7 +60,6 @@ def get_existing_shell_pod_name(args: Namespace) -> str | None: ' -o custom-columns=":metadata.name"' ), task='Get existing interactive shell pod name.', - global_args=args, ) if return_code != 0: xpk_print( diff --git a/src/xpk/core/capacity.py b/src/xpk/core/capacity.py index f07e660b7..b8dfaaff9 100644 --- a/src/xpk/core/capacity.py +++ b/src/xpk/core/capacity.py @@ -119,7 +119,7 @@ def get_reservation_maintenance_interval( f' --project={project} --zone={zone} --format="value(specificReservation.instanceProperties.maintenanceInterval)"' ) return_code, output = run_command_for_value( - command, 'Get reservation maintenance interval', None + command, 'Get reservation maintenance interval' ) if return_code != 0: xpk_print(f'Get reservation maintenance interval ERROR {return_code}') @@ -143,7 +143,7 @@ def get_reservation_placement_policy( f' --project={project} --zone={zone} --format="value(resourcePolicies.policy)"' ) return_code, output = run_command_for_value( - command, 'Get reservation placement policy', None + command, 'Get reservation placement policy' ) if return_code != 0: xpk_print(f'Get reservation placement policy ERROR {return_code}') diff --git a/src/xpk/core/cluster.py b/src/xpk/core/cluster.py index b2c9d8e1c..f99a72759 100644 --- a/src/xpk/core/cluster.py +++ b/src/xpk/core/cluster.py @@ -213,7 +213,6 @@ def get_cluster_nodes_info(args) -> list[dict]: err_code, val = run_command_for_value( command=command, task='Get cluster nodes info', - global_args=args, ) if err_code != 0: xpk_exit(err_code) @@ -248,7 +247,6 @@ def get_cluster_network(args) -> str: err_code, val = run_command_for_value( command=cluster_network_cmd, task='Get network cluster is in', - global_args=args, ) if err_code != 0: xpk_exit(err_code) @@ -361,7 +359,6 @@ def is_driver_enabled_on_cluster( command, f"Checks if {driver} driver's {config_key} is enabled in cluster" ' describe.', - args, ) if return_code != 0: xpk_exit(return_code) @@ -412,7 +409,7 @@ def get_all_clusters_programmatic(args) -> tuple[list[str], int]: ' --format="csv[no-heading](name)"' ) return_code, raw_cluster_output = run_command_for_value( - command, 'Find if Cluster Exists', args + command, 'Find if Cluster Exists' ) if return_code != 0: xpk_print(f'Find if Cluster Exists returned ERROR {return_code}') @@ -734,7 +731,6 @@ def is_cluster_using_clouddns(args) -> bool: return_code, _ = run_command_for_value( command, 'Check if Cloud DNS is enabled in cluster describe.', - args, ) if return_code == 0: xpk_print('Cloud DNS is enabled on the cluster, no update needed.') @@ -757,7 +753,6 @@ def is_workload_identity_enabled_on_cluster(args) -> bool: return_code, workload_pool = run_command_for_value( command, 'Checks if Workload Identity Federation is enabled in cluster describe.', - args, ) if return_code != 0: xpk_exit(return_code) @@ -785,7 +780,6 @@ def is_gcsfuse_driver_enabled_on_cluster(args) -> bool: return_code, gcsfuse_driver_enabled = run_command_for_value( command, 'Checks if GCSFuse CSI driver is enabled in cluster describe.', - args, ) if return_code != 0: xpk_exit(return_code) diff --git a/src/xpk/core/cluster_private.py b/src/xpk/core/cluster_private.py index 3dd4b7f8e..16169ef0a 100644 --- a/src/xpk/core/cluster_private.py +++ b/src/xpk/core/cluster_private.py @@ -133,7 +133,6 @@ def is_cluster_private(args) -> bool: return_code, private_nodes_enabled = run_command_for_value( command, 'Check if Private Nodes is enabled in cluster.', - args, ) if return_code != 0: @@ -164,7 +163,6 @@ def get_cluster_authorized_networks(args) -> list[str]: return_code, authorized_networks = run_command_for_value( command, 'Fetching the list of authorized network from cluster describe.', - args, dry_run_return_val='127.0.0.1/32', ) diff --git a/src/xpk/core/commands.py b/src/xpk/core/commands.py index cc3b266b7..f00c2d18b 100644 --- a/src/xpk/core/commands.py +++ b/src/xpk/core/commands.py @@ -23,6 +23,7 @@ from ..utils.objects import chunks from ..utils.file import make_tmp_files, write_tmp_file from ..utils.console import xpk_print +from ..utils.execution_context import is_dry_run def run_commands(commands, jobname, per_command_name, batch=10, dry_run=False): @@ -226,7 +227,6 @@ def run_command_with_updates(command, task, global_args, verbose=True) -> int: def run_command_for_value( command, task, - global_args, dry_run_return_val='0', print_timer=False, hide_error=False, @@ -239,7 +239,6 @@ def run_command_for_value( Args: command: user provided command to run. task: user provided task name for running the command. - global_args: user provided arguments for running the command. dry_run_return_val: return value of this command for dry run. print_timer: print out the time the command is running. hide_error: hide the error from the command output upon success. @@ -249,7 +248,7 @@ def run_command_for_value( int: return_code, default is 0 str: return_val, default is '0' """ - if global_args is not None and global_args.dry_run: + if is_dry_run(): xpk_print( f'Task: `{task}` is implemented by the following command' ' not running since it is a dry run.' diff --git a/src/xpk/core/gcloud_context.py b/src/xpk/core/gcloud_context.py index a96fa2ee3..0d3688f1d 100644 --- a/src/xpk/core/gcloud_context.py +++ b/src/xpk/core/gcloud_context.py @@ -139,7 +139,6 @@ def get_gke_server_config(args) -> tuple[int, GkeServerConfig | None]: return_code, cmd_output = run_command_for_value( command, command_description, - args, hide_error=True, ) if return_code != 0: diff --git a/src/xpk/core/jobset.py b/src/xpk/core/jobset.py index e47346796..d20ea6e74 100644 --- a/src/xpk/core/jobset.py +++ b/src/xpk/core/jobset.py @@ -122,7 +122,7 @@ def update_jobset_resources_if_necessary(args): # Get total number of nodes cmd_total_node_num = 'kubectl get node --no-headers | wc -l' return_code, out = run_command_for_value( - cmd_total_node_num, 'Count total nodes', args + cmd_total_node_num, 'Count total nodes' ) if return_code != 0: xpk_exit(1) diff --git a/src/xpk/core/kjob.py b/src/xpk/core/kjob.py index c2c43f27b..4033ca2d6 100644 --- a/src/xpk/core/kjob.py +++ b/src/xpk/core/kjob.py @@ -214,7 +214,7 @@ def verify_kjob_installed(args: Namespace) -> int: """ command = "kubectl-kjob help" task = "Verify kjob installation " - verify_kjob_installed_code, _ = run_command_for_value(command, task, args) + verify_kjob_installed_code, _ = run_command_for_value(command, task) if verify_kjob_installed_code == 0: xpk_print("kjob found") diff --git a/src/xpk/core/kueue.py b/src/xpk/core/kueue.py index 49f57a4fd..4fdc8f325 100644 --- a/src/xpk/core/kueue.py +++ b/src/xpk/core/kueue.py @@ -292,7 +292,7 @@ def verify_kueuectl(args: Namespace) -> None: command = 'kubectl kueue version' task = 'Verify kueuectl installation on cluster' - verify_kueuectl_installed_code, _ = run_command_for_value(command, task, args) + verify_kueuectl_installed_code, _ = run_command_for_value(command, task) if verify_kueuectl_installed_code == 0: xpk_print('kueuectl found') @@ -327,7 +327,7 @@ def delete_multikueueclusters_definitions(args) -> int: def get_kueue_version(args) -> tuple[int, str]: command = 'kubectl kueue version' task = 'Get kueue version on server' - return_code, val = run_command_for_value(command, task, args) + return_code, val = run_command_for_value(command, task) if return_code != 0: return return_code, '' lines = val.splitlines() @@ -524,7 +524,7 @@ def update_kueue_resources_if_necessary(args): # Get total number of nodes cmd_total_node_num = 'kubectl get node --no-headers | wc -l' return_code, out = run_command_for_value( - cmd_total_node_num, 'Count total nodes', args + cmd_total_node_num, 'Count total nodes' ) if return_code != 0: xpk_exit(1) diff --git a/src/xpk/core/monitoring.py b/src/xpk/core/monitoring.py index a1a791824..41726674e 100644 --- a/src/xpk/core/monitoring.py +++ b/src/xpk/core/monitoring.py @@ -40,7 +40,7 @@ def get_gke_dashboard(args, dashboard_filter) -> tuple[bool, str | None]: ) return_code, return_value = run_command_for_value( - command, 'GKE Dashboard List', args + command, 'GKE Dashboard List' ) if return_code != 0: diff --git a/src/xpk/core/network.py b/src/xpk/core/network.py index 18f844c59..44ec54888 100644 --- a/src/xpk/core/network.py +++ b/src/xpk/core/network.py @@ -245,9 +245,7 @@ def get_cluster_subnetworks(args) -> list[str]: list[str]: list of cluster networks """ command = 'kubectl get GKENetworkParamSet' - return_code, stdout = run_command_for_value( - command, 'Get Cluster Networks', args - ) + return_code, stdout = run_command_for_value(command, 'Get Cluster Networks') if return_code != 0: xpk_print('GKE Cluster Get NetworkParamSet failed') xpk_exit(return_code) @@ -328,7 +326,7 @@ def get_all_networks_programmatic(args) -> tuple[list[str], int]: f' --project={args.project}' ) return_code, raw_network_output = run_command_for_value( - command, 'Get All Networks', args + command, 'Get All Networks' ) if return_code != 0: xpk_print(f'Get All Networks returned ERROR {return_code}') @@ -353,7 +351,7 @@ def get_all_subnets_programmatic(args) -> tuple[list[str], int]: f' --filter=name~"{subnet_name_filter}" --project={args.project}' ) return_code, raw_subnets_output = run_command_for_value( - command, 'Get All Subnets', args + command, 'Get All Subnets' ) if return_code != 0: xpk_print(f'Get All Subnets returned ERROR {return_code}') @@ -380,7 +378,7 @@ def get_all_firewall_rules_programmatic(args) -> tuple[list[str], int]: f' --project={args.project}' ) return_code, raw_subnets_output = run_command_for_value( - command, 'Get All Firewall Rules', args + command, 'Get All Firewall Rules' ) if return_code != 0: xpk_print(f'Get All Firewall Rules returned ERROR {return_code}') diff --git a/src/xpk/core/nodepool.py b/src/xpk/core/nodepool.py index 85ab6aba9..4dd25014c 100644 --- a/src/xpk/core/nodepool.py +++ b/src/xpk/core/nodepool.py @@ -436,7 +436,7 @@ def get_all_nodepools_programmatic(args) -> tuple[list[str], int]: ' --format="csv[no-heading](name)"' ) return_code, raw_nodepool_output = run_command_for_value( - command, 'Get All Node Pools', args + command, 'Get All Node Pools' ) if return_code != 0: xpk_print(f'Get All Node Pools returned ERROR {return_code}') @@ -463,7 +463,7 @@ def get_nodepool_zone(args, nodepool_name) -> tuple[int, str | None]: f' --region={zone_to_region(args.zone)} --format="value(locations)"' ) return_code, nodepool_zone = run_command_for_value( - command, 'Get Node Pool Zone', args, dry_run_return_val=args.zone + command, 'Get Node Pool Zone', dry_run_return_val=args.zone ) if return_code != 0: xpk_print(f'Get Node Pool Zone returned ERROR {return_code}') @@ -496,7 +496,7 @@ def get_gke_node_pool_version( ) return_code, current_gke_master_version = run_command_for_value( - command, command_description, args + command, command_description ) if return_code != 0: xpk_print( @@ -604,7 +604,7 @@ def get_nodepool_workload_metadata_mode( f' --region={zone_to_region(args.zone)} --format="value(config.workloadMetadataConfig.mode)"' ) return_code, nodepool_WI_mode = run_command_for_value( - command, 'Get Node Pool Workload Identity Metadata Mode', args + command, 'Get Node Pool Workload Identity Metadata Mode' ) if return_code != 0: xpk_print( diff --git a/src/xpk/core/pathways.py b/src/xpk/core/pathways.py index 017fb885e..d62b60c62 100644 --- a/src/xpk/core/pathways.py +++ b/src/xpk/core/pathways.py @@ -116,7 +116,7 @@ def check_if_pathways_job_is_installed(args) -> bool: ' custom-columns=NAME:.metadata.name' ) task = f'Check if PathwaysJob is installed on {args.cluster}' - return_code, return_msg = run_command_for_value(command, task, args) + return_code, return_msg = run_command_for_value(command, task) # return_msg contains the name of the controller pod, if found. xpk_print('check_if_pathways_job_is_installed', return_code, return_msg) diff --git a/src/xpk/core/ray.py b/src/xpk/core/ray.py index 50e391025..3c1759186 100644 --- a/src/xpk/core/ray.py +++ b/src/xpk/core/ray.py @@ -184,13 +184,13 @@ def generate_available_resources(label, args, percent) -> tuple: f"kubectl get nodes -l {label} -o jsonpath='{{.items[0].metadata.name}}'" ) task = f'Getting nodes with label {label}' - _, node_name = run_command_for_value(command, task, args) + _, node_name = run_command_for_value(command, task) command = ( f"kubectl get node {node_name} -o jsonpath='{{.status.allocatable.cpu}}'" ) task = 'Fetching available CPU on node' - _, available_cpu = run_command_for_value(command, task, args) + _, available_cpu = run_command_for_value(command, task) match = re.match(r'(\d+)([a-zA-Z]+)', available_cpu) if not match: xpk_print( @@ -207,7 +207,7 @@ def generate_available_resources(label, args, percent) -> tuple: " jsonpath='{.status.allocatable.memory}'" ) task = 'Fetching available memory on node' - _, available_memory = run_command_for_value(command, task, args) + _, available_memory = run_command_for_value(command, task) match = re.match(r'(\d+)([a-zA-Z]+)', available_memory) if not match: xpk_print( diff --git a/src/xpk/core/resources.py b/src/xpk/core/resources.py index f215e1063..88351cb56 100644 --- a/src/xpk/core/resources.py +++ b/src/xpk/core/resources.py @@ -68,7 +68,6 @@ def get_cluster_configmap(args, configmap_name) -> dict[str, str] | None: return_code, return_value = run_command_for_value( command, 'GKE Cluster Get ConfigMap', - args, dry_run_return_val='map[]', ) if return_code != 0: diff --git a/src/xpk/core/workload.py b/src/xpk/core/workload.py index 5ec937941..cebc1b455 100644 --- a/src/xpk/core/workload.py +++ b/src/xpk/core/workload.py @@ -131,7 +131,7 @@ def get_workload_list(args) -> tuple[int, str]: if hasattr(args, 'filter_by_job'): task += f' with filter-by-job={args.filter_by_job}' - return_code, return_value = run_command_for_value(command, task, args) + return_code, return_value = run_command_for_value(command, task) return return_code, return_value @@ -152,7 +152,7 @@ def check_if_workload_exists(args) -> bool: command = f"kubectl get workloads -o=custom-columns='{s}'" return_code, return_msg = run_command_for_value( - command, 'Check if Workload Already Exists', args + command, 'Check if Workload Already Exists' ) if return_code != 0: @@ -186,7 +186,7 @@ def wait_for_job_completion(args) -> int: # Get the full workload name get_workload_name_cmd = f'kubectl get workloads | grep jobset-{args.workload}' return_code, return_value = run_command_for_value( - get_workload_name_cmd, 'Get full workload name', args + get_workload_name_cmd, 'Get full workload name' ) if return_code != 0: xpk_print(f'Get full workload name request returned ERROR {return_code}') @@ -205,7 +205,6 @@ def wait_for_job_completion(args) -> int: return_code, return_value = run_command_for_value( wait_cmd, f'Wait for workload to finish with timeout of {timeout_msg}', - args, print_timer=True, ) if return_code != 0: @@ -231,7 +230,7 @@ def wait_for_job_completion(args) -> int: " jsonpath='{.status.conditions[-1].type}'" ) return_code, return_value = run_command_for_value( - status_cmd, 'Get jobset status', args + status_cmd, 'Get jobset status' ) if return_code != 0: xpk_print(f'Get workload status request returned ERROR {return_code}') diff --git a/src/xpk/utils/validation.py b/src/xpk/utils/validation.py index 87c12befb..a572cd923 100644 --- a/src/xpk/utils/validation.py +++ b/src/xpk/utils/validation.py @@ -71,9 +71,7 @@ def validate_dependencies(): if deps_version is None or deps_version != xpk_version: for name, check in validation_commands.items(): cmd, message = check['command'], check['message'] - code, _ = run_command_for_value( - cmd, f'Validate {name} installation.', None - ) + code, _ = run_command_for_value(cmd, f'Validate {name} installation.') if code != 0: xpk_print(message) xpk_exit(code) From 02fe0aa894c4b0c8cf1702ec71e849c358479c79 Mon Sep 17 00:00:00 2001 From: Konrad Kaim Date: Tue, 23 Sep 2025 11:31:11 +0000 Subject: [PATCH 2/2] feat: remove redundant args --- src/xpk/commands/cluster.py | 32 ++++++++++++++++---------------- src/xpk/commands/info.py | 9 +++------ src/xpk/commands/kind.py | 9 +++------ src/xpk/commands/kjob_common.py | 6 +++--- src/xpk/commands/workload.py | 4 ++-- src/xpk/core/cluster.py | 9 +++------ src/xpk/core/kjob.py | 18 +++++++----------- src/xpk/core/kueue.py | 8 +++----- src/xpk/core/nap.py | 6 +++--- src/xpk/core/network.py | 5 +---- src/xpk/core/ray.py | 7 +++---- src/xpk/core/resources.py | 9 ++++----- src/xpk/core/scheduling.py | 2 +- src/xpk/core/vertex.py | 2 +- 14 files changed, 53 insertions(+), 73 deletions(-) diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 28810bb71..b6b28ab14 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -109,7 +109,7 @@ def cluster_adapt(args) -> None: 'Argument --num-nodes was not provided, trying to determine number of' ' nodes based on the available nodes in the cluster...' ) - args.num_nodes = count_nodes_on_cluster(args, system) + args.num_nodes = count_nodes_on_cluster(system) if args.num_nodes == 0: xpk_print( 'Found unexpected number of nodes. Is the --device-type correct?' @@ -445,7 +445,7 @@ def cluster_describe(args) -> None: get_cluster_credentials(args) - return_code, data_table = nodepools_build_table(args) + return_code, data_table = nodepools_build_table() if return_code != 0: xpk_exit(return_code) @@ -485,7 +485,7 @@ def cluster_describe(args) -> None: xpk_exit(0) -def nodepools_build_table(args) -> tuple[int, list[list]]: +def nodepools_build_table() -> tuple[int, list[list]]: table = [[ 'NODEPOOL_NAME', 'SLICE', @@ -497,14 +497,14 @@ def nodepools_build_table(args) -> tuple[int, list[list]]: nodepools_data = {} - nodepools, return_code = get_node_pools_name(args) + nodepools, return_code = get_node_pools_name() if return_code != 0: xpk_print(f'Get node pools name returned ERROR {return_code}') for name in nodepools: nodepools_data[name] = [name] - slices, return_code = get_slice_node_pool_size(args) + slices, return_code = get_slice_node_pool_size() if return_code != 0: xpk_print(f'Get slice node pool size returned ERROR {return_code}') @@ -513,7 +513,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]: count, nodepool_name = s[0], s[1] nodepools_data[nodepool_name].append(count) - type_nodepool, return_code = get_node_pool_instance_type(args) + type_nodepool, return_code = get_node_pool_instance_type() if return_code != 0: xpk_print(f'Get node pool instance type returned ERROR {return_code}') @@ -522,7 +522,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]: nodepool_name, instance_type = tn[0], tn[1] nodepools_data[nodepool_name].append(instance_type) - expected_healthy_nodes, return_code = get_expected_healthy_nodes(args) + expected_healthy_nodes, return_code = get_expected_healthy_nodes() if return_code != 0: xpk_print(f'Get expected healthy nodes returned ERROR {return_code}') @@ -531,7 +531,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]: count, nodepool_name = ehn[0], ehn[1] nodepools_data[nodepool_name].append(count) - actual_healthy_nodes, return_code = get_actual_healthy_nodes(args) + actual_healthy_nodes, return_code = get_actual_healthy_nodes() if return_code != 0: xpk_print(f'Get actual healthy nodes returned ERROR {return_code}') @@ -540,7 +540,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]: count, nodepool_name = ahn[0], ahn[1] nodepools_data[nodepool_name].append(count) - total_nodes, return_code = get_total_nodes_per_node_pool(args) + total_nodes, return_code = get_total_nodes_per_node_pool() if return_code != 0: xpk_print(f'Get total nodes per node pool returned ERROR {return_code}') @@ -555,7 +555,7 @@ def nodepools_build_table(args) -> tuple[int, list[list]]: return 0, table -def get_node_pools_name(args) -> tuple[list[str], int]: +def get_node_pools_name() -> tuple[list[str], int]: cmd_nodepools = ( 'kubectl get node --no-headers=true -o' " custom-columns='NODEPOOL:.metadata.labels.cloud\\.google\\.com/gke-nodepool'" @@ -568,7 +568,7 @@ def get_node_pools_name(args) -> tuple[list[str], int]: return out.splitlines(), 0 -def get_slice_node_pool_size(args) -> tuple[list[str], int]: +def get_slice_node_pool_size() -> tuple[list[str], int]: cmd_slices = ( 'kubectl get node --no-headers=true -o' " custom-columns=':metadata.labels.cloud\\.google\\.com/gke-nodepool'" @@ -585,7 +585,7 @@ def get_slice_node_pool_size(args) -> tuple[list[str], int]: return out.splitlines(), 0 -def get_node_pool_instance_type(args) -> tuple[list[str], int]: +def get_node_pool_instance_type() -> tuple[list[str], int]: cmd_type_nodepool = ( 'kubectl get node --no-headers=true -o' " custom-columns='NODEPOOL:.metadata.labels.cloud\\.google\\.com/gke-nodepool," @@ -601,7 +601,7 @@ def get_node_pool_instance_type(args) -> tuple[list[str], int]: return out.splitlines(), 0 -def get_expected_healthy_nodes(args) -> tuple[list[str], int]: +def get_expected_healthy_nodes() -> tuple[list[str], int]: cmd_expected_healthy_nodes = ( 'kubectl get node --no-headers=true -o' " custom-columns=':metadata.labels.cloud\\.google\\.com/gke-nodepool'" @@ -619,7 +619,7 @@ def get_expected_healthy_nodes(args) -> tuple[list[str], int]: return out.splitlines(), 0 -def get_actual_healthy_nodes(args) -> tuple[list[str], int]: +def get_actual_healthy_nodes() -> tuple[list[str], int]: cmd_actual_healthy_nodes = ( 'kubectl get node --no-headers=true -o' " custom-columns='NODE_NAME:metadata.name," @@ -640,7 +640,7 @@ def get_actual_healthy_nodes(args) -> tuple[list[str], int]: return out.splitlines(), 0 -def get_total_nodes_per_node_pool(args) -> tuple[list[str], int]: +def get_total_nodes_per_node_pool() -> tuple[list[str], int]: cmd_total_nodes = ( 'kubectl get node --no-headers=true -o' " custom-columns='NODE_NAME:metadata.name," @@ -1201,7 +1201,7 @@ def install_storage_csis(args): def install_kjob(args): xpk_print('Verifying kjob installation') - err_code = verify_kjob_installed(args) + err_code = verify_kjob_installed() if err_code > 0: xpk_exit(err_code) diff --git a/src/xpk/commands/info.py b/src/xpk/commands/info.py index a2525e0f9..fce87ce14 100644 --- a/src/xpk/commands/info.py +++ b/src/xpk/commands/info.py @@ -39,7 +39,7 @@ def info(args: Namespace) -> None: add_zone_and_project(args) get_cluster_credentials(args) - verify_kueuectl(args) + verify_kueuectl() lq, cq = bool(args.localqueue), bool(args.clusterqueue) if not lq and not cq: lq, cq = True, True @@ -48,7 +48,7 @@ def info(args: Namespace) -> None: if lq: lqs = run_kueuectl_list_localqueue(args) - cqs = run_kueuectl_list_clusterqueue(args) + cqs = run_kueuectl_list_clusterqueue() quotas = get_nominal_quotas(cqs) if lq and lqs is not None: @@ -222,12 +222,9 @@ def run_kueuectl_list_localqueue(args: Namespace) -> str: return val -def run_kueuectl_list_clusterqueue(args: Namespace) -> str: +def run_kueuectl_list_clusterqueue() -> str: """Run the kueuectl list clusterqueue command. - Args: - args: user provided arguments for running the command. - Returns: kueuectl list clusterqueue formatted as json string """ diff --git a/src/xpk/commands/kind.py b/src/xpk/commands/kind.py index f0d67d181..90a7fd870 100644 --- a/src/xpk/commands/kind.py +++ b/src/xpk/commands/kind.py @@ -70,7 +70,7 @@ def cluster_create(args) -> None: xpk_exit(install_kueue_on_cluster_code) xpk_print('Verifying kjob installation') - err_code = verify_kjob_installed(args) + err_code = verify_kjob_installed() if err_code > 0: xpk_exit(err_code) @@ -154,7 +154,7 @@ def create_cluster_if_necessary(args) -> int: Returns: 0 if successful and 1 otherwise. """ - all_clusters, return_code = get_all_local_clusters_programmatic(args) + all_clusters, return_code = get_all_local_clusters_programmatic() if return_code > 0: xpk_print('Listing all clusters failed!') return 1 @@ -229,12 +229,9 @@ def run_kind_cluster_create_command(args) -> int: return 0 -def get_all_local_clusters_programmatic(args) -> tuple[list[str], int]: +def get_all_local_clusters_programmatic() -> tuple[list[str], int]: """Gets all the local clusters. - Args: - args: user provided arguments for running the command. - Returns: List of cluster names and 0 if successful and 1 otherwise. """ diff --git a/src/xpk/commands/kjob_common.py b/src/xpk/commands/kjob_common.py index a13485c91..5df178ab1 100644 --- a/src/xpk/commands/kjob_common.py +++ b/src/xpk/commands/kjob_common.py @@ -35,11 +35,11 @@ def add_gpu_networking_annotations_to_command(args, cmd: str) -> str: annotations: tuple if gpu_type == H100_MEGA_DEVICE_TYPE: - annotations = get_a3mega_pod_template_annotations(args) + annotations = get_a3mega_pod_template_annotations() elif gpu_type == H200_DEVICE_TYPE: - annotations = get_a3ultra_pod_template_annotations(args) + annotations = get_a3ultra_pod_template_annotations() elif gpu_type == B200_DEVICE_TYPE: - annotations = get_a4_pod_template_annotations(args) + annotations = get_a4_pod_template_annotations() else: annotations = tuple() diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 66e061de8..205e4bb4e 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -334,7 +334,7 @@ def workload_create(args) -> None: xpk_print('Starting workload create', flush=True) metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, metadata_configmap_name) + cluster_config_map = get_cluster_configmap(metadata_configmap_name) cluster_xpk_version = None if cluster_config_map is None: xpk_print( @@ -507,7 +507,7 @@ def workload_create(args) -> None: annotations=annotations, ) - sub_networks = get_cluster_subnetworks(args) + sub_networks = get_cluster_subnetworks() if args.device_type == a3high_device_type: yml_string = tcpx_decorator.decorate_jobset(yml_string) elif args.device_type == a3mega_device_type: diff --git a/src/xpk/core/cluster.py b/src/xpk/core/cluster.py index f99a72759..29696ef4a 100644 --- a/src/xpk/core/cluster.py +++ b/src/xpk/core/cluster.py @@ -199,12 +199,9 @@ def install_nri_on_cluster(args) -> int: return 0 -def get_cluster_nodes_info(args) -> list[dict]: +def get_cluster_nodes_info() -> list[dict]: """Get list of cluster's nodes descrition in yaml format - Args: - args: user provided arguments for running the command. - Returns: List of nodes info yaml objects. """ @@ -220,9 +217,9 @@ def get_cluster_nodes_info(args) -> list[dict]: return data['items'] -def count_nodes_on_cluster(args, system: SystemCharacteristics) -> int: +def count_nodes_on_cluster(system: SystemCharacteristics) -> int: """Count cluster nodes by accelerator type""" - nodes_info = get_cluster_nodes_info(args) + nodes_info = get_cluster_nodes_info() accelerators = [ node['metadata']['labels']['cloud.google.com/gke-accelerator'] for node in nodes_info diff --git a/src/xpk/core/kjob.py b/src/xpk/core/kjob.py index 4033ca2d6..16cc735e2 100644 --- a/src/xpk/core/kjob.py +++ b/src/xpk/core/kjob.py @@ -167,8 +167,8 @@ class PodTemplateDefaults(Enum): default_interface_annotation = "networking.gke.io/default-interface=eth0" -def get_a4_pod_template_annotations(args) -> tuple[str, str]: - sub_networks = get_cluster_subnetworks(args) +def get_a4_pod_template_annotations() -> tuple[str, str]: + sub_networks = get_cluster_subnetworks() interfaces_key, interfaces_value = rdma_decorator.get_interfaces_entry( sub_networks ) @@ -179,8 +179,8 @@ def get_a4_pod_template_annotations(args) -> tuple[str, str]: ) -def get_a3ultra_pod_template_annotations(args: Namespace) -> tuple[str, str]: - sub_networks = get_cluster_subnetworks(args) +def get_a3ultra_pod_template_annotations() -> tuple[str, str]: + sub_networks = get_cluster_subnetworks() interfaces_key, interfaces_value = rdma_decorator.get_interfaces_entry( sub_networks ) @@ -191,11 +191,9 @@ def get_a3ultra_pod_template_annotations(args: Namespace) -> tuple[str, str]: ) -def get_a3mega_pod_template_annotations( - args: Namespace, -) -> tuple[str, str, str]: +def get_a3mega_pod_template_annotations() -> tuple[str, str, str]: """Adds or updates annotations in the Pod template.""" - sub_networks = get_cluster_subnetworks(args) + sub_networks = get_cluster_subnetworks() tcpxo_deamon_key, tcpxo_deamon_paths = get_tcpxo_deamon_entry() interfaces_key, interfaces_value = tcpxo_decorator.get_interfaces_entry( sub_networks @@ -205,10 +203,8 @@ def get_a3mega_pod_template_annotations( return tcpxo, interfaces, default_interface_annotation -def verify_kjob_installed(args: Namespace) -> int: +def verify_kjob_installed() -> int: """Check if kjob is installed. If not provide user with proper communicate and exit. - Args: - args - user provided arguments. Returns: error code > if kjob not installed, otherwise 0 """ diff --git a/src/xpk/core/kueue.py b/src/xpk/core/kueue.py index 4fdc8f325..c0e40b815 100644 --- a/src/xpk/core/kueue.py +++ b/src/xpk/core/kueue.py @@ -281,10 +281,8 @@ """ -def verify_kueuectl(args: Namespace) -> None: +def verify_kueuectl() -> None: """Verify if kueuectl is installed. - Args: - args: user provided arguments. Returns: None """ @@ -324,7 +322,7 @@ def delete_multikueueclusters_definitions(args) -> int: return return_code -def get_kueue_version(args) -> tuple[int, str]: +def get_kueue_version() -> tuple[int, str]: command = 'kubectl kueue version' task = 'Get kueue version on server' return_code, val = run_command_for_value(command, task) @@ -348,7 +346,7 @@ def install_kueue_on_cluster(args) -> int: 0 if successful and 1 otherwise. """ - err_code, kueue_version_installed = get_kueue_version(args) + err_code, kueue_version_installed = get_kueue_version() if err_code == 0: if Version(kueue_version_installed) < Version('v0.9.0') and Version( KUEUE_VERSION diff --git a/src/xpk/core/nap.py b/src/xpk/core/nap.py index 0b21e8ee8..bccb50c90 100644 --- a/src/xpk/core/nap.py +++ b/src/xpk/core/nap.py @@ -272,7 +272,7 @@ def is_autoprovisioning_enabled( """ resources_configmap_name = f'{args.cluster}-{CLUSTER_RESOURCES_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, resources_configmap_name) + cluster_config_map = get_cluster_configmap(resources_configmap_name) if cluster_config_map is None: xpk_print( @@ -325,7 +325,7 @@ def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]: if capacity_type_str == CapacityType.UNKNOWN.name: # Use default settings from cluster creation. metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, metadata_configmap_name) + cluster_config_map = get_cluster_configmap(metadata_configmap_name) # Error out if the metadata config map doesn't exist, and is attempting to use # autoprovisioning. @@ -369,7 +369,7 @@ def get_autoprovisioning_node_selector_args(args) -> tuple[str, int]: def get_cluster_provisioner(args) -> str: metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, metadata_configmap_name) + cluster_config_map = get_cluster_configmap(metadata_configmap_name) cluster_provisioner = 'gcloud' if not cluster_config_map is None: provisioner = cluster_config_map.get('provisioner') diff --git a/src/xpk/core/network.py b/src/xpk/core/network.py index 44ec54888..58f2849e4 100644 --- a/src/xpk/core/network.py +++ b/src/xpk/core/network.py @@ -235,12 +235,9 @@ def create_cluster_network_config(args) -> int: return 0 -def get_cluster_subnetworks(args) -> list[str]: +def get_cluster_subnetworks() -> list[str]: """Gets the list of cluster networks. - Args: - args: user provided arguments for running the command. - Returns: list[str]: list of cluster networks """ diff --git a/src/xpk/core/ray.py b/src/xpk/core/ray.py index 3c1759186..4fff877c0 100644 --- a/src/xpk/core/ray.py +++ b/src/xpk/core/ray.py @@ -106,12 +106,12 @@ def install_ray_cluster(args, system) -> int: label = 'cloud.google.com/gke-nodepool=default-pool' available_head_cpu, available_head_mem = generate_available_resources( - label, args, HEAD_CPU + label, HEAD_CPU ) label = f'cloud.google.com/gke-tpu-accelerator={system.gke_accelerator}' available_worker_cpu, available_worker_mem = generate_available_resources( - label, args, WORKER_CPU + label, WORKER_CPU ) yml_string = ray_cluster_crd_yaml.format( @@ -168,12 +168,11 @@ def delete_ray_cluster(args) -> None: return -def generate_available_resources(label, args, percent) -> tuple: +def generate_available_resources(label, percent) -> tuple: """Generate the available resources for the nodes that match the given label Args: label: the label used to match the appropriate nodes - args: user provided arguments for running the command percent: the percent of the available resources to use Returns: diff --git a/src/xpk/core/resources.py b/src/xpk/core/resources.py index 88351cb56..2e034d35e 100644 --- a/src/xpk/core/resources.py +++ b/src/xpk/core/resources.py @@ -50,11 +50,10 @@ class AutoprovisioningConfig: maximum_chips: int -def get_cluster_configmap(args, configmap_name) -> dict[str, str] | None: +def get_cluster_configmap(configmap_name) -> dict[str, str] | None: """Run the Get GKE Cluster ConfigMap request. Args: - args: user provided arguments for running the command. configmap_name: name of the configmap. Returns: @@ -205,7 +204,7 @@ def check_cluster_resources(args, system) -> tuple[bool, bool]: True if device_type/gke_accelerator exists in the cluster, False otherwise. """ resources_configmap_name = f'{args.cluster}-{CLUSTER_RESOURCES_CONFIGMAP}' - resources_config_map = get_cluster_configmap(args, resources_configmap_name) + resources_config_map = get_cluster_configmap(resources_configmap_name) if resources_config_map is None: xpk_print( f'No ConfigMap exist for cluster with the name {resources_config_map}.' @@ -228,7 +227,7 @@ def get_cluster_system_characteristics(args) -> SystemCharacteristics | None: returns system characteristics """ resources_configmap_name = f'{args.cluster}-{CLUSTER_RESOURCES_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, resources_configmap_name) + cluster_config_map = get_cluster_configmap(resources_configmap_name) if cluster_config_map is None: return None @@ -250,7 +249,7 @@ def get_cluster_capacity_type(args) -> CapacityType | None: returns system characteristics """ metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, metadata_configmap_name) + cluster_config_map = get_cluster_configmap(metadata_configmap_name) if cluster_config_map is None: return None diff --git a/src/xpk/core/scheduling.py b/src/xpk/core/scheduling.py index ef6b469ce..3032eb086 100644 --- a/src/xpk/core/scheduling.py +++ b/src/xpk/core/scheduling.py @@ -36,7 +36,7 @@ def check_if_workload_can_schedule(args, system: SystemCharacteristics) -> bool: returns true if workload can schedule, otherwise returns false. """ resources_configmap_name = f'{args.cluster}-{CLUSTER_RESOURCES_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, resources_configmap_name) + cluster_config_map = get_cluster_configmap(resources_configmap_name) # Prevents workload creation failure for existing clusters with no ConfigMap if cluster_config_map is None: diff --git a/src/xpk/core/vertex.py b/src/xpk/core/vertex.py index 6507e1856..8fe47257a 100644 --- a/src/xpk/core/vertex.py +++ b/src/xpk/core/vertex.py @@ -66,7 +66,7 @@ def create_vertex_experiment(args) -> dict | None: ) metadata_configmap_name = f'{args.cluster}-{CLUSTER_METADATA_CONFIGMAP}' - cluster_config_map = get_cluster_configmap(args, metadata_configmap_name) + cluster_config_map = get_cluster_configmap(metadata_configmap_name) if cluster_config_map is None or 'tensorboard_name' not in cluster_config_map: xpk_print(