/
test_www_ws.py
58 lines (49 loc) · 2.49 KB
/
test_www_ws.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# This file is part of Buildbot. Buildbot is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members
from buildbot.test.util import www
from buildbot.util import json
from buildbot.www import ws
from mock import Mock
from twisted.trial import unittest
class EventResource(www.WwwTestMixin, unittest.TestCase):
def setUp(self):
self.master = master = self.make_master(url='h:/a/b/')
self.ws = ws.WsResource(master)
self.proto = self.ws._factory.buildProtocol("me")
self.gotMsg = []
self.proto.sendMessage = Mock(spec=self.proto.sendMessage)
def test_ping(self):
self.proto.onMessage(json.dumps(dict(cmd="ping", _id=1)), False)
self.proto.sendMessage.assert_called_with('{"msg": "pong", "code": 200, "_id": 1}')
def test_bad_cmd(self):
self.proto.onMessage(json.dumps(dict(cmd="poing", _id=1)), False)
self.proto.sendMessage.assert_called_with(
'{"_id": 1, "code": 404, "error": "no such command \'poing\'"}')
def test_no_cmd(self):
self.proto.onMessage(json.dumps(dict(_id=1)), False)
self.proto.sendMessage.assert_called_with(
'{"_id": null, "code": 400, "error": "no \'cmd\' in websocket frame"}')
def test_no_id(self):
self.proto.onMessage(json.dumps(dict(cmd="ping")), False)
self.proto.sendMessage.assert_called_with(
'{"_id": null, "code": 400, "error": "no \'_id\' in websocket frame"}')
def test_startConsuming(self):
self.proto.onMessage(json.dumps(dict(cmd="startConsuming", path="builds/*/*", _id=1)), False)
self.proto.sendMessage.assert_called_with(
'{"msg": "OK", "code": 200, "_id": 1}')
self.master.mq.verifyMessages = False
self.master.mq.callConsumer(("builds", "1", "new"), {"buildid": 1})
self.proto.sendMessage.assert_called_with(
'{"message": {"buildid": 1}, "key": "builds/1/new"}')