Skip to content

Commit

Permalink
#82 fixed root cause of issue that journal comments were not displaye…
Browse files Browse the repository at this point in the history
…d in some cases

this was because in some cases it happened that the subscription for comments was done before the socket connection was authenticated.
  • Loading branch information
petersutter committed May 17, 2018
1 parent 77b8866 commit d3a6629
Showing 1 changed file with 172 additions and 135 deletions.
307 changes: 172 additions & 135 deletions frontend/src/utils/Emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,174 +21,211 @@ import Emitter from 'component-emitter'
import {ThrottledNamespacedEventEmitter} from './ThrottledEmitter'
import store from '../store'

const url = window.location.origin
const socketConfig = {
path: '/api/events',
transports: ['websocket'],
autoConnect: false
}
const shootsSocket = io(`${url}/shoots`, socketConfig)
const journalsSocket = io(`${url}/journals`, socketConfig)

/* Event Emitters */
const emitterObjForSocket = (socket) => {
return {
authenticated: false,
namespace: undefined,
filter: undefined,
auth: {
class AbstractEmitter {
constructor (socket) {
this.authenticated = false
this.auth = {
bearer: undefined
},
socket,
setUser (user) {
// could be overwritten
this._setUser(user)
},
_setUser (user) {
user = user || {}
const id_token = user.id_token
/* eslint camelcase: off */
if (!id_token) {
console.log(`Disconnect socket ${this.socket.id} because ID token is empty`)
this.auth.bearer = undefined
this.socket.disconnect()
} else if (!this.socket.connected) {
console.log(`Socket ${this.socket.id} not connected.`)
this.auth.bearer = id_token
this.socket.connect()
} else if (this.auth.bearer !== id_token) {
console.log(`Socket ${this.socket.id} connected but has different ID token`)
this.auth.bearer = id_token
const onDisconnect = (reason) => {
console.log('ON DISCONNECT')
if (reason === 'io client disconnect') {
clearTimeout(timeoutId)
this.socket.connect()
}
}
const onTimeout = () => {
this.socket.off('disconnect', onDisconnect)
}
this.socket = socket
}

authenticate () {
console.log(`socket connection ${this.socket.id} authenticating`)
if (this.auth.bearer) {
this.socket.emit('authentication', this.auth)
}
}

onAuthenticated () {
this.authenticated = true
this.emit('authenticated')
console.log(`socket connection ${this.socket.id} authenticated`)
}

onConnect (attempt) {
if (attempt) {
console.log(`socket connection ${this.socket.id} established after '${attempt}' attempt(s)`)
} else {
console.log(`socket connection ${this.socket.id} established`)
}
this.authenticate()
}

onDisconnect (reason) {
console.error(`socket connection lost because`, reason)
this.authenticated = false
this.socket.off('event')
this.emit('disconnect', reason)
}

setUser (user) {
user = user || {}
const id_token = user.id_token
/* eslint camelcase: off */
if (!id_token) {
console.log(`Disconnect socket ${this.socket.id} because ID token is empty`)
this.auth.bearer = undefined
this.socket.disconnect()
} else if (!this.socket.connected) {
this.auth.bearer = id_token
this.socket.connect()
} else if (this.auth.bearer !== id_token) {
console.log(`Socket ${this.socket.id} connected but has different ID token`)
this.auth.bearer = id_token
const onDisconnect = (reason) => {
console.log('onDisconnect', reason)
if (reason === 'io client disconnect') {
clearTimeout(timeoutId)
this.socket.connect()
}
const timeoutId = setTimeout(onTimeout, 1000)
this.socket.once('disconnect', onDisconnect)
this.socket.disconnect()
}
},
setNamespace (namespace, filter) {
this.namespace = namespace
this.filter = filter
store.dispatch('clearShoots')
if (this.namespace && this.authenticated) {
store.dispatch('setShootsLoading')
this.subscribe({namespace, filter})
}
},
authenticate () {
console.log(`socket connection ${this.socket.id} authenticating`)
if (this.auth.bearer) {
this.socket.emit('authentication', this.auth)
const onTimeout = () => {
this.socket.off('disconnect', onDisconnect)
}
},
subscribe () {
console.log('should be overwritten..', this.socket.id)
const timeoutId = setTimeout(onTimeout, 1000)
this.socket.once('disconnect', onDisconnect)
this.socket.disconnect()
}
}
}

const shootsEmitterObj = emitterObjForSocket(shootsSocket)
shootsEmitterObj.subscribe = async function ({namespace, filter}) {
if (namespace === '_all') {
const allNamespaces = await store.getters.namespaces
const namespaces = map(allNamespaces, (namespace) => { return {namespace, filter} })
this.socket.emit('subscribe', {namespaces})
} else if (namespace) {
this.socket.emit('subscribe', {namespaces: [{namespace, filter}]})
}
}
class ShootsEmitter extends AbstractEmitter {
constructor (socket) {
super(socket)

const journalsEmitterObj = emitterObjForSocket(journalsSocket)
journalsEmitterObj.setUser = function (user) {
if (!store.getters.isAdmin) {
return
this.filter = undefined
this.namespace = undefined
}
this._setUser(user)
}
journalsEmitterObj.subscribe = function () {
if (store.getters.isAdmin) {
this.socket.emit('subscribeIssues')

onAuthenticated () {
super.onAuthenticated()

/* currently we only throttle NamespacedEvents (for shoots) as for this kind
* we expect many events coming in in a short period of time */
const throttledNsEventEmitter = new ThrottledNamespacedEventEmitter({emitter: this, wait: 1000})

this.socket.on('namespacedEvents', ({kind, namespaces}) => {
throttledNsEventEmitter.emit(kind, namespaces)
})
this.socket.on('batchNamespacedEventsDone', ({kind, namespaces}) => {
if (kind === 'shoots') {
store.dispatch('unsetShootsLoading', namespaces)
throttledNsEventEmitter.flush()
}
})

if (this.subscribeAfterAuthentication) {
this.subscribe({namespace: this.namespace, filter: undefined})
this.subscribeAfterAuthentication = undefined
}
}
}
journalsEmitterObj.subscribeComments = function ({name, namespace}) {
if (store.getters.isAdmin) {
this.socket.emit('subscribeComments', {name, namespace})
this.subscribedComments = true

setNamespace = async function (namespace, filter) {
this.namespace = namespace
this.filter = filter

this.subscribe({namespace, filter})
}
}
journalsEmitterObj.unsubscribeComments = function () {
if (store.getters.isAdmin) {
if (this.subscribedComments) {
this.socket.emit('unsubscribeComments')
this.subscribedComments = false

subscribe = async function ({namespace, filter}) {
if (this.namespace) {
if (this.authenticated) {
store.dispatch('clearShoots')
store.dispatch('setShootsLoading')
if (namespace === '_all') {
const allNamespaces = await store.getters.namespaces
const namespaces = map(allNamespaces, (namespace) => { return {namespace, filter} })
this.socket.emit('subscribe', {namespaces})
} else if (namespace) {
this.socket.emit('subscribe', {namespaces: [{namespace, filter}]})
}
} else {
this.subscribeAfterAuthentication = {namespace, filter}
}
}
}
}

const shootsEmitter = Emitter(shootsEmitterObj)
const journalsEmitter = Emitter(journalsEmitterObj)
class JournalsEmitter extends AbstractEmitter {
onAuthenticated () {
super.onAuthenticated()

const emitters = [shootsEmitter, journalsEmitter]
this.socket.on('events', ({kind, events}) => {
this.emit(kind, events)
})

function onAuthenticated () {
this.authenticated = true
this.emit('authenticated')
console.log(`socket connection ${this.socket.id} authenticated`)
this.subscribeIssues()

this.socket.on('events', ({kind, events}) => {
this.emit(kind, events)
})
if (this.subscribeCommentsAfterAuthentication) {
this.subscribeComments(this.subscribeCommentsAfterAuthentication)

/* currently we only throttle NamespacedEvents (for shoots) as for this kind
* we expect many events coming in in a short period of time */
const throttledNsEventEmitter = new ThrottledNamespacedEventEmitter({emitter: this, wait: 1000})
this.subscribeCommentsAfterAuthentication = undefined
}
}

this.socket.on('namespacedEvents', ({kind, namespaces}) => {
throttledNsEventEmitter.emit(kind, namespaces)
})
this.socket.on('batchNamespacedEventsDone', ({kind, namespaces}) => {
if (kind === 'shoots') {
store.dispatch('unsetShootsLoading', namespaces)
throttledNsEventEmitter.flush()
setUser (user) {
if (!store.getters.isAdmin) {
return
}
})
super.setUser(user)
}

const namespace = this.namespace
const filter = undefined
this.subscribe({namespace, filter})
}
subscribeIssues () {
if (this.authenticated) {
if (store.getters.isAdmin) {
this.socket.emit('subscribeIssues')
}
}
}

function onConnect (attempt) {
if (attempt) {
console.log(`socket connection ${this.socket.id} established after '${attempt}' attempt(s)`)
} else {
console.log(`socket connection ${this.socket.id} established`)
subscribeComments ({name, namespace}) {
if (this.authenticated) {
if (store.getters.isAdmin) {
this.socket.emit('subscribeComments', {name, namespace})
this.subscribedComments = true
}
} else {
this.subscribeCommentsAfterAuthentication = {name, namespace}
}
}

unsubscribeComments () {
this.subscribeCommentsAfterAuthentication = undefined

if (this.authenticated) {
if (store.getters.isAdmin) {
if (this.subscribedComments) {
this.socket.emit('unsubscribeComments')
this.subscribedComments = false
}
}
}
}
this.authenticate()
}

function onDisconnect (reason) {
console.error(`socket connection lost because`, reason)
this.authenticated = false
this.socket.off('event')
this.emit('disconnect', reason)
const url = window.location.origin
const socketConfig = {
path: '/api/events',
transports: ['websocket'],
autoConnect: false
}
const shootsSocket = io(`${url}/shoots`, socketConfig)
const journalsSocket = io(`${url}/journals`, socketConfig)

const shootsEmitter = Emitter(new ShootsEmitter(shootsSocket))
const journalsEmitter = Emitter(new JournalsEmitter(journalsSocket))

const emitters = [shootsEmitter, journalsEmitter]

/* Web Socket Connection */

forEach(emitters, emitter => {
emitter.socket.on('connect', attempt => onConnect.call(emitter, attempt))
emitter.socket.on('reconnect', attempt => onConnect.call(emitter, attempt))
emitter.socket.on('authenticated', () => onAuthenticated.call(emitter))
emitter.socket.on('disconnect', reason => onDisconnect.call(emitter, reason))
emitter.socket.on('connect', attempt => emitter.onConnect(attempt))
emitter.socket.on('reconnect', attempt => emitter.onConnect(attempt))
emitter.socket.on('authenticated', () => emitter.onAuthenticated())
emitter.socket.on('disconnect', reason => emitter.onDisconnect(reason))
emitter.socket.on('connect_error', err => {
console.error(`socket connection error ${err}`)
})
Expand Down

0 comments on commit d3a6629

Please sign in to comment.