Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Apply configured tolerations to service pods
  • Loading branch information
cccs-rs committed May 29, 2024
2 parents e6fcd21 + 1ca5732 commit 1460dd5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
6 changes: 4 additions & 2 deletions assemblyline_core/scaler/controllers/kubernetes_ctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
V1PersistentVolumeClaimSpec, V1NetworkPolicy, V1NetworkPolicySpec, V1NetworkPolicyEgressRule, V1NetworkPolicyPeer, \
V1NetworkPolicyIngressRule, V1Secret, V1SecretVolumeSource, V1LocalObjectReference, V1Service, \
V1ServiceSpec, V1ServicePort, V1PodSecurityContext, V1Probe, V1ExecAction, V1SecurityContext, \
V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement
V1Affinity, V1NodeAffinity, V1NodeSelector, V1NodeSelectorTerm, V1NodeSelectorRequirement, V1Toleration
from kubernetes.client.rest import ApiException
from assemblyline.odm.models.service import DependencyConfig, DockerConfig, PersistentVolume

Expand Down Expand Up @@ -241,7 +241,7 @@ def parse_cpu(string: str) -> float:
class KubernetesController(ControllerInterface):
def __init__(self, logger, namespace: str, prefix: str, priority: str, dependency_priority: str,
cpu_reservation: float, linux_node_selector: Selector, labels=None, log_level="INFO", core_env={},
default_service_account=None, cluster_pod_list=True):
default_service_account=None, cluster_pod_list=True, default_service_tolerations = []):
# Try loading a kubernetes connection from either the fact that we are running
# inside of a cluster, or have a config file that tells us how
try:
Expand Down Expand Up @@ -285,6 +285,7 @@ def __init__(self, logger, namespace: str, prefix: str, priority: str, dependenc
self._service_limited_env: dict[str, dict[str, str]] = defaultdict(dict)
self.default_service_account: Optional[str] = default_service_account
self.cluster_pod_list = cluster_pod_list
self.default_service_tolerations = [V1Toleration(**toleration.as_primitives()) for toleration in default_service_tolerations]

# A record of previously reported events so that we don't report the same message repeatedly, fill it with
# existing messages so we don't have a huge dump of duplicates on restart
Expand Down Expand Up @@ -849,6 +850,7 @@ def _create_deployment(self, service_name: str, deployment_name: str, docker_con
security_context=V1PodSecurityContext(fs_group=1000),
service_account_name=service_account,
affinity=selector_to_node_affinity(self.linux_node_selector),
tolerations=self.default_service_tolerations
)

if use_pull_secret:
Expand Down
16 changes: 10 additions & 6 deletions assemblyline_core/scaler/scaler_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,13 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
'privilege': 'service'
}

service_defaults_config = self.config.core.scaler.service_defaults

# If Scaler has envs that set service-server env, then that should override configured values
if SERVICE_API_HOST:
self.config.core.scaler.service_defaults.environment = \
service_defaults_config.environment = \
[EnvironmentVariable(dict(name="SERVICE_API_HOST", value=SERVICE_API_HOST))] + \
[env for env in self.config.core.scaler.service_defaults.environment if env.name != "SERVICE_API_HOST"]
[env for env in service_defaults_config.environment if env.name != "SERVICE_API_HOST"]

if self.config.core.scaler.additional_labels:
labels.update({k: v for k, v in (_l.split("=") for _l in self.config.core.scaler.additional_labels)})
Expand All @@ -304,7 +306,9 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
log_level=self.config.logging.log_level,
core_env=core_env,
cluster_pod_list=self.config.core.scaler.cluster_pod_list,
default_service_account=self.config.services.service_account)
default_service_account=self.config.services.service_account,
default_service_tolerations=service_defaults_config.tolerations
)

# Add global configuration for privileged services
self.controller.add_config_mount(KUBERNETES_AL_CONFIG, config_map=KUBERNETES_AL_CONFIG, key="config",
Expand All @@ -313,7 +317,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
# If we're passed an override for server-server and it's defining an HTTPS connection, then add a global
# mount for the Root CA that needs to be mounted
if INTERNAL_ENCRYPT:
self.config.core.scaler.service_defaults.mounts.append(Mount(dict(
service_defaults_config.mounts.append(Mount(dict(
name="root-ca",
path="/etc/assemblyline/ssl/al_root-ca.crt",
resource_type="secret",
Expand All @@ -322,7 +326,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
)))

# Add default mounts for (non-)privileged services
for mount in self.config.core.scaler.service_defaults.mounts:
for mount in service_defaults_config.mounts:
# Deprecated configuration for mounting ConfigMap
# TODO: Deprecate code on next major change
if mount.config_map:
Expand Down Expand Up @@ -365,7 +369,7 @@ def __init__(self, config=None, datastore=None, redis=None, redis_persist=None):
if CLASSIFICATION_HOST_PATH:
self.controller.global_mounts.append((CLASSIFICATION_HOST_PATH, '/etc/assemblyline/classification.yml'))

for mount in self.config.core.scaler.service_defaults.mounts:
for mount in service_defaults_config.mounts:
# Mounts are all storage-based since there's no equivalent to ConfigMaps in Docker
if mount.privileged_only:
self.controller.core_mounts.append((mount.name, mount.path))
Expand Down
9 changes: 6 additions & 3 deletions assemblyline_core/updater/run_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from kubernetes.client import V1Job, V1ObjectMeta, V1JobSpec, V1PodTemplateSpec, V1PodSpec, V1Volume, \
V1VolumeMount, V1EnvVar, V1Container, V1ResourceRequirements, \
V1ConfigMapVolumeSource, V1Secret, V1SecretVolumeSource, V1LocalObjectReference
V1ConfigMapVolumeSource, V1Secret, V1SecretVolumeSource, V1LocalObjectReference, V1Toleration
from kubernetes import client, config
from kubernetes.client.rest import ApiException

Expand Down Expand Up @@ -148,7 +148,7 @@ def restart(self, service_name):

class KubernetesUpdateInterface:
def __init__(self, logger, prefix, namespace, priority_class, extra_labels, linux_node_selector: Selector,
log_level="INFO", default_service_account=None):
log_level="INFO", default_service_account=None, default_service_tolerations=[]):
# Try loading a kubernetes connection from either the fact that we are running
# inside of a cluster, or we have a configuration in the normal location
try:
Expand Down Expand Up @@ -181,6 +181,8 @@ def __init__(self, logger, prefix, namespace, priority_class, extra_labels, linu
self.default_service_account = default_service_account
self.secret_env = []
self.linux_node_selector = linux_node_selector
self.default_service_tolerations = [V1Toleration(**toleration.as_primitives()) for toleration in default_service_tolerations]


# Get the deployment of this process. Use that information to fill out the secret info
deployment = self.apps_api.read_namespaced_deployment(name='updater', namespace=self.namespace)
Expand Down Expand Up @@ -465,7 +467,8 @@ def __init__(self, redis_persist=None, redis=None, logger=None, datastore=None):
extra_labels=extra_labels,
log_level=self.config.logging.log_level,
default_service_account=self.config.services.service_account,
linux_node_selector=self.config.core.scaler.linux_node_selector)
linux_node_selector=self.config.core.scaler.linux_node_selector,
default_service_tolerations=self.config.core.scaler.service_defaults.tolerations)
# Add all additional mounts to privileged services
self.mounts = self.config.core.scaler.service_defaults.mounts
else:
Expand Down

0 comments on commit 1460dd5

Please sign in to comment.