Skip to content

Commit

Permalink
Enable native HPA for Airflow Workers (#36174)
Browse files Browse the repository at this point in the history
  • Loading branch information
pemiranda-clgx committed Jan 14, 2024
1 parent 8767357 commit 03cf692
Show file tree
Hide file tree
Showing 6 changed files with 543 additions and 1 deletion.
3 changes: 2 additions & 1 deletion chart/templates/workers/worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#################################
{{- $persistence := .Values.workers.persistence.enabled }}
{{- $keda := .Values.workers.keda.enabled }}
{{- $hpa := and .Values.workers.hpa.enabled (not .Values.workers.keda.enabled) }}
{{- if or (eq .Values.executor "CeleryExecutor") (eq .Values.executor "CeleryKubernetesExecutor") }}
{{- $nodeSelector := or .Values.workers.nodeSelector .Values.nodeSelector }}
{{- $affinity := or .Values.workers.affinity .Values.affinity }}
Expand Down Expand Up @@ -59,7 +60,7 @@ spec:
{{- if $persistence }}
serviceName: {{ include "airflow.fullname" . }}-worker
{{- end }}
{{- if not $keda }}
{{- if and (not $keda) (not $hpa) }}
replicas: {{ .Values.workers.replicas }}
{{- end }}
{{- if $revisionHistoryLimit }}
Expand Down
49 changes: 49 additions & 0 deletions chart/templates/workers/worker-hpa.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{{/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/}}

################################
## Airflow Worker HPA
#################################
{{- if and (and (not .Values.workers.keda.enabled) .Values.workers.hpa.enabled) (has .Values.executor (list "CeleryExecutor" "CeleryKubernetesExecutor")) }}
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "airflow.fullname" . }}-worker
labels:
tier: airflow
component: worker-horizontalpodautoscaler
release: {{ .Release.Name }}
chart: "{{ .Chart.Name }}-{{ .Chart.Version }}"
heritage: {{ .Release.Service }}
deploymentName: {{ .Release.Name }}-worker
{{- if or (.Values.labels) (.Values.workers.labels) }}
{{- mustMerge .Values.workers.labels .Values.labels | toYaml | nindent 4 }}
{{- end }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: {{ ternary "StatefulSet" "Deployment" .Values.workers.persistence.enabled }}
name: {{ include "airflow.fullname" . }}-worker
minReplicas: {{ .Values.workers.hpa.minReplicaCount }}
maxReplicas: {{ .Values.workers.hpa.maxReplicaCount }}
metrics: {{- toYaml .Values.workers.hpa.metrics | nindent 4 }}
{{- with .Values.workers.hpa.behavior }}
behavior: {{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}
253 changes: 253 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1531,6 +1531,53 @@
}
}
},
"hpa": {
"description": "HPA configuration.",
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"description": "Allow HPA autoscaling (KEDA must be disabled).",
"type": "boolean",
"default": false
},
"minReplicaCount": {
"description": "Minimum number of workers created by KEDA.",
"type": "integer",
"default": 0
},
"maxReplicaCount": {
"description": "Maximum number of workers created by KEDA.",
"type": "integer",
"default": 5
},
"metrics": {
"description": "Specifications for which to use to calculate the desired replica count.",
"type": "array",
"default": [
{
"type": "Resource",
"resource": {
"name": "cpu",
"target": {
"type": "Utilization",
"averageUtilization": 80
}
}
}
],
"items": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricSpec"
}
},
"behavior": {
"description": "HorizontalPodAutoscalerBehavior configures the scaling behavior of the target.",
"type": "object",
"default": {},
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.HorizontalPodAutoscalerBehavior"
}
}
},
"persistence": {
"description": "Persistence configuration.",
"type": "object",
Expand Down Expand Up @@ -7422,6 +7469,72 @@
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.ContainerResourceMetricSource": {
"description": "ContainerResourceMetricSource indicates how to scale on a resource metric known to Kubernetes, as specified in requests and limits, describing each pod in the current scale target (e.g. CPU or memory). The values will be averaged together before being compared to the target. Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source. Only one \"target\" type should be set.",
"properties": {
"container": {
"description": "container is the name of the container in the pods of the scaling target",
"type": "string"
},
"name": {
"description": "name is the name of the resource in question.",
"type": "string"
},
"target": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget",
"description": "target specifies the target value for the given metric"
}
},
"required": [
"name",
"target",
"container"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.CrossVersionObjectReference": {
"description": "CrossVersionObjectReference contains enough information to let you identify the referred resource.",
"properties": {
"apiVersion": {
"description": "API version of the referent",
"type": "string"
},
"kind": {
"description": "Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds\"",
"type": "string"
},
"name": {
"description": "Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names",
"type": "string"
}
},
"required": [
"kind",
"name"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.ExternalMetricSource": {
"description": "ExternalMetricSource indicates how to scale on a metric not associated with any Kubernetes object (for example length of queue in cloud messaging service, or QPS from loadbalancer running outside of cluster).",
"properties": {
"metric": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricIdentifier",
"description": "metric identifies the target metric by name and selector"
},
"target": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget",
"description": "target specifies the target value for the given metric"
}
},
"required": [
"metric",
"target"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.HPAScalingPolicy": {
"description": "HPAScalingPolicy is a single policy which must hold true for a specified past interval.",
"properties": {
Expand Down Expand Up @@ -7486,6 +7599,146 @@
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.MetricIdentifier": {
"description": "MetricIdentifier defines the name and optionally selector for a metric",
"properties": {
"name": {
"description": "name is the name of the given metric",
"type": "string"
},
"selector": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.LabelSelector",
"description": "selector is the string-encoded form of a standard kubernetes label selector for the given metric When set, it is passed as an additional parameter to the metrics server for more specific metrics scoping. When unset, just the metricName will be used to gather metrics."
}
},
"required": [
"name"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.MetricSpec": {
"description": "MetricSpec specifies how to scale based on a single metric (only `type` and one other matching field should be set at once).",
"properties": {
"containerResource": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ContainerResourceMetricSource",
"description": "container resource refers to a resource metric (such as those specified in requests and limits) known to Kubernetes describing a single container in each pod of the current scale target (e.g. CPU or memory). Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source. This is an alpha feature and can be enabled by the HPAContainerMetrics feature flag."
},
"external": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ExternalMetricSource",
"description": "external refers to a global metric that is not associated with any Kubernetes object. It allows autoscaling based on information coming from components running outside of cluster (for example length of queue in cloud messaging service, or QPS from loadbalancer running outside of cluster)."
},
"object": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ObjectMetricSource",
"description": "object refers to a metric describing a single kubernetes object (for example, hits-per-second on an Ingress object)."
},
"pods": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.PodsMetricSource",
"description": "pods refers to a metric describing each pod in the current scale target (for example, transactions-processed-per-second). The values will be averaged together before being compared to the target value."
},
"resource": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.ResourceMetricSource",
"description": "resource refers to a resource metric (such as those specified in requests and limits) known to Kubernetes describing each pod in the current scale target (e.g. CPU or memory). Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source."
},
"type": {
"description": "type is the type of metric source. It should be one of \"ContainerResource\", \"External\", \"Object\", \"Pods\" or \"Resource\", each mapping to a matching field in the object. Note: \"ContainerResource\" type is available on when the feature-gate HPAContainerMetrics is enabled",
"type": "string"
}
},
"required": [
"type"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.MetricTarget": {
"description": "MetricTarget defines the target value, average value, or average utilization of a specific metric",
"properties": {
"averageUtilization": {
"description": "averageUtilization is the target value of the average of the resource metric across all relevant pods, represented as a percentage of the requested value of the resource for the pods. Currently only valid for Resource metric source type",
"format": "int32",
"type": "integer"
},
"averageValue": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.api.resource.Quantity",
"description": "averageValue is the target value of the average of the metric across all relevant pods (as a quantity)"
},
"type": {
"description": "type represents whether the metric type is Utilization, Value, or AverageValue",
"type": "string"
},
"value": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.api.resource.Quantity",
"description": "value is the target value of the metric (as a quantity)."
}
},
"required": [
"type"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.ObjectMetricSource": {
"description": "ObjectMetricSource indicates how to scale on a metric describing a kubernetes object (for example, hits-per-second on an Ingress object).",
"properties": {
"describedObject": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.CrossVersionObjectReference"
},
"metric": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricIdentifier",
"description": "metric identifies the target metric by name and selector"
},
"target": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget",
"description": "target specifies the target value for the given metric"
}
},
"required": [
"describedObject",
"target",
"metric"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.PodsMetricSource": {
"description": "PodsMetricSource indicates how to scale on a metric describing each pod in the current scale target (for example, transactions-processed-per-second). The values will be averaged together before being compared to the target value.",
"properties": {
"metric": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricIdentifier",
"description": "metric identifies the target metric by name and selector"
},
"target": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget",
"description": "target specifies the target value for the given metric"
}
},
"required": [
"metric",
"target"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.autoscaling.v2beta2.ResourceMetricSource": {
"description": "ResourceMetricSource indicates how to scale on a resource metric known to Kubernetes, as specified in requests and limits, describing each pod in the current scale target (e.g. CPU or memory). The values will be averaged together before being compared to the target. Such metrics are built in to Kubernetes, and have special scaling options on top of those available to normal per-pod metrics using the \"pods\" source. Only one \"target\" type should be set.",
"properties": {
"name": {
"description": "name is the name of the resource in question.",
"type": "string"
},
"target": {
"$ref": "#/definitions/io.k8s.api.autoscaling.v2beta2.MetricTarget",
"description": "target specifies the target value for the given metric"
}
},
"required": [
"name",
"target"
],
"type": "object",
"additionalProperties": false
},
"io.k8s.api.core.v1.AWSElasticBlockStoreVolumeSource": {
"description": "Represents a Persistent Disk resource in AWS.\n\nAn AWS EBS disk must exist before mounting to a container. The disk must also be in the same AWS zone as the kubelet. An AWS EBS disk can only be mounted as read/write once. AWS EBS volumes support ownership management and SELinux relabeling.",
"properties": {
Expand Down
22 changes: 22 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,28 @@ workers:
# This configuration will be ignored if PGBouncer is not enabled
usePgbouncer: true

# Allow HPA (KEDA must be disabled).
hpa:
enabled: false

# Minimum number of workers created by HPA
minReplicaCount: 0

# Maximum number of workers created by HPA
maxReplicaCount: 5

# Specifications for which to use to calculate the desired replica count
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 80

# Scaling behavior of the target in both Up and Down directions
behavior: {}

persistence:
# Enable persistent volumes
enabled: true
Expand Down

0 comments on commit 03cf692

Please sign in to comment.