Skip to content

Commit

Permalink
fix(realtime): 💅 Remove async
Browse files Browse the repository at this point in the history
  • Loading branch information
CPatchane committed Feb 5, 2019
1 parent 3849ac6 commit c426ffa
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 176 deletions.
275 changes: 134 additions & 141 deletions packages/cozy-realtime/src/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/* global WebSocket */

// cozySocket is a custom object wrapping logic to websocket and exposing a subscription
// interface, here we store its promise
let cozySocketPromise
// interface
let cozySocket

// Important, must match the spec,
// see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
Expand Down Expand Up @@ -115,7 +115,7 @@ const configTypes = {

const validateConfig = validate(configTypes)

async function connectWebSocket(
export function connectWebSocket(
config,
onmessage,
onclose,
Expand All @@ -124,168 +124,162 @@ async function connectWebSocket(
isRetry
) {
validateConfig(config)
return new Promise((resolve, reject) => {
const options = {
secure: config.url ? isSecureURL(config.url) : true,
...config
}
const options = {
secure: config.url ? isSecureURL(config.url) : true,
...config
}

const protocol = options.secure ? 'wss:' : 'ws:'
const domain = options.domain || getDomainFromUrl(options.url)
const protocol = options.secure ? 'wss:' : 'ws:'
const domain = options.domain || getDomainFromUrl(options.url)

if (!domain) {
throw new Error('Unable to detect domain')
}
if (!domain) {
throw new Error('Unable to detect domain')
}

const socket = new WebSocket(
`${protocol}//${domain}/realtime/`,
'io.cozy.websocket'
)
const socket = new WebSocket(
`${protocol}//${domain}/realtime/`,
'io.cozy.websocket'
)

socket.onopen = () => {
try {
socket.send(
JSON.stringify({
method: 'AUTH',
payload: options.token
})
)
} catch (error) {
return reject(error)
}

const windowUnloadHandler = () => socket.close()
window.addEventListener('beforeunload', windowUnloadHandler)
socket.onopen = () => {
try {
socket.send(
JSON.stringify({
method: 'AUTH',
payload: options.token
})
)
} catch (error) {
throw error
}
}

socket.onmessage = onmessage
socket.onclose = event => {
window.removeEventListener('beforeunload', windowUnloadHandler)
if (typeof onclose === 'function')
onclose(event, numRetries, retryDelay)
}
socket.onerror = error =>
console.error(`WebSocket error: ${error.message}`)
const windowUnloadHandler = () => socket.close()
window.addEventListener('beforeunload', windowUnloadHandler)

if (isRetry && subscriptionsState.size) {
for (let doctype of subscriptionsState) {
subscribeWhenReady(doctype, socket)
}
}
socket.onmessage = onmessage
socket.onclose = event => {
window.removeEventListener('beforeunload', windowUnloadHandler)
if (typeof onclose === 'function') onclose(event, numRetries, retryDelay)
}
socket.onerror = error => console.error(`WebSocket error: ${error.message}`)

resolve(socket)
if (isRetry && subscriptionsState.size) {
for (let doctype of subscriptionsState) {
subscribeWhenReady(doctype, socket)
}
})
}

return socket
}

export function getCozySocket(config) {
return new Promise(async (resolve, reject) => {
const listeners = {}
const listeners = {}

let socket
let socket

const onSocketMessage = event => {
const data = JSON.parse(event.data)
const eventType = data.event.toLowerCase()
const payload = data.payload
const onSocketMessage = event => {
const data = JSON.parse(event.data)
const eventType = data.event.toLowerCase()
const payload = data.payload

if (eventType === 'error') {
const realtimeError = new Error(payload.title)
const errorFields = ['status', 'code', 'source']
errorFields.forEach(property => {
realtimeError[property] = payload[property]
})
if (eventType === 'error') {
const realtimeError = new Error(payload.title)
const errorFields = ['status', 'code', 'source']
errorFields.forEach(property => {
realtimeError[property] = payload[property]
})

throw realtimeError
}
throw realtimeError
}

if (listeners[payload.type] && listeners[payload.type][eventType]) {
listeners[payload.type][eventType].forEach(listener => {
listener(payload.doc)
})
}
if (listeners[payload.type] && listeners[payload.type][eventType]) {
listeners[payload.type][eventType].forEach(listener => {
listener(payload.doc)
})
}
}

const onSocketClose = async (event, numRetries, retryDelay) => {
if (!event.wasClean) {
console.warn(
`WebSocket closed unexpectedly with code ${event.code} and ${
event.reason ? `reason: '${event.reason}'` : 'no reason'
}.`
)
const onSocketClose = (event, numRetries, retryDelay) => {
if (!event.wasClean) {
console.warn(
`WebSocket closed unexpectedly with code ${event.code} and ${
event.reason ? `reason: '${event.reason}'` : 'no reason'
}.`
)

if (numRetries) {
console.warn(`Reconnecting ... ${numRetries} tries left.`)
setTimeout(async () => {
try {
socket = await connectWebSocket(
config,
onSocketMessage,
onSocketClose,
--numRetries,
retryDelay + 1000,
true
)
} catch (error) {
console.error(
`Unable to reconnect to realtime. Error: ${error.message}`
)
}
}, retryDelay)
}
if (numRetries) {
console.warn(`Reconnecting ... ${numRetries} tries left.`)
setTimeout(() => {
try {
socket = connectWebSocket(
config,
onSocketMessage,
onSocketClose,
--numRetries,
retryDelay + 1000,
true
)
} catch (error) {
console.error(
`Unable to reconnect to realtime. Error: ${error.message}`
)
}
}, retryDelay)
}
}
}

try {
socket = await connectWebSocket(
config,
onSocketMessage,
onSocketClose,
NUM_RETRIES,
RETRY_BASE_DELAY
)
} catch (error) {
reject(error)
}
try {
socket = connectWebSocket(
config,
onSocketMessage,
onSocketClose,
NUM_RETRIES,
RETRY_BASE_DELAY
)
} catch (error) {
throw error
}

return {
subscribe: (doctype, event, listener) => {
if (typeof listener !== 'function')
throw new Error('Realtime event listener must be a function')

if (!listeners[doctype]) {
listeners[doctype] = {}
subscribeWhenReady(doctype, socket)
}

listeners[doctype][event] = (listeners[doctype][event] || []).concat([
listener
])

resolve({
subscribe: (doctype, event, listener) => {
if (typeof listener !== 'function')
throw new Error('Realtime event listener must be a function')

if (!listeners[doctype]) {
listeners[doctype] = {}
subscribeWhenReady(doctype, socket)
}

listeners[doctype][event] = (listeners[doctype][event] || []).concat([
listener
])

if (!subscriptionsState.has(doctype)) {
subscriptionsState.add(doctype)
}
},
unsubscribe: (doctype, event, listener) => {
if (
listeners[doctype] &&
listeners[doctype][event] &&
listeners[doctype][event].includes(listener)
) {
listeners[doctype][event] = listeners[doctype][event].filter(
l => l !== listener
)
}
if (subscriptionsState.has(doctype)) {
subscriptionsState.delete(doctype)
}
if (!subscriptionsState.has(doctype)) {
subscriptionsState.add(doctype)
}
})
})
},
unsubscribe: (doctype, event, listener) => {
if (
listeners[doctype] &&
listeners[doctype][event] &&
listeners[doctype][event].includes(listener)
) {
listeners[doctype][event] = listeners[doctype][event].filter(
l => l !== listener
)
}
if (subscriptionsState.has(doctype)) {
subscriptionsState.delete(doctype)
}
}
}
}

// Returns the Promise of a subscription to a given doctype and document
export async function subscribe(config, doctype, doc, parse = doc => doc) {
const subscription = await subscribeAll(config, doctype, parse)
export function subscribe(config, doctype, doc, parse = doc => doc) {
const subscription = subscribeAll(config, doctype, parse)
// We will call the listener only for the given document, so let's curry it
const docListenerCurried = listener => {
return syncedDoc => {
Expand All @@ -303,9 +297,8 @@ export async function subscribe(config, doctype, doc, parse = doc => doc) {
}

// Returns the Promise of a subscription to a given doctype (all documents)
export async function subscribeAll(config, doctype, parse = doc => doc) {
if (!cozySocketPromise) cozySocketPromise = getCozySocket(config)
const cozySocket = await cozySocketPromise
export function subscribeAll(config, doctype, parse = doc => doc) {
if (!cozySocket) cozySocket = getCozySocket(config)
// Some document need to have specific parsing, for example, decoding
// base64 encoded properties
const parseCurried = listener => {
Expand Down

0 comments on commit c426ffa

Please sign in to comment.