Skip to content

Commit

Permalink
Merge pull request #4686 from StackStorm/stream_workaround
Browse files Browse the repository at this point in the history
Replace sseclient with sseclient-py (fix hanging of "st2 execution tail" command)
  • Loading branch information
Kami committed May 22, 2019
2 parents 1e72ed0 + 1408654 commit e190294
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 15 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -35,6 +35,9 @@ Fixed
``service`` or ``action`` parameter. (bug fix) #4675

Reported by James Robinson (Netskope and Veracode).
* Replace ``sseclient`` library on which st2client and CLI depends on with ``sseclient-py``.
``sseclient`` has various issue which cause client to sometimes hang and keep the connection open
which also causes ``st2 execution tail`` command to hang for a long time. (improvement) #4686

3.0.0 - April 18, 2019
----------------------
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -281,6 +281,8 @@ flake8: requirements .flake8
chmod +x $(VIRTUALENV_ST2CLIENT_DIR)/bin/activate

$(VIRTUALENV_ST2CLIENT_DIR)/bin/pip install --upgrade "pip>=9.0,<9.1"
# NOTE We need to upgrade setuptools to avoid bug with dependency resolving in old versions
$(VIRTUALENV_ST2CLIENT_DIR)/bin/pip install --upgrade "setuptools==41.0.1"
$(VIRTUALENV_ST2CLIENT_DIR)/bin/activate; cd st2client ; ../$(VIRTUALENV_ST2CLIENT_DIR)/bin/python setup.py install ; cd ..
$(VIRTUALENV_ST2CLIENT_DIR)/bin/st2 --version
$(VIRTUALENV_ST2CLIENT_DIR)/bin/python -c "import st2client"
Expand Down
3 changes: 2 additions & 1 deletion fixed-requirements.txt
Expand Up @@ -35,7 +35,8 @@ cryptography==2.6.1
retrying==1.3.3
# Note: We use latest version of virtualenv which uses pip 9.0
virtualenv==15.1.0
sseclient==0.0.19
# NOTE: sseclient has various issues which sometimes hang the connection for a long time, etc.
sseclient-py==1.7
python-editor==1.0.4
prompt-toolkit==1.0.15
tooz==1.64.2
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -50,7 +50,7 @@ retrying==1.3.3
routes==2.4.1
semver==2.8.1
six==1.12.0
sseclient==0.0.19
sseclient-py==1.7
stevedore==1.30.1
tooz==1.64.2
ujson==1.35
Expand Down
2 changes: 1 addition & 1 deletion st2client/in-requirements.txt
Expand Up @@ -8,7 +8,7 @@ jsonschema
jsonpath-rw
requests
six
sseclient
sseclient-py
python-editor
prompt-toolkit
cryptography
2 changes: 1 addition & 1 deletion st2client/requirements.txt
Expand Up @@ -11,4 +11,4 @@ pytz==2019.1
pyyaml==5.1
requests[security]<2.15,>=2.14.1
six==1.12.0
sseclient==0.0.19
sseclient-py==1.7
6 changes: 5 additions & 1 deletion st2client/st2client/models/core.py
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from __future__ import absolute_import

import os
import json
import logging
Expand All @@ -21,6 +22,7 @@
import six
from six.moves import urllib
from six.moves import http_client
import requests

from st2client.utils import httpclient

Expand Down Expand Up @@ -631,8 +633,10 @@ def listen(self, events=None, **kwargs):
query_string = '?' + urllib.parse.urlencode(query_params)
url = url + query_string

for message in SSEClient(url, **request_params):
response = requests.get(url, stream=True, **request_params)
client = SSEClient(response)

for message in client.events():
# If the execution on the API server takes too long, the message
# can be empty. In this case, rerun the query.
if not message.data:
Expand Down
29 changes: 19 additions & 10 deletions st2client/tests/unit/test_models.py
Expand Up @@ -256,32 +256,41 @@ def test_resource_delete_failed(self):
instance = mgr.get_by_name('abc')
self.assertRaises(Exception, mgr.delete, instance)

@mock.patch('requests.get')
@mock.patch('sseclient.SSEClient')
def test_stream_resource_listen(self, mock):
def test_stream_resource_listen(self, mock_sseclient, mock_requests):
mock_msg = mock.Mock()
mock_msg.data = json.dumps(base.RESOURCES)

# checking the case to specify valid 'cacert' parameter to the StreamManager
def side_effect_checking_verify_parameter_is(endpoint_url, **kwargs):
self.assertEqual(endpoint_url, 'https://example.com/stream?events=foo%2Cbar')
self.assertEqual(kwargs['verify'], '/path/ca.crt')
def side_effect_checking_verify_parameter_is():
return [mock_msg]

mock.side_effect = side_effect_checking_verify_parameter_is
mock_sseclient.return_value.events.side_effect = side_effect_checking_verify_parameter_is
mgr = models.StreamManager('https://example.com', cacert='/path/ca.crt')

resp = mgr.listen(events=['foo', 'bar'])
self.assertEqual(list(resp), [base.RESOURCES])

call_args = tuple(['https://example.com/stream?events=foo%2Cbar'])
call_kwargs = {'stream': True, 'verify': '/path/ca.crt'}

self.assertEqual(mock_requests.call_args_list[0][0], call_args)
self.assertEqual(mock_requests.call_args_list[0][1], call_kwargs)

# checking the case not to specify valid 'cacert' parameter to the StreamManager
def side_effect_checking_verify_parameter_is_not(endpoint_url, **kwargs):
# checking endpoint_url in case of no event specification
self.assertEqual(endpoint_url, 'https://example.com/stream?')
self.assertFalse('verify' in kwargs)
def side_effect_checking_verify_parameter_is_not():
return [mock_msg]

mock.side_effect = side_effect_checking_verify_parameter_is_not
mock_sseclient.return_value.events.side_effect = \
side_effect_checking_verify_parameter_is_not
mgr = models.StreamManager('https://example.com')

resp = mgr.listen()
self.assertEqual(list(resp), [base.RESOURCES])

call_args = tuple(['https://example.com/stream?'])
call_kwargs = {'stream': True}

self.assertEqual(mock_requests.call_args_list[1][0], call_args)
self.assertEqual(mock_requests.call_args_list[1][1], call_kwargs)

0 comments on commit e190294

Please sign in to comment.