Skip to content
This repository has been archived by the owner on Oct 3, 2020. It is now read-only.

Commit

Permalink
Remember the streaming response object in watch-queries (#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar authored and hjacobs committed Jul 11, 2019
1 parent 497f24f commit c6610ad
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pykube/query.py
Expand Up @@ -157,6 +157,7 @@ class WatchQuery(BaseQuery):
def __init__(self, *args, **kwargs):
self.resource_version = kwargs.pop("resource_version", None)
super(WatchQuery, self).__init__(*args, **kwargs)
self._response = None

def object_stream(self):
params = {"watch": "true"}
Expand All @@ -172,6 +173,7 @@ def object_stream(self):
kwargs["version"] = self.api_obj_class.version
r = self.api.get(**kwargs)
self.api.raise_for_status(r)
self._response = r
WatchEvent = namedtuple("WatchEvent", "type object")
for line in r.iter_lines():
we = json.loads(line.decode("utf-8"))
Expand All @@ -180,6 +182,10 @@ def object_stream(self):
def __iter__(self):
return iter(self.object_stream())

@property
def response(self):
return self._response


def as_selector(value):
if isinstance(value, str):
Expand Down
40 changes: 40 additions & 0 deletions tests/test_watch.py
@@ -0,0 +1,40 @@
import json
from unittest.mock import MagicMock

import pytest

from pykube import Pod
from pykube.query import Query


@pytest.fixture
def api():
return MagicMock()


def test_watch_response_exists(api):
stream = Query(api, Pod).watch()
assert hasattr(stream, 'response')
assert stream.response is None # not yet executed


def test_watch_response_is_readonly(api):
stream = Query(api, Pod).watch()
with pytest.raises(AttributeError):
stream.response = object()


def test_watch_response_is_set_on_iter(api):
line1 = json.dumps({'type': 'ADDED', 'object': {}}).encode('utf-8')
expected_response = MagicMock()
expected_response.iter_lines.return_value = [line1]
api.get.return_value = expected_response

stream = Query(api, Pod).watch()
next(iter(stream))

assert stream.response is expected_response

assert api.get.call_count == 1
assert api.get.call_args_list[0][1]['stream'] is True
assert 'watch=true' in api.get.call_args_list[0][1]['url']

0 comments on commit c6610ad

Please sign in to comment.