Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply configured tolerations to service pods #954

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
V1PersistentVolumeClaimSpec, V1NetworkPolicy, V1NetworkPolicySpec, V1NetworkPolicyEgressRule, V1NetworkPolicyPeer, \
V1NetworkPolicyIngressRule, V1Secret, V1SecretVolumeSource, V1LocalObjectReference, V1Service, \
V1ServiceSpec, V1ServicePort, V1PodSecurityContext, V1Probe, V1ExecAction, V1SecurityContext, \
V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement
V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement, V1Toleration
from kubernetes.client.rest import ApiException
from assemblyline.odm.models.service import DependencyConfig, DockerConfig, PersistentVolume

Expand Down Expand Up @@ -241,7 +241,7 @@ def parse_cpu(string: str) -> float:
class KubernetesController(ControllerInterface):
def __init__(self, logger, namespace: str, prefix: str, priority: str, dependency_priority: str,
cpu_reservation: float, linux_node_selector: Selector, labels=None, log_level="INFO", core_env={},
default_service_account=None, cluster_pod_list=True):
default_service_account=None, cluster_pod_list=True, default_service_tolerations = []):
# Try loading a kubernetes connection from either the fact that we are running
# inside of a cluster, or have a config file that tells us how
try:
Expand Down Expand Up @@ -285,6 +285,7 @@ def __init__(self, logger, namespace: str, prefix: str, priority: str, dependenc
self._service_limited_env: dict[str, dict[str, str]] = defaultdict(dict)
self.default_service_account: Optional[str] = default_service_account
self.cluster_pod_list = cluster_pod_list
self.default_service_tolerations = [V1Toleration(**toleration.as_primitives()) for toleration in default_service_tolerations]

# A record of previously reported events so that we don't report the same message repeatedly, fill it with
# existing messages so we don't have a huge dump of duplicates on restart
Expand Down Expand Up @@ -849,6 +850,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
security_context=V1PodSecurityContext(fs_group=1000),
service_account_name=service_account,
affinity=selector_to_node_affinity(self.linux_node_selector),
tolerations=self.default_service_tolerations
)

if use_pull_secret:
Expand Down
16 changes: 10 additions & 6 deletions assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,13 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
'privilege': 'service'
}

service_defaults_config = self.config.core.scaler.service_defaults

# If Scaler has envs that set service-server env, then that should override configured values
if SERVICE_API_HOST:
self.config.core.scaler.service_defaults.environment = \
service_defaults_config.environment = \
[EnvironmentVariable(dict(name="SERVICE_API_HOST", value=SERVICE_API_HOST))] + \
[env for env in self.config.core.scaler.service_defaults.environment if env.name != "SERVICE_API_HOST"]
[env for env in service_defaults_config.environment if env.name != "SERVICE_API_HOST"]

if self.config.core.scaler.additional_labels:
labels.update({k: v for k, v in (_l.split("=") for _l in self.config.core.scaler.additional_labels)})
Expand All @@ -304,7 +306,9 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
log_level=self.config.logging.log_level,
core_env=core_env,
cluster_pod_list=self.config.core.scaler.cluster_pod_list,
default_service_account=self.config.services.service_account)
default_service_account=self.config.services.service_account,
default_service_tolerations=service_defaults_config.tolerations
)

# Add global configuration for privileged services
self.controller.add_config_mount(KUBERNETES_AL_CONFIG, config_map=KUBERNETES_AL_CONFIG, key="config",
Expand All @@ -313,7 +317,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
# If we're passed an override for server-server and it's defining an HTTPS connection, then add a global
# mount for the Root CA that needs to be mounted
if INTERNAL_ENCRYPT:
self.config.core.scaler.service_defaults.mounts.append(Mount(dict(
service_defaults_config.mounts.append(Mount(dict(
name="root-ca",
path="/etc/assemblyline/ssl/al_root-ca.crt",
resource_type="secret",
Expand All @@ -322,7 +326,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
)))

# Add default mounts for (non-)privileged services
for mount in self.config.core.scaler.service_defaults.mounts:
for mount in service_defaults_config.mounts:
# Deprecated configuration for mounting ConfigMap
# TODO: Deprecate code on next major change
if mount.config_map:
Expand Down Expand Up @@ -365,7 +369,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
if CLASSIFICATION_HOST_PATH:
self.controller.global_mounts.append((CLASSIFICATION_HOST_PATH, '/etc/assemblyline/classification.yml'))

for mount in self.config.core.scaler.service_defaults.mounts:
for mount in service_defaults_config.mounts:
# Mounts are all storage-based since there's no equivalent to ConfigMaps in Docker
if mount.privileged_only:
self.controller.core_mounts.append((mount.name, mount.path))
Expand Down
9 changes: 6 additions & 3 deletions assemblyline_core/updater/run_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from kubernetes.client import V1Job, V1ObjectMeta, V1JobSpec, V1PodTemplateSpec, V1PodSpec, V1Volume, \
V1VolumeMount, V1EnvVar, V1Container, V1ResourceRequirements, \
V1ConfigMapVolumeSource, V1Secret, V1SecretVolumeSource, V1LocalObjectReference
V1ConfigMapVolumeSource, V1Secret, V1SecretVolumeSource, V1LocalObjectReference, V1Toleration
from kubernetes import client, config
from kubernetes.client.rest import ApiException

Expand Down Expand Up @@ -148,7 +148,7 @@ def restart(self, service_name):

class KubernetesUpdateInterface:
def __init__(self, logger, prefix, namespace, priority_class, extra_labels, linux_node_selector: Selector,
log_level="INFO", default_service_account=None):
log_level="INFO", default_service_account=None, default_service_tolerations=[]):
# Try loading a kubernetes connection from either the fact that we are running
# inside of a cluster, or we have a configuration in the normal location
try:
Expand Down Expand Up @@ -181,6 +181,8 @@ def __init__(self, logger, prefix, namespace, priority_class, extra_labels, linu
self.default_service_account = default_service_account
self.secret_env = []
self.linux_node_selector = linux_node_selector
self.default_service_tolerations = [V1Toleration(**toleration.as_primitives()) for toleration in default_service_tolerations]


# Get the deployment of this process. Use that information to fill out the secret info
deployment = self.apps_api.read_namespaced_deployment(name='updater', namespace=self.namespace)
Expand Down Expand Up @@ -465,7 +467,8 @@ def __init__(self, redis_persist=None, redis=None, logger=None, datastore=None):
extra_labels=extra_labels,
log_level=self.config.logging.log_level,
default_service_account=self.config.services.service_account,
linux_node_selector=self.config.core.scaler.linux_node_selector)
linux_node_selector=self.config.core.scaler.linux_node_selector,
default_service_tolerations=self.config.core.scaler.service_defaults.tolerations)
# Add all additional mounts to privileged services
self.mounts = self.config.core.scaler.service_defaults.mounts
else:
Expand Down