Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2283,7 +2283,7 @@ steps:
args: [--scenario=defaults, --recreate-cluster]
ci-builder: stable
agents:
queue: hetzner-aarch64-8cpu-16gb
queue: hetzner-aarch64-16cpu-32gb

- id: orchestratord-all
label: "Orchestratord test (every property)"
Expand All @@ -2295,7 +2295,7 @@ steps:
args: [--scenario=all, --recreate-cluster]
ci-builder: stable
agents:
queue: hetzner-aarch64-8cpu-16gb
queue: hetzner-aarch64-16cpu-32gb

- id: orchestratord-combine
label: "Orchestratord test (combine properties)"
Expand All @@ -2307,4 +2307,4 @@ steps:
args: [--scenario=combine, --runtime=7200, --recreate-cluster]
ci-builder: stable
agents:
queue: hetzner-aarch64-8cpu-16gb
queue: hetzner-aarch64-16cpu-32gb
3 changes: 3 additions & 0 deletions misc/helm-charts/operator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ The following table lists the configurable parameters of the Materialize operato
| Parameter | Description | Default |
|-----------|-------------|---------|
| `balancerd.affinity` | Affinity to use for balancerd pods spawned by the operator | ``nil`` |
| `balancerd.defaultResources` | Default resources if not set in the Materialize CR | ``{"limits":{"memory":"256Mi"},"requests":{"cpu":"500m","memory":"256Mi"}}`` |
| `balancerd.enabled` | Flag to indicate whether to create balancerd pods for the environments | ``true`` |
| `balancerd.nodeSelector` | Node selector to use for balancerd pods spawned by the operator | ``nil`` |
| `balancerd.tolerations` | Tolerations to use for balancerd pods spawned by the operator | ``nil`` |
Expand All @@ -117,11 +118,13 @@ The following table lists the configurable parameters of the Materialize operato
| `clusterd.swapNodeSelector` | Additional node selector to use for clusterd pods when using swap. This will be merged with the values in `nodeSelector`. | ``{"materialize.cloud/swap":"true"}`` |
| `clusterd.tolerations` | Tolerations to use for clusterd pods spawned by the operator | ``nil`` |
| `console.affinity` | Affinity to use for console pods spawned by the operator | ``nil`` |
| `console.defaultResources` | Default resources if not set in the Materialize CR | ``{"limits":{"memory":"256Mi"},"requests":{"cpu":"500m","memory":"256Mi"}}`` |
| `console.enabled` | Flag to indicate whether to create console pods for the environments | ``true`` |
| `console.imageTagMapOverride` | Override the mapping of environmentd versions to console versions | ``{}`` |
| `console.nodeSelector` | Node selector to use for console pods spawned by the operator | ``nil`` |
| `console.tolerations` | Tolerations to use for console pods spawned by the operator | ``nil`` |
| `environmentd.affinity` | Affinity to use for environmentd pods spawned by the operator | ``nil`` |
| `environmentd.defaultResources` | Default resources if not set in the Materialize CR | ``{"limits":{"memory":"4Gi"},"requests":{"cpu":"1","memory":"4095Mi"}}`` |
| `environmentd.nodeSelector` | Node selector to use for environmentd pods spawned by the operator | ``{}`` |
| `environmentd.tolerations` | Tolerations to use for environmentd pods spawned by the operator | ``nil`` |
| `networkPolicies.egress` | egress from Materialize pods to sources and sinks | ``{"cidrs":["0.0.0.0/0"],"enabled":false}`` |
Expand Down
9 changes: 9 additions & 0 deletions misc/helm-charts/operator/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ spec:
- '--environmentd-toleration={{ toJson $toleration }}'
{{- end }}
{{- end }}
{{- if .Values.environmentd.defaultResources }}
- '--environmentd-default-resources={{ toJson .Values.environmentd.defaultResources }}'
{{- end }}
{{- if .Values.clusterd.nodeSelector }}
{{- range $key, $value := .Values.clusterd.nodeSelector }}
- "--clusterd-node-selector={{ $key }}={{ $value }}"
Expand All @@ -168,6 +171,9 @@ spec:
- '--balancerd-toleration={{ toJson $toleration }}'
{{- end }}
{{- end }}
{{- if .Values.balancerd.defaultResources }}
- '--balancerd-default-resources={{ toJson .Values.balancerd.defaultResources }}'
{{- end }}
{{- if .Values.console.nodeSelector }}
{{- range $key, $value := .Values.console.nodeSelector }}
- "--console-node-selector={{ $key }}={{ $value }}"
Expand All @@ -181,6 +187,9 @@ spec:
- '--console-toleration={{ toJson $toleration }}'
{{- end }}
{{- end }}
{{- if .Values.console.defaultResources }}
- '--console-default-resources={{ toJson .Values.console.defaultResources }}'
{{- end }}
{{- if .Values.storage.storageClass.name }}
- "--ephemeral-volume-class={{ .Values.storage.storageClass.name }}"
{{- end }}
Expand Down
23 changes: 23 additions & 0 deletions misc/helm-charts/operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ environmentd:
affinity:
# -- Tolerations to use for environmentd pods spawned by the operator
tolerations:
# -- Default resources if not set in the Materialize CR
defaultResources:
limits:
memory: "4Gi"
requests:
cpu: "1"
# Setting the memory request to slightly less than limit
# allows the pod to use swap if enabled at the kubelet.
memory: "4095Mi"

clusterd:
# -- Node selector to use for all clusterd pods spawned by the operator
Expand All @@ -239,6 +248,13 @@ balancerd:
affinity:
# -- Tolerations to use for balancerd pods spawned by the operator
tolerations:
# -- Default resources if not set in the Materialize CR
defaultResources:
limits:
memory: "256Mi"
requests:
cpu: "500m"
memory: "256Mi"

console:
# -- Flag to indicate whether to create console pods for the environments
Expand All @@ -251,6 +267,13 @@ console:
affinity:
# -- Tolerations to use for console pods spawned by the operator
tolerations:
# -- Default resources if not set in the Materialize CR
defaultResources:
limits:
memory: "256Mi"
requests:
cpu: "500m"
memory: "256Mi"

# RBAC (Role-Based Access Control) settings
rbac:
Expand Down
12 changes: 11 additions & 1 deletion src/orchestratord/src/controller/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{

use http::HeaderValue;
use k8s_openapi::{
api::core::v1::{Affinity, Toleration},
api::core::v1::{Affinity, ResourceRequirements, Toleration},
apimachinery::pkg::apis::meta::v1::{Condition, Time},
};
use kube::{Api, Client, Resource, ResourceExt, api::PostParams, runtime::controller::Action};
Expand Down Expand Up @@ -91,6 +91,8 @@ pub struct MaterializeControllerArgs {
environmentd_affinity: Option<Affinity>,
#[clap(long = "environmentd-toleration", value_parser = parse_tolerations)]
environmentd_tolerations: Option<Vec<Toleration>>,
#[clap(long, value_parser = parse_resources)]
environmentd_default_resources: Option<ResourceRequirements>,
#[clap(long)]
clusterd_node_selector: Vec<KeyValueArg<String, String>>,
#[clap(long, value_parser = parse_affinity)]
Expand All @@ -103,12 +105,16 @@ pub struct MaterializeControllerArgs {
balancerd_affinity: Option<Affinity>,
#[clap(long = "balancerd-toleration", value_parser = parse_tolerations)]
balancerd_tolerations: Option<Vec<Toleration>>,
#[clap(long, value_parser = parse_resources)]
balancerd_default_resources: Option<ResourceRequirements>,
#[clap(long)]
console_node_selector: Vec<KeyValueArg<String, String>>,
#[clap(long, value_parser = parse_affinity)]
console_affinity: Option<Affinity>,
#[clap(long = "console-toleration", value_parser = parse_tolerations)]
console_tolerations: Option<Vec<Toleration>>,
#[clap(long, value_parser = parse_resources)]
console_default_resources: Option<ResourceRequirements>,
#[clap(long, default_value = "always", value_enum)]
image_pull_policy: KubernetesImagePullPolicy,
#[clap(flatten)]
Expand Down Expand Up @@ -181,6 +187,10 @@ fn parse_tolerations(s: &str) -> anyhow::Result<Toleration> {
Ok(serde_json::from_str(s)?)
}

fn parse_resources(s: &str) -> anyhow::Result<ResourceRequirements> {
Ok(serde_json::from_str(s)?)
}

#[derive(Clone, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct DefaultCertificateSpecs {
Expand Down
6 changes: 5 additions & 1 deletion src/orchestratord/src/controller/materialize/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,11 @@ fn create_balancerd_deployment_object(
startup_probe: Some(startup_probe),
readiness_probe: Some(readiness_probe),
liveness_probe: Some(liveness_probe),
resources: mz.spec.balancerd_resource_requirements.clone(),
resources: mz
.spec
.balancerd_resource_requirements
.clone()
.or_else(|| config.balancerd_default_resources.clone()),
security_context: security_context.clone(),
volume_mounts: Some(volume_mounts),
..Default::default()
Expand Down
6 changes: 5 additions & 1 deletion src/orchestratord/src/controller/materialize/console.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,11 @@ ssl_certificate_key /nginx/tls/tls.key;",
period_seconds: Some(30),
..probe.clone()
}),
resources: mz.spec.console_resource_requirements.clone(),
resources: mz
.spec
.console_resource_requirements
.clone()
.or_else(|| config.console_default_resources.clone()),
security_context,
volume_mounts: Some(volume_mounts),
..Default::default()
Expand Down
6 changes: 5 additions & 1 deletion src/orchestratord/src/controller/materialize/environmentd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,11 @@ fn create_environmentd_statefulset_object(
volume_mounts: Some(volume_mounts),
liveness_probe: Some(probe.clone()),
readiness_probe: Some(probe),
resources: mz.spec.environmentd_resource_requirements.clone(),
resources: mz
.spec
.environmentd_resource_requirements
.clone()
.or_else(|| config.environmentd_default_resources.clone()),
security_context: security_context.clone(),
..Default::default()
};
Expand Down
156 changes: 156 additions & 0 deletions test/orchestratord/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ def get_environmentd_data() -> dict[str, Any]:
)


def get_console_data() -> dict[str, Any]:
return get_pod_data(
labels={"materialize.cloud/app": "console"},
)


def retry(fn: Callable, timeout: int) -> None:
end_time = (
datetime.datetime.now() + datetime.timedelta(seconds=timeout)
Expand Down Expand Up @@ -681,6 +687,156 @@ def check_pods() -> None:
retry(check_pods, 5)


class EnvironmentdResources(Modification):
@classmethod
def values(cls) -> list[Any]:
return [
None,
{
"limits": {
"cpu": "1",
"memory": "1Gi",
},
"requests": {
"cpu": "1",
"memory": "1Gi",
},
},
]

@classmethod
def default(cls) -> Any:
return None

def modify(self, definition: dict[str, Any]) -> None:
definition["materialize"]["spec"][
"environmentdResourceRequirements"
] = self.value

def validate(self, mods: dict[type[Modification], Any]) -> None:
expected = self.value
if self.value is None:
expected = {
"limits": {
"memory": "4Gi",
},
"requests": {
"cpu": "1",
"memory": "4095Mi",
},
}

def check_pods() -> None:
environmentd = get_environmentd_data()["items"][0]

resources = environmentd["spec"]["containers"][0]["resources"]
assert (
resources == expected
), f"Expected environmentd resources {expected}, but got {resources}"

retry(check_pods, 120)


class BalancerdResources(Modification):
@classmethod
def values(cls) -> list[Any]:
return [
None,
{
"limits": {
"cpu": "1",
"memory": "512Mi",
},
"requests": {
"cpu": "1",
"memory": "512Mi",
},
},
]

@classmethod
def default(cls) -> Any:
return None

def modify(self, definition: dict[str, Any]) -> None:
definition["materialize"]["spec"]["balancerdResourceRequirements"] = self.value

def validate(self, mods: dict[type[Modification], Any]) -> None:
if mods[BalancerdEnabled] == False:
return
expected = self.value
if self.value is None:
expected = {
"limits": {
"memory": "256Mi",
},
"requests": {
"cpu": "500m",
"memory": "256Mi",
},
}

def check_pods() -> None:
balancerd = get_balancerd_data()["items"][0]

resources = balancerd["spec"]["containers"][0]["resources"]
assert (
resources == expected
), f"Expected balancerd resources {expected}, but got {resources}"

retry(check_pods, 120)


class ConsoleResources(Modification):
@classmethod
def values(cls) -> list[Any]:
return [
None,
{
"limits": {
"cpu": "100m",
"memory": "128Mi",
},
"requests": {
"cpu": "100m",
"memory": "128Mi",
},
},
]

@classmethod
def default(cls) -> Any:
return None

def modify(self, definition: dict[str, Any]) -> None:
definition["materialize"]["spec"]["consoleResourceRequirements"] = self.value

def validate(self, mods: dict[type[Modification], Any]) -> None:
if mods[ConsoleEnabled] == False:
return
expected = self.value
if self.value is None:
expected = {
"limits": {
"memory": "256Mi",
},
"requests": {
"cpu": "500m",
"memory": "256Mi",
},
}

def check_pods() -> None:
console = get_console_data()["items"][0]

resources = console["spec"]["containers"][0]["resources"]
assert (
resources == expected
), f"Expected console resources {expected}, but got {resources}"

retry(check_pods, 240)


def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
parser.add_argument(
"--recreate-cluster",
Expand Down