Skip to content
Permalink
Browse files
enhancement: add dubbo-consumer scheduler and dubbo-cluseter refresh
  • Loading branch information
hufeng committed Aug 9, 2021
1 parent 088ac54 commit 8f7d4561d09ce08fb9b5a3c12a9687e0fc7dd681
Showing 2 changed files with 52 additions and 15 deletions.
@@ -34,7 +34,8 @@ const log = debug('dubbo:dubbo-cluster')
*/

export default class DubboCluster
implements IDubboObservable<IDubboTransportSubscriber> {
implements IDubboObservable<IDubboTransportSubscriber>
{
private subscriber: IDubboTransportSubscriber
private readonly dubboClusterTransportMap: Map<
HostName,
@@ -53,21 +54,19 @@ export default class DubboCluster

// ~~~~~~~~~~~~~~~~~~~~private methods~~~~~~~~~~~~~~~~~~~~~~~~~

private handleTransportClose = (
transport: DubboTcpTransport,
hostname: string
) => (host: string) => {
log('receive dubbo-tcp-transport closed %s', transport.host)
if (!this.dubboClusterTransportMap.has(hostname)) {
return
}
private handleTransportClose =
(transport: DubboTcpTransport, hostname: string) => (host: string) => {
log('receive dubbo-tcp-transport closed %s', transport.host)
if (!this.dubboClusterTransportMap.has(hostname)) {
return
}

const transports = this.dubboClusterTransportMap.get(hostname)
log('delete dubbo-tcp-transport %s', transport.host)
transports.delete(transport)
log('current dubbo-tcp-transport map %O', this.dubboClusterTransportMap)
this.subscriber.onClose(host)
}
const transports = this.dubboClusterTransportMap.get(hostname)
log('delete dubbo-tcp-transport %s', transport.host)
transports.delete(transport)
log('current dubbo-tcp-transport map %O', this.dubboClusterTransportMap)
this.subscriber.onClose(host)
}

private updateDubboClusterTransports(hostname: HostName, hosts: Set<Host>) {
const transports = this.dubboClusterTransportMap.get(hostname)
@@ -153,6 +152,23 @@ export default class DubboCluster
return this
}

refresh(serviceHostMap: Map<HostName, Set<Host>>) {
for (let [hostname, hosts] of serviceHostMap.entries()) {
if (this.dubboClusterTransportMap.has(hostname)) {
const transports = this.dubboClusterTransportMap.get(hostname)
const transportHosts = [...transports].map(
(transport) => transport.host
)
const diff = [...hosts].filter((host) => !transportHosts.includes(host))
if (diff.length > 0) {
this.addDubboClusterTransports(hostname, new Set(diff))
}
} else {
this.addDubboClusterTransports(hostname, hosts)
}
}
}

close() {
for (let transports of this.dubboClusterTransportMap.values()) {
transports.forEach((transport) => transport.close())
@@ -49,6 +49,7 @@ export default class Scheduler {
private status: STATUS
private readonly queue: Queue
private readonly registry: IRegistry<any>
private readonly refreshTimer: NodeJS.Timer
private readonly dubboCluster: DubboCluster
private readonly dubboServiceUrlMapper: Map<TDubboInterface, Array<DubboUrl>>

@@ -72,6 +73,9 @@ export default class Scheduler {
},
onClose: this.handleTransportClose
})
this.refreshTimer = setInterval(() => {
this.refreshDubboCluster()
}, 10 * 1000)

// init registry
this.registry = registry
@@ -86,9 +90,26 @@ export default class Scheduler {
}

close() {
clearTimeout(this.refreshTimer)
this.dubboCluster.close()
}

private refreshDubboCluster() {
const serviceHostMap = new Map<HostName, Set<Host>>()
for (let urls of this.dubboServiceUrlMapper.values()) {
for (let { hostname, port } of urls) {
const host = `${hostname}:${port}`
if (serviceHostMap.has(hostname)) {
serviceHostMap.get(hostname).add(host)
} else {
serviceHostMap.set(hostname, new Set([host]))
}
}
}
log('refreshDubboCluster with map %O', serviceHostMap)
this.dubboCluster.refresh(serviceHostMap)
}

/**
* handle request in queue
* @param ctx

0 comments on commit 8f7d456

Please sign in to comment.