From 830e7584bf49d3f5fdb43dc1ab84d3c1ae9b3622 Mon Sep 17 00:00:00 2001 From: Per Johansson Date: Thu, 14 Dec 2023 14:03:25 +0100 Subject: [PATCH] Treat read timeouts similar to the server hanging up the connection. That is, stop yielding more data and return normally. A warning is also logged however. A read timeout might happen in empty clusters if there isn't any new events appearing for the specified time. In most cases, the default apiserver timeout will make it hang up the connection before that happens, which is why this problem hasn't been noticed before. --- k8s/base.py | 45 +++++++++++++++++++++++++++++++----------- tests/k8s/test_base.py | 44 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 76 insertions(+), 13 deletions(-) diff --git a/k8s/base.py b/k8s/base.py index 3df6e5b..cd7c7f8 100644 --- a/k8s/base.py +++ b/k8s/base.py @@ -19,6 +19,9 @@ import logging from collections import namedtuple +import requests +import requests.packages.urllib3 as urllib3 + from . import config from .client import Client, NotFound from .fields import Field @@ -128,20 +131,38 @@ def watch_list(cls, namespace=None): if not url: raise NotImplementedError("Cannot watch_list, no watch_list_url defined on class {}".format(cls)) - resp = cls._client.get(url, stream=True, timeout=config.stream_timeout) - for line in resp.iter_lines(chunk_size=None): - if line: - try: - event_json = json.loads(line) + try: + # The timeout here appears to be per call to the poll (or similar) system call, so each time data is received, the timeout will reset. + resp = cls._client.get(url, stream=True, timeout=config.stream_timeout) + for line in resp.iter_lines(chunk_size=None): + if line: try: - event = WatchEvent(event_json, cls) - yield event - except TypeError: + event_json = json.loads(line) + try: + event = WatchEvent(event_json, cls) + yield event + except TypeError: + LOG.exception( + "Unable to create instance of %s from watch event json, discarding event. event_json=%r", + cls.__name__, + event_json, + ) + except ValueError: LOG.exception( - "Unable to create instance of %s from watch event json, discarding event. event_json=%r", - cls.__name__, event_json) - except ValueError: - LOG.exception("Unable to parse JSON on watch event, discarding event. Line: %r", line) + "Unable to parse JSON on watch event, discarding event. Line: %r", + line, + ) + except requests.ConnectionError as e: + # ConnectionError is fairly generic, but check for ReadTimeoutError from urllib3. + # If we get this, there were no events received for the timeout period, which might not be an error, just a quiet period. + underlying = e.args[0] + if isinstance(underlying, urllib3.exceptions.ReadTimeoutError): + LOG.warning( + "Read timeout while streaming from API server. Error: %s", + e, + ) + return + raise @classmethod def get(cls, name, namespace="default"): diff --git a/tests/k8s/test_base.py b/tests/k8s/test_base.py index ac7d8f4..404e001 100644 --- a/tests/k8s/test_base.py +++ b/tests/k8s/test_base.py @@ -17,6 +17,8 @@ import mock import pytest +import requests +import requests.packages.urllib3 as urllib3 from k8s.base import Model, Field, WatchEvent, Equality, Inequality, In, NotIn, Exists from k8s.models.common import DeleteOptions, Preconditions @@ -113,4 +115,44 @@ def test_delete_with_options(self, client): }, "propagationPolicy": "Foreground" } - client.delete.assert_called_once_with("/example", params={"labelSelector": "foo=bar"}, body=expected_body) \ No newline at end of file + client.delete.assert_called_once_with("/example", params={"labelSelector": "foo=bar"}, body=expected_body) + + +class TestWatchList(object): + @pytest.fixture + def client(self): + with mock.patch.object(Example, "_client") as m: + yield m + + def test_watch_list(self, client): + client.get.return_value.iter_lines.return_value = [ + '{"type": "ADDED", "object": {"value": 1}}', + ] + gen = Example.watch_list() + assert next(gen) == WatchEvent( + {"type": "ADDED", "object": {"value": 1}}, Example + ) + client.get.assert_called_once_with( + "/watch/example", stream=True, timeout=3600, params={} + ) + assert list(gen) == [] + + def test_watch_list_with_timeout(self, client): + client.get.return_value.iter_lines.return_value.__getitem__.side_effect = [ + '{"type": "ADDED", "object": {"value": 1}}', + requests.ConnectionError(urllib3.exceptions.ReadTimeoutError("", "", "")), + '{"type": "MODIFIED", "object": {"value": 2}}', # Not reached + ] + # Seal to avoid __iter__ being used instead of __getitem__ + mock.seal(client) + gen = Example.watch_list() + assert next(gen) == WatchEvent( + {"type": "ADDED", "object": {"value": 1}}, Example + ) + assert list(gen) == [] + assert ( + client.get.return_value.iter_lines.return_value.__getitem__.call_count == 2 + ) + client.get.assert_called_once_with( + "/watch/example", stream=True, timeout=3600, params={} + )