Skip to content
Permalink
Browse files
when scheduler call dubboInoke check availiable socket worker
  • Loading branch information
hufeng committed May 22, 2018
1 parent dc6bd6a commit 87483ad74838af91f41dcd5b7bc8c7914ce4252c
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 40 deletions.
@@ -5,14 +5,14 @@ const dubbo = new Dubbo({
register: 'localhost:2181',
dubboVersion: '2.0.0',
interfaces: [
'com.alibaba.dubbo.demo.DemoService',
'com.alibaba.dubbo.demo.BasicTypeService',
'com.alibaba.dubbo.demo.ErrorService',
'com.alibaba.dubbo.demo.DemoProvider',
'com.alibaba.dubbo.demo.BasicTypeProvider',
'com.alibaba.dubbo.demo.ErrorProvider',
],
});

const demoService = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.DemoService',
dubboInterface: 'com.alibaba.dubbo.demo.DemoProvider',
version: '1.0.0',
methods: {
sayHello(name) {
@@ -36,7 +36,7 @@ const demoService = dubbo.proxyService({
});

const basicTypeService = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.BasicTypeService',
dubboInterface: 'com.alibaba.dubbo.demo.BasicTypeProvider',
version: '2.0.0',
methods: {
testBasicType() {
@@ -51,7 +51,7 @@ const basicTypeService = dubbo.proxyService({
});

const errorService = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.ErrorService',
dubboInterface: 'com.alibaba.dubbo.demo.ErrorProvider',
version: '1.0.0',
methods: {
errorTest() {
@@ -5,9 +5,9 @@ const dubbo = new Dubbo({
register: 'localhost:2181',
dubboVersion: '2.0.0',
interfaces: [
'com.alibaba.dubbo.demo.DemoService',
'com.alibaba.dubbo.demo.BasicTypeService',
'com.alibaba.dubbo.demo.ErrorService',
'com.alibaba.dubbo.demo.DemoProvider',
'com.alibaba.dubbo.demo.BasicTypeProvider',
'com.alibaba.dubbo.demo.ErrorProvider',
],
});

@@ -19,8 +19,8 @@ dubbo.use(async function costTime(ctx, next) {
console.log('end makecostTime->', endTime - startTime);
});

const demoService = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.DemoService',
const demoProvider = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.DemoProvider',
version: '1.0.0',
methods: {
sayHello(name) {
@@ -43,8 +43,8 @@ const demoService = dubbo.proxyService({
},
});

const basicTypeService = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.BasicTypeService',
const basicTypeProvider = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.BasicTypeProvider',
version: '2.0.0',
methods: {
testBasicType() {
@@ -58,8 +58,8 @@ const basicTypeService = dubbo.proxyService({
},
});

const errorService = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.ErrorService',
const errorProvider = dubbo.proxyService({
dubboInterface: 'com.alibaba.dubbo.demo.ErrorProvider',
version: '1.0.0',
methods: {
errorTest() {
@@ -69,7 +69,7 @@ const errorService = dubbo.proxyService({
});

module.exports = {
demoService,
errorService,
basicTypeService,
demoProvider,
errorProvider,
basicTypeProvider,
};
@@ -1,7 +1,7 @@
const Koa = require('koa');
const Router = require('koa-router');
// const {tracer} = require('dubbo2.js');
const {demoService, basicTypeService, errorService} = require('./dubbo');
const {demoProvider, basicTypeProvider, errorProvider} = require('./dubbo');

const app = new Koa();
const router = new Router();
@@ -11,38 +11,38 @@ router.get('/', ctx => {
});

router.get('/hello', async ctx => {
const {res, err} = await demoService.sayHello('test');
const {res, err} = await demoProvider.sayHello('test');
ctx.body = err ? err.message : res;
});

router.get('/user', async ctx => {
const {res, err} = await demoService.getUserInfo();
const {res, err} = await demoProvider.getUserInfo();
ctx.body = res || err.message;
});

router.get('/echo', async ctx => {
ctx.body = await demoService.echo();
ctx.body = await demoProvider.echo();
});

router.get('/type', async ctx => {
const {res, err} = await basicTypeService.testBasicType();
const {res, err} = await basicTypeProvider.testBasicType();
ctx.body = res;
});

router.get('/exp', async ctx => {
const {err, res} = await errorService.errorTest();
const {err, res} = await errorProvider.errorTest();
console.log(err);
ctx.body = 'ok';
});

router.get('/tracer', async ctx => {
const {res: hello} = await demoService.sayHello('test');
const {res: userInfo} = await demoService.getUserInfo();
const {res: hello} = await demoProvider.sayHello('test');
const {res: userInfo} = await demoProvider.getUserInfo();

setTimeout(async () => {
await basicTypeService.testBasicType();
await basicTypeProvider.testBasicType();
process.nextTick(() => {
demoService.getUserInfo();
demoProvider.getUserInfo();
});
});

@@ -25,17 +25,22 @@

<dubbo:registry protocol="zookeeper" address="localhost:2181"/>

<dubbo:provider timeout="1500"/>

<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>
<dubbo:protocol name="dubbo" port="20881"/>
<!--<dubbo:protocol name="dubbo" port="20881"/>-->

<!-- 和本地bean一样实现服务 -->
<bean id="demoProvider" class="com.alibaba.dubbo.demo.provider.DemoProviderImpl"/>
<bean id="basicTypeProvider" class="com.alibaba.dubbo.demo.provider.BasicTypeProviderImpl"/>
<bean id="errorProvider" class="com.alibaba.dubbo.demo.provider.ErrorProviderImpl"/>

<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoProvider" ref="demoProvider" version="1.0.0"/>
<dubbo:service interface="com.alibaba.dubbo.demo.DemoProvider" ref="demoProvider" version="1.0.0" timeout="10000">
<dubbo:method name="sayHello" timeout="8000"/>
</dubbo:service>

<dubbo:service interface="com.alibaba.dubbo.demo.BasicTypeProvider" ref="basicTypeProvider" version="2.0.0"/>
<dubbo:service interface="com.alibaba.dubbo.demo.ErrorProvider" ref="errorProvider" version="1.0.0"/>

@@ -256,7 +256,7 @@ export default class Context<T = any> {

//===========timeout setter&&getter=================
set timeoutId(timeId: NodeJS.Timer) {
log('requestId#%d set timeoutId: %O', this._request.requestId, timeId);
log('requestId#%d set timeoutId: %O', this._request.requestId);
this._timeoutId = timeId;
}

@@ -32,6 +32,8 @@ enum SCHEDULE_STATUS {
FAILED,
//OK状态
READY,
//没有可用socket agent
NO_AVAILIABLE_SOCKET_AGENT,
}

/**
@@ -74,7 +76,10 @@ export default class Scheduler {
log('scheduler was padding');
break;
case SCHEDULE_STATUS.FAILED:
this._scheduleFailed(requestId);
this._scheduleFailed(
requestId,
new ScheduleError('Schedule error, Zk Could not connect'),
);
break;
case SCHEDULE_STATUS.READY:
log('scheduler was ready');
@@ -131,12 +136,9 @@ export default class Scheduler {
/**
* 处理schedule的failed状态
*/
private _scheduleFailed = (requestId: number) => {
log('scheduler was failed');
queue.failed(
requestId,
new ScheduleError('Schedule error, Zk Could not connect'),
);
private _scheduleFailed = (requestId: number, err: Error) => {
log('scheduler was failed, err: %s', err);
queue.failed(requestId, err);
};

/**
@@ -172,6 +174,14 @@ export default class Scheduler {
* @param agentHostList
*/
private _dubboInvoke(ctx: Context, agentHostList: Array<string>) {
if (!this._serverAgent.hasAvailableSocketAgent(agentHostList)) {
this._scheduleFailed(
ctx.requestId,
new ScheduleError('Could not find any availiable socekt agent'),
);
return;
}

const node = this._serverAgent.getAvailableSocketAgent(agentHostList)
.worker;

@@ -79,12 +79,12 @@ export class ServerAgent implements IObservable<ISocketSubscriber> {
agentHostPorts: Array<TAgentHostPort>,
): Array<SocketPool> {
let availableList = [];
agentHostPorts.forEach(agentHostPort => {
for (let agentHostPort of agentHostPorts) {
const socketPool = this._serverAgentMap.get(agentHostPort);
if (socketPool && socketPool.hasAvaliableNodes) {
availableList.push(socketPool);
}
});
}
return availableList;
}

0 comments on commit 87483ad

Please sign in to comment.