Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
.DS_Store
node_modules
etc
lottery
chapter5-bak
9 changes: 9 additions & 0 deletions chapter5/backend/article.js

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions chapter5/backend/comment-list.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const fs = require('fs')
const protobuf = require('protocol-buffers');
const commentSchemas = protobuf(
fs.readFileSync(`${__dirname}/../proto/comment.proto`)
);

// 假数据
const commentData = require('./mockdata/comment');

/**
* 服务端的编解包逻辑
*/
const server = require('./lib/geeknode-rpc-server')(commentSchemas.CommentListRequest, commentSchemas.CommentListResponse);

server
.createServer((request, response) => {
response.end({ comments: commentData });
})
.listen(4001, ()=> {
console.log('commentlist rpc server listened: 4001')
});
32 changes: 32 additions & 0 deletions chapter5/backend/comment-praise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const fs = require('fs')
const protobuf = require('protocol-buffers');
const commentSchemas = protobuf(
fs.readFileSync(`${__dirname}/../proto/comment.proto`)
);

// 假数据
const commentData = require('./mockdata/comment');

/**
* 服务端的编解包逻辑
*/
const server = require('./lib/geeknode-rpc-server')(commentSchemas.PraiseRequest, commentSchemas.PraiseResponse);

server
.createServer((request, response) => {
const commentid = request.body.commentid;
const comment = commentData.filter(comment => comment.id == commentid)[0];
let praiseNum = 0;

if (comment) {
comment.praiseNum++;
praiseNum = comment.praiseNum;
}
response.end({
commentid,
praiseNum
});
})
.listen(4002, ()=> {
console.log('praise rpc server listened: 4002')
});
28 changes: 28 additions & 0 deletions chapter5/backend/detail.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
const fs = require('fs')
const protobuf = require('protocol-buffers');
const schemas = protobuf(
fs.readFileSync(`${__dirname}/proto/detail.proto`)
);

// 假数据
const columnData = require('./mockdata/column')

/**
* 服务端的编解包逻辑
*/
const server = require('./lib/geeknode-rpc-server')(schemas.ColumnRequest, schemas.ColumnResponse);

server
.createServer((request, response) => {
// 因为都是假数据,这里就没有使用栏目id。真实项目会拿这个columnid去请求数据库
const columnid = request.body;

// 直接返回假数据
response.end({
column: columnData[0],
recommendColumns: [columnData[1], columnData[2]]
});
})
.listen(4000, ()=> {
console.log('detail server listened: 4000')
});
36 changes: 36 additions & 0 deletions chapter5/backend/lib/geeknode-rpc-server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const RPC = require('./rpc-server');

/**
* 因为所有服务用的包头格式都一样,不一样的只有protobuf协议,所以这里可以将这段逻辑封成一个模块
*
* 日常做项目的时候一定要注意把重复代码做封装
*/
module.exports = function (protobufRequestSchema, protobufResponseSchema) {
return new RPC({
// 解码请求包
decodeRequest(buffer) {
const seq = buffer.readUInt32BE();

return {
seq: seq,
result: protobufRequestSchema.decode(buffer.slice(8))
}
},
// 判断请求包是不是接收完成
isCompleteRequest(buffer) {
const bodyLength = buffer.readUInt32BE(4);

return 8 + bodyLength
},
// 编码返回包
encodeResponse(data, seq) {
const body = protobufResponseSchema.encode(data);

const head = Buffer.alloc(8);
head.writeUInt32BE(seq);
head.writeUInt32BE(body.length, 4);

return Buffer.concat([head, body]);
}
})
}
131 changes: 131 additions & 0 deletions chapter5/backend/lib/rpc-server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// 'use strict';
// const debug = require("debug")('easysock-server');
const net = require("net");

module.exports = class RPC {
constructor({ encodeResponse, decodeRequest, isCompleteRequest }) {
this.encodeResponse = encodeResponse;
this.decodeRequest = decodeRequest;
this.isCompleteRequest = isCompleteRequest;
}

createServer(callback) {
let buffer = null;

const tcpServer = net.createServer((socket) => {

socket.on('data', (data) => {
buffer = (buffer && buffer.length > 0) ?
Buffer.concat([buffer, data]) : // 有遗留数据才做拼接操作
data;

let checkLength = null;
while (buffer && (checkLength = this.isCompleteRequest(buffer))) {
let requestBuffer = null;
if (checkLength == buffer.length) {
requestBuffer = buffer;
buffer = null;

} else {
requestBuffer = buffer.slice(0, checkLength);
buffer = buffer.slice(checkLength);
}

const request = this.decodeRequest(requestBuffer);
callback(
{ // request
body: request.result,
socket
},
{ // response
end: (data) => {
const buffer = this.encodeResponse(data, request.seq)
socket.write(buffer);
}
}
);
}
})
});

return {
listen() {
tcpServer.listen.apply(tcpServer, arguments)
}
}
}
}

// /**
// * @param config
// * encode
// * decode
// * check
// * @param handleResponse:async fn
// *
// *
// * @returns {bindsocket(socket)}
// */
// let exportee = module.exports = function (config, handleResponse) {
// let {encode, decode, check} = config;
// if (!check && config.isReceiveComplete) check = config.isReceiveComplete;

// // 缓冲区
// let handleData = async function (buffer) {
// let param = null;
// let result = null;

// try {
// param = decode(buffer);
// result = await handleResponse(param.error, param.result);
// return encode(result, param.seq);
// } catch (e) {
// return debug(e);
// }
// };

// return function bindsocket(socket) {
// debug('socket connected', socket.connection);

// socket.on('data', data=> {
// debug('socket ondata');

// debug('remain', buffer && buffer.length)
// });

// socket.on('end', e=> {
// debug('socket end');
// });

// return socket;
// };
// };

// const net = require("net");
// /**
// * 直接创建一个server
// * @param config 包含encode、decode、check
// * @param handlerResponse 一个async function,参数为收到的请求结构体,返回是回包的结构体
// */
// exportee.server = function (config, handlerResponse) {
// let handleSocket = exportee(config, handlerResponse);
// let socketList = [];

// return Object.assign(
// net.createServer(function (socket) {
// handleSocket(socket);
// socketList.push(socket);
// }),

// {
// closeAllSocket: function () {
// socketList.forEach(socket=> {
// if (!socket.destroyed) {
// socket.destroy();

// }
// })
// }
// }
// )
// };
Loading