Skip to content
Permalink
Browse files
Merge branch 'master' into master
  • Loading branch information
realbillwong committed Sep 1, 2021
2 parents 44f089c + 28e499b commit 31a1edc01a851bb4efdee9ee4a130d42e35f14e3
Showing 6 changed files with 61 additions and 56 deletions.
@@ -137,9 +137,9 @@ export default class Dubbo<TService = Object> {
category: 'consumers',
method: '',
revision: version,
version,
group,
timeout,
version: version,
group: group,
timeout: timeout,
side: 'consumer',
check: false,
pid: process.pid
@@ -0,0 +1,4 @@
# Please don't delete this file

# In order to enable the process of dubbo-service to obtain reusable ports, we use file locks
# Make each child process obtain available ports in turn to solve the problem of simultaneous port conflicts
@@ -22,13 +22,10 @@ import { portManager } from '../port'

describe('port test suite', () => {
it('test master process', async () => {
expect(portManager.isMasterProcess).toBeTruthy()
const port = await portManager.getReusedPort()
expect(port).toBeTruthy()
expect(
fs.existsSync(path.join(process.cwd(), '.dubbojs/dubbo'))
).toBeTruthy()
fs.writeFileSync(path.join(process.cwd(), '.dubbojs', `${port}`), '')
expect(fs.existsSync(path.join(process.cwd(), '.dubbojs'))).toBeTruthy()
fs.unlinkSync(path.join(process.cwd(), '.dubbojs', `${port}`))
})

it('test cluster mode', async () => {})
@@ -16,76 +16,74 @@
*/

import path from 'path'
import cluster from 'cluster'
import getPort from 'get-port'
import debug from 'debug'
import fs from 'fs-extra'
import lockfile from 'proper-lockfile'

const dlog = debug('dubbo-server:get-port')
// port cache file
const ROOT = path.join(process.cwd(), '.dubbojs')
const LOCK_FILE = path.join(ROOT, 'dubbo')
// dubbo lock file path
const LOCK_FILE = path.join(__dirname, '..', '.dubbo')

export class PortManager {
private port: number

constructor() {
if (this.isMasterProcess) {
// create dubbo lock file
fs.ensureFileSync(LOCK_FILE)
}
// listen process exit event
// and clean port/pid file content
// listen process exit event and clean port/pid file content
this.clearPidPort()
}

async getReusedPort(): Promise<number> {
try {
// set file lock
const release = await lockfile.lock(LOCK_FILE, {
retries: { retries: 5, maxTimeout: 5000 }
})
dlog('pid %d get lock', process.pid)
// find available reused port
const dirs = await fs.readdir(ROOT)
dlog('scan %s dir includes %O', ROOT, dirs)
const excludes = []
const portPidFiles = dirs.filter((dir) => !dir.startsWith('dubbo'))
for (let portPid of portPidFiles) {
const file = fs.readFileSync(path.join(ROOT, portPid)).toString()
if (file === '') {
fs.writeFileSync(path.join(ROOT, portPid), String(process.pid))
this.port = Number(portPid)
await release()
return this.port
} else {
excludes.push(Number(portPid))
}
// set file lock
const release = await lockfile.lock(LOCK_FILE, {
retries: { retries: 5, maxTimeout: 5000 }
})
dlog('pid %d get lock', process.pid)
fs.ensureDirSync(ROOT)
// find available reused port
const dirs = await fs.readdir(ROOT)
dlog('scan %s dir includes %O', ROOT, dirs)
const excludes = []
for (let portPid of dirs) {
const fullFilePath = path.join(ROOT, portPid)
// if current file name not number
// delete it, because it was invalid file
if (!/\d+/.test(portPid)) {
fs.rmSync(fullFilePath)
continue
}

this.port = await this.getFreePort(excludes)
fs.writeFileSync(path.join(ROOT, String(this.port)), String(process.pid))
await release()
return this.port
} catch (err) {
throw err
// if current file content was empty
// the file name port was reused
const file = fs.readFileSync(fullFilePath).toString()
if (file === '') {
// write current port
fs.writeFileSync(path.join(ROOT, portPid), String(process.pid))
this.port = Number(portPid)
await release()
return this.port
} else {
excludes.push(Number(portPid))
}
}

this.port = await this.getFreePort(excludes)
fs.writeFileSync(path.join(ROOT, String(this.port)), String(process.pid))
await release()
return this.port
}

async getFreePort(exclude: Array<number> = []) {
const ports = []
for (let i = 0; i < 10; i++) {
// gen new port
const port = await getPort({ port: getPort.makeRange(20888, 30000) })
ports.push(port)
}

const availablePort = ports.filter((port) => !exclude.includes(port))[0]
dlog(
'get random port %d in %s mode',
availablePort,
this.isMasterProcess ? 'master' : 'worker'
)
dlog('get random port %d', availablePort)
return availablePort
}

@@ -107,13 +105,6 @@ export class PortManager {
process.on(event, cleanup)
})
}

get isMasterProcess() {
const isClusterMode = cluster.isMaster
const isPm2MasterMode =
process.env.NODE_APP_INSTANCE && process.env.NODE_APP_INSTANCE === '0'
return isClusterMode || isPm2MasterMode
}
}

export const portManager = new PortManager()
@@ -15,6 +15,8 @@
* limitations under the License.
*/

import path from 'path'
import fs from 'fs-extra'
import { Zk } from 'apache-dubbo-registry'
import { DubboDirectlyInvoker, java } from 'apache-dubbo-consumer'
import { DubboService } from 'apache-dubbo-service'
@@ -43,6 +45,10 @@ beforeAll(async () => {
})

afterAll(async () => {
// clear port file
fs.unlinkSync(
path.join(process.cwd(), '.dubbojs', String(dubboService.getPort()))
)
dubbo.close()
await dubboService.close()
})
@@ -15,6 +15,8 @@
* limitations under the License.
*/

import path from 'path'
import fs from 'fs-extra'
import { Zk } from 'apache-dubbo-registry'
import { Dubbo, java } from 'apache-dubbo-consumer'
import { DubboService } from 'apache-dubbo-service'
@@ -44,6 +46,11 @@ beforeAll(async () => {
})

afterAll(async () => {
// clear port file
fs.unlinkSync(
path.join(process.cwd(), '.dubbojs', String(dubboService.getPort()))
)

dubbo.close()
await dubboService.close()
})

0 comments on commit 31a1edc

Please sign in to comment.