Skip to content

Commit

Permalink
Add circuit_breaking test (#70)
Browse files Browse the repository at this point in the history
This is a port of the circuit_breaking test defined
[here](https://github.com/grpc/grpc/blob/master/tools/run_tests/run_xds_tests.py#L2176).
I set all clients to use the Java server because as far as I can tell,
none of the others support the `keep-open` option of the `rpc-behavior`
header for both `UnaryCall` and `EmptyCall`.
  • Loading branch information
murgatroid99 committed May 20, 2024
1 parent 70348ea commit 32d1e3d
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 5 deletions.
1 change: 1 addition & 0 deletions .kokoro/psm_interop_kokoro_lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ psm::lb::get_tests() {
"outlier_detection_test"
"remove_neg_test"
"round_robin_test"
"circuit_breaking_test"
)
# master-only tests
if [[ "${TESTING_VERSION}" =~ "master" ]]; then
Expand Down
8 changes: 7 additions & 1 deletion framework/infrastructure/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ def backend_service_patch_backends(
backend_service,
backends,
max_rate_per_endpoint: Optional[int] = None,
*,
circuit_breakers: Optional[dict[str, int]] = None,
):
if max_rate_per_endpoint is None:
max_rate_per_endpoint = 5
Expand All @@ -201,9 +203,13 @@ def backend_service_patch_backends(
for backend in backends
]

request = {"backends": backend_list}
if circuit_breakers:
request["circuitBreakers"] = circuit_breakers

self._patch_resource(
collection=self.api.backendServices(),
body={"backends": backend_list},
body=request,
backendService=backend_service.name,
)

Expand Down
18 changes: 14 additions & 4 deletions framework/infrastructure/traffic_director.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,21 @@ def backend_service_remove_neg_backends(self, name, zones):
self.backend_service_patch_backends()

def backend_service_patch_backends(
self, max_rate_per_endpoint: Optional[int] = None
self,
max_rate_per_endpoint: Optional[int] = None,
*,
circuit_breakers: Optional[dict[str, int]] = None,
):
logging.info(
"Adding backends to Backend Service %s: %r",
self.backend_service.name,
self.backends,
)
self.compute.backend_service_patch_backends(
self.backend_service, self.backends, max_rate_per_endpoint
self.backend_service,
self.backends,
max_rate_per_endpoint,
circuit_breakers=circuit_breakers,
)

def backend_service_remove_all_backends(self):
Expand Down Expand Up @@ -348,14 +354,18 @@ def alternative_backend_service_add_neg_backends(self, name, zones):
self.alternative_backends.add(backend)
self.alternative_backend_service_patch_backends()

def alternative_backend_service_patch_backends(self):
def alternative_backend_service_patch_backends(
self, *, circuit_breakers: Optional[dict[str, int]] = None
):
logging.info(
"Adding backends to Backend Service %s: %r",
self.alternative_backend_service.name,
self.alternative_backends,
)
self.compute.backend_service_patch_backends(
self.alternative_backend_service, self.alternative_backends
self.alternative_backend_service,
self.alternative_backends,
circuit_breakers=circuit_breakers,
)

def alternative_backend_service_remove_all_backends(self):
Expand Down
73 changes: 73 additions & 0 deletions framework/xds_k8s_testcase.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,79 @@ def assertAllBackendsReceivedRpcs(self, lb_stats):
msg=f"Backend {backend} did not receive a single RPC",
)

def assertClientEventuallyReachesSteadyState(
self,
test_client: XdsTestClient,
*,
rpc_type: str,
num_rpcs: int,
threshold_percent: int = 1,
retry_timeout: dt.timedelta = dt.timedelta(minutes=12),
retry_wait: dt.timedelta = dt.timedelta(seconds=10),
steady_state_delay: dt.timedelta = dt.timedelta(seconds=5),
):
retryer = retryers.constant_retryer(
wait_fixed=retry_wait,
timeout=retry_timeout,
error_note=(
f"Timeout waiting for test client {test_client.hostname} to"
f"report {num_rpcs} pending calls ±{threshold_percent}%"
),
)
for attempt in retryer:
with attempt:
self._checkRpcsInFlight(
test_client, rpc_type, num_rpcs, threshold_percent
)
logging.info(
"Will check again in %d seconds to verify that RPC count is steady",
steady_state_delay.total_seconds(),
)
time.sleep(steady_state_delay.total_seconds())
self._checkRpcsInFlight(
test_client, rpc_type, num_rpcs, threshold_percent
)

def _checkRpcsInFlight(
self,
test_client: XdsTestClient,
rpc_type: str,
num_rpcs: int,
threshold_percent: int,
):
if not 0 <= threshold_percent <= 100:
raise ValueError(
"Value error: Threshold should be between 0 to 100"
)
threshold_fraction = threshold_percent / 100.0
stats = test_client.get_load_balancer_accumulated_stats()
logging.info(
"[%s] << Received LoadBalancerAccumulatedStatsResponse:\n%s",
test_client.hostname,
self._pretty_accumulated_stats(stats),
)
rpcs_started = stats.num_rpcs_started_by_method[rpc_type]
rpcs_succeeded = stats.num_rpcs_succeeded_by_method[rpc_type]
rpcs_failed = stats.num_rpcs_failed_by_method[rpc_type]
rpcs_in_flight = rpcs_started - rpcs_succeeded - rpcs_failed
logging.info(
"[%s] << %s RPCs in flight: %d, expected %d ±%d%%",
test_client.hostname,
rpc_type,
rpcs_in_flight,
num_rpcs,
threshold_percent,
)
self.assertBetween(
rpcs_in_flight,
minv=int(num_rpcs * (1 - threshold_fraction)),
maxv=int(num_rpcs * (1 + threshold_fraction)),
msg=(
f"Found wrong number of RPCs in flight: actual({rpcs_in_flight}"
f"), expected({num_rpcs} ± {threshold_percent}%)"
),
)


class IsolatedXdsKubernetesTestCase(
XdsKubernetesBaseTestCase, metaclass=abc.ABCMeta
Expand Down
222 changes: 222 additions & 0 deletions tests/circuit_breaking_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

from absl import flags
from absl.testing import absltest

from framework import xds_k8s_flags
from framework import xds_k8s_testcase
from framework.helpers import skips
from framework.infrastructure import k8s
from framework.rpc import grpc_testing
from framework.test_app.runners.k8s import k8s_xds_server_runner

logger = logging.getLogger(__name__)
flags.adopt_module_key_flags(xds_k8s_testcase)

# Type aliases
_XdsTestServer = xds_k8s_testcase.XdsTestServer
_XdsTestClient = xds_k8s_testcase.XdsTestClient
_KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
_Lang = skips.Lang

_QPS = 100
_INITIAL_UNARY_MAX_REQUESTS = 500
_INITIAL_EMPTY_MAX_REQUESTS = 1000
_UPDATED_UNARY_MAX_REQUESTS = 800


class CircuitBreakingTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
@classmethod
def setUpClass(cls):
"""Force the java test server for languages not yet supporting
the `keep-open` option of the `rpc-behavior` feature.
https://github.com/grpc/grpc/blob/master/doc/xds-test-descriptions.md#server
"""
super().setUpClass()
if cls.lang_spec.client_lang is not _Lang.JAVA:
# gRPC C++, go, python and node fallback to the gRPC Java.
# TODO(https://github.com/grpc/grpc-go/issues/6288): use go server.
# TODO(https://github.com/grpc/grpc/issues/33134): use python server.
cls.server_image = xds_k8s_flags.SERVER_IMAGE_CANONICAL.value

def setUp(self):
super().setUp()
self.alternate_k8s_namespace = k8s.KubernetesNamespace(
self.k8s_api_manager, self.server_namespace
)
self.alternate_server_runner = _KubernetesServerRunner(
self.alternate_k8s_namespace,
deployment_name=self.server_name + "-alt",
image_name=self.server_image,
gcp_service_account=self.gcp_service_account,
td_bootstrap_image=self.td_bootstrap_image,
gcp_project=self.project,
gcp_api_manager=self.gcp_api_manager,
xds_server_uri=self.xds_server_uri,
network=self.network,
debug_use_port_forwarding=self.debug_use_port_forwarding,
reuse_namespace=True,
)

def cleanup(self):
super().cleanup()
if hasattr(self, "alternate_server_runner"):
self.alternate_server_runner.cleanup(
force=self.force_cleanup, force_namespace=self.force_cleanup
)

def test_circuit_breaking(self) -> None:
with self.subTest("00_create_health_check"):
self.td.create_health_check()

with self.subTest("01_create_backend_services"):
self.td.create_backend_service()
self.td.create_alternative_backend_service()

with self.subTest("02_create_url_map"):
src_address = f"{self.server_xds_host}:{self.server_xds_port}"
matcher_name = self.td.make_resource_name(
self.td.URL_MAP_PATH_MATCHER_NAME
)
route_rules = [
{
"priority": 0,
# UnaryCall -> backend_service
"matchRules": [
{"fullPathMatch": "/grpc.testing.TestService/UnaryCall"}
],
"service": self.td.backend_service.url,
},
{
"priority": 1,
# EmptyCall -> alternative_backend_service
"matchRules": [
{"fullPathMatch": "/grpc.testing.TestService/EmptyCall"}
],
"service": self.td.alternative_backend_service.url,
},
]

self.td.create_url_map_with_content(
{
"name": self.td.make_resource_name(self.td.URL_MAP_NAME),
"defaultService": self.td.backend_service.url,
"hostRules": [
{"hosts": [src_address], "pathMatcher": matcher_name}
],
"pathMatchers": [
{
"name": matcher_name,
"defaultService": self.td.backend_service.url,
"routeRules": route_rules,
}
],
}
)

with self.subTest("03_create_target_proxy"):
self.td.create_target_proxy()

with self.subTest("04_create_forwarding_rule"):
self.td.create_forwarding_rule(self.server_xds_port)

with self.subTest("05_start_test_servers"):
default_test_server: _XdsTestServer = self.startTestServers()[0]
alternate_test_server: _XdsTestServer = self.startTestServers(
server_runner=self.alternate_server_runner
)[0]

with self.subTest("06_add_server_backends_to_backend_services"):
self.setupServerBackends()
# Add backend to alternative backend service
(
neg_name_alt,
neg_zones_alt,
) = self.alternate_k8s_namespace.parse_service_neg_status(
self.alternate_server_runner.service_name, self.server_port
)
self.td.alternative_backend_service_add_neg_backends(
neg_name_alt, neg_zones_alt
)

with self.subTest("07_patch_backends_with_circuit_breakers"):
self.td.backend_service_patch_backends(
circuit_breakers={"maxRequests": _INITIAL_UNARY_MAX_REQUESTS}
)
self.td.alternative_backend_service_patch_backends(
circuit_breakers={"maxRequests": _INITIAL_EMPTY_MAX_REQUESTS}
)

test_client: _XdsTestClient
with self.subTest("08_start_test_client"):
test_client = self.startTestClient(
default_test_server, rpc="UnaryCall,EmptyCall", qps=_QPS
)

with self.subTest("09_test_client_xds_config_exists"):
self.assertXdsConfigExists(test_client)

with self.subTest("10_test_server_received_rpcs_from_test_client"):
self.assertRpcsEventuallyGoToGivenServers(
test_client, (default_test_server, alternate_test_server)
)

with self.subTest("11_configure_client_with_keep_open"):
test_client.update_config.configure(
rpc_types=grpc_testing.RPC_TYPES_BOTH_CALLS,
metadata={
(
grpc_testing.RPC_TYPE_UNARY_CALL,
"rpc-behavior",
"keep-open",
),
(
grpc_testing.RPC_TYPE_EMPTY_CALL,
"rpc-behavior",
"keep-open",
),
},
timeout_sec=20,
)

with self.subTest("12_client_reaches_target_steady_state"):
self.assertClientEventuallyReachesSteadyState(
test_client,
rpc_type=grpc_testing.RPC_TYPE_UNARY_CALL,
num_rpcs=_INITIAL_UNARY_MAX_REQUESTS,
)
self.assertClientEventuallyReachesSteadyState(
test_client,
rpc_type=grpc_testing.RPC_TYPE_EMPTY_CALL,
num_rpcs=_INITIAL_EMPTY_MAX_REQUESTS,
)

with self.subTest("13_increase_backend_max_requests"):
self.td.backend_service_patch_backends(
circuit_breakers={"maxRequests": _UPDATED_UNARY_MAX_REQUESTS}
)

with self.subTest("14_client_reaches_increased_steady_state"):
self.assertClientEventuallyReachesSteadyState(
test_client,
rpc_type=grpc_testing.RPC_TYPE_UNARY_CALL,
num_rpcs=_UPDATED_UNARY_MAX_REQUESTS,
)


if __name__ == "__main__":
absltest.main()

0 comments on commit 32d1e3d

Please sign in to comment.