From c0f0a28075d909d89145a73bd64eeaaea6469a25 Mon Sep 17 00:00:00 2001 From: Feidias Ioannidis Date: Mon, 20 Oct 2025 13:01:48 +0000 Subject: [PATCH 1/4] Use kueue_manger for Kind and clean up kueue.py --- src/xpk/commands/kind.py | 34 ++- src/xpk/core/kueue.py | 548 --------------------------------------- 2 files changed, 15 insertions(+), 567 deletions(-) delete mode 100644 src/xpk/core/kueue.py diff --git a/src/xpk/commands/kind.py b/src/xpk/commands/kind.py index 8f7a3e5a2..579058485 100644 --- a/src/xpk/commands/kind.py +++ b/src/xpk/commands/kind.py @@ -14,6 +14,7 @@ limitations under the License. """ +from ..core.kueue_manager import (KueueConfig, KueueManager) from ..core.commands import ( run_command_for_value, run_command_with_updates, @@ -24,11 +25,7 @@ prepare_kjob, apply_kjob_crds, ) -from ..core.kueue import ( - install_kueue_on_cluster, - install_kueue_crs, - wait_for_kueue_available, -) +from ..core.scheduling import get_total_chips_requested_from_args from ..core.storage import install_storage_crd from ..core.system_characteristics import ( SystemCharacteristics, @@ -67,11 +64,6 @@ def cluster_create(args) -> None: if set_jobset_on_cluster_code != 0: xpk_exit(set_jobset_on_cluster_code) - xpk_print('Enabling Kueue on the cluster') - install_kueue_on_cluster_code = install_kueue_on_cluster() - if install_kueue_on_cluster_code != 0: - xpk_exit(install_kueue_on_cluster_code) - xpk_print('Verifying kjob installation') err_code = verify_kjob_installed() if err_code > 0: @@ -90,11 +82,6 @@ def cluster_create(args) -> None: k8s_client = setup_k8s_env(args) install_storage_crd(k8s_client) - xpk_print('Wait for Kueue to be fully available') - wait_for_kueue_available_code = wait_for_kueue_available() - if wait_for_kueue_available_code != 0: - xpk_exit(wait_for_kueue_available_code) - args.num_slices = 1 args.enable_pathways = False system = SystemCharacteristics( @@ -107,10 +94,19 @@ def cluster_create(args) -> None: 'kind', ) - xpk_print('Install Kueue Custom Resources') - enable_kueue_credentials_code = install_kueue_crs(args, system, None) - if enable_kueue_credentials_code != 0: - xpk_exit(enable_kueue_credentials_code) + kueue_manager = KueueManager() + kueue_manager.install_or_upgrade( + KueueConfig( + system, + total_chips=get_total_chips_requested_from_args(args, system), + autoprovisioning_enabled=False, + num_slices=args.num_slices, + memory_limit=args.memory_limit, + cpu_limit=args.cpu_limit, + is_pathways_cluster=False, + flex=args.flex, + ), + ) xpk_print('Kind commands done! Resources are created.') xpk_exit(0) diff --git a/src/xpk/core/kueue.py b/src/xpk/core/kueue.py deleted file mode 100644 index b47dfb888..000000000 --- a/src/xpk/core/kueue.py +++ /dev/null @@ -1,548 +0,0 @@ -""" -Copyright 2024 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import math -import packaging -from packaging.version import Version - -from ..utils.console import xpk_exit, xpk_print -from ..utils.file import write_tmp_file -from .capacity import B200_DEVICE_TYPE, H100_MEGA_DEVICE_TYPE, H200_DEVICE_TYPE -from .commands import ( - run_command_for_value, - run_command_with_updates, - run_command_with_updates_retry, -) -from .pathways import add_pw_resource_flavors, add_pw_resources_to_kueue -from .resources import AutoprovisioningConfig -from .scheduling import ( - create_accelerator_label, - create_machine_label, - get_total_chips_requested_from_args, -) -from .system_characteristics import ( - AcceleratorTypeToAcceleratorCharacteristics, - SystemCharacteristics, -) - -KUEUE_VERSION = 'v0.12.2' -CLUSTER_QUEUE_NAME = 'cluster-queue' -LOCAL_QUEUE_NAME = 'multislice-queue' -WAIT_FOR_KUEUE_TIMEOUT = '10m' -MEMORY_SIZE_PER_VM = 1.2 -MIN_MEMORY_LIMIT_SIZE = 4096 - -packaging.version.VERSION_PATTERN = r'^v\d+\.\d+\.\d+$' - -topology_yaml = """apiVersion: kueue.x-k8s.io/v1alpha1 -kind: Topology -metadata: - name: "gke-default" -spec: - levels: - - nodeLabel: "cloud.google.com/gce-topology-block" - - nodeLabel: "cloud.google.com/gce-topology-subblock" - - nodeLabel: "cloud.google.com/gce-topology-host" - - nodeLabel: "kubernetes.io/hostname" ---- -""" - -cluster_set_crd_yaml = """apiVersion: kueue.x-k8s.io/v1beta1 -kind: ResourceFlavor -metadata: - name: {cluster_hardware_name} -spec: - nodeLabels: - {accelerator_label} - {machine_label} - {topology_label} ---- -apiVersion: kueue.x-k8s.io/v1beta1 -kind: AdmissionCheck -metadata: - name: dws-prov -spec: - controllerName: kueue.x-k8s.io/provisioning-request - parameters: - apiGroup: kueue.x-k8s.io - kind: ProvisioningRequestConfig - name: dws-config ---- -apiVersion: kueue.x-k8s.io/v1beta1 -kind: ProvisioningRequestConfig -metadata: - name: dws-config -spec: - provisioningClassName: queued-provisioning.gke.io - podSetUpdates: - nodeSelector: - - key: autoscaling.gke.io/provisioning-request - valueFromProvisioningClassDetail: ResizeRequestName - managedResources: - - {managed_resource} ---- -{pw_resource_flavors} -apiVersion: kueue.x-k8s.io/v1beta1 -kind: ClusterQueue -metadata: - name: {cluster_queue_name} -spec: - preemption: - reclaimWithinCohort: Never # Don't preempt other queues in the cohort. - withinClusterQueue: LowerPriority - namespaceSelector: {{}} # match all. - resourceGroups: - {covered_resources_config} - {pw_resources_kueue} - {admission_checks} ---- -apiVersion: kueue.x-k8s.io/v1beta1 -kind: LocalQueue -metadata: - namespace: default - name: {local_queue_name} -spec: - clusterQueue: {cluster_queue_name} ---- -apiVersion: scheduling.k8s.io/v1 -kind: PriorityClass -metadata: - name: very-low -value: 100 -globalDefault: false -description: "Very Low" ---- -apiVersion: scheduling.k8s.io/v1 -kind: PriorityClass -metadata: - name: low -value: 250 -globalDefault: false -description: "Low" ---- -apiVersion: scheduling.k8s.io/v1 -kind: PriorityClass -metadata: - name: medium -value: 500 -globalDefault: false -description: "Medium" ---- -apiVersion: scheduling.k8s.io/v1 -kind: PriorityClass -metadata: - name: high -value: 750 -globalDefault: false -description: "High" ---- -apiVersion: scheduling.k8s.io/v1 -kind: PriorityClass -metadata: - name: very-high -value: 1000 -globalDefault: false -description: "Very High" -""" - -cluster_preheat_yml = """ -apiVersion: apps/v1 -kind: DaemonSet -metadata: - name: {cachekey} - labels: - k8s-app: {cachekey} -spec: - selector: - matchLabels: - k8s-app: {cachekey} - updateStrategy: - type: RollingUpdate - template: - metadata: - labels: - name: {cachekey} - k8s-app: {cachekey} - spec: - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: {nodeSelectorKey} - operator: Exists - tolerations: - - operator: "Exists" - containers: - - image: {image_name} - name: {cachekey} - command: [ "sleep", "inf" ] -""" - -kueue_controller_manager_yml = """ -apiVersion: apps/v1 -kind: Deployment -metadata: - labels: - app.kubernetes.io/component: controller - app.kubernetes.io/name: kueue - control-plane: controller-manager - name: kueue-controller-manager - namespace: kueue-system -spec: - replicas: 1 - selector: - matchLabels: - control-plane: controller-manager - template: - metadata: - annotations: - kubectl.kubernetes.io/default-container: manager - labels: - app.kubernetes.io/component: controller - app.kubernetes.io/name: kueue - control-plane: controller-manager - spec: - containers: - - args: - - --config=/controller_manager_config.yaml - - --zap-log-level=2 - command: - - /manager - image: registry.k8s.io/kueue/kueue:{KUEUE_VERSION} - imagePullPolicy: Always - livenessProbe: - httpGet: - path: /healthz - port: 8081 - initialDelaySeconds: 15 - periodSeconds: 20 - name: manager - ports: - - containerPort: 8082 - name: visibility - protocol: TCP - - containerPort: 9443 - name: webhook-server - protocol: TCP - readinessProbe: - httpGet: - path: /readyz - port: 8081 - initialDelaySeconds: 5 - periodSeconds: 10 - resources: - limits: - cpu: 1000m - memory: {memory_limit_size} - requests: - cpu: 1000m - memory: 512Mi - securityContext: - allowPrivilegeEscalation: false - volumeMounts: - - mountPath: /visibility - name: visibility - - mountPath: /tmp/k8s-webhook-server/serving-certs - name: cert - readOnly: true - - mountPath: /controller_manager_config.yaml - name: manager-config - subPath: controller_manager_config.yaml - securityContext: - runAsNonRoot: true - serviceAccountName: kueue-controller-manager - terminationGracePeriodSeconds: 10 - volumes: - - name: visibility - emptyDir: {{}} - - name: cert - secret: - defaultMode: 420 - secretName: kueue-webhook-server-cert - - configMap: - name: kueue-manager-config - name: manager-config -""" - - -def verify_kueuectl() -> None: - """Verify if kueuectl is installed. - Returns: - None - """ - xpk_print('Veryfing kueuectl installation') - - command = 'kubectl kueue version' - task = 'Verify kueuectl installation on cluster' - verify_kueuectl_installed_code, _ = run_command_for_value(command, task) - - if verify_kueuectl_installed_code == 0: - xpk_print('kueuectl found') - - if verify_kueuectl_installed_code != 0: - xpk_print( - 'kueuectl not found. Please follow' - ' https://kueue.sigs.k8s.io/docs/reference/kubectl-kueue/installation/' - ' to install kueuectl.' - ) - xpk_exit(verify_kueuectl_installed_code) - - -def delete_multikueueconfigs_definitions() -> int: - command = 'kubectl delete crd multikueueconfigs.kueue.x-k8s.io' - task = 'Delete multikueueconfigs crds' - return_code = run_command_with_updates_retry(command, task) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code - - -def delete_multikueueclusters_definitions() -> int: - command = 'kubectl delete crd multikueueclusters.kueue.x-k8s.io' - task = 'Delete multikueueclusters crds' - return_code = run_command_with_updates_retry(command, task) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code - - -def get_kueue_version() -> tuple[int, str]: - command = 'kubectl kueue version' - task = 'Get kueue version on server' - return_code, val = run_command_for_value(command, task) - if return_code != 0: - return return_code, '' - lines = val.splitlines() - if len(lines) == 1: - return 1, '' - server_version_line = lines[1] - manager_image_version = server_version_line.split(':')[-1] - return return_code, manager_image_version - - -def install_kueue_on_cluster() -> int: - """Install Kueue on the cluster. - - Returns: - 0 if successful and 1 otherwise. - """ - - err_code, kueue_version_installed = get_kueue_version() - if err_code == 0: - if Version(kueue_version_installed) < Version('v0.9.0') and Version( - KUEUE_VERSION - ) >= Version('v0.9.0'): - xpk_print('Upgrading kueue on cluster from version < 0.9.0.') - upgrade_code = delete_multikueueclusters_definitions() - if upgrade_code != 0: - return upgrade_code - upgrade_code = delete_multikueueconfigs_definitions() - if upgrade_code != 0: - return upgrade_code - - command = ( - 'kubectl apply --server-side --force-conflicts -f' - f' https://github.com/kubernetes-sigs/kueue/releases/download/{KUEUE_VERSION}/manifests.yaml' - ) - task = 'Set Kueue On Cluster' - return_code = run_command_with_updates_retry(command, task) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code - - -def wait_for_kueue_available() -> int: - """Wait for Kueue to be fully available. - - Returns: - 0 if successful and 1 otherwise. - """ - command = ( - 'kubectl wait deploy/kueue-controller-manager -n kueue-system' - f' --for=condition=available --timeout={WAIT_FOR_KUEUE_TIMEOUT}' - ) - task = 'Wait for Kueue to be available' - return_code = run_command_with_updates(command, task) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code - - -def install_kueue_crs( - args, - system: SystemCharacteristics, - autoprovisioning_config: AutoprovisioningConfig | None, - flex_with_tpu=False, -) -> int: - """Install Kueue Custom Resources. - - Args: - args: user provided arguments for running the command. - system: system level arguments. - autoprovisioning_config: Autoprovisioning config to configure kueue with if - autoprovisioning is enabled. - - Returns: - 0 if successful and 1 otherwise. - """ - device_type = system.device_type - cluster_hardware_name = f'{args.num_slices}x{device_type}' - resource_type = AcceleratorTypeToAcceleratorCharacteristics[ - system.accelerator_type - ].resource_type - - autoprovisioning_enabled = False - if autoprovisioning_config: - # Determine total resources available based on autoprovisioning max chips. - autoprovisioning_enabled = True - total_chips = autoprovisioning_config.maximum_chips - cluster_hardware_name = f'{system.gke_accelerator}' - else: - # Determine total chips based on user specified topology. - total_chips = get_total_chips_requested_from_args(args, system) - if args.flex and flex_with_tpu is False: - admission_checks = """ - admissionChecks: - - dws-prov -""" - else: - admission_checks = '' - - covered_resources_config = get_kueue_covered_resources_config( - cluster_hardware_name=cluster_hardware_name, - resource_type=resource_type, - total_chips=total_chips, - cpu_limit=args.cpu_limit, - memory_limit=args.memory_limit, - ) - topology_label = '' - if system.device_type in [ - H100_MEGA_DEVICE_TYPE, - H200_DEVICE_TYPE, - B200_DEVICE_TYPE, - ]: - topology_label = 'topologyName: "gke-default"' - res_type = AcceleratorTypeToAcceleratorCharacteristics[ - system.accelerator_type - ].resource_type - yml_string = cluster_set_crd_yaml.format( - system=system, - cluster_hardware_name=cluster_hardware_name, - accelerator_label=create_accelerator_label( - system.accelerator_type, system - ), - machine_label=create_machine_label( - system.accelerator_type, system, autoprovisioning_enabled - ), - topology_label=topology_label, - covered_resources_config=covered_resources_config, - resource_type=res_type, - pw_resource_flavors=add_pw_resource_flavors(args), - pw_resources_kueue=add_pw_resources_to_kueue(args), - admission_checks=admission_checks, - managed_resource=res_type, - cluster_queue_name=CLUSTER_QUEUE_NAME, - local_queue_name=LOCAL_QUEUE_NAME, - ) - if system.device_type in [ - H100_MEGA_DEVICE_TYPE, - H200_DEVICE_TYPE, - B200_DEVICE_TYPE, - ]: - yml_string = topology_yaml + yml_string - - tmp = write_tmp_file(yml_string) - command = f'kubectl apply -f {str(tmp)}' - - task = 'Applying Kueue Custom Resources' - return_code = run_command_with_updates_retry(command, task) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code - - -def get_kueue_covered_resources_config( - cluster_hardware_name, resource_type, total_chips, cpu_limit, memory_limit -) -> str: - """Gets Kueue covered resources configuration. - - Args: - cluster_hardware_name: cluster hardware name. - resource_type: resource type of tpu or gpu. - total_chips: total number of chips for the specific resource type. - - Returns: - A string of Kueue covered resources configuration. - """ - config_format = """ - - coveredResources: {resource_types} - flavors: - - name: {cluster_hardware_name} - resources: - - name: "{resource_type}" - nominalQuota: {total_chips}""" - resource_types = [resource_type] - if cpu_limit: - config_format = config_format + """ - - name: "cpu" - nominalQuota: {cpu_limit}""" - resource_types.append('cpu') - if memory_limit: - config_format = config_format + """ - - name: "memory" - nominalQuota: {memory_limit}""" - resource_types.append('memory') - - config_string = config_format.format( - cluster_hardware_name=cluster_hardware_name, - resource_types=resource_types, - resource_type=resource_type, - total_chips=total_chips, - cpu_limit=cpu_limit, - memory_limit=memory_limit, - ) - return config_string - - -def update_kueue_resources_if_necessary(): - """Update the kueue manifest to increase the resources for the kueue controller manager. - - Returns: - 0 if successful and 1 otherwise. - """ - # Get total number of nodes - cmd_total_node_num = 'kubectl get node --no-headers | wc -l' - return_code, out = run_command_for_value( - cmd_total_node_num, 'Count total nodes' - ) - if return_code != 0: - xpk_exit(1) - # 1.2MiB per VM or 4GiB (whichever is greater). - new_memory_limit = ( - f'{max(math.ceil(int(out) * MEMORY_SIZE_PER_VM), MIN_MEMORY_LIMIT_SIZE)}Mi' - ) - yml_string = kueue_controller_manager_yml.format( - memory_limit_size=new_memory_limit, KUEUE_VERSION=KUEUE_VERSION - ) - tmp = write_tmp_file(yml_string) - command = f'kubectl apply -f {str(tmp)}' - - task = 'Updating Kueue Controller Manager resources' - return_code = run_command_with_updates_retry(command, task) - if return_code != 0: - xpk_print(f'{task} returned ERROR {return_code}') - return return_code From 6b9cab725640c35a0ce1c93bb6e0d6974b540802 Mon Sep 17 00:00:00 2001 From: Feidias Ioannidis Date: Mon, 20 Oct 2025 14:30:43 +0000 Subject: [PATCH 2/4] clean ups --- src/xpk/commands/batch.py | 2 +- src/xpk/commands/cluster.py | 37 +++++++++++++++++-- src/xpk/commands/info.py | 5 ++- src/xpk/commands/inspector.py | 2 +- src/xpk/commands/kind.py | 6 +-- src/xpk/commands/run.py | 2 +- src/xpk/commands/workload.py | 2 +- src/xpk/core/blueprint/blueprint_generator.py | 2 +- src/xpk/core/jobset.py | 2 +- src/xpk/core/kueue_manager.py | 27 +++++++++++++- 10 files changed, 72 insertions(+), 15 deletions(-) diff --git a/src/xpk/commands/batch.py b/src/xpk/commands/batch.py index 83b10f2ba..96e66e74a 100644 --- a/src/xpk/commands/batch.py +++ b/src/xpk/commands/batch.py @@ -29,7 +29,7 @@ get_storage_annotations, prepare_kjob, ) -from ..core.kueue import LOCAL_QUEUE_NAME +from ..core.kueue_manager import LOCAL_QUEUE_NAME from ..utils.console import xpk_exit, xpk_print from ..utils.execution_context import is_dry_run from ..utils.validation import validate_dependencies, should_validate_dependencies diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index 8b91bd4af..cb5db7f83 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -46,9 +46,6 @@ ) from ..core.jobset import update_jobset_resources_if_necessary from ..core.kjob import apply_kjob_crds, prepare_kjob, verify_kjob_installed -from ..core.kueue import ( - cluster_preheat_yml, -) from ..core.kueue_manager import (KueueConfig, KueueManager) from ..core.nap import enable_autoprovisioning_on_cluster from ..core.network import ( @@ -82,6 +79,40 @@ import shutil import os +cluster_preheat_yml = """ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: {cachekey} + labels: + k8s-app: {cachekey} +spec: + selector: + matchLabels: + k8s-app: {cachekey} + updateStrategy: + type: RollingUpdate + template: + metadata: + labels: + name: {cachekey} + k8s-app: {cachekey} + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: {nodeSelectorKey} + operator: Exists + tolerations: + - operator: "Exists" + containers: + - image: {image_name} + name: {cachekey} + command: [ "sleep", "inf" ] +""" + def cluster_adapt(args) -> None: """Function that performs cluster adaptation. diff --git a/src/xpk/commands/info.py b/src/xpk/commands/info.py index 691efb655..927e04a37 100644 --- a/src/xpk/commands/info.py +++ b/src/xpk/commands/info.py @@ -22,7 +22,7 @@ from ..core.commands import run_command_for_value from ..core.cluster import get_cluster_credentials from ..core.gcloud_context import add_zone_and_project -from ..core.kueue import verify_kueuectl +from ..core.kueue_manager import KueueManager from ..utils.console import xpk_exit, xpk_print from ..utils.validation import validate_dependencies, should_validate_dependencies @@ -42,7 +42,8 @@ def info(args: Namespace) -> None: add_zone_and_project(args) get_cluster_credentials(args) - verify_kueuectl() + kueue_manager = KueueManager() + kueue_manager.verify_kueuectl() lq, cq = bool(args.localqueue), bool(args.clusterqueue) if not lq and not cq: lq, cq = True, True diff --git a/src/xpk/commands/inspector.py b/src/xpk/commands/inspector.py index bb819d646..49fa7ee87 100644 --- a/src/xpk/commands/inspector.py +++ b/src/xpk/commands/inspector.py @@ -17,7 +17,7 @@ from ..core.cluster import get_cluster_credentials from ..core.commands import run_command_for_value from ..core.gcloud_context import add_zone_and_project, get_cluster_location -from ..core.kueue import CLUSTER_QUEUE_NAME, LOCAL_QUEUE_NAME +from ..core.kueue_manager import CLUSTER_QUEUE_NAME, LOCAL_QUEUE_NAME from ..core.resources import CLUSTER_METADATA_CONFIGMAP, CLUSTER_RESOURCES_CONFIGMAP from ..utils.console import xpk_exit, xpk_print from ..utils.file import append_tmp_file, write_tmp_file diff --git a/src/xpk/commands/kind.py b/src/xpk/commands/kind.py index 579058485..b9e3bc6b7 100644 --- a/src/xpk/commands/kind.py +++ b/src/xpk/commands/kind.py @@ -101,10 +101,10 @@ def cluster_create(args) -> None: total_chips=get_total_chips_requested_from_args(args, system), autoprovisioning_enabled=False, num_slices=args.num_slices, - memory_limit=args.memory_limit, - cpu_limit=args.cpu_limit, + memory_limit='', + cpu_limit=0, is_pathways_cluster=False, - flex=args.flex, + flex=False, ), ) diff --git a/src/xpk/commands/run.py b/src/xpk/commands/run.py index 74a3f0bbe..fec97e7ad 100644 --- a/src/xpk/commands/run.py +++ b/src/xpk/commands/run.py @@ -28,7 +28,7 @@ get_storage_annotations, prepare_kjob, ) -from ..core.kueue import LOCAL_QUEUE_NAME +from ..core.kueue_manager import LOCAL_QUEUE_NAME from ..utils.console import xpk_exit, xpk_print from ..utils.validation import validate_dependencies, should_validate_dependencies from .kind import set_local_cluster_command diff --git a/src/xpk/commands/workload.py b/src/xpk/commands/workload.py index 76ee8cc82..4b2e89f45 100644 --- a/src/xpk/commands/workload.py +++ b/src/xpk/commands/workload.py @@ -34,7 +34,7 @@ ) from ..core.docker_resources import get_volumes, parse_env_config from ..core.gcloud_context import add_zone_and_project -from ..core.kueue import LOCAL_QUEUE_NAME +from ..core.kueue_manager import LOCAL_QUEUE_NAME from ..core.monitoring import get_gke_outlier_dashboard from ..core.nap import ( get_autoprovisioning_node_selector_args, diff --git a/src/xpk/core/blueprint/blueprint_generator.py b/src/xpk/core/blueprint/blueprint_generator.py index 1692e72f8..2d02adb66 100644 --- a/src/xpk/core/blueprint/blueprint_generator.py +++ b/src/xpk/core/blueprint/blueprint_generator.py @@ -32,7 +32,7 @@ ) from ..system_characteristics import get_system_characteristics_by_device_type from .blueprint_definitions import Blueprint, DeploymentGroup, DeploymentModule -from ..kueue import KUEUE_VERSION +from ..kueue_manager import KUEUE_VERSION yaml_parser = yaml.YAML() diff --git a/src/xpk/core/jobset.py b/src/xpk/core/jobset.py index b22e09c32..b65c6503f 100644 --- a/src/xpk/core/jobset.py +++ b/src/xpk/core/jobset.py @@ -18,7 +18,7 @@ from ..utils.console import xpk_exit, xpk_print from ..utils.file import write_tmp_file -from ..core.kueue import ( +from ..core.kueue_manager import ( MEMORY_SIZE_PER_VM, MIN_MEMORY_LIMIT_SIZE, ) diff --git a/src/xpk/core/kueue_manager.py b/src/xpk/core/kueue_manager.py index 31810a3a8..6c8319615 100644 --- a/src/xpk/core/kueue_manager.py +++ b/src/xpk/core/kueue_manager.py @@ -48,6 +48,7 @@ KUEUE_CONTROLLER_MANAGER_JINJA_FILE = "kueue_controller_manager.yaml.j2" MEMORY_SIZE_PER_VM = 1.2 MIN_MEMORY_LIMIT_SIZE = 4096 +KUEUE_VERSION = "v0.12.2" @dataclass @@ -66,7 +67,9 @@ class KueueManager: """Manages the installation and configuration of Kueue on an XPK cluster.""" def __init__( - self, kueue_version: str = "v0.12.2", template_path="src/xpk/templates/" + self, + kueue_version: str = KUEUE_VERSION, + template_path="src/xpk/templates/", ): self.kueue_version = kueue_version self.template_env = Environment(loader=FileSystemLoader(template_path)) @@ -377,3 +380,25 @@ def __update_kueue_resources_if_necessary(self) -> int: if return_code != 0: xpk_print(f"{task} returned ERROR {return_code}") return return_code + + def verify_kueuectl(self) -> None: + """Verify if kueuectl is installed. + Returns: + None + """ + xpk_print("Veryfing kueuectl installation") + + command = "kubectl kueue version" + task = "Verify kueuectl installation on cluster" + verify_kueuectl_installed_code, _ = run_command_for_value(command, task) + + if verify_kueuectl_installed_code == 0: + xpk_print("kueuectl found") + + if verify_kueuectl_installed_code != 0: + xpk_print( + "kueuectl not found. Please follow" + " https://kueue.sigs.k8s.io/docs/reference/kubectl-kueue/installation/" + " to install kueuectl." + ) + xpk_exit(verify_kueuectl_installed_code) From 8c907f11475ff39b20dbc5d2b03727a7c2a3aa21 Mon Sep 17 00:00:00 2001 From: Feidias Ioannidis Date: Mon, 20 Oct 2025 15:23:47 +0000 Subject: [PATCH 3/4] remove kueuectl verification --- src/xpk/commands/info.py | 3 --- src/xpk/core/kueue_manager.py | 22 ---------------------- 2 files changed, 25 deletions(-) diff --git a/src/xpk/commands/info.py b/src/xpk/commands/info.py index 9249d574f..84b6d01ec 100644 --- a/src/xpk/commands/info.py +++ b/src/xpk/commands/info.py @@ -22,7 +22,6 @@ from ..core.commands import run_command_for_value from ..core.cluster import get_cluster_credentials from ..core.gcloud_context import add_zone_and_project -from ..core.kueue_manager import KueueManager from ..utils.console import xpk_exit, xpk_print from ..utils.validation import validate_dependencies_list, SystemDependency, should_validate_dependencies @@ -46,8 +45,6 @@ def info(args: Namespace) -> None: add_zone_and_project(args) get_cluster_credentials(args) - kueue_manager = KueueManager() - kueue_manager.verify_kueuectl() lq, cq = bool(args.localqueue), bool(args.clusterqueue) if not lq and not cq: lq, cq = True, True diff --git a/src/xpk/core/kueue_manager.py b/src/xpk/core/kueue_manager.py index 6c8319615..d0ff0ea5f 100644 --- a/src/xpk/core/kueue_manager.py +++ b/src/xpk/core/kueue_manager.py @@ -380,25 +380,3 @@ def __update_kueue_resources_if_necessary(self) -> int: if return_code != 0: xpk_print(f"{task} returned ERROR {return_code}") return return_code - - def verify_kueuectl(self) -> None: - """Verify if kueuectl is installed. - Returns: - None - """ - xpk_print("Veryfing kueuectl installation") - - command = "kubectl kueue version" - task = "Verify kueuectl installation on cluster" - verify_kueuectl_installed_code, _ = run_command_for_value(command, task) - - if verify_kueuectl_installed_code == 0: - xpk_print("kueuectl found") - - if verify_kueuectl_installed_code != 0: - xpk_print( - "kueuectl not found. Please follow" - " https://kueue.sigs.k8s.io/docs/reference/kubectl-kueue/installation/" - " to install kueuectl." - ) - xpk_exit(verify_kueuectl_installed_code) From b19a7189e4f5683fb5bc047fcb711f6d4c371d8f Mon Sep 17 00:00:00 2001 From: Feidias Ioannidis Date: Tue, 21 Oct 2025 13:19:47 +0000 Subject: [PATCH 4/4] remove string yaml --- src/xpk/commands/cluster.py | 43 +++++------------------ src/xpk/core/kueue_manager.py | 3 +- src/xpk/templates/cluster_preheat.yaml.j2 | 31 ++++++++++++++++ src/xpk/utils/templates.py | 2 ++ 4 files changed, 43 insertions(+), 36 deletions(-) create mode 100644 src/xpk/templates/cluster_preheat.yaml.j2 diff --git a/src/xpk/commands/cluster.py b/src/xpk/commands/cluster.py index cb5db7f83..60a9b0b83 100644 --- a/src/xpk/commands/cluster.py +++ b/src/xpk/commands/cluster.py @@ -76,42 +76,12 @@ from ..utils.validation import validate_dependencies_list, SystemDependency, should_validate_dependencies from . import cluster_gcluster from .common import set_cluster_command +from jinja2 import Environment, FileSystemLoader +from ..utils.templates import TEMPLATE_PATH import shutil import os -cluster_preheat_yml = """ -apiVersion: apps/v1 -kind: DaemonSet -metadata: - name: {cachekey} - labels: - k8s-app: {cachekey} -spec: - selector: - matchLabels: - k8s-app: {cachekey} - updateStrategy: - type: RollingUpdate - template: - metadata: - labels: - name: {cachekey} - k8s-app: {cachekey} - spec: - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: {nodeSelectorKey} - operator: Exists - tolerations: - - operator: "Exists" - containers: - - image: {image_name} - name: {cachekey} - command: [ "sleep", "inf" ] -""" +CLUSTER_PREHEAT_JINJA_FILE = 'cluster_preheat.yaml.j2' def cluster_adapt(args) -> None: @@ -455,12 +425,15 @@ def cluster_cacheimage(args) -> None: node_selector_key = AcceleratorTypeToAcceleratorCharacteristics[ system.accelerator_type ].accelerator_label - yml_string = cluster_preheat_yml.format( + + template_env = Environment(loader=FileSystemLoader(TEMPLATE_PATH)) + cluster_preheat_yaml = template_env.get_template(CLUSTER_PREHEAT_JINJA_FILE) + rendered_yaml = cluster_preheat_yaml.render( cachekey=args.cache_key, image_name=args.docker_image, nodeSelectorKey=node_selector_key, ) - tmp = write_tmp_file(yml_string) + tmp = write_tmp_file(rendered_yaml) command_apply = f'kubectl apply -f {str(tmp)}' command_delete = f'kubectl delete -f {str(tmp)} --ignore-not-found=true' diff --git a/src/xpk/core/kueue_manager.py b/src/xpk/core/kueue_manager.py index 55e70a597..65a337b5b 100644 --- a/src/xpk/core/kueue_manager.py +++ b/src/xpk/core/kueue_manager.py @@ -39,6 +39,7 @@ ) from ..utils.file import write_tmp_file from ..utils.console import xpk_print, xpk_exit +from ..utils.templates import TEMPLATE_PATH WAIT_FOR_KUEUE_TIMEOUT = "10m" CLUSTER_QUEUE_NAME = "cluster-queue" @@ -69,7 +70,7 @@ class KueueManager: def __init__( self, kueue_version: str = KUEUE_VERSION, - template_path="src/xpk/templates/", + template_path=TEMPLATE_PATH, ): self.kueue_version = kueue_version self.template_env = Environment(loader=FileSystemLoader(template_path)) diff --git a/src/xpk/templates/cluster_preheat.yaml.j2 b/src/xpk/templates/cluster_preheat.yaml.j2 new file mode 100644 index 000000000..12a5baf8f --- /dev/null +++ b/src/xpk/templates/cluster_preheat.yaml.j2 @@ -0,0 +1,31 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: {{ cachekey }} + labels: + k8s-app: {{ cachekey }} +spec: + selector: + matchLabels: + k8s-app: {{ cachekey }} + updateStrategy: + type: RollingUpdate + template: + metadata: + labels: + name: {{ cachekey }} + k8s-app: {{ cachekey }} + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: {{ nodeSelectorKey }} + operator: Exists + tolerations: + - operator: "Exists" + containers: + - image: {{ image_name }} + name: {{ cachekey }} + command: [ "sleep", "inf" ] diff --git a/src/xpk/utils/templates.py b/src/xpk/utils/templates.py index f631b069c..11e9cba24 100644 --- a/src/xpk/utils/templates.py +++ b/src/xpk/utils/templates.py @@ -18,6 +18,8 @@ import ruamel.yaml +TEMPLATE_PATH = "src/xpk/templates/" + yaml = ruamel.yaml.YAML()