Skip to content

Commit

Permalink
implement mqService via websocket in frontend
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Tardy <pierre.tardy@intel.com>
  • Loading branch information
Pierre Tardy committed Mar 20, 2015
1 parent 7c7ffbf commit bb90e0f
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 79 deletions.
4 changes: 2 additions & 2 deletions master/buildbot/test/unit/test_www_ws.py
Expand Up @@ -49,10 +49,10 @@ def test_no_id(self):
'{"_id": null, "code": 400, "error": "no \'_id\' in websocket frame"}')

def test_startConsuming(self):
self.proto.onMessage(json.dumps(dict(cmd="startConsuming", path="builds.*.*", _id=1)), False)
self.proto.onMessage(json.dumps(dict(cmd="startConsuming", path="builds/*/*", _id=1)), False)
self.proto.sendMessage.assert_called_with(
'{"msg": "OK", "code": 200, "_id": 1}')
self.master.mq.verifyMessages = False
self.master.mq.callConsumer(("builds", "1", "new"), {"buildid": 1})
self.proto.sendMessage.assert_called_with(
'{"path": "builds.*.*", "message": {"buildid": 1}, "_id": 1, "key": ["builds", "1", "new"]}')
'{"message": {"buildid": 1}, "key": "builds/1/new"}')
4 changes: 2 additions & 2 deletions master/buildbot/www/ws.py
Expand Up @@ -63,7 +63,7 @@ def ack(self, _id):
return self.sendJsonMessage(msg="OK", code=200, _id=_id)

def parsePath(self, path):
path = path.split(".")
path = path.split("/")
return tuple([str(p) if p != "*" else None for p in path])

@defer.inlineCallbacks
Expand All @@ -74,7 +74,7 @@ def cmd_startConsuming(self, path, _id):
return

def callback(key, message):
self.sendJsonMessage(path=path, key=key, message=message, _id=_id)
self.sendJsonMessage(key="/".join(key), message=message)

qref = yield self.master.mq.startConsuming(callback, self.parsePath(path))

Expand Down
13 changes: 10 additions & 3 deletions www/base/src/app/common/common.constant.coffee
Expand Up @@ -6,9 +6,16 @@ invert_constant = (constant_name, inverted_constant_name) ->
class Baseurlapi extends Constant('common')
constructor: ->
return 'api/v2/'
class Baseurlsse extends Constant('common')

class Baseurlws extends Constant('common')
constructor: ->
return 'sse/'
href = location.href.toString()
if location.hash != ""
href = href.replace(location.hash, "")
if href[href.length - 1] != "/"
href = href + "/"

return href.replace(/^http/, "ws") + "ws"

class Plurals extends Constant('common')
constructor: ->
Expand Down Expand Up @@ -41,4 +48,4 @@ class Results extends Constant('common')
RETRY: 5
CANCELLED: 6
}
invert_constant('RESULTS', 'RESULTS_TEXT')
invert_constant('RESULTS', 'RESULTS_TEXT')
@@ -1,5 +1,5 @@
class BuildbotService extends Factory
constructor: ($log, Restangular, mqService, $rootScope, BASEURLAPI, BASEURLSSE,
constructor: ($log, Restangular, mqService, $rootScope, BASEURLAPI, BASEURLWS,
SINGULARS, $q, $timeout, config) ->
jsonrpc2_id = 1
referenceid = 1
Expand Down Expand Up @@ -185,7 +185,7 @@ class BuildbotService extends Factory
RestangularConfigurer.setBaseUrl(BASEURLAPI)
RestangularConfigurer.setOnElemRestangularized(onElemRestangularized)
RestangularConfigurer.setResponseExtractor(responseExtractor)
mqService.setBaseUrl(BASEURLSSE)
mqService.setBaseUrl(BASEURLWS)

self = Restangular.withConfig(configurer)
self.bindHierarchy = ($scope, $stateParams, paths) ->
Expand All @@ -197,3 +197,7 @@ class BuildbotService extends Factory
return $q.all(l)
addSomeAndMemoize(self)
return self

class BuildbotServiceConfig extends Config
constructor: (RestangularProvider) ->
RestangularProvider.setDefaultHeaders({'content-type': "application/json"})
71 changes: 44 additions & 27 deletions www/base/src/app/common/services/mq/mq.service.coffee
Expand Up @@ -7,9 +7,9 @@ class MqService extends Factory('common')
return matcher.test(value)

listeners = {}
eventsource = null
cid = null
basepath = null
ws = null
curid = 1
pending_msgs = {}
deferred = null
lostConnection = false
self =
Expand All @@ -30,10 +30,12 @@ class MqService extends Factory('common')
unsub = ->
namedListeners.splice(namedListeners.indexOf(listener), 1)
if namedListeners.length == 0
self.stopConsuming(name)
p = self.stopConsuming(name)
delete listeners[name]
else
p = $q.when(0)
return p
$scope?.$on("$destroy", unsub)

return p.then -> unsub

broadcast: (eventname, message) ->
Expand All @@ -51,31 +53,31 @@ class MqService extends Factory('common')
throw Error("broadcasting #{eventname} without listeners!")

# this is intended to be mocked in unittests
getEventSource: (url) ->
return new EventSource(url + "listen")
getWebSocket: (url) ->
return new WebSocket(url)

setBaseUrl: (url) ->
cid = null
pending_msgs = {}
makedeferred = ->
deferred = $q.defer()
deferred.promise.then(makedeferred)
makedeferred()
basepath = url
eventsource = self.getEventSource(url)
eventsource.onopen = (e) ->
cid = null
ws = self.getWebSocket(url)

eventsource.onerror = (e) ->
ws.onerror = (e) ->
console.error(e)
lostConnection = true
eventsource.onmessage = (e) ->
console.log "got message!", e
if navigator.onLine
self.setBaseUrl(url)
else
window.addEventListener "online", ->
self.setBaseUrl(url)

eventsource.addEventListener "handshake", (e) ->
cid = e.data
ws.onopen = (e) ->
# now we got our handshake, we can start consuming
# what was registered in between
# this is still racy, as we can have miss some events during this handshake time
pending_msgs = {}
allp = []
for k, v of listeners
allp.push(self.startConsuming(k))
Expand All @@ -85,18 +87,33 @@ class MqService extends Factory('common')
if lostConnection
$rootScope.$broadcast("lost-sync")

eventsource.addEventListener "event", (e) ->
e.msg = JSON.parse(e.data)
$rootScope.$apply ->
self.broadcast(e.msg.key, e.msg.message)
ws.onmessage = (e) ->
msg = JSON.parse(e.data)
if msg._id? and pending_msgs[msg._id]?
if msg.code != 200
pending_msgs[msg._id].reject(msg)
else
pending_msgs[msg._id].resolve(msg)
delete pending_msgs[msg._id]
else
$rootScope.$apply ->
self.broadcast(msg.key, msg.message)
sendMessage: (args) ->
curid += 1
args._id = curid
d = $q.defer()
d.promise._id = curid
pending_msgs[curid] = d
ws.send(JSON.stringify(args))
return d.promise

startConsuming: (name) ->
if cid?
return $http.get(basepath + "add/#{cid}/#{name}")
startConsuming: (path) ->
if ws.readyState == 1
return self.sendMessage(cmd:"startConsuming", path:path)
else
return deferred.promise
stopConsuming: (name) ->
if cid?
$http.get(basepath + "remove/#{cid}/#{name}")
stopConsuming: (path) ->
if ws.readyState == 1
return self.sendMessage(cmd:"stopConsuming", path:path)

return self
98 changes: 55 additions & 43 deletions www/base/src/app/common/services/mq/mq.service.spec.coffee
@@ -1,23 +1,22 @@
beforeEach module 'app'

describe 'mq service', ->
mqService = $scope = $httpBackend = null
es =
addEventListener: (event, cb) ->
this["on#{event}"] = cb
mqService = $scope = $httpBackend = $rootScope = null
ws =
send: ->

# common event receiver
event_receiver =
receiver1: ->
receiver2: ->

injected = ($injector) ->
$httpBackend = $injector.get('$httpBackend')
decorateHttpBackend($httpBackend)
$scope = $injector.get('$rootScope').$new()
$rootScope = $injector.get('$rootScope')
$scope = $rootScope.$new()
mqService = $injector.get('mqService')
# stub out the actual backend of mqservice
spyOn(mqService,"getEventSource").and.returnValue(es)
spyOn(mqService,"getWebSocket").and.returnValue(ws)
spyOn(ws,"send").and.returnValue(null)
spyOn(event_receiver,"receiver1").and.returnValue(null)
spyOn(event_receiver,"receiver2").and.returnValue(null)

Expand All @@ -30,14 +29,12 @@ describe 'mq service', ->
expect(mqService._match("a/b/*/*", "a/b/c/d/e")).toBe(false)

it 'should setup everything in setBaseURL', ->
expect(es.onopen).toBeUndefined()
mqService.setBaseUrl("sse/")
expect(mqService.getEventSource).toHaveBeenCalled()
expect(es.onopen).toBeDefined()
expect(es.onerror).toBeDefined()
expect(es.onmessage).toBeDefined()
expect(es.onhandshake).toBeDefined()
expect(es.onevent).toBeDefined()
expect(ws.onopen).toBeUndefined()
mqService.setBaseUrl("ws")
expect(mqService.getWebSocket).toHaveBeenCalled()
expect(ws.onopen).toBeDefined()
expect(ws.onerror).toBeDefined()
expect(ws.onmessage).toBeDefined()

it 'should work with simple pub/sub usecase', ->
mqService.setBaseUrl("sse/")
Expand Down Expand Up @@ -70,31 +67,46 @@ describe 'mq service', ->
expect(event_receiver.receiver1).not.toHaveBeenCalled()

it 'should use the backend to register to messages', ->
$httpBackend.expectGET('sse/add/<cid>/1/bla').respond("")
$httpBackend.expectGET('sse/add/<cid>/*/bla').respond("")
mqService.setBaseUrl("sse/")
mqService.on("1/bla", event_receiver.receiver1)
mqService.on("*/bla", event_receiver.receiver2)
es.onopen()
$httpBackend.verifyNoOutstandingRequest()
es.onhandshake({data:"<cid>"})
$httpBackend.flush()
mqService.setBaseUrl("ws/")
called = []
unregs = []
p1 = mqService.on("1/bla", event_receiver.receiver1)
p2 = mqService.on("*/bla", event_receiver.receiver2)
p1.then (unreg) ->
called.push('p1')
unregs.push(unreg)
p2.then (unreg) ->
called.push('p2')
unregs.push(unreg)
ws.readyState = 1
ws.onopen()
expect(ws.send).toHaveBeenCalledWith('{"cmd":"startConsuming","path":"1/bla","_id":2}')
expect(ws.send).toHaveBeenCalledWith('{"cmd":"startConsuming","path":"*/bla","_id":3}')
# fake the response
ws.onmessage(data: '{"msg":"OK","code":200,"_id":3}')
ws.onmessage(data: '{"msg":"OK","code":200,"_id":2}')
$rootScope.$apply()
expect(called).toEqual(["p1", "p2"])

it 'should unregister on scope close', ->
$httpBackend.expectGET('sse/add/<cid>/1/bla').respond("")
$httpBackend.expectGET('sse/add/<cid>/*/bla').respond("")
mqService.setBaseUrl("sse/")
mqService.on("1/bla", event_receiver.receiver1, $scope)
mqService.on("*/bla", event_receiver.receiver2, $scope)
es.onopen()
$httpBackend.verifyNoOutstandingRequest()
es.onhandshake({data:"<cid>"})
$httpBackend.flush()
$httpBackend.expectGET('sse/remove/<cid>/1/bla').respond("")
$httpBackend.expectGET('sse/remove/<cid>/*/bla').respond("")
$scope.$destroy()
$httpBackend.flush()
expect ->
mqService.broadcast("1/bla", {"msg":true})
.toThrow()
expect(event_receiver.receiver1).not.toHaveBeenCalled()
# fake the message
msg = '{"message": {"buildid": 1}, "key": "1/bla"}'
ws.onmessage(data: msg)
expect(event_receiver.receiver1).toHaveBeenCalledWith({"buildid": 1}, "1/bla")
expect(event_receiver.receiver2).toHaveBeenCalledWith({"buildid": 1}, "1/bla")

# unregister
called = []
p1 = unregs[0]()
p2 = unregs[1]()
$rootScope.$apply()
expect(ws.send).toHaveBeenCalledWith('{"cmd":"stopConsuming","path":"1/bla","_id":4}')
expect(ws.send).toHaveBeenCalledWith('{"cmd":"stopConsuming","path":"*/bla","_id":5}')
p1.then (unreg) ->
called.push('p1')
p2.then (unreg) ->
called.push('p2')
expect(called).toEqual([])
ws.onmessage(data: '{"msg":"OK","code":200,"_id":4}')
ws.onmessage(data: '{"msg":"OK","code":200,"_id":5}')
$rootScope.$apply()
expect(called).toEqual(["p1", "p2"])

0 comments on commit bb90e0f

Please sign in to comment.