Skip to content

Commit

Permalink
fix: Wrong semaphore and service state synced in some corner cases
Browse files Browse the repository at this point in the history
  • Loading branch information
ci010 committed Dec 24, 2022
1 parent 02b1d9a commit 5a91001
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 23 deletions.
2 changes: 1 addition & 1 deletion xmcl-electron-app/preload/semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ function createResourceMonitor(): ISemaphoreChannel {
emitter.emit('acquire', semaphores)
})
return {
subscribe(): Promise<Record<string, number>> {
subscribe(): Promise<[Record<string, number>, number]> {
return ipcRenderer.invoke('semaphore')
},
unsubscribe(): Promise<void> {
Expand Down
33 changes: 30 additions & 3 deletions xmcl-keystone-ui/src/composables/semaphore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,28 @@ export function useSemaphore(semaphore: string | Ref<string>) {

export function useSemaphores() {
const container: Record<string, number> = reactive({})
let totalOrder = 0

const { refresh, refreshing } = useRefreshable(() => resourceMonitor.subscribe().then((sem) => {
const { refresh, refreshing } = useRefreshable(() => resourceMonitor.subscribe().then(([sem, order]) => {
totalOrder = order
console.log(`Refreshed semaphores by total order ${totalOrder}`)
for (const [key, val] of Object.entries(sem)) {
set(container, key, val)
}
}))

resourceMonitor.on('acquire', (res) => {
resourceMonitor.on('acquire', ([res, order]) => {
const nextOrder = totalOrder + 1
if (nextOrder < order) {
if (!refreshing.value) {
refresh()
}
return
}
if (nextOrder > order) {
// discord old message
return
}
const sem = res instanceof Array ? res : [res]
for (const s of sem) {
if (s in container) {
Expand All @@ -46,8 +60,20 @@ export function useSemaphores() {
set(container, s, 1)
}
}
totalOrder = nextOrder
})
resourceMonitor.on('release', (res) => {
resourceMonitor.on('release', ([res, order]) => {
const nextOrder = totalOrder + 1
if (nextOrder < order) {
if (!refreshing.value) {
refresh()
}
return
}
if (nextOrder > order) {
// discord old message
return
}
const sem = res instanceof Array ? res : [res]
for (const s of sem) {
if (s in container) {
Expand All @@ -56,6 +82,7 @@ export function useSemaphores() {
set(container, s, 0)
}
}
totalOrder = nextOrder
})

onMounted(() => {
Expand Down
24 changes: 23 additions & 1 deletion xmcl-keystone-ui/src/vuexServiceProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ export class VuexServiceFactory implements ServiceFactory {
return
}
const newId = lastId + 1
if (id <= lastId) {
// discard old packet
console.log(`Discard stale sync mutation ${id}(${mutation.type}) as last id is ${lastId}`)
return
}
if (id !== newId) {
console.log(`Sync conflict from main. Last id in renderer: ${lastId}. Sync from main ${id}`)
sync()
Expand All @@ -184,7 +189,18 @@ export class VuexServiceFactory implements ServiceFactory {
const sync = () => {
this.store.commit('syncStart', { service })
console.log(`Sync ${service}.`)
proxy.sync(lastId).then((syncInfo) => {
proxy.sync(lastId).then((result) => {
if (typeof result === 'string') {
if (result === 'NOT_FOUND_SERVICE') {
console.log(`Does not found service ${service}`)
} else if (result === 'NOT_STATE_SERVICE') {
console.log(`The service ${service} does not have state`)
} else {
console.log(`Fail to sync service ${service} ${result}`)
}
return
}
const syncInfo = result
if (!syncInfo) {
console.log(`The ${service} is not syncable.`)
this.store.commit('sync', {})
Expand All @@ -199,6 +215,12 @@ export class VuexServiceFactory implements ServiceFactory {
this.store.commit('sync', { [service]: state })
lastId = length

for (const [id, m] of Object.entries(syncingQueue)) {
if (Number(id) > lastId) {
this.store.commit(m.type, m.payload)
}
}

syncingQueue = {}
})
}
Expand Down
6 changes: 3 additions & 3 deletions xmcl-runtime-api/src/semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { GenericEventEmitter } from './events'

interface SemaphoreChannelEventMap {
acquire: string[] | string
release: string[] | string
acquire: [string[] | string, number]
release: [string[] | string, number]
}

/**
Expand All @@ -12,7 +12,7 @@ export interface ResourceMonitor extends GenericEventEmitter<SemaphoreChannelEve
/**
* Subscribe the resource/semaphore update. This will make this object start to emit event
*/
subscribe(): Promise<Record<string, number>>
subscribe(): Promise<[Record<string, number>, number]>
/**
* Stop subscribe the resource/semaphore update.
*/
Expand Down
12 changes: 9 additions & 3 deletions xmcl-runtime/lib/managers/SemaphoreManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,17 @@ export default class SemaphoreManager extends Manager {
private locks: Record<string, ReadWriteLock> = {}

private semaphore: Record<string, number> = {}
/**
* Total order is used to check if client/server is sync
*/
private order = 0
private semaphoreWaiter: Record<string, Array<() => void>> = {}
private logger = this.app.logManager.getLogger('SemaphoreManager')

constructor(app: LauncherApp) {
super(app)
app.controller.handle('semaphore', () => {
return this.semaphore
return [this.semaphore, this.order]
})
app.controller.handle('semaphoreAbort', (_, key) => {
this.logger.log(`Force release the semaphore: ${key}`)
Expand Down Expand Up @@ -44,7 +48,8 @@ export default class SemaphoreManager extends Manager {
} else {
this.semaphore[key] = 1
}
this.app.controller.broadcast('acquire', key)
this.order += 1
this.app.controller.broadcast('acquire', [key, this.order])
}

wait(key: string) {
Expand All @@ -67,6 +72,7 @@ export default class SemaphoreManager extends Manager {
} else {
this.semaphore[key] = 0
}
this.order += 1
if (this.semaphore[key] === 0) {
nextTick(() => {
if (this.semaphore[key] === 0) {
Expand All @@ -78,6 +84,6 @@ export default class SemaphoreManager extends Manager {
}
})
}
this.app.controller.broadcast('release', key)
this.app.controller.broadcast('release', [key, this.order])
}
}
15 changes: 6 additions & 9 deletions xmcl-runtime/lib/managers/ServiceStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@ export default class ServiceStateManager extends Manager {

constructor(app: LauncherApp) {
super(app)
app.controller.handle('sync', (_, serviceName, id) => {
app.controller.handle('sync', async (_, serviceName, id) => {
const service = app.serviceManager.getServiceByKey(serviceName)
if (service) {
return (service as AbstractService).initialize().then(() => {
const stateProxy = this.registeredState[serviceName]
if (stateProxy) {
return stateProxy.takeSnapshot(id)
}
})
}
if (!service) return 'NOT_FOUND_SERVICE'
await (service as AbstractService).initialize()
const stateProxy = this.registeredState[serviceName]
if (!stateProxy) return 'NOT_STATE_SERVICE'
return stateProxy.takeSnapshot(id)
})
app.controller.handle('commit', (event, serviceName, type, payload) => {
const stateProxy = this.registeredState[serviceName]
Expand Down
3 changes: 0 additions & 3 deletions xmcl-runtime/lib/util/serviceProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ export class ServiceStateProxy {
takeSnapshot(currentId: number) {
const checkPointId = this.checkPointId
this.logger.log(`Sync from renderer: ${currentId}, service ${this.serviceName}: ${checkPointId}.`)
if (currentId === checkPointId) {
return undefined
}
return {
state: JSON.parse(JSON.stringify(this.snapshot)),
length: checkPointId,
Expand Down

0 comments on commit 5a91001

Please sign in to comment.