Skip to content

Commit

Permalink
testing: Check removal of orphaned nodes
Browse files Browse the repository at this point in the history
Make environmentd crash on drop replica after the catalog transaction
but before the orchestrator call. Then ensure that the orphan will get cleaned
up on environmentd restart.
  • Loading branch information
lluki committed Nov 29, 2022
1 parent bc0510e commit 61e3f42
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions misc/python/materialize/cloudtest/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@

import os
import subprocess
import time
from datetime import datetime, timedelta
from typing import List, Optional

from pg8000.exceptions import InterfaceError

from materialize import ROOT, mzbuild
from materialize.cloudtest.k8s import K8sResource
from materialize.cloudtest.k8s.debezium import DEBEZIUM_RESOURCES
Expand Down Expand Up @@ -146,3 +150,32 @@ def __init__(
def create(self) -> None:
super().create()
wait(condition="condition=Ready", resource="pod/compute-cluster-u1-replica-1-0")

def wait_for_sql(self) -> None:
"""Wait until environmentd pod is ready and can accept SQL connections"""
wait(condition="condition=Ready", resource="pod/environmentd-0")

start = datetime.now()
while datetime.now() - start < timedelta(seconds=300):
try:
self.environmentd.sql("SELECT 1")
break
except InterfaceError as e:
# Since we crash environmentd, we expect some errors that we swallow.
print(f"SQL interface not ready, {e} while SELECT 1. Waiting...")
time.sleep(2)

def set_environmentd_failpoints(self, failpoints: str) -> None:
"""Set the FAILPOINTS environmentd variable in the stateful set. This
will most likely restart environmentd"""
stateful_set = [
resource
for resource in self.resources
if type(resource) == EnvironmentdStatefulSet
]
assert len(stateful_set) == 1
stateful_set = stateful_set[0]

stateful_set.env["FAILPOINTS"] = failpoints
stateful_set.replace()
self.wait_for_sql()
4 changes: 4 additions & 0 deletions misc/python/materialize/cloudtest/k8s/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ def replace(self) -> None:
apps_v1_api.replace_namespaced_stateful_set(
name=name, body=self.stateful_set, namespace=self.namespace()
)
# Despite the name "status" this kubectl command will actually wait
# until the rollout is complete.
# See https://github.com/kubernetes/kubernetes/issues/79606#issuecomment-779779928
self.kubectl("rollout", "status", f"statefulset/{name}")


class K8sConfigMap(K8sResource):
Expand Down
6 changes: 5 additions & 1 deletion misc/python/materialize/cloudtest/k8s/environmentd.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# by the Apache License, Version 2.0.

import urllib.parse
from typing import Optional
from typing import Dict, Optional

from kubernetes.client import (
V1Container,
Expand Down Expand Up @@ -60,6 +60,7 @@ def __init__(
self.tag = tag
self.release_mode = release_mode
self.log_filter = log_filter
self.env: Dict[str, str] = {}
super().__init__()

def generate_stateful_set(self) -> V1StatefulSet:
Expand All @@ -78,6 +79,9 @@ def generate_stateful_set(self) -> V1StatefulSet:
V1EnvVar(name="MZ_ANNOUNCE_EGRESS_IP", value="1.2.3.4,88.77.66.55"),
]

for (k, v) in self.env.items():
env.append(V1EnvVar(name=k, value=v))

ports = [V1ContainerPort(container_port=5432, name="sql")]

volume_mounts = [
Expand Down
1 change: 1 addition & 0 deletions misc/python/stubs/pg8000/exceptions.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
# by the Apache License, Version 2.0.

class DatabaseError(Exception): ...
class InterfaceError(Exception): ...
4 changes: 4 additions & 0 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1831,10 +1831,14 @@ impl<S: Append + 'static> Coordinator<S> {

self.catalog_transact(Some(session), ops).await?;

fail::fail_point!("after_catalog_drop_replica");

for (compute_id, replica_id) in replicas_to_drop {
self.drop_replica(compute_id, replica_id).await.unwrap();
}

fail::fail_point!("after_sequencer_drop_replica");

Ok(ExecuteResponse::DroppedComputeReplica)
}

Expand Down
2 changes: 1 addition & 1 deletion src/environmentd/src/bin/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ max log level: {max_log_level}",
value.to_string_lossy().into_owned(),
)
})
.filter(|(name, _value)| name.starts_with("MZ_"))
.filter(|(name, _value)| name.starts_with("MZ_") || name == "FAILPOINTS")
.map(|(name, value)| format!("{}={}", escape(&name), escape(&value)))
.chain(env::args().into_iter().map(|arg| escape(&arg).into_owned()))
.join(" ")
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator-kubernetes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow = "1.0.66"
async-trait = "0.1.58"
chrono = { version = "0.4.23", default-features = false }
clap = { version = "3.2.20", features = ["derive"] }
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
maplit = "1.0.2"
mz-cloud-resources = { path = "../cloud-resources" }
Expand Down
1 change: 1 addition & 0 deletions src/orchestrator-kubernetes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ impl NamespacedOrchestrator for NamespacedKubernetesOrchestrator {

/// Drops the identified service, if it exists.
async fn drop_service(&self, id: &str) -> Result<(), anyhow::Error> {
fail::fail_point!("kubernetes_drop_service", |_| Err(anyhow!("failpoint")));
self.service_scales
.lock()
.expect("poisoned lock")
Expand Down
28 changes: 25 additions & 3 deletions test/cloudtest/test_computed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import pytest
from pg8000.exceptions import InterfaceError

from materialize.cloudtest.application import MaterializeApplication
from materialize.cloudtest.exists import exists, not_exists
from materialize.cloudtest.wait import wait
Expand Down Expand Up @@ -38,9 +41,26 @@ def test_computed_sizing(mz: MaterializeApplication) -> None:
mz.environmentd.sql("DROP CLUSTER sized1 CASCADE")


def test_computed_shutdown(mz: MaterializeApplication) -> None:
@pytest.mark.parametrize(
"failpoint",
["", "after_catalog_drop_replica=panic", "after_sequencer_drop_replica=panic"],
)
def test_computed_shutdown(mz: MaterializeApplication, failpoint: str) -> None:
"""Test that dropping a cluster or replica causes the associated computeds to shut down."""

print(f"Testing computed shutdown with failpoint={failpoint}")

mz.set_environmentd_failpoints(failpoint)

def sql_expect_crash(sql: str) -> None:
# We expect executing `sql` will crash environmentd. To ensure it is actually `sql`
# wait until the SQL interface is available.
mz.wait_for_sql()
try:
mz.environmentd.sql(sql)
except InterfaceError as e:
print(f"Expected SQL error: {e}")

mz.environmentd.sql(
"CREATE CLUSTER shutdown1 REPLICAS (shutdown_replica1 (SIZE '1'), shutdown_replica2 (SIZE '1'))"
)
Expand All @@ -66,10 +86,12 @@ def test_computed_shutdown(mz: MaterializeApplication) -> None:
compute_svcs[replica_name] = compute_svc
exists(resource=compute_svc)

mz.environmentd.sql("DROP CLUSTER REPLICA shutdown1.shutdown_replica1")
sql_expect_crash("DROP CLUSTER REPLICA shutdown1.shutdown_replica1")
wait(condition="delete", resource=compute_pods["shutdown_replica1"])
not_exists(resource=compute_svcs["shutdown_replica1"])

mz.environmentd.sql("DROP CLUSTER shutdown1 CASCADE")
sql_expect_crash("DROP CLUSTER shutdown1 CASCADE")
wait(condition="delete", resource=compute_pods["shutdown_replica2"])
not_exists(resource=compute_svcs["shutdown_replica2"])

mz.set_environmentd_failpoints("")
32 changes: 31 additions & 1 deletion test/cloudtest/test_crash.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
# by the Apache License, Version 2.0.

from textwrap import dedent
from typing import Tuple

from pg8000.exceptions import InterfaceError # type: ignore
from kubernetes.client import V1Pod, V1StatefulSet
from pg8000.exceptions import InterfaceError

from materialize.cloudtest.application import MaterializeApplication
from materialize.cloudtest.wait import wait
Expand Down Expand Up @@ -97,13 +99,41 @@ def test_crash_storaged(mz: MaterializeApplication) -> None:


def test_crash_environmentd(mz: MaterializeApplication) -> None:
def restarts(p: V1Pod) -> int:
assert p.status is not None
assert p.status.container_statuses is not None
return p.status.container_statuses[0].restart_count

def get_replica() -> Tuple[V1Pod, V1StatefulSet]:
"""Find the stateful set for the replica of the default cluster"""
compute_pod_name = f"compute-cluster-u1-replica-1-0"
ss_name = f"compute-cluster-u1-replica-1"
compute_pod = mz.environmentd.api().read_namespaced_pod(
compute_pod_name, mz.environmentd.namespace()
)
for ss in (
mz.environmentd.apps_api().list_stateful_set_for_all_namespaces().items
):
assert ss.metadata is not None
if ss.metadata.name == ss_name:
return (compute_pod, ss)
assert False

populate(mz, 2)

before = get_replica()

try:
mz.environmentd.sql("SELECT mz_internal.mz_panic('forced panic')")
except InterfaceError:
pass
validate(mz, 2)

after = get_replica()

# A environmentd crash must not restart other nodes
assert restarts(before[0]) == restarts(after[0])


def test_crash_computed(mz: MaterializeApplication) -> None:
populate(mz, 3)
Expand Down
21 changes: 19 additions & 2 deletions test/cloudtest/test_storaged.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

from textwrap import dedent

import pytest
from pg8000.exceptions import InterfaceError

from materialize.cloudtest.application import MaterializeApplication
from materialize.cloudtest.exists import exists, not_exists
from materialize.cloudtest.wait import wait
Expand Down Expand Up @@ -102,7 +105,12 @@ def test_storaged_resizing(mz: MaterializeApplication) -> None:
wait(condition="delete", resource=storaged)


def test_storaged_shutdown(mz: MaterializeApplication) -> None:
@pytest.mark.parametrize("failpoint", [False, True])
def test_storaged_shutdown(mz: MaterializeApplication, failpoint: bool) -> None:
print("Starting test_storaged_shutdown")
if failpoint:
mz.set_environmentd_failpoints("kubernetes_drop_service=return(error)")

"""Test that dropping a source causes its respective storaged to shut down."""
mz.testdrive.run(
input=dedent(
Expand Down Expand Up @@ -143,7 +151,16 @@ def test_storaged_shutdown(mz: MaterializeApplication) -> None:
wait(condition="condition=Ready", resource=storaged_pod)
exists(storaged_svc)

mz.environmentd.sql("DROP SOURCE source1")
mz.wait_for_sql()

try:
mz.environmentd.sql("DROP SOURCE source1")
except InterfaceError as e:
print(f"Expected SQL error: {e}")

if failpoint:
# Disable failpoint here, this should end the crash loop of environmentd
mz.set_environmentd_failpoints("")

wait(condition="delete", resource=storaged_pod)
not_exists(storaged_svc)
Expand Down

0 comments on commit 61e3f42

Please sign in to comment.