Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HPA for paasta services(k8s deployments) #2362

Merged
merged 9 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 107 additions & 23 deletions paasta_tools/kubernetes/application/controller_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from paasta_tools.kubernetes_tools import KubernetesDeploymentConfig
from paasta_tools.kubernetes_tools import KubernetesDeploymentConfigDict
from paasta_tools.kubernetes_tools import load_kubernetes_service_config_no_cache
from paasta_tools.kubernetes_tools import max_unavailable
from paasta_tools.kubernetes_tools import pod_disruption_budget_for_service_instance
from paasta_tools.kubernetes_tools import update_deployment
from paasta_tools.kubernetes_tools import update_stateful_set
Expand All @@ -31,6 +30,14 @@ def __init__(
item: Union[V1Deployment, V1StatefulSet],
logging=logging.getLogger(__name__),
) -> None:
"""
This Application wrapper is an interface for creating/deleting k8s deployments and statefulsets
soa_config is KubernetesDeploymentConfig. It is not loaded in init because it is not always required.
:param item: Kubernetes Object(V1Deployment/V1StatefulSet) that has already been filled up.
:param logging: where logs go
"""
if not item.metadata.namespace:
item.metadata.namespace = "paasta"
self.kube_deployment = KubeDeployment(
service=item.metadata.labels["yelp.com/paasta_service"],
instance=item.metadata.labels["yelp.com/paasta_instance"],
Expand Down Expand Up @@ -103,7 +110,7 @@ def delete_pod_disruption_budget(self, kube_client: KubeClient) -> None:
)
)
else:
raise e
raise
else:
self.logging.info(
"deleted pod disruption budget/{} from namespace/{}".format(
Expand All @@ -117,10 +124,8 @@ def ensure_pod_disruption_budget(
pdr = pod_disruption_budget_for_service_instance(
service=self.kube_deployment.service,
instance=self.kube_deployment.instance,
min_instances=self.soa_config.get_desired_instances()
- max_unavailable(
instance_count=self.soa_config.get_desired_instances(),
bounce_margin_factor=self.soa_config.get_bounce_margin_factor(),
max_unavailable="{}%".format(
int((1 - self.soa_config.get_bounce_margin_factor()) * 100)
),
)
try:
Expand All @@ -134,21 +139,10 @@ def ensure_pod_disruption_budget(
raise

if existing_pdr:
if existing_pdr.spec.min_available != pdr.spec.min_available:
# poddisruptionbudget objects are not mutable like most things in the kubernetes api,
# so we have to do a delete/replace.
# unfortunately we can't really do this transactionally, but I guess we'll just hope for the best?
logging.debug(
f"existing poddisruptionbudget {pdr.metadata.name} is out of date; deleting"
)
kube_client.policy.delete_namespaced_pod_disruption_budget(
name=pdr.metadata.name,
namespace=pdr.metadata.namespace,
body=V1DeleteOptions(),
)
logging.debug(f"creating poddisruptionbudget {pdr.metadata.name}")
return create_pod_disruption_budget(
kube_client=kube_client, pod_disruption_budget=pdr
if existing_pdr.spec.max_unavailable != pdr.spec.max_unavailable:
logging.debug(f"Updating poddisruptionbudget {pdr.metadata.name}")
return kube_client.policy.patch_namespaced_pod_disruption_budget(
name=pdr.metadata.name, namespace=pdr.metadata.namespace, body=pdr
)
else:
logging.debug(f"poddisruptionbudget {pdr.metadata.name} up to date")
Expand Down Expand Up @@ -180,22 +174,112 @@ def deep_delete(self, kube_client: KubeClient) -> None:
)
)
else:
raise e
raise
else:
self.logging.info(
"deleted deploy/{} from namespace/{}".format(
self.item.metadata.name, self.item.metadata.namespace
)
)
self.delete_pod_disruption_budget(kube_client)
self.delete_horizontal_pod_autoscaler(kube_client)

def get_existing_app(self, kube_client: KubeClient):
return kube_client.deployments.read_namespaced_deployment(
name=self.item.metadata.name, namespace=self.item.metadata.namespace
)

def create(self, kube_client: KubeClient) -> None:
create_deployment(kube_client=kube_client, formatted_deployment=self.item)
self.ensure_pod_disruption_budget(kube_client)
self.sync_horizontal_pod_autoscaler(kube_client)

def update(self, kube_client: KubeClient) -> None:
# If autoscaling is enabled, do not update replicas.
# In all other cases, replica is set to max(instances, min_instances)
if not self.get_soa_config().get("instances"):
mzq592 marked this conversation as resolved.
Show resolved Hide resolved
self.item.spec.replicas = self.get_existing_app(kube_client).spec.replicas
update_deployment(kube_client=kube_client, formatted_deployment=self.item)
self.ensure_pod_disruption_budget(kube_client)
self.sync_horizontal_pod_autoscaler(kube_client)

def sync_horizontal_pod_autoscaler(self, kube_client: KubeClient) -> None:
"""
In order for autoscaling to work, there needs to be at least two configurations
min_instnace, max_instance, and there cannot be instance.
"""
self.logging.info(
f"Syncing HPA setting for {self.item.metadata.name}/name in {self.item.metadata.namespace}"
)
hpa_exists = self.exists_hpa(kube_client)
# NO autoscaling
if self.get_soa_config().get("instances"):
# Remove HPA if autoscaling is disabled
if hpa_exists:
self.delete_horizontal_pod_autoscaler(kube_client)
return
body = self.soa_config.get_autoscaling_metric_spec(
mzq592 marked this conversation as resolved.
Show resolved Hide resolved
name=self.item.metadata.name, namespace=self.item.metadata.namespace
)
if not body:
raise Exception(
f"CRIT: autoscaling misconfigured for {self.kube_deployment.service}.\
{self.kube_deployment.instance}. Please correct the configuration and update pre-commit hook."
)
self.logging.debug(body)
if not hpa_exists:
self.logging.info(
f"Creating new HPA for {self.item.metadata.name}/name in {self.item.metadata.namespace}"
)
kube_client.autoscaling.create_namespaced_horizontal_pod_autoscaler(
namespace=self.item.metadata.namespace, body=body, pretty=True
)
else:
self.logging.info(
f"Updating new HPA for {self.item.metadata.name}/name in {self.item.metadata.namespace}/namespace"
)
kube_client.autoscaling.patch_namespaced_horizontal_pod_autoscaler(
name=self.item.metadata.name,
namespace=self.item.metadata.namespace,
body=body,
pretty=True,
)

def exists_hpa(self, kube_client: KubeClient) -> bool:
return (
len(
kube_client.autoscaling.list_namespaced_horizontal_pod_autoscaler(
field_selector=f"metadata.name={self.item.metadata.name}",
namespace=self.item.metadata.namespace,
).items
)
> 0
)

def delete_horizontal_pod_autoscaler(self, kube_client: KubeClient) -> None:
try:
kube_client.autoscaling.delete_namespaced_horizontal_pod_autoscaler(
name=self.item.metadata.name,
namespace=self.item.metadata.namespace,
body=V1DeleteOptions(),
)
except ApiException as e:
if e.status == 404:
# Deployment does not exist, nothing to delete but
# we can consider this a success.
self.logging.debug(
"not deleting nonexistent HPA/{self.item.metadata.name} from namespace/{self.item.metadata.namespace}".format(
self.item.metadata.name, self.item.metadata.namespace
)
)
else:
raise
else:
self.logging.info(
"deleted HPA/{} from namespace/{}".format(
self.item.metadata.name, self.item.metadata.namespace
)
)


class StatefulSetWrapper(Application):
Expand All @@ -219,7 +303,7 @@ def deep_delete(self, kube_client: KubeClient) -> None:
)
)
else:
raise e
raise
else:
self.logging.info(
"deleted statefulset/{} from namespace/{}".format(
Expand Down
Loading